]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
Changes
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /*
3  * Copyright 1998-2002 The OpenLDAP Foundation, Redwood City, California, USA
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms are permitted only
7  * as authorized by the OpenLDAP Public License.  A copy of this
8  * license is available at http://www.OpenLDAP.org/license.html or
9  * in file LICENSE in the top-level directory of the distribution.
10  */
11
12 #include "portable.h"
13
14 #include <stdio.h>
15
16 #include <ac/stdarg.h>
17 #include <ac/stdlib.h>
18 #include <ac/string.h>
19 #include <ac/time.h>
20 #include <ac/errno.h>
21
22 #include "ldap-int.h"
23 #include "ldap_pvt_thread.h"
24 #include "ldap_queue.h"
25
26 #ifndef LDAP_THREAD_HAVE_TPOOL
27
28 enum ldap_int_thread_pool_state {
29         LDAP_INT_THREAD_POOL_RUNNING,
30         LDAP_INT_THREAD_POOL_FINISHING,
31         LDAP_INT_THREAD_POOL_STOPPING
32 };
33
34 typedef struct ldap_int_thread_key_s {
35         void *ltk_key;
36         void *ltk_data;
37         ldap_pvt_thread_pool_keyfree_t *ltk_free;
38 } ldap_int_thread_key_t;
39
40 /* Max number of thread-specific keys we store per thread.
41  * We don't expect to use many...
42  */
43 #define MAXKEYS 32
44
45 typedef struct ldap_int_thread_ctx_s {
46         union {
47         LDAP_STAILQ_ENTRY(ldap_int_thread_ctx_s) q;
48         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) l;
49         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) al;
50         } ltc_next;
51         ldap_pvt_thread_start_t *ltc_start_routine;
52         void *ltc_arg;
53         ldap_pvt_thread_t ltc_thread_id;
54         ldap_int_thread_key_t ltc_key[MAXKEYS];
55 } ldap_int_thread_ctx_t;
56
57 struct ldap_int_thread_pool_s {
58         LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
59         ldap_pvt_thread_mutex_t ltp_mutex;
60         ldap_pvt_thread_cond_t ltp_cond;
61         LDAP_STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
62         LDAP_SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
63         LDAP_SLIST_HEAD(tclq, ldap_int_thread_ctx_s) ltp_active_list;
64         long ltp_state;
65         long ltp_max_count;
66         long ltp_max_pending;
67         long ltp_pending_count;
68         long ltp_active_count;
69         long ltp_open_count;
70         long ltp_starting;
71 };
72
73 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
74         ldap_int_thread_pool_list =
75         LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
76
77 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
78
79 static void *ldap_int_thread_pool_wrapper( void *pool );
80
81 int
82 ldap_int_thread_pool_startup ( void )
83 {
84         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
85 }
86
87 int
88 ldap_int_thread_pool_shutdown ( void )
89 {
90         struct ldap_int_thread_pool_s *pool;
91
92         while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
93                 LDAP_STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
94                 ldap_pvt_thread_pool_destroy( &pool, 0);
95         }
96         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
97         return(0);
98 }
99
100 int
101 ldap_pvt_thread_pool_init (
102         ldap_pvt_thread_pool_t *tpool,
103         int max_threads,
104         int max_pending )
105 {
106         ldap_pvt_thread_pool_t pool;
107         int rc;
108
109         *tpool = NULL;
110         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
111                 sizeof(struct ldap_int_thread_pool_s));
112
113         if (pool == NULL) return(-1);
114
115         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
116         if (rc != 0)
117                 return(rc);
118         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
119         if (rc != 0)
120                 return(rc);
121         pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
122         pool->ltp_max_count = max_threads;
123         pool->ltp_max_pending = max_pending;
124         LDAP_STAILQ_INIT(&pool->ltp_pending_list);
125         LDAP_SLIST_INIT(&pool->ltp_free_list);
126         LDAP_SLIST_INIT(&pool->ltp_active_list);
127         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
128         LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
129         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
130
131 #if 0
132         /* THIS WILL NOT WORK on some systems.  If the process
133          * forks after starting a thread, there is no guarantee
134          * that the thread will survive the fork.  For example,
135          * slapd forks in order to daemonize, and does so after
136          * calling ldap_pvt_thread_pool_init.  On some systems,
137          * this initial thread does not run in the child process,
138          * but ltp_open_count == 1, so two things happen: 
139          * 1) the first client connection fails, and 2) when
140          * slapd is kill'ed, it never terminates since it waits
141          * for all worker threads to exit. */
142
143         /* start up one thread, just so there is one. no need to
144          * lock the mutex right now, since no threads are running.
145          */
146         pool->ltp_open_count++;
147
148         ldap_pvt_thread_t thr;
149         rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool );
150
151         if( rc != 0) {
152                 /* couldn't start one?  then don't start any */
153                 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
154                 LDAP_STAILQ_REMOVE(ldap_int_thread_pool_list, pool, 
155                         ldap_int_thread_pool_s, ltp_next);
156                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
157                 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
158                 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
159                 LDAP_FREE(pool);
160                 return(-1);
161         }
162 #endif
163
164         *tpool = pool;
165         return(0);
166 }
167
168 int
169 ldap_pvt_thread_pool_submit (
170         ldap_pvt_thread_pool_t *tpool,
171         ldap_pvt_thread_start_t *start_routine, void *arg )
172 {
173         struct ldap_int_thread_pool_s *pool;
174         ldap_int_thread_ctx_t *ctx;
175         int need_thread = 0;
176         ldap_pvt_thread_t thr;
177
178         if (tpool == NULL)
179                 return(-1);
180
181         pool = *tpool;
182
183         if (pool == NULL)
184                 return(-1);
185
186         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
187         if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
188                 || (pool->ltp_max_pending > 0
189                         && pool->ltp_pending_count >= pool->ltp_max_pending))
190         {
191                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
192                 return(-1);
193         }
194         ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list);
195         if (ctx) {
196                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
197         } else {
198                 int i;
199                 ctx = (ldap_int_thread_ctx_t *) LDAP_MALLOC(
200                         sizeof(ldap_int_thread_ctx_t));
201                 if (ctx == NULL) {
202                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
203                         return(-1);
204                 }
205                 for ( i=0; i<MAXKEYS; i++ ) {
206                         ctx->ltc_key[i].ltk_key = NULL;
207                 }
208         }
209
210         ctx->ltc_start_routine = start_routine;
211         ctx->ltc_arg = arg;
212
213         pool->ltp_pending_count++;
214         LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, ctx, ltc_next.q);
215         ldap_pvt_thread_cond_signal(&pool->ltp_cond);
216         if ((pool->ltp_open_count <= 0
217                         || pool->ltp_pending_count > 1
218                         || pool->ltp_open_count == pool->ltp_active_count)
219                 && (pool->ltp_max_count <= 0
220                         || pool->ltp_open_count < pool->ltp_max_count))
221         {
222                 pool->ltp_open_count++;
223                 pool->ltp_starting++;
224                 need_thread = 1;
225         }
226         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
227
228         if (need_thread) {
229                 int rc = ldap_pvt_thread_create( &thr, 1,
230                         ldap_int_thread_pool_wrapper, pool );
231                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
232                 if (rc == 0) {
233                         pool->ltp_starting--;
234                 } else {
235                         /* couldn't create thread.  back out of
236                          * ltp_open_count and check for even worse things.
237                          */
238                         pool->ltp_open_count--;
239                         pool->ltp_starting--;
240                         if (pool->ltp_open_count == 0) {
241                                 /* no open threads at all?!?
242                                  */
243                                 ldap_int_thread_ctx_t *ptr;
244                                 LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltc_next.q)
245                                         if (ptr == ctx) break;
246                                 if (ptr == ctx) {
247                                         /* no open threads, context not handled, so
248                                          * back out of ltp_pending_count, free the context,
249                                          * report the error.
250                                          */
251                                         LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, ctx, 
252                                                 ldap_int_thread_ctx_s, ltc_next.q);
253                                         pool->ltp_pending_count++;
254                                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
255                                         LDAP_FREE(ctx);
256                                         return(-1);
257                                 }
258                         }
259                         /* there is another open thread, so this
260                          * context will be handled eventually.
261                          * continue on and signal that the context
262                          * is waiting.
263                          */
264                 }
265                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
266         }
267
268         return(0);
269 }
270
271 int
272 ldap_pvt_thread_pool_maxthreads ( ldap_pvt_thread_pool_t *tpool, int max_threads )
273 {
274         struct ldap_int_thread_pool_s *pool;
275
276         if (tpool == NULL)
277                 return(-1);
278
279         pool = *tpool;
280
281         if (pool == NULL)
282                 return(-1);
283
284         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
285         pool->ltp_max_count = max_threads;
286         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
287         return(0);
288 }
289
290 int
291 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
292 {
293         struct ldap_int_thread_pool_s *pool;
294         int count;
295
296         if (tpool == NULL)
297                 return(-1);
298
299         pool = *tpool;
300
301         if (pool == NULL)
302                 return(0);
303
304         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
305         count = pool->ltp_pending_count + pool->ltp_active_count;
306         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
307         return(count);
308 }
309
310 static void ldap_int_thread_pool_keyfree( ldap_int_thread_ctx_t *ctx )
311 {
312         int i;
313         for ( i=0; i<MAXKEYS && ctx->ltc_key[i].ltk_key; i++ ) {
314                 if (ctx->ltc_key[i].ltk_free)
315                         ctx->ltc_key[i].ltk_free(
316                                 ctx->ltc_key[i].ltk_key,
317                                 ctx->ltc_key[i].ltk_data );
318         }
319 }
320
321 int
322 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
323 {
324         struct ldap_int_thread_pool_s *pool, *pptr;
325         long waiting;
326         ldap_int_thread_ctx_t *ctx;
327
328         if (tpool == NULL)
329                 return(-1);
330
331         pool = *tpool;
332
333         if (pool == NULL) return(-1);
334
335         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
336         LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
337                 if (pptr == pool) break;
338         if (pptr == pool)
339                 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
340                         ldap_int_thread_pool_s, ltp_next);
341         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
342
343         if (pool != pptr) return(-1);
344
345         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
346         pool->ltp_state = run_pending
347                 ? LDAP_INT_THREAD_POOL_FINISHING
348                 : LDAP_INT_THREAD_POOL_STOPPING;
349         waiting = pool->ltp_open_count;
350
351         /* broadcast could be used here, but only after
352          * it is fixed in the NT thread implementation
353          */
354         while (--waiting >= 0) {
355                 ldap_pvt_thread_cond_signal(&pool->ltp_cond);
356         }
357         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
358
359         do {
360                 ldap_pvt_thread_yield();
361                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
362                 waiting = pool->ltp_open_count;
363                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
364         } while (waiting > 0);
365
366         while ((ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
367         {
368                 LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
369                 ldap_int_thread_pool_keyfree( ctx );
370                 LDAP_FREE(ctx);
371         }
372
373         while ((ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
374         {
375                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
376                 ldap_int_thread_pool_keyfree( ctx );
377                 LDAP_FREE(ctx);
378         }
379
380         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
381         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
382         LDAP_FREE(pool);
383         return(0);
384 }
385
386 static void *
387 ldap_int_thread_pool_wrapper ( 
388         void *xpool )
389 {
390         struct ldap_int_thread_pool_s *pool = xpool;
391         ldap_int_thread_ctx_t *ctx;
392
393         if (pool == NULL)
394                 return NULL;
395
396         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
397
398         while (pool->ltp_state != LDAP_INT_THREAD_POOL_STOPPING) {
399                 ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list);
400                 if (ctx) {
401                         LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
402                 } else {
403                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
404                                 break;
405                         if (pool->ltp_max_count > 0
406                                 && pool->ltp_open_count > pool->ltp_max_count)
407                         {
408                                 /* too many threads running (can happen if the
409                                  * maximum threads value is set during ongoing
410                                  * operation using ldap_pvt_thread_pool_maxthreads)
411                                  * so let this thread die.
412                                  */
413                                 break;
414                         }
415
416                         /* we could check an idle timer here, and let the
417                          * thread die if it has been inactive for a while.
418                          * only die if there are other open threads (i.e.,
419                          * always have at least one thread open).  the check
420                          * should be like this:
421                          *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
422                          *       check timer, leave thread (break;)
423                          */
424
425                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
426                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
427
428                         continue;
429                 }
430
431                 pool->ltp_pending_count--;
432                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_active_list, ctx, ltc_next.al);
433                 pool->ltp_active_count++;
434                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
435
436                 ctx->ltc_thread_id = ldap_pvt_thread_self();
437                 ctx->ltc_start_routine(ctx, ctx->ltc_arg);
438
439                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
440                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, ctx, ltc_next.l);
441                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
442
443                 ldap_pvt_thread_yield();
444
445                 /* if we use an idle timer, here's
446                  * a good place to update it
447                  */
448
449                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
450                 pool->ltp_active_count--;
451                 ctx = LDAP_SLIST_FIRST(&pool->ltp_active_list);
452                 if (ctx) {
453                         LDAP_SLIST_REMOVE_HEAD(&pool->ltp_active_list, ltc_next.al);
454                 }
455         }
456
457         pool->ltp_open_count--;
458         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
459
460
461         ldap_pvt_thread_exit(NULL);
462         return(NULL);
463 }
464
465 int ldap_pvt_thread_pool_getkey(
466         void *xctx,
467         void *key,
468         void **data,
469         ldap_pvt_thread_pool_keyfree_t **kfree )
470 {
471         ldap_int_thread_ctx_t *ctx = xctx;
472         int i;
473
474         if ( !ctx || !data ) return EINVAL;
475
476         for ( i=0; i<MAXKEYS && ctx->ltc_key[i].ltk_key; i++ ) {
477                 if ( ctx->ltc_key[i].ltk_key == key ) {
478                         *data = ctx->ltc_key[i].ltk_data;
479                         if ( kfree ) *kfree = ctx->ltc_key[i].ltk_free;
480                         return 0;
481                 }
482         }
483         return ENOENT;
484 }
485
486 int ldap_pvt_thread_pool_setkey(
487         void *xctx,
488         void *key,
489         void *data,
490         ldap_pvt_thread_pool_keyfree_t *kfree )
491 {
492         ldap_int_thread_ctx_t *ctx = xctx;
493         int i;
494
495         if ( !ctx || !key ) return EINVAL;
496
497         for ( i=0; i<MAXKEYS; i++ ) {
498                 if ( !ctx->ltc_key[i].ltk_key || ctx->ltc_key[i].ltk_key == key ) {
499                         ctx->ltc_key[i].ltk_key = key;
500                         ctx->ltc_key[i].ltk_data = data;
501                         ctx->ltc_key[i].ltk_free = kfree;
502                         return 0;
503                 }
504         }
505         return ENOMEM;
506 }
507
508 /*
509  * This is necessary if the caller does not have access to the
510  * thread context handle (for example, a slapd plugin calling
511  * slapi_search_internal()). No doubt it is more efficient to
512  * for the application to keep track of the thread context
513  * handles itself.
514  */
515 void *ldap_pvt_thread_pool_context( ldap_pvt_thread_pool_t *tpool )
516 {
517         ldap_pvt_thread_pool_t pool;
518         ldap_pvt_thread_t tid;
519         ldap_int_thread_ctx_t *ptr;
520
521         pool = *tpool;
522         if (pool == NULL) {
523                 return NULL;
524         }
525
526         tid = ldap_pvt_thread_self();
527
528         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
529         LDAP_SLIST_FOREACH(ptr, &pool->ltp_active_list, ltc_next.al)
530                 if (ptr != NULL && ptr->ltc_thread_id == tid) break;
531         if (ptr != NULL && ptr->ltc_thread_id != tid) {
532                 ptr = NULL;
533         }
534         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
535
536         return ptr;
537 }
538
539 #endif /* LDAP_HAVE_THREAD_POOL */