]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
Merge remote-tracking branch 'origin/mdb.master'
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
3  *
4  * Copyright 1998-2013 The OpenLDAP Foundation.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted only as authorized by the OpenLDAP
9  * Public License.
10  *
11  * A copy of this license is available in file LICENSE in the
12  * top-level directory of the distribution or, alternatively, at
13  * <http://www.OpenLDAP.org/license.html>.
14  */
15
16 #include "portable.h"
17
18 #include <stdio.h>
19
20 #include <ac/signal.h>
21 #include <ac/stdarg.h>
22 #include <ac/stdlib.h>
23 #include <ac/string.h>
24 #include <ac/time.h>
25 #include <ac/errno.h>
26
27 #include "ldap-int.h"
28 #include "ldap_pvt_thread.h" /* Get the thread interface */
29 #include "ldap_queue.h"
30 #define LDAP_THREAD_POOL_IMPLEMENTATION
31 #include "ldap_thr_debug.h"  /* May rename symbols defined below */
32
33 #ifndef LDAP_THREAD_HAVE_TPOOL
34
35 /* Thread-specific key with data and optional free function */
36 typedef struct ldap_int_tpool_key_s {
37         void *ltk_key;
38         void *ltk_data;
39         ldap_pvt_thread_pool_keyfree_t *ltk_free;
40 } ldap_int_tpool_key_t;
41
42 /* Max number of thread-specific keys we store per thread.
43  * We don't expect to use many...
44  */
45 #define MAXKEYS 32
46
47 /* Max number of threads */
48 #define LDAP_MAXTHR     1024    /* must be a power of 2 */
49
50 /* (Theoretical) max number of pending requests */
51 #define MAX_PENDING (INT_MAX/2) /* INT_MAX - (room to avoid overflow) */
52
53 /* pool->ltp_pause values */
54 enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 };
55
56 /* Context: thread ID and thread-specific key/data pairs */
57 typedef struct ldap_int_thread_userctx_s {
58         struct ldap_int_thread_poolq_s *ltu_pq;
59         ldap_pvt_thread_t ltu_id;
60         ldap_int_tpool_key_t ltu_key[MAXKEYS];
61 } ldap_int_thread_userctx_t;
62
63
64 /* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
65  * Protected by ldap_pvt_thread_pool_mutex except during pauses,
66  * when it is read-only (used by pool_purgekey and pool_context).
67  * Protected by ldap_pvt_thread_pool_mutex.
68  */
69 static struct {
70         ldap_int_thread_userctx_t *ctx;
71         /* ctx is valid when not NULL or DELETED_THREAD_CTX */
72 #       define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
73 } thread_keys[LDAP_MAXTHR];
74
75 #define TID_HASH(tid, hash) do { \
76         unsigned const char *ptr_ = (unsigned const char *)&(tid); \
77         unsigned i_; \
78         for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
79                 (hash) += ((hash) << 5) ^ ptr_[i_]; \
80 } while(0)
81
82
83 /* Task for a thread to perform */
84 typedef struct ldap_int_thread_task_s {
85         union {
86                 LDAP_STAILQ_ENTRY(ldap_int_thread_task_s) q;
87                 LDAP_SLIST_ENTRY(ldap_int_thread_task_s) l;
88         } ltt_next;
89         ldap_pvt_thread_start_t *ltt_start_routine;
90         void *ltt_arg;
91 } ldap_int_thread_task_t;
92
93 typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
94
95 struct ldap_int_thread_poolq_s {
96         struct ldap_int_thread_pool_s *ltp_pool;
97
98         /* protect members below */
99         ldap_pvt_thread_mutex_t ltp_mutex;
100
101         /* not paused and something to do for pool_<wrapper/pause/destroy>()
102          * Used for normal pool operation, to synch between submitter and
103          * worker threads. Not used for pauses. In normal operation multiple
104          * queues can rendezvous without acquiring the main pool lock.
105          */
106         ldap_pvt_thread_cond_t ltp_cond;
107
108         /* ltp_pause == 0 ? &ltp_pending_list : &empty_pending_list,
109          * maintaned to reduce work for pool_wrapper()
110          */
111         ldap_int_tpool_plist_t *ltp_work_list;
112
113         /* pending tasks, and unused task objects */
114         ldap_int_tpool_plist_t ltp_pending_list;
115         LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
116
117         /* Max number of threads in this queue */
118         int ltp_max_count;
119
120         /* Max pending + paused + idle tasks, negated when ltp_finishing */
121         int ltp_max_pending;
122
123         int ltp_pending_count;          /* Pending + paused + idle tasks */
124         int ltp_active_count;           /* Active, not paused/idle tasks */
125         int ltp_open_count;                     /* Number of threads, negated when ltp_pause */
126         int ltp_starting;                       /* Currently starting threads */
127 };
128
129 struct ldap_int_thread_pool_s {
130         LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
131
132         struct ldap_int_thread_poolq_s *ltp_wqs;
133
134         /* number of poolqs */
135         int ltp_numqs;
136
137         /* protect members below */
138         ldap_pvt_thread_mutex_t ltp_mutex;
139
140         /* paused and waiting for resume
141          * When a pause is in effect all workers switch to waiting on
142          * this cond instead of their per-queue cond.
143          */
144         ldap_pvt_thread_cond_t ltp_cond;
145
146         /* ltp_active_queues < 1 && ltp_pause */
147         ldap_pvt_thread_cond_t ltp_pcond;
148
149         /* number of active queues */
150         int ltp_active_queues;
151
152         /* The pool is finishing, waiting for its threads to close.
153          * They close when ltp_pending_list is done.  pool_submit()
154          * rejects new tasks.  ltp_max_pending = -(its old value).
155          */
156         int ltp_finishing;
157
158         /* Some active task needs to be the sole active task.
159          * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
160          */
161         volatile sig_atomic_t ltp_pause;
162
163         /* Max number of threads in pool */
164         int ltp_max_count;
165
166         /* Configured max number of threads in pool, 0 for default (LDAP_MAXTHR) */
167         int ltp_conf_max_count;
168
169         /* Max pending + paused + idle tasks, negated when ltp_finishing */
170         int ltp_max_pending;
171 };
172
173 static ldap_int_tpool_plist_t empty_pending_list =
174         LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list);
175
176 static int ldap_int_has_thread_pool = 0;
177 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
178         ldap_int_thread_pool_list =
179         LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
180
181 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
182
183 static void *ldap_int_thread_pool_wrapper( void *pool );
184
185 static ldap_pvt_thread_key_t    ldap_tpool_key;
186
187 /* Context of the main thread */
188 static ldap_int_thread_userctx_t ldap_int_main_thrctx;
189
190 int
191 ldap_int_thread_pool_startup ( void )
192 {
193         ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
194         ldap_pvt_thread_key_create( &ldap_tpool_key );
195         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
196 }
197
198 int
199 ldap_int_thread_pool_shutdown ( void )
200 {
201         struct ldap_int_thread_pool_s *pool;
202
203         while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
204                 (ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
205         }
206         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
207         ldap_pvt_thread_key_destroy( ldap_tpool_key );
208         return(0);
209 }
210
211
212 /* Create a thread pool */
213 int
214 ldap_pvt_thread_pool_init_q (
215         ldap_pvt_thread_pool_t *tpool,
216         int max_threads,
217         int max_pending,
218         int numqs )
219 {
220         ldap_pvt_thread_pool_t pool;
221         struct ldap_int_thread_poolq_s *pq;
222         int i, rc, rem_thr, rem_pend;
223
224         /* multiple pools are currently not supported (ITS#4943) */
225         assert(!ldap_int_has_thread_pool);
226
227         if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
228                 max_threads = 0;
229         if (! (1 <= max_pending && max_pending <= MAX_PENDING))
230                 max_pending = MAX_PENDING;
231
232         *tpool = NULL;
233         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
234                 sizeof(struct ldap_int_thread_pool_s) +
235                 numqs * sizeof(struct ldap_int_thread_poolq_s));
236
237         if (pool == NULL) return(-1);
238
239         pool->ltp_wqs = (struct ldap_int_thread_poolq_s *)(pool+1);
240         pool->ltp_numqs = numqs;
241         pool->ltp_conf_max_count = max_threads;
242         if ( !max_threads )
243                 max_threads = LDAP_MAXTHR;
244
245         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
246         if (rc != 0)
247                 return(rc);
248
249         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
250         if (rc != 0)
251                 return(rc);
252
253         rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
254         if (rc != 0)
255                 return(rc);
256
257         rem_thr = max_threads % numqs;
258         rem_pend = max_pending % numqs;
259         for ( i=0; i<numqs; i++ ) {
260                 pq = &pool->ltp_wqs[i];
261                 pq->ltp_pool = pool;
262                 rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
263                 if (rc != 0)
264                         return(rc);
265                 rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
266                 if (rc != 0)
267                         return(rc);
268                 LDAP_STAILQ_INIT(&pq->ltp_pending_list);
269                 pq->ltp_work_list = &pq->ltp_pending_list;
270                 LDAP_SLIST_INIT(&pq->ltp_free_list);
271
272                 pq->ltp_max_count = max_threads / numqs;
273                 if ( rem_thr ) {
274                         pq->ltp_max_count++;
275                         rem_thr--;
276                 }
277                 pq->ltp_max_pending = max_pending / numqs;
278                 if ( rem_pend ) {
279                         pq->ltp_max_pending++;
280                         rem_pend--;
281                 }
282         }
283
284         ldap_int_has_thread_pool = 1;
285
286         pool->ltp_max_count = max_threads;
287         pool->ltp_max_pending = max_pending;
288
289         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
290         LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
291         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
292
293         /* Start no threads just yet.  That can break if the process forks
294          * later, as slapd does in order to daemonize.  On at least POSIX,
295          * only the forking thread would survive in the child.  Yet fork()
296          * can't unlock/clean up other threads' locks and data structures,
297          * unless pthread_atfork() handlers have been set up to do so.
298          */
299
300         *tpool = pool;
301         return(0);
302 }
303
304 int
305 ldap_pvt_thread_pool_init (
306         ldap_pvt_thread_pool_t *tpool,
307         int max_threads,
308         int max_pending )
309 {
310         return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 );
311 }
312
313 static int
314 ldap_int_poolq_hash(
315         struct ldap_int_thread_pool_s *pool,
316         void *arg )
317 {
318         int i = 0, j;
319         unsigned char *ptr = (unsigned char *)&arg;
320         /* dumb hash of arg to choose a queue */
321         for (j=0; j<sizeof(arg); j++)
322                 i += *ptr++;
323         i %= pool->ltp_numqs;
324         return i;
325 }
326
327 /* Submit a task to be performed by the thread pool */
328 int
329 ldap_pvt_thread_pool_submit (
330         ldap_pvt_thread_pool_t *tpool,
331         ldap_pvt_thread_start_t *start_routine, void *arg )
332 {
333         struct ldap_int_thread_pool_s *pool;
334         struct ldap_int_thread_poolq_s *pq;
335         ldap_int_thread_task_t *task;
336         ldap_pvt_thread_t thr;
337         int i, j;
338
339         if (tpool == NULL)
340                 return(-1);
341
342         pool = *tpool;
343
344         if (pool == NULL)
345                 return(-1);
346
347         if ( pool->ltp_numqs > 1 )
348                 i = ldap_int_poolq_hash( pool, arg );
349         else
350                 i = 0;
351
352         j = i;
353         while(1) {
354                 ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i].ltp_mutex);
355                 if (pool->ltp_wqs[i].ltp_pending_count < pool->ltp_wqs[i].ltp_max_pending) {
356                         break;
357                 }
358                 ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i].ltp_mutex);
359                 i++;
360                 i %= pool->ltp_numqs;
361                 if ( i == j )
362                         return -1;
363         }
364
365         pq = &pool->ltp_wqs[i];
366         task = LDAP_SLIST_FIRST(&pq->ltp_free_list);
367         if (task) {
368                 LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
369         } else {
370                 task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
371                 if (task == NULL)
372                         goto failed;
373         }
374
375         task->ltt_start_routine = start_routine;
376         task->ltt_arg = arg;
377
378         pq->ltp_pending_count++;
379         LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
380
381         if (pool->ltp_pause)
382                 goto done;
383
384         /* should we open (create) a thread? */
385         if (pq->ltp_open_count < pq->ltp_active_count+pq->ltp_pending_count &&
386                 pq->ltp_open_count < pq->ltp_max_count)
387         {
388                 pq->ltp_starting++;
389                 pq->ltp_open_count++;
390
391                 if (0 != ldap_pvt_thread_create(
392                         &thr, 1, ldap_int_thread_pool_wrapper, pq))
393                 {
394                         /* couldn't create thread.  back out of
395                          * ltp_open_count and check for even worse things.
396                          */
397                         pq->ltp_starting--;
398                         pq->ltp_open_count--;
399
400                         if (pq->ltp_open_count == 0) {
401                                 /* no open threads at all?!?
402                                  */
403                                 ldap_int_thread_task_t *ptr;
404
405                                 /* let pool_destroy know there are no more threads */
406                                 ldap_pvt_thread_cond_signal(&pq->ltp_cond);
407
408                                 LDAP_STAILQ_FOREACH(ptr, &pq->ltp_pending_list, ltt_next.q)
409                                         if (ptr == task) break;
410                                 if (ptr == task) {
411                                         /* no open threads, task not handled, so
412                                          * back out of ltp_pending_count, free the task,
413                                          * report the error.
414                                          */
415                                         pq->ltp_pending_count--;
416                                         LDAP_STAILQ_REMOVE(&pq->ltp_pending_list, task,
417                                                 ldap_int_thread_task_s, ltt_next.q);
418                                         LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task,
419                                                 ltt_next.l);
420                                         goto failed;
421                                 }
422                         }
423                         /* there is another open thread, so this
424                          * task will be handled eventually.
425                          */
426                 }
427         }
428         ldap_pvt_thread_cond_signal(&pq->ltp_cond);
429
430  done:
431         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
432         return(0);
433
434  failed:
435         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
436         return(-1);
437 }
438
439 static void *
440 no_task( void *ctx, void *arg )
441 {
442         return NULL;
443 }
444
445 /* Cancel a pending task that was previously submitted.
446  * Return 1 if the task was successfully cancelled, 0 if
447  * not found, -1 for invalid parameters
448  */
449 int
450 ldap_pvt_thread_pool_retract (
451         ldap_pvt_thread_pool_t *tpool,
452         ldap_pvt_thread_start_t *start_routine, void *arg )
453 {
454         struct ldap_int_thread_pool_s *pool;
455         struct ldap_int_thread_poolq_s *pq;
456         ldap_int_thread_task_t *task;
457         int i;
458
459         if (tpool == NULL)
460                 return(-1);
461
462         pool = *tpool;
463
464         if (pool == NULL)
465                 return(-1);
466
467         i = ldap_int_poolq_hash( pool, arg );
468         pq = &pool->ltp_wqs[i];
469
470         ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
471         LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
472                 if (task->ltt_start_routine == start_routine &&
473                         task->ltt_arg == arg) {
474                         /* Could LDAP_STAILQ_REMOVE the task, but that
475                          * walks ltp_pending_list again to find it.
476                          */
477                         task->ltt_start_routine = no_task;
478                         task->ltt_arg = NULL;
479                         break;
480                 }
481         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
482         return task != NULL;
483 }
484
485 /* Set max #threads.  value <= 0 means max supported #threads (LDAP_MAXTHR) */
486 int
487 ldap_pvt_thread_pool_maxthreads(
488         ldap_pvt_thread_pool_t *tpool,
489         int max_threads )
490 {
491         struct ldap_int_thread_pool_s *pool;
492         struct ldap_int_thread_poolq_s *pq;
493         int remthr, i;
494
495         if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
496                 max_threads = 0;
497
498         if (tpool == NULL)
499                 return(-1);
500
501         pool = *tpool;
502
503         if (pool == NULL)
504                 return(-1);
505
506         pool->ltp_conf_max_count = max_threads;
507         if ( !max_threads )
508                 max_threads = LDAP_MAXTHR;
509         pool->ltp_max_count = max_threads;
510
511         remthr = max_threads % pool->ltp_numqs;
512         max_threads /= pool->ltp_numqs;
513
514         for (i=0; i<pool->ltp_numqs; i++) {
515                 pq = &pool->ltp_wqs[i];
516                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
517                 pq->ltp_max_count = max_threads;
518                 if (remthr) {
519                         pq->ltp_max_count++;
520                         remthr--;
521                 }
522                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
523         }
524         return(0);
525 }
526
527 /* Inspect the pool */
528 int
529 ldap_pvt_thread_pool_query(
530         ldap_pvt_thread_pool_t *tpool,
531         ldap_pvt_thread_pool_param_t param,
532         void *value )
533 {
534         struct ldap_int_thread_pool_s   *pool;
535         int                             count = -1;
536
537         if ( tpool == NULL || value == NULL ) {
538                 return -1;
539         }
540
541         pool = *tpool;
542
543         if ( pool == NULL ) {
544                 return 0;
545         }
546
547         switch ( param ) {
548         case LDAP_PVT_THREAD_POOL_PARAM_MAX:
549                 count = pool->ltp_conf_max_count;
550                 break;
551
552         case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
553                 count = pool->ltp_max_pending;
554                 if (count < 0)
555                         count = -count;
556                 if (count == MAX_PENDING)
557                         count = 0;
558                 break;
559
560         case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
561                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
562                 count = (pool->ltp_pause != 0);
563                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
564                 break;
565
566         case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
567         case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
568         case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
569         case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
570         case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
571                 {
572                         int i;
573                         count = 0;
574                         for (i=0; i<pool->ltp_numqs; i++) {
575                                 struct ldap_int_thread_poolq_s *pq = &pool->ltp_wqs[i];
576                                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
577                                 switch(param) {
578                                         case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
579                                                 count += pq->ltp_open_count;
580                                                 break;
581                                         case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
582                                                 count += pq->ltp_starting;
583                                                 break;
584                                         case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
585                                                 count += pq->ltp_active_count;
586                                                 break;
587                                         case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
588                                                 count += pq->ltp_pending_count;
589                                                 break;
590                                         case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
591                                                 count += pq->ltp_pending_count + pq->ltp_active_count;
592                                                 break;
593                                 }
594                                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
595                         }
596                         if (count < 0)
597                                 count = -count;
598                 }
599                 break;
600
601         case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
602                 break;
603
604         case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
605                 break;
606
607         case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
608                 break;
609
610         case LDAP_PVT_THREAD_POOL_PARAM_STATE:
611                 if (pool->ltp_pause)
612                         *((char **)value) = "pausing";
613                 else if (!pool->ltp_finishing)
614                         *((char **)value) = "running";
615                 else {
616                         int i;
617                         for (i=0; i<pool->ltp_numqs; i++)
618                                 if (pool->ltp_wqs[i].ltp_pending_count) break;
619                         if (i<pool->ltp_numqs)
620                                 *((char **)value) = "finishing";
621                         else
622                                 *((char **)value) = "stopping";
623                 }
624                 break;
625
626         case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
627                 break;
628         }
629
630         if ( count > -1 ) {
631                 *((int *)value) = count;
632         }
633
634         return ( count == -1 ? -1 : 0 );
635 }
636
637 /*
638  * true if pool is pausing; does not lock any mutex to check.
639  * 0 if not pause, 1 if pause, -1 if error or no pool.
640  */
641 int
642 ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool )
643 {
644         int rc = -1;
645         struct ldap_int_thread_pool_s *pool;
646
647         if ( tpool != NULL && (pool = *tpool) != NULL ) {
648                 rc = (pool->ltp_pause != 0);
649         }
650
651         return rc;
652 }
653
654 /*
655  * wrapper for ldap_pvt_thread_pool_query(), left around
656  * for backwards compatibility
657  */
658 int
659 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
660 {
661         int     rc, count;
662
663         rc = ldap_pvt_thread_pool_query( tpool,
664                 LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
665
666         if ( rc == 0 ) {
667                 return count;
668         }
669
670         return rc;
671 }
672
673 /* Destroy the pool after making its threads finish */
674 int
675 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
676 {
677         struct ldap_int_thread_pool_s *pool, *pptr;
678         struct ldap_int_thread_poolq_s *pq;
679         ldap_int_thread_task_t *task;
680         int i;
681
682         if (tpool == NULL)
683                 return(-1);
684
685         pool = *tpool;
686
687         if (pool == NULL) return(-1);
688
689         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
690         LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
691                 if (pptr == pool) break;
692         if (pptr == pool)
693                 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
694                         ldap_int_thread_pool_s, ltp_next);
695         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
696
697         if (pool != pptr) return(-1);
698
699         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
700
701         pool->ltp_finishing = 1;
702         if (pool->ltp_max_pending > 0)
703                 pool->ltp_max_pending = -pool->ltp_max_pending;
704
705         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
706         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
707
708         for (i=0; i<pool->ltp_numqs; i++) {
709                 pq = &pool->ltp_wqs[i];
710                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
711                 if (pq->ltp_max_pending > 0)
712                         pq->ltp_max_pending = -pq->ltp_max_pending;
713                 if (!run_pending) {
714                         while ((task = LDAP_STAILQ_FIRST(&pq->ltp_pending_list)) != NULL) {
715                                 LDAP_STAILQ_REMOVE_HEAD(&pq->ltp_pending_list, ltt_next.q);
716                                 LDAP_FREE(task);
717                         }
718                         pq->ltp_pending_count = 0;
719                 }
720
721                 while (pq->ltp_open_count) {
722                         ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
723                         ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
724                 }
725
726                 while ((task = LDAP_SLIST_FIRST(&pq->ltp_free_list)) != NULL)
727                 {
728                         LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
729                         LDAP_FREE(task);
730                 }
731                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
732                 ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
733                 ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
734         }
735
736         ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
737         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
738         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
739         LDAP_FREE(pool);
740         *tpool = NULL;
741         ldap_int_has_thread_pool = 0;
742         return(0);
743 }
744
745 /* Thread loop.  Accept and handle submitted tasks. */
746 static void *
747 ldap_int_thread_pool_wrapper ( 
748         void *xpool )
749 {
750         struct ldap_int_thread_poolq_s *pq = xpool;
751         struct ldap_int_thread_pool_s *pool = pq->ltp_pool;
752         ldap_int_thread_task_t *task;
753         ldap_int_tpool_plist_t *work_list;
754         ldap_int_thread_userctx_t ctx, *kctx;
755         unsigned i, keyslot, hash;
756         int pool_lock = 0;
757
758         assert(pool != NULL);
759
760         for ( i=0; i<MAXKEYS; i++ ) {
761                 ctx.ltu_key[i].ltk_key = NULL;
762         }
763
764         ctx.ltu_pq = pq;
765         ctx.ltu_id = ldap_pvt_thread_self();
766         TID_HASH(ctx.ltu_id, hash);
767
768         ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
769
770         if (pool->ltp_pause) {
771                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
772                 /* thread_keys[] is read-only when paused */
773                 while (pool->ltp_pause)
774                         ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
775                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
776         }
777
778         /* find a key slot to give this thread ID and store a
779          * pointer to our keys there; start at the thread ID
780          * itself (mod LDAP_MAXTHR) and look for an empty slot.
781          */
782         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
783         for (keyslot = hash & (LDAP_MAXTHR-1);
784                 (kctx = thread_keys[keyslot].ctx) && kctx != DELETED_THREAD_CTX;
785                 keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
786         thread_keys[keyslot].ctx = &ctx;
787         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
788
789         ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
790         pq->ltp_starting--;
791         pq->ltp_active_count++;
792
793         for (;;) {
794                 work_list = pq->ltp_work_list; /* help the compiler a bit */
795                 task = LDAP_STAILQ_FIRST(work_list);
796                 if (task == NULL) {     /* paused or no pending tasks */
797                         if (--(pq->ltp_active_count) < 1) {
798                                 if (pool->ltp_pause) {
799                                         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
800                                         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
801                                         pool_lock = 1;
802                                         if (--(pool->ltp_active_queues) < 1) {
803                                                 /* Notify pool_pause it is the sole active thread. */
804                                                 ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
805                                         }
806                                 }
807                         }
808
809                         do {
810                                 if (pool->ltp_finishing || pq->ltp_open_count > pq->ltp_max_count) {
811                                         /* Not paused, and either finishing or too many
812                                          * threads running (can happen if ltp_max_count
813                                          * was reduced).  Let this thread die.
814                                          */
815                                         goto done;
816                                 }
817
818                                 /* We could check an idle timer here, and let the
819                                  * thread die if it has been inactive for a while.
820                                  * Only die if there are other open threads (i.e.,
821                                  * always have at least one thread open).
822                                  * The check should be like this:
823                                  *   if (pool->ltp_open_count>1 && pool->ltp_starting==0)
824                                  *       check timer, wait if ltp_pause, leave thread;
825                                  *
826                                  * Just use pthread_cond_timedwait() if we want to
827                                  * check idle time.
828                                  */
829                                 if (pool_lock) {
830                                         ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
831                                         if (!pool->ltp_pause) {
832                                                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
833                                                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
834                                                 pool_lock = 0;
835                                         }
836                                 } else
837                                         ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
838
839                                 work_list = pq->ltp_work_list;
840                                 task = LDAP_STAILQ_FIRST(work_list);
841                         } while (task == NULL);
842
843                         if (pool_lock) {
844                                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
845                                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
846                                 pool_lock = 0;
847                         }
848                         pq->ltp_active_count++;
849                 }
850
851                 LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q);
852                 pq->ltp_pending_count--;
853                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
854
855                 task->ltt_start_routine(&ctx, task->ltt_arg);
856
857                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
858                 LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task, ltt_next.l);
859         }
860  done:
861
862         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
863
864         /* The pool_mutex lock protects ctx->ltu_key from pool_purgekey()
865          * during this call, since it prevents new pauses. */
866         ldap_pvt_thread_pool_context_reset(&ctx);
867
868         thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
869         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
870
871         pq->ltp_open_count--;
872         /* let pool_destroy know we're all done */
873         if (pq->ltp_open_count == 0)
874                 ldap_pvt_thread_cond_signal(&pq->ltp_cond);
875
876         if (pool_lock)
877                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
878         else
879                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
880
881         ldap_pvt_thread_exit(NULL);
882         return(NULL);
883 }
884
885 /* Arguments > ltp_pause to handle_pause(,PAUSE_ARG()).  arg=PAUSE_ARG
886  * ensures (arg-ltp_pause) sets GO_* at need and keeps DO_PAUSE/GO_*.
887  */
888 #define GO_IDLE         8
889 #define GO_UNIDLE       16
890 #define CHECK_PAUSE     32      /* if ltp_pause: GO_IDLE; wait; GO_UNIDLE */
891 #define DO_PAUSE        64      /* CHECK_PAUSE; pause the pool */
892 #define PAUSE_ARG(a) \
893                 ((a) | ((a) & (GO_IDLE|GO_UNIDLE) ? GO_IDLE-1 : CHECK_PAUSE))
894
895 static int
896 handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
897 {
898         struct ldap_int_thread_pool_s *pool;
899         struct ldap_int_thread_poolq_s *pq;
900         int ret = 0, pause, max_ltp_pause;
901
902         if (tpool == NULL)
903                 return(-1);
904
905         pool = *tpool;
906
907         if (pool == NULL)
908                 return(0);
909
910         if (pause_type == CHECK_PAUSE && !pool->ltp_pause)
911                 return(0);
912
913         {
914                 ldap_int_thread_userctx_t *ctx = ldap_pvt_thread_pool_context();
915                 pq = ctx->ltu_pq;
916         }
917
918         /* Let pool_unidle() ignore requests for new pauses */
919         max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED;
920
921         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
922
923         pause = pool->ltp_pause;        /* NOT_PAUSED, WANT_PAUSE or PAUSED */
924
925         /* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
926         pause_type -= pause;
927
928         if (pause_type & GO_IDLE) {
929                 int do_pool = 0;
930                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
931                 pq->ltp_pending_count++;
932                 pq->ltp_active_count--;
933                 if (pause && pq->ltp_active_count < 1) {
934                         do_pool = 1;
935                 }
936                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
937                 if (do_pool) {
938                         pool->ltp_active_queues--;
939                         if (pool->ltp_active_queues < 1)
940                         /* Tell the task waiting to DO_PAUSE it can proceed */
941                                 ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
942                 }
943         }
944
945         if (pause_type & GO_UNIDLE) {
946                 /* Wait out pause if any, then cancel GO_IDLE */
947                 if (pause > max_ltp_pause) {
948                         ret = 1;
949                         do {
950                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
951                         } while (pool->ltp_pause > max_ltp_pause);
952                 }
953                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
954                 pq->ltp_pending_count--;
955                 pq->ltp_active_count++;
956                 ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
957         }
958
959         if (pause_type & DO_PAUSE) {
960                 int i, j;
961                 /* Tell everyone else to pause or finish, then await that */
962                 ret = 0;
963                 assert(!pool->ltp_pause);
964                 pool->ltp_pause = WANT_PAUSE;
965                 pool->ltp_active_queues = 0;
966
967                 for (i=0; i<pool->ltp_numqs; i++)
968                         if (&pool->ltp_wqs[i] == pq) break;
969
970                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
971                 /* temporarily remove ourself from active count */
972                 pq->ltp_active_count--;
973
974                 j=i;
975                 do {
976                         pq = &pool->ltp_wqs[j];
977                         if (j != i)
978                                 ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
979
980                         /* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
981                          * and do not finish threads in ldap_pvt_thread_pool_wrapper() */
982                         pq->ltp_open_count = -pq->ltp_open_count;
983                         /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
984                         pq->ltp_work_list = &empty_pending_list;
985
986                         if (pq->ltp_active_count > 0)
987                                 pool->ltp_active_queues++;
988
989                         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
990                         if (pool->ltp_numqs > 1) {
991                                 j++;
992                                 j %= pool->ltp_numqs;
993                         }
994                 } while (j != i);
995
996                 /* Wait for this task to become the sole active task */
997                 while (pool->ltp_active_queues > 0)
998                         ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
999
1000                 /* restore us to active count */
1001                 pool->ltp_wqs[i].ltp_active_count++;
1002
1003                 assert(pool->ltp_pause == WANT_PAUSE);
1004                 pool->ltp_pause = PAUSED;
1005         }
1006         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1007
1008         return(ret);
1009 }
1010
1011 /* Consider this task idle: It will not block pool_pause() in other tasks. */
1012 void
1013 ldap_pvt_thread_pool_idle( ldap_pvt_thread_pool_t *tpool )
1014 {
1015         handle_pause(tpool, PAUSE_ARG(GO_IDLE));
1016 }
1017
1018 /* Cancel pool_idle(). If the pool is paused, wait it out first. */
1019 void
1020 ldap_pvt_thread_pool_unidle( ldap_pvt_thread_pool_t *tpool )
1021 {
1022         handle_pause(tpool, PAUSE_ARG(GO_UNIDLE));
1023 }
1024
1025 /*
1026  * If a pause was requested, wait for it.  If several threads
1027  * are waiting to pause, let through one or more pauses.
1028  * The calling task must be active, not idle.
1029  * Return 1 if we waited, 0 if not, -1 at parameter error.
1030  */
1031 int
1032 ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
1033 {
1034         return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
1035 }
1036
1037 /*
1038  * Pause the pool.  The calling task must be active, not idle.
1039  * Return when all other tasks are paused or idle.
1040  */
1041 int
1042 ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool )
1043 {
1044         return handle_pause(tpool, PAUSE_ARG(DO_PAUSE));
1045 }
1046
1047 /* End a pause */
1048 int
1049 ldap_pvt_thread_pool_resume ( 
1050         ldap_pvt_thread_pool_t *tpool )
1051 {
1052         struct ldap_int_thread_pool_s *pool;
1053         struct ldap_int_thread_poolq_s *pq;
1054         int i;
1055
1056         if (tpool == NULL)
1057                 return(-1);
1058
1059         pool = *tpool;
1060
1061         if (pool == NULL)
1062                 return(0);
1063
1064         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1065         assert(pool->ltp_pause == PAUSED);
1066         pool->ltp_pause = 0;
1067         for (i=0; i<pool->ltp_numqs; i++) {
1068                 pq = &pool->ltp_wqs[i];
1069                 if (pq->ltp_open_count <= 0) /* true when paused, but be paranoid */
1070                         pq->ltp_open_count = -pq->ltp_open_count;
1071                 pq->ltp_work_list = &pq->ltp_pending_list;
1072                 ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
1073         }
1074         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
1075         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1076         return(0);
1077 }
1078
1079 /*
1080  * Get the key's data and optionally free function in the given context.
1081  */
1082 int ldap_pvt_thread_pool_getkey(
1083         void *xctx,
1084         void *key,
1085         void **data,
1086         ldap_pvt_thread_pool_keyfree_t **kfree )
1087 {
1088         ldap_int_thread_userctx_t *ctx = xctx;
1089         int i;
1090
1091         if ( !ctx || !key || !data ) return EINVAL;
1092
1093         for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
1094                 if ( ctx->ltu_key[i].ltk_key == key ) {
1095                         *data = ctx->ltu_key[i].ltk_data;
1096                         if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
1097                         return 0;
1098                 }
1099         }
1100         return ENOENT;
1101 }
1102
1103 static void
1104 clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
1105 {
1106         for ( ; i < MAXKEYS-1 && ctx->ltu_key[i+1].ltk_key; i++ )
1107                 ctx->ltu_key[i] = ctx->ltu_key[i+1];
1108         ctx->ltu_key[i].ltk_key = NULL;
1109 }
1110
1111 /*
1112  * Set or remove data for the key in the given context.
1113  * key can be any unique pointer.
1114  * kfree() is an optional function to free the data (but not the key):
1115  *   pool_context_reset() and pool_purgekey() call kfree(key, data),
1116  *   but pool_setkey() does not.  For pool_setkey() it is the caller's
1117  *   responsibility to free any existing data with the same key.
1118  *   kfree() must not call functions taking a tpool argument.
1119  */
1120 int ldap_pvt_thread_pool_setkey(
1121         void *xctx,
1122         void *key,
1123         void *data,
1124         ldap_pvt_thread_pool_keyfree_t *kfree,
1125         void **olddatap,
1126         ldap_pvt_thread_pool_keyfree_t **oldkfreep )
1127 {
1128         ldap_int_thread_userctx_t *ctx = xctx;
1129         int i, found;
1130
1131         if ( !ctx || !key ) return EINVAL;
1132
1133         for ( i=found=0; i<MAXKEYS; i++ ) {
1134                 if ( ctx->ltu_key[i].ltk_key == key ) {
1135                         found = 1;
1136                         break;
1137                 } else if ( !ctx->ltu_key[i].ltk_key ) {
1138                         break;
1139                 }
1140         }
1141
1142         if ( olddatap ) {
1143                 if ( found ) {
1144                         *olddatap = ctx->ltu_key[i].ltk_data;
1145                 } else {
1146                         *olddatap = NULL;
1147                 }
1148         }
1149
1150         if ( oldkfreep ) {
1151                 if ( found ) {
1152                         *oldkfreep = ctx->ltu_key[i].ltk_free;
1153                 } else {
1154                         *oldkfreep = 0;
1155                 }
1156         }
1157
1158         if ( data || kfree ) {
1159                 if ( i>=MAXKEYS )
1160                         return ENOMEM;
1161                 ctx->ltu_key[i].ltk_key = key;
1162                 ctx->ltu_key[i].ltk_data = data;
1163                 ctx->ltu_key[i].ltk_free = kfree;
1164         } else if ( found ) {
1165                 clear_key_idx( ctx, i );
1166         }
1167
1168         return 0;
1169 }
1170
1171 /* Free all elements with this key, no matter which thread they're in.
1172  * May only be called while the pool is paused.
1173  */
1174 void ldap_pvt_thread_pool_purgekey( void *key )
1175 {
1176         int i, j;
1177         ldap_int_thread_userctx_t *ctx;
1178
1179         assert ( key != NULL );
1180
1181         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
1182         for ( i=0; i<LDAP_MAXTHR; i++ ) {
1183                 ctx = thread_keys[i].ctx;
1184                 if ( ctx && ctx != DELETED_THREAD_CTX ) {
1185                         for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
1186                                 if ( ctx->ltu_key[j].ltk_key == key ) {
1187                                         if (ctx->ltu_key[j].ltk_free)
1188                                                 ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
1189                                                 ctx->ltu_key[j].ltk_data );
1190                                         clear_key_idx( ctx, j );
1191                                         break;
1192                                 }
1193                         }
1194                 }
1195         }
1196         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
1197 }
1198
1199 /*
1200  * Find the context of the current thread.
1201  * This is necessary if the caller does not have access to the
1202  * thread context handle (for example, a slapd plugin calling
1203  * slapi_search_internal()). No doubt it is more efficient
1204  * for the application to keep track of the thread context
1205  * handles itself.
1206  */
1207 void *ldap_pvt_thread_pool_context( )
1208 {
1209         void *ctx = NULL;
1210
1211         ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
1212         return ctx ? ctx : (void *) &ldap_int_main_thrctx;
1213 }
1214
1215 /*
1216  * Free the context's keys.
1217  * Must not call functions taking a tpool argument (because this
1218  * thread already holds ltp_mutex when called from pool_wrapper()).
1219  */
1220 void ldap_pvt_thread_pool_context_reset( void *vctx )
1221 {
1222         ldap_int_thread_userctx_t *ctx = vctx;
1223         int i;
1224
1225         for ( i=MAXKEYS-1; i>=0; i--) {
1226                 if ( !ctx->ltu_key[i].ltk_key )
1227                         continue;
1228                 if ( ctx->ltu_key[i].ltk_free )
1229                         ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
1230                         ctx->ltu_key[i].ltk_data );
1231                 ctx->ltu_key[i].ltk_key = NULL;
1232         }
1233 }
1234
1235 ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
1236 {
1237         ldap_int_thread_userctx_t *ctx = vctx;
1238
1239         return ctx->ltu_id;
1240 }
1241 #endif /* LDAP_THREAD_HAVE_TPOOL */