]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
c6eeed959cf85b1c8c823e4741c0ce944bb4668c
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /*
3  * Copyright 1998-2003 The OpenLDAP Foundation, Redwood City, California, USA
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms are permitted only
7  * as authorized by the OpenLDAP Public License.  A copy of this
8  * license is available at http://www.OpenLDAP.org/license.html or
9  * in file LICENSE in the top-level directory of the distribution.
10  */
11
12 #include "portable.h"
13
14 #include <stdio.h>
15
16 #include <ac/stdarg.h>
17 #include <ac/stdlib.h>
18 #include <ac/string.h>
19 #include <ac/time.h>
20 #include <ac/errno.h>
21
22 #include "ldap-int.h"
23 #include "ldap_pvt_thread.h"
24 #include "ldap_queue.h"
25
26 #ifndef LDAP_THREAD_HAVE_TPOOL
27
28 enum ldap_int_thread_pool_state {
29         LDAP_INT_THREAD_POOL_RUNNING,
30         LDAP_INT_THREAD_POOL_FINISHING,
31         LDAP_INT_THREAD_POOL_STOPPING
32 };
33
34 typedef struct ldap_int_thread_key_s {
35         void *ltk_key;
36         void *ltk_data;
37         ldap_pvt_thread_pool_keyfree_t *ltk_free;
38 } ldap_int_thread_key_t;
39
40 /* Max number of thread-specific keys we store per thread.
41  * We don't expect to use many...
42  */
43 #define MAXKEYS 32
44 #define MAXTHREADS      1024    /* must be a power of 2 */
45
46 static struct {
47         ldap_pvt_thread_t id;
48         ldap_int_thread_key_t *ctx;
49 } thread_keys[MAXTHREADS];
50         
51
52 typedef struct ldap_int_thread_ctx_s {
53         union {
54         LDAP_STAILQ_ENTRY(ldap_int_thread_ctx_s) q;
55         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) l;
56         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) al;
57         } ltc_next;
58         ldap_pvt_thread_start_t *ltc_start_routine;
59         void *ltc_arg;
60 } ldap_int_thread_ctx_t;
61
62 struct ldap_int_thread_pool_s {
63         LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
64         ldap_pvt_thread_mutex_t ltp_mutex;
65         ldap_pvt_thread_cond_t ltp_cond;
66         LDAP_STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
67         LDAP_SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
68         LDAP_SLIST_HEAD(tclq, ldap_int_thread_ctx_s) ltp_active_list;
69         long ltp_state;
70         long ltp_max_count;
71         long ltp_max_pending;
72         long ltp_pending_count;
73         long ltp_active_count;
74         long ltp_open_count;
75         long ltp_starting;
76 };
77
78 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
79         ldap_int_thread_pool_list =
80         LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
81
82 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
83
84 static void *ldap_int_thread_pool_wrapper( void *pool );
85
86 int
87 ldap_int_thread_pool_startup ( void )
88 {
89         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
90 }
91
92 int
93 ldap_int_thread_pool_shutdown ( void )
94 {
95         struct ldap_int_thread_pool_s *pool;
96
97         while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
98                 LDAP_STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
99                 ldap_pvt_thread_pool_destroy( &pool, 0);
100         }
101         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
102         return(0);
103 }
104
105 int
106 ldap_pvt_thread_pool_init (
107         ldap_pvt_thread_pool_t *tpool,
108         int max_threads,
109         int max_pending )
110 {
111         ldap_pvt_thread_pool_t pool;
112         int rc;
113
114         *tpool = NULL;
115         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
116                 sizeof(struct ldap_int_thread_pool_s));
117
118         if (pool == NULL) return(-1);
119
120         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
121         if (rc != 0)
122                 return(rc);
123         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
124         if (rc != 0)
125                 return(rc);
126         pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
127         pool->ltp_max_count = max_threads;
128         pool->ltp_max_pending = max_pending;
129         LDAP_STAILQ_INIT(&pool->ltp_pending_list);
130         LDAP_SLIST_INIT(&pool->ltp_free_list);
131         LDAP_SLIST_INIT(&pool->ltp_active_list);
132         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
133         LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
134         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
135
136 #if 0
137         /* THIS WILL NOT WORK on some systems.  If the process
138          * forks after starting a thread, there is no guarantee
139          * that the thread will survive the fork.  For example,
140          * slapd forks in order to daemonize, and does so after
141          * calling ldap_pvt_thread_pool_init.  On some systems,
142          * this initial thread does not run in the child process,
143          * but ltp_open_count == 1, so two things happen: 
144          * 1) the first client connection fails, and 2) when
145          * slapd is kill'ed, it never terminates since it waits
146          * for all worker threads to exit. */
147
148         /* start up one thread, just so there is one. no need to
149          * lock the mutex right now, since no threads are running.
150          */
151         pool->ltp_open_count++;
152
153         ldap_pvt_thread_t thr;
154         rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool );
155
156         if( rc != 0) {
157                 /* couldn't start one?  then don't start any */
158                 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
159                 LDAP_STAILQ_REMOVE(ldap_int_thread_pool_list, pool, 
160                         ldap_int_thread_pool_s, ltp_next);
161                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
162                 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
163                 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
164                 LDAP_FREE(pool);
165                 return(-1);
166         }
167 #endif
168
169         *tpool = pool;
170         return(0);
171 }
172
173 int
174 ldap_pvt_thread_pool_submit (
175         ldap_pvt_thread_pool_t *tpool,
176         ldap_pvt_thread_start_t *start_routine, void *arg )
177 {
178         struct ldap_int_thread_pool_s *pool;
179         ldap_int_thread_ctx_t *ctx;
180         int need_thread = 0;
181         ldap_pvt_thread_t thr;
182
183         if (tpool == NULL)
184                 return(-1);
185
186         pool = *tpool;
187
188         if (pool == NULL)
189                 return(-1);
190
191         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
192         if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
193                 || (pool->ltp_max_pending > 0
194                         && pool->ltp_pending_count >= pool->ltp_max_pending))
195         {
196                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
197                 return(-1);
198         }
199         ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list);
200         if (ctx) {
201                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
202         } else {
203                 int i;
204                 ctx = (ldap_int_thread_ctx_t *) LDAP_MALLOC(
205                         sizeof(ldap_int_thread_ctx_t));
206                 if (ctx == NULL) {
207                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
208                         return(-1);
209                 }
210         }
211
212         ctx->ltc_start_routine = start_routine;
213         ctx->ltc_arg = arg;
214
215         pool->ltp_pending_count++;
216         LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, ctx, ltc_next.q);
217         ldap_pvt_thread_cond_signal(&pool->ltp_cond);
218         if ((pool->ltp_open_count <= 0
219 #if 0
220                         || pool->ltp_pending_count > 1
221 #endif
222                         || pool->ltp_open_count == pool->ltp_active_count)
223                 && (pool->ltp_max_count <= 0
224                         || pool->ltp_open_count < pool->ltp_max_count))
225         {
226                 pool->ltp_open_count++;
227                 pool->ltp_starting++;
228                 need_thread = 1;
229         }
230         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
231
232         if (need_thread) {
233                 int rc;
234
235                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
236
237                 rc = ldap_pvt_thread_create( &thr, 1,
238                         ldap_int_thread_pool_wrapper, pool );
239                 if (rc == 0) {
240                         pool->ltp_starting--;
241
242                         /* assign this thread ID to a key slot; start
243                          * at the thread ID itself (mod MAXTHREADS) and
244                          * look for an empty slot.
245                          */
246                         for (rc = thr & (MAXTHREADS-1); thread_keys[rc].id;
247                                 rc = (rc+1) & (MAXTHREADS-1));
248                         thread_keys[rc].id = thr;
249                 } else {
250                         /* couldn't create thread.  back out of
251                          * ltp_open_count and check for even worse things.
252                          */
253                         pool->ltp_open_count--;
254                         pool->ltp_starting--;
255                         if (pool->ltp_open_count == 0) {
256                                 /* no open threads at all?!?
257                                  */
258                                 ldap_int_thread_ctx_t *ptr;
259                                 LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltc_next.q)
260                                         if (ptr == ctx) break;
261                                 if (ptr == ctx) {
262                                         /* no open threads, context not handled, so
263                                          * back out of ltp_pending_count, free the context,
264                                          * report the error.
265                                          */
266                                         LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, ctx, 
267                                                 ldap_int_thread_ctx_s, ltc_next.q);
268                                         pool->ltp_pending_count++;
269                                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
270                                         LDAP_FREE(ctx);
271                                         return(-1);
272                                 }
273                         }
274                         /* there is another open thread, so this
275                          * context will be handled eventually.
276                          * continue on and signal that the context
277                          * is waiting.
278                          */
279                 }
280                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
281         }
282
283         return(0);
284 }
285
286 int
287 ldap_pvt_thread_pool_maxthreads ( ldap_pvt_thread_pool_t *tpool, int max_threads )
288 {
289         struct ldap_int_thread_pool_s *pool;
290
291         if (tpool == NULL)
292                 return(-1);
293
294         pool = *tpool;
295
296         if (pool == NULL)
297                 return(-1);
298
299         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
300         pool->ltp_max_count = max_threads;
301         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
302         return(0);
303 }
304
305 int
306 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
307 {
308         struct ldap_int_thread_pool_s *pool;
309         int count;
310
311         if (tpool == NULL)
312                 return(-1);
313
314         pool = *tpool;
315
316         if (pool == NULL)
317                 return(0);
318
319         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
320         count = pool->ltp_pending_count + pool->ltp_active_count;
321         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
322         return(count);
323 }
324
325 int
326 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
327 {
328         struct ldap_int_thread_pool_s *pool, *pptr;
329         long waiting;
330         ldap_int_thread_ctx_t *ctx;
331
332         if (tpool == NULL)
333                 return(-1);
334
335         pool = *tpool;
336
337         if (pool == NULL) return(-1);
338
339         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
340         LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
341                 if (pptr == pool) break;
342         if (pptr == pool)
343                 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
344                         ldap_int_thread_pool_s, ltp_next);
345         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
346
347         if (pool != pptr) return(-1);
348
349         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
350         pool->ltp_state = run_pending
351                 ? LDAP_INT_THREAD_POOL_FINISHING
352                 : LDAP_INT_THREAD_POOL_STOPPING;
353
354         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
355         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
356
357         do {
358                 ldap_pvt_thread_yield();
359                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
360                 waiting = pool->ltp_open_count;
361                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
362         } while (waiting > 0);
363
364         while ((ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
365         {
366                 LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
367                 LDAP_FREE(ctx);
368         }
369
370         while ((ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
371         {
372                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
373                 LDAP_FREE(ctx);
374         }
375
376         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
377         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
378         LDAP_FREE(pool);
379         return(0);
380 }
381
382 static void *
383 ldap_int_thread_pool_wrapper ( 
384         void *xpool )
385 {
386         struct ldap_int_thread_pool_s *pool = xpool;
387         ldap_int_thread_ctx_t *ctx;
388         ldap_int_thread_key_t ltc_key[MAXKEYS];
389         ldap_pvt_thread_t tid;
390         int i, keyslot;
391
392         if (pool == NULL)
393                 return NULL;
394
395         for ( i=0; i<MAXKEYS; i++ ) {
396                 ltc_key[i].ltk_key = NULL;
397         }
398
399         tid = ldap_pvt_thread_self();
400
401         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
402
403         /* store pointer to our keys */
404         for (i = tid & (MAXTHREADS-1); thread_keys[i].id != tid;
405                                 i = (i+1) & (MAXTHREADS-1));
406         thread_keys[i].ctx = ltc_key;
407         keyslot = i;
408
409         while (pool->ltp_state != LDAP_INT_THREAD_POOL_STOPPING) {
410                 ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list);
411                 if (ctx) {
412                         LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
413                 } else {
414                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
415                                 break;
416                         if (pool->ltp_max_count > 0
417                                 && pool->ltp_open_count > pool->ltp_max_count)
418                         {
419                                 /* too many threads running (can happen if the
420                                  * maximum threads value is set during ongoing
421                                  * operation using ldap_pvt_thread_pool_maxthreads)
422                                  * so let this thread die.
423                                  */
424                                 break;
425                         }
426
427                         /* we could check an idle timer here, and let the
428                          * thread die if it has been inactive for a while.
429                          * only die if there are other open threads (i.e.,
430                          * always have at least one thread open).  the check
431                          * should be like this:
432                          *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
433                          *       check timer, leave thread (break;)
434                          */
435
436                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
437                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
438
439                         continue;
440                 }
441
442                 pool->ltp_pending_count--;
443
444                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_active_list, ctx, ltc_next.al);
445                 pool->ltp_active_count++;
446                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
447
448                 ctx->ltc_start_routine(ltc_key, ctx->ltc_arg);
449
450                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
451                 LDAP_SLIST_REMOVE(&pool->ltp_active_list, ctx,
452                         ldap_int_thread_ctx_s, ltc_next.al);
453                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, ctx, ltc_next.l);
454                 pool->ltp_active_count--;
455                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
456
457                 ldap_pvt_thread_yield();
458
459                 /* if we use an idle timer, here's
460                  * a good place to update it
461                  */
462
463                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
464         }
465
466         for ( i=0; i<MAXKEYS && ltc_key[i].ltk_key; i++ ) {
467                 if (ltc_key[i].ltk_free)
468                         ltc_key[i].ltk_free(
469                                 ltc_key[i].ltk_key,
470                                 ltc_key[i].ltk_data );
471         }
472
473         thread_keys[keyslot].ctx = NULL;
474         thread_keys[keyslot].id = 0;
475
476         pool->ltp_open_count--;
477         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
478
479         ldap_pvt_thread_exit(NULL);
480         return(NULL);
481 }
482
483 int ldap_pvt_thread_pool_getkey(
484         void *xctx,
485         void *key,
486         void **data,
487         ldap_pvt_thread_pool_keyfree_t **kfree )
488 {
489         ldap_int_thread_key_t *ctx = xctx;
490         int i;
491
492         if ( !ctx || !data ) return EINVAL;
493
494         for ( i=0; i<MAXKEYS && ctx[i].ltk_key; i++ ) {
495                 if ( ctx[i].ltk_key == key ) {
496                         *data = ctx[i].ltk_data;
497                         if ( kfree ) *kfree = ctx[i].ltk_free;
498                         return 0;
499                 }
500         }
501         return ENOENT;
502 }
503
504 int ldap_pvt_thread_pool_setkey(
505         void *xctx,
506         void *key,
507         void *data,
508         ldap_pvt_thread_pool_keyfree_t *kfree )
509 {
510         ldap_int_thread_key_t *ctx = xctx;
511         int i;
512
513         if ( !ctx || !key ) return EINVAL;
514
515         for ( i=0; i<MAXKEYS; i++ ) {
516                 if ( !ctx[i].ltk_key || ctx[i].ltk_key == key ) {
517                         ctx[i].ltk_key = key;
518                         ctx[i].ltk_data = data;
519                         ctx[i].ltk_free = kfree;
520                         return 0;
521                 }
522         }
523         return ENOMEM;
524 }
525
526 /*
527  * This is necessary if the caller does not have access to the
528  * thread context handle (for example, a slapd plugin calling
529  * slapi_search_internal()). No doubt it is more efficient to
530  * for the application to keep track of the thread context
531  * handles itself.
532  */
533 void *ldap_pvt_thread_pool_context( )
534 {
535         ldap_pvt_thread_t tid;
536         int i;
537
538         tid = ldap_pvt_thread_self();
539
540         for (i = tid & (MAXTHREADS-1); thread_keys[i].id &&
541                 thread_keys[i].id != tid; i = (i+1) & (MAXTHREADS-1));
542
543         return thread_keys[i].ctx;
544 }
545
546 #endif /* LDAP_HAVE_THREAD_POOL */