]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
4cecaee9da7b28dcfa014457391c16294eca452e
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /*
3  * Copyright 1998-2003 The OpenLDAP Foundation, Redwood City, California, USA
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms are permitted only
7  * as authorized by the OpenLDAP Public License.  A copy of this
8  * license is available at http://www.OpenLDAP.org/license.html or
9  * in file LICENSE in the top-level directory of the distribution.
10  */
11
12 #include "portable.h"
13
14 #include <stdio.h>
15
16 #include <ac/stdarg.h>
17 #include <ac/stdlib.h>
18 #include <ac/string.h>
19 #include <ac/time.h>
20 #include <ac/errno.h>
21
22 #include "ldap-int.h"
23 #include "ldap_pvt_thread.h"
24 #include "ldap_queue.h"
25
26 #ifndef LDAP_THREAD_HAVE_TPOOL
27
28 enum ldap_int_thread_pool_state {
29         LDAP_INT_THREAD_POOL_RUNNING,
30         LDAP_INT_THREAD_POOL_FINISHING,
31         LDAP_INT_THREAD_POOL_STOPPING
32 };
33
34 typedef struct ldap_int_thread_key_s {
35         void *ltk_key;
36         void *ltk_data;
37         ldap_pvt_thread_pool_keyfree_t *ltk_free;
38 } ldap_int_thread_key_t;
39
40 /* Max number of thread-specific keys we store per thread.
41  * We don't expect to use many...
42  */
43 #define MAXKEYS 32
44 #define MAXTHREADS      1024    /* must be a power of 2 */
45
46 static struct {
47         ldap_pvt_thread_t id;
48         ldap_int_thread_key_t *ctx;
49 } thread_keys[MAXTHREADS];
50         
51
52 typedef struct ldap_int_thread_ctx_s {
53         union {
54         LDAP_STAILQ_ENTRY(ldap_int_thread_ctx_s) q;
55         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) l;
56         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) al;
57         } ltc_next;
58         ldap_pvt_thread_start_t *ltc_start_routine;
59         void *ltc_arg;
60 } ldap_int_thread_ctx_t;
61
62 struct ldap_int_thread_pool_s {
63         LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
64         ldap_pvt_thread_mutex_t ltp_mutex;
65         ldap_pvt_thread_cond_t ltp_cond;
66         LDAP_STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
67         LDAP_SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
68         LDAP_SLIST_HEAD(tclq, ldap_int_thread_ctx_s) ltp_active_list;
69         long ltp_state;
70         long ltp_max_count;
71         long ltp_max_pending;
72         long ltp_pending_count;
73         long ltp_active_count;
74         long ltp_open_count;
75         long ltp_starting;
76 };
77
78 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
79         ldap_int_thread_pool_list =
80         LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
81
82 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
83
84 static void *ldap_int_thread_pool_wrapper( void *pool );
85
86 int
87 ldap_int_thread_pool_startup ( void )
88 {
89         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
90 }
91
92 int
93 ldap_int_thread_pool_shutdown ( void )
94 {
95         struct ldap_int_thread_pool_s *pool;
96
97         while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
98                 LDAP_STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
99                 ldap_pvt_thread_pool_destroy( &pool, 0);
100         }
101         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
102         return(0);
103 }
104
105 int
106 ldap_pvt_thread_pool_init (
107         ldap_pvt_thread_pool_t *tpool,
108         int max_threads,
109         int max_pending )
110 {
111         ldap_pvt_thread_pool_t pool;
112         int rc;
113
114         *tpool = NULL;
115         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
116                 sizeof(struct ldap_int_thread_pool_s));
117
118         if (pool == NULL) return(-1);
119
120         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
121         if (rc != 0)
122                 return(rc);
123         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
124         if (rc != 0)
125                 return(rc);
126         pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
127         pool->ltp_max_count = max_threads;
128         pool->ltp_max_pending = max_pending;
129         LDAP_STAILQ_INIT(&pool->ltp_pending_list);
130         LDAP_SLIST_INIT(&pool->ltp_free_list);
131         LDAP_SLIST_INIT(&pool->ltp_active_list);
132         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
133         LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
134         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
135
136 #if 0
137         /* THIS WILL NOT WORK on some systems.  If the process
138          * forks after starting a thread, there is no guarantee
139          * that the thread will survive the fork.  For example,
140          * slapd forks in order to daemonize, and does so after
141          * calling ldap_pvt_thread_pool_init.  On some systems,
142          * this initial thread does not run in the child process,
143          * but ltp_open_count == 1, so two things happen: 
144          * 1) the first client connection fails, and 2) when
145          * slapd is kill'ed, it never terminates since it waits
146          * for all worker threads to exit. */
147
148         /* start up one thread, just so there is one. no need to
149          * lock the mutex right now, since no threads are running.
150          */
151         pool->ltp_open_count++;
152
153         ldap_pvt_thread_t thr;
154         rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool );
155
156         if( rc != 0) {
157                 /* couldn't start one?  then don't start any */
158                 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
159                 LDAP_STAILQ_REMOVE(ldap_int_thread_pool_list, pool, 
160                         ldap_int_thread_pool_s, ltp_next);
161                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
162                 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
163                 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
164                 LDAP_FREE(pool);
165                 return(-1);
166         }
167 #endif
168
169         *tpool = pool;
170         return(0);
171 }
172
173 #define TID_HASH(tid, hash) do { int i; \
174         unsigned char *ptr = (unsigned char *)&(tid); \
175         for (i=0, hash=0; i<sizeof(tid); i++) hash += ptr[i]; } while(0)
176
177 int
178 ldap_pvt_thread_pool_submit (
179         ldap_pvt_thread_pool_t *tpool,
180         ldap_pvt_thread_start_t *start_routine, void *arg )
181 {
182         struct ldap_int_thread_pool_s *pool;
183         ldap_int_thread_ctx_t *ctx;
184         int need_thread = 0;
185         ldap_pvt_thread_t thr;
186
187         if (tpool == NULL)
188                 return(-1);
189
190         pool = *tpool;
191
192         if (pool == NULL)
193                 return(-1);
194
195         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
196         if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
197                 || (pool->ltp_max_pending > 0
198                         && pool->ltp_pending_count >= pool->ltp_max_pending))
199         {
200                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
201                 return(-1);
202         }
203         ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list);
204         if (ctx) {
205                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
206         } else {
207                 int i;
208                 ctx = (ldap_int_thread_ctx_t *) LDAP_MALLOC(
209                         sizeof(ldap_int_thread_ctx_t));
210                 if (ctx == NULL) {
211                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
212                         return(-1);
213                 }
214         }
215
216         ctx->ltc_start_routine = start_routine;
217         ctx->ltc_arg = arg;
218
219         pool->ltp_pending_count++;
220         LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, ctx, ltc_next.q);
221         ldap_pvt_thread_cond_signal(&pool->ltp_cond);
222         if ((pool->ltp_open_count <= 0
223 #if 0
224                         || pool->ltp_pending_count > 1
225 #endif
226                         || pool->ltp_open_count == pool->ltp_active_count)
227                 && (pool->ltp_max_count <= 0
228                         || pool->ltp_open_count < pool->ltp_max_count))
229         {
230                 pool->ltp_open_count++;
231                 pool->ltp_starting++;
232                 need_thread = 1;
233         }
234         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
235
236         if (need_thread) {
237                 int rc;
238
239                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
240
241                 rc = ldap_pvt_thread_create( &thr, 1,
242                         ldap_int_thread_pool_wrapper, pool );
243                 if (rc == 0) {
244                         int hash;
245                         pool->ltp_starting--;
246
247                         /* assign this thread ID to a key slot; start
248                          * at the thread ID itself (mod MAXTHREADS) and
249                          * look for an empty slot.
250                          */
251                         TID_HASH(thr, hash);
252                         for (rc = hash & (MAXTHREADS-1); thread_keys[rc].id;
253                                 rc = (rc+1) & (MAXTHREADS-1));
254                         thread_keys[rc].id = thr;
255                 } else {
256                         /* couldn't create thread.  back out of
257                          * ltp_open_count and check for even worse things.
258                          */
259                         pool->ltp_open_count--;
260                         pool->ltp_starting--;
261                         if (pool->ltp_open_count == 0) {
262                                 /* no open threads at all?!?
263                                  */
264                                 ldap_int_thread_ctx_t *ptr;
265                                 LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltc_next.q)
266                                         if (ptr == ctx) break;
267                                 if (ptr == ctx) {
268                                         /* no open threads, context not handled, so
269                                          * back out of ltp_pending_count, free the context,
270                                          * report the error.
271                                          */
272                                         LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, ctx, 
273                                                 ldap_int_thread_ctx_s, ltc_next.q);
274                                         pool->ltp_pending_count++;
275                                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
276                                         LDAP_FREE(ctx);
277                                         return(-1);
278                                 }
279                         }
280                         /* there is another open thread, so this
281                          * context will be handled eventually.
282                          * continue on and signal that the context
283                          * is waiting.
284                          */
285                 }
286                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
287         }
288
289         return(0);
290 }
291
292 int
293 ldap_pvt_thread_pool_maxthreads ( ldap_pvt_thread_pool_t *tpool, int max_threads )
294 {
295         struct ldap_int_thread_pool_s *pool;
296
297         if (tpool == NULL)
298                 return(-1);
299
300         pool = *tpool;
301
302         if (pool == NULL)
303                 return(-1);
304
305         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
306         pool->ltp_max_count = max_threads;
307         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
308         return(0);
309 }
310
311 int
312 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
313 {
314         struct ldap_int_thread_pool_s *pool;
315         int count;
316
317         if (tpool == NULL)
318                 return(-1);
319
320         pool = *tpool;
321
322         if (pool == NULL)
323                 return(0);
324
325         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
326         count = pool->ltp_pending_count + pool->ltp_active_count;
327         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
328         return(count);
329 }
330
331 int
332 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
333 {
334         struct ldap_int_thread_pool_s *pool, *pptr;
335         long waiting;
336         ldap_int_thread_ctx_t *ctx;
337
338         if (tpool == NULL)
339                 return(-1);
340
341         pool = *tpool;
342
343         if (pool == NULL) return(-1);
344
345         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
346         LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
347                 if (pptr == pool) break;
348         if (pptr == pool)
349                 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
350                         ldap_int_thread_pool_s, ltp_next);
351         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
352
353         if (pool != pptr) return(-1);
354
355         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
356         pool->ltp_state = run_pending
357                 ? LDAP_INT_THREAD_POOL_FINISHING
358                 : LDAP_INT_THREAD_POOL_STOPPING;
359
360         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
361         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
362
363         do {
364                 ldap_pvt_thread_yield();
365                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
366                 waiting = pool->ltp_open_count;
367                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
368         } while (waiting > 0);
369
370         while ((ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
371         {
372                 LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
373                 LDAP_FREE(ctx);
374         }
375
376         while ((ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
377         {
378                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
379                 LDAP_FREE(ctx);
380         }
381
382         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
383         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
384         LDAP_FREE(pool);
385         return(0);
386 }
387
388 static void *
389 ldap_int_thread_pool_wrapper ( 
390         void *xpool )
391 {
392         struct ldap_int_thread_pool_s *pool = xpool;
393         ldap_int_thread_ctx_t *ctx;
394         ldap_int_thread_key_t ltc_key[MAXKEYS];
395         ldap_pvt_thread_t tid;
396         int i, keyslot, hash;
397
398         if (pool == NULL)
399                 return NULL;
400
401         for ( i=0; i<MAXKEYS; i++ ) {
402                 ltc_key[i].ltk_key = NULL;
403         }
404
405         tid = ldap_pvt_thread_self();
406
407         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
408
409         /* store pointer to our keys */
410         TID_HASH(tid, hash);
411         for (i = hash & (MAXTHREADS-1); thread_keys[i].id != tid;
412                                 i = (i+1) & (MAXTHREADS-1));
413         thread_keys[i].ctx = ltc_key;
414         keyslot = i;
415
416         while (pool->ltp_state != LDAP_INT_THREAD_POOL_STOPPING) {
417                 ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list);
418                 if (ctx) {
419                         LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
420                 } else {
421                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
422                                 break;
423                         if (pool->ltp_max_count > 0
424                                 && pool->ltp_open_count > pool->ltp_max_count)
425                         {
426                                 /* too many threads running (can happen if the
427                                  * maximum threads value is set during ongoing
428                                  * operation using ldap_pvt_thread_pool_maxthreads)
429                                  * so let this thread die.
430                                  */
431                                 break;
432                         }
433
434                         /* we could check an idle timer here, and let the
435                          * thread die if it has been inactive for a while.
436                          * only die if there are other open threads (i.e.,
437                          * always have at least one thread open).  the check
438                          * should be like this:
439                          *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
440                          *       check timer, leave thread (break;)
441                          */
442
443                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
444                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
445
446                         continue;
447                 }
448
449                 pool->ltp_pending_count--;
450
451                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_active_list, ctx, ltc_next.al);
452                 pool->ltp_active_count++;
453                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
454
455                 ctx->ltc_start_routine(ltc_key, ctx->ltc_arg);
456
457                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
458                 LDAP_SLIST_REMOVE(&pool->ltp_active_list, ctx,
459                         ldap_int_thread_ctx_s, ltc_next.al);
460                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, ctx, ltc_next.l);
461                 pool->ltp_active_count--;
462                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
463
464                 ldap_pvt_thread_yield();
465
466                 /* if we use an idle timer, here's
467                  * a good place to update it
468                  */
469
470                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
471         }
472
473         for ( i=0; i<MAXKEYS && ltc_key[i].ltk_key; i++ ) {
474                 if (ltc_key[i].ltk_free)
475                         ltc_key[i].ltk_free(
476                                 ltc_key[i].ltk_key,
477                                 ltc_key[i].ltk_data );
478         }
479
480         thread_keys[keyslot].ctx = NULL;
481         thread_keys[keyslot].id = 0;
482
483         pool->ltp_open_count--;
484         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
485
486         ldap_pvt_thread_exit(NULL);
487         return(NULL);
488 }
489
490 int ldap_pvt_thread_pool_getkey(
491         void *xctx,
492         void *key,
493         void **data,
494         ldap_pvt_thread_pool_keyfree_t **kfree )
495 {
496         ldap_int_thread_key_t *ctx = xctx;
497         int i;
498
499         if ( !ctx || !data ) return EINVAL;
500
501         for ( i=0; i<MAXKEYS && ctx[i].ltk_key; i++ ) {
502                 if ( ctx[i].ltk_key == key ) {
503                         *data = ctx[i].ltk_data;
504                         if ( kfree ) *kfree = ctx[i].ltk_free;
505                         return 0;
506                 }
507         }
508         return ENOENT;
509 }
510
511 int ldap_pvt_thread_pool_setkey(
512         void *xctx,
513         void *key,
514         void *data,
515         ldap_pvt_thread_pool_keyfree_t *kfree )
516 {
517         ldap_int_thread_key_t *ctx = xctx;
518         int i;
519
520         if ( !ctx || !key ) return EINVAL;
521
522         for ( i=0; i<MAXKEYS; i++ ) {
523                 if ( !ctx[i].ltk_key || ctx[i].ltk_key == key ) {
524                         ctx[i].ltk_key = key;
525                         ctx[i].ltk_data = data;
526                         ctx[i].ltk_free = kfree;
527                         return 0;
528                 }
529         }
530         return ENOMEM;
531 }
532
533 /*
534  * This is necessary if the caller does not have access to the
535  * thread context handle (for example, a slapd plugin calling
536  * slapi_search_internal()). No doubt it is more efficient to
537  * for the application to keep track of the thread context
538  * handles itself.
539  */
540 void *ldap_pvt_thread_pool_context( )
541 {
542         ldap_pvt_thread_t tid;
543         int i, hash;
544
545         tid = ldap_pvt_thread_self();
546
547         TID_HASH( tid, hash );
548         for (i = hash & (MAXTHREADS-1); thread_keys[i].id &&
549                 thread_keys[i].id != tid; i = (i+1) & (MAXTHREADS-1));
550
551         return thread_keys[i].ctx;
552 }
553
554 #endif /* LDAP_HAVE_THREAD_POOL */