]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
ITS#2404 fix from HEAD
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /*
3  * Copyright 1998-2003 The OpenLDAP Foundation, Redwood City, California, USA
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms are permitted only
7  * as authorized by the OpenLDAP Public License.  A copy of this
8  * license is available at http://www.OpenLDAP.org/license.html or
9  * in file LICENSE in the top-level directory of the distribution.
10  */
11
12 #include "portable.h"
13
14 #include <stdio.h>
15
16 #include <ac/stdarg.h>
17 #include <ac/stdlib.h>
18 #include <ac/string.h>
19 #include <ac/time.h>
20 #include <ac/errno.h>
21
22 #include "ldap-int.h"
23 #include "ldap_pvt_thread.h"
24 #include "ldap_queue.h"
25
26 #ifndef LDAP_THREAD_HAVE_TPOOL
27
28 enum ldap_int_thread_pool_state {
29         LDAP_INT_THREAD_POOL_RUNNING,
30         LDAP_INT_THREAD_POOL_FINISHING,
31         LDAP_INT_THREAD_POOL_STOPPING
32 };
33
34 typedef struct ldap_int_thread_key_s {
35         void *ltk_key;
36         void *ltk_data;
37         ldap_pvt_thread_pool_keyfree_t *ltk_free;
38 } ldap_int_thread_key_t;
39
40 /* Max number of thread-specific keys we store per thread.
41  * We don't expect to use many...
42  */
43 #define MAXKEYS 32
44
45 typedef struct ldap_int_thread_ctx_s {
46         union {
47         LDAP_STAILQ_ENTRY(ldap_int_thread_ctx_s) q;
48         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) l;
49         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) al;
50         } ltc_next;
51         ldap_pvt_thread_start_t *ltc_start_routine;
52         void *ltc_arg;
53         ldap_pvt_thread_t ltc_thread_id;
54         ldap_int_thread_key_t *ltc_key;
55 } ldap_int_thread_ctx_t;
56
57 struct ldap_int_thread_pool_s {
58         LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
59         ldap_pvt_thread_mutex_t ltp_mutex;
60         ldap_pvt_thread_cond_t ltp_cond;
61         LDAP_STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
62         LDAP_SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
63         LDAP_SLIST_HEAD(tclq, ldap_int_thread_ctx_s) ltp_active_list;
64         long ltp_state;
65         long ltp_max_count;
66         long ltp_max_pending;
67         long ltp_pending_count;
68         long ltp_active_count;
69         long ltp_open_count;
70         long ltp_starting;
71 };
72
73 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
74         ldap_int_thread_pool_list =
75         LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
76
77 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
78
79 static void *ldap_int_thread_pool_wrapper( void *pool );
80
81 int
82 ldap_int_thread_pool_startup ( void )
83 {
84         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
85 }
86
87 int
88 ldap_int_thread_pool_shutdown ( void )
89 {
90         struct ldap_int_thread_pool_s *pool;
91
92         while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
93                 LDAP_STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
94                 ldap_pvt_thread_pool_destroy( &pool, 0);
95         }
96         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
97         return(0);
98 }
99
100 int
101 ldap_pvt_thread_pool_init (
102         ldap_pvt_thread_pool_t *tpool,
103         int max_threads,
104         int max_pending )
105 {
106         ldap_pvt_thread_pool_t pool;
107         int rc;
108
109         *tpool = NULL;
110         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
111                 sizeof(struct ldap_int_thread_pool_s));
112
113         if (pool == NULL) return(-1);
114
115         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
116         if (rc != 0)
117                 return(rc);
118         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
119         if (rc != 0)
120                 return(rc);
121         pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
122         pool->ltp_max_count = max_threads;
123         pool->ltp_max_pending = max_pending;
124         LDAP_STAILQ_INIT(&pool->ltp_pending_list);
125         LDAP_SLIST_INIT(&pool->ltp_free_list);
126         LDAP_SLIST_INIT(&pool->ltp_active_list);
127         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
128         LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
129         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
130
131 #if 0
132         /* THIS WILL NOT WORK on some systems.  If the process
133          * forks after starting a thread, there is no guarantee
134          * that the thread will survive the fork.  For example,
135          * slapd forks in order to daemonize, and does so after
136          * calling ldap_pvt_thread_pool_init.  On some systems,
137          * this initial thread does not run in the child process,
138          * but ltp_open_count == 1, so two things happen: 
139          * 1) the first client connection fails, and 2) when
140          * slapd is kill'ed, it never terminates since it waits
141          * for all worker threads to exit. */
142
143         /* start up one thread, just so there is one. no need to
144          * lock the mutex right now, since no threads are running.
145          */
146         pool->ltp_open_count++;
147
148         ldap_pvt_thread_t thr;
149         rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool );
150
151         if( rc != 0) {
152                 /* couldn't start one?  then don't start any */
153                 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
154                 LDAP_STAILQ_REMOVE(ldap_int_thread_pool_list, pool, 
155                         ldap_int_thread_pool_s, ltp_next);
156                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
157                 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
158                 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
159                 LDAP_FREE(pool);
160                 return(-1);
161         }
162 #endif
163
164         *tpool = pool;
165         return(0);
166 }
167
168 int
169 ldap_pvt_thread_pool_submit (
170         ldap_pvt_thread_pool_t *tpool,
171         ldap_pvt_thread_start_t *start_routine, void *arg )
172 {
173         struct ldap_int_thread_pool_s *pool;
174         ldap_int_thread_ctx_t *ctx;
175         int need_thread = 0;
176         ldap_pvt_thread_t thr;
177
178         if (tpool == NULL)
179                 return(-1);
180
181         pool = *tpool;
182
183         if (pool == NULL)
184                 return(-1);
185
186         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
187         if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
188                 || (pool->ltp_max_pending > 0
189                         && pool->ltp_pending_count >= pool->ltp_max_pending))
190         {
191                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
192                 return(-1);
193         }
194         ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list);
195         if (ctx) {
196                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
197         } else {
198                 int i;
199                 ctx = (ldap_int_thread_ctx_t *) LDAP_MALLOC(
200                         sizeof(ldap_int_thread_ctx_t));
201                 if (ctx == NULL) {
202                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
203                         return(-1);
204                 }
205         }
206
207         ctx->ltc_start_routine = start_routine;
208         ctx->ltc_arg = arg;
209
210         pool->ltp_pending_count++;
211         LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, ctx, ltc_next.q);
212         ldap_pvt_thread_cond_signal(&pool->ltp_cond);
213         if ((pool->ltp_open_count <= 0
214 #if 0
215                         || pool->ltp_pending_count > 1
216 #endif
217                         || pool->ltp_open_count == pool->ltp_active_count)
218                 && (pool->ltp_max_count <= 0
219                         || pool->ltp_open_count < pool->ltp_max_count))
220         {
221                 pool->ltp_open_count++;
222                 pool->ltp_starting++;
223                 need_thread = 1;
224         }
225         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
226
227         if (need_thread) {
228                 int rc = ldap_pvt_thread_create( &thr, 1,
229                         ldap_int_thread_pool_wrapper, pool );
230                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
231                 if (rc == 0) {
232                         pool->ltp_starting--;
233                 } else {
234                         /* couldn't create thread.  back out of
235                          * ltp_open_count and check for even worse things.
236                          */
237                         pool->ltp_open_count--;
238                         pool->ltp_starting--;
239                         if (pool->ltp_open_count == 0) {
240                                 /* no open threads at all?!?
241                                  */
242                                 ldap_int_thread_ctx_t *ptr;
243                                 LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltc_next.q)
244                                         if (ptr == ctx) break;
245                                 if (ptr == ctx) {
246                                         /* no open threads, context not handled, so
247                                          * back out of ltp_pending_count, free the context,
248                                          * report the error.
249                                          */
250                                         LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, ctx, 
251                                                 ldap_int_thread_ctx_s, ltc_next.q);
252                                         pool->ltp_pending_count++;
253                                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
254                                         LDAP_FREE(ctx);
255                                         return(-1);
256                                 }
257                         }
258                         /* there is another open thread, so this
259                          * context will be handled eventually.
260                          * continue on and signal that the context
261                          * is waiting.
262                          */
263                 }
264                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
265         }
266
267         return(0);
268 }
269
270 int
271 ldap_pvt_thread_pool_maxthreads ( ldap_pvt_thread_pool_t *tpool, int max_threads )
272 {
273         struct ldap_int_thread_pool_s *pool;
274
275         if (tpool == NULL)
276                 return(-1);
277
278         pool = *tpool;
279
280         if (pool == NULL)
281                 return(-1);
282
283         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
284         pool->ltp_max_count = max_threads;
285         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
286         return(0);
287 }
288
289 int
290 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
291 {
292         struct ldap_int_thread_pool_s *pool;
293         int count;
294
295         if (tpool == NULL)
296                 return(-1);
297
298         pool = *tpool;
299
300         if (pool == NULL)
301                 return(0);
302
303         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
304         count = pool->ltp_pending_count + pool->ltp_active_count;
305         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
306         return(count);
307 }
308
309 int
310 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
311 {
312         struct ldap_int_thread_pool_s *pool, *pptr;
313         long waiting;
314         ldap_int_thread_ctx_t *ctx;
315
316         if (tpool == NULL)
317                 return(-1);
318
319         pool = *tpool;
320
321         if (pool == NULL) return(-1);
322
323         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
324         LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
325                 if (pptr == pool) break;
326         if (pptr == pool)
327                 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
328                         ldap_int_thread_pool_s, ltp_next);
329         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
330
331         if (pool != pptr) return(-1);
332
333         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
334         pool->ltp_state = run_pending
335                 ? LDAP_INT_THREAD_POOL_FINISHING
336                 : LDAP_INT_THREAD_POOL_STOPPING;
337
338         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
339         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
340
341         do {
342                 ldap_pvt_thread_yield();
343                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
344                 waiting = pool->ltp_open_count;
345                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
346         } while (waiting > 0);
347
348         while ((ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
349         {
350                 LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
351                 LDAP_FREE(ctx);
352         }
353
354         while ((ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
355         {
356                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
357                 LDAP_FREE(ctx);
358         }
359
360         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
361         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
362         LDAP_FREE(pool);
363         return(0);
364 }
365
366 static void *
367 ldap_int_thread_pool_wrapper ( 
368         void *xpool )
369 {
370         struct ldap_int_thread_pool_s *pool = xpool;
371         ldap_int_thread_ctx_t *ctx;
372         ldap_int_thread_key_t ltc_key[MAXKEYS];
373         int i;
374
375         if (pool == NULL)
376                 return NULL;
377
378         for ( i=0; i<MAXKEYS; i++ ) {
379                 ltc_key[i].ltk_key = NULL;
380         }
381
382         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
383
384         while (pool->ltp_state != LDAP_INT_THREAD_POOL_STOPPING) {
385                 ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list);
386                 if (ctx) {
387                         LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
388                 } else {
389                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
390                                 break;
391                         if (pool->ltp_max_count > 0
392                                 && pool->ltp_open_count > pool->ltp_max_count)
393                         {
394                                 /* too many threads running (can happen if the
395                                  * maximum threads value is set during ongoing
396                                  * operation using ldap_pvt_thread_pool_maxthreads)
397                                  * so let this thread die.
398                                  */
399                                 break;
400                         }
401
402                         /* we could check an idle timer here, and let the
403                          * thread die if it has been inactive for a while.
404                          * only die if there are other open threads (i.e.,
405                          * always have at least one thread open).  the check
406                          * should be like this:
407                          *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
408                          *       check timer, leave thread (break;)
409                          */
410
411                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
412                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
413
414                         continue;
415                 }
416
417                 pool->ltp_pending_count--;
418
419                 ctx->ltc_key = ltc_key;
420                 ctx->ltc_thread_id = ldap_pvt_thread_self();
421
422                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_active_list, ctx, ltc_next.al);
423                 pool->ltp_active_count++;
424                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
425
426                 ctx->ltc_start_routine(ctx, ctx->ltc_arg);
427                 ctx->ltc_key = NULL;
428
429                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
430                 LDAP_SLIST_REMOVE(&pool->ltp_active_list, ctx,
431                         ldap_int_thread_ctx_s, ltc_next.al);
432                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, ctx, ltc_next.l);
433                 pool->ltp_active_count--;
434                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
435
436                 ldap_pvt_thread_yield();
437
438                 /* if we use an idle timer, here's
439                  * a good place to update it
440                  */
441
442                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
443         }
444
445         for ( i=0; i<MAXKEYS && ltc_key[i].ltk_key; i++ ) {
446                 if (ltc_key[i].ltk_free)
447                         ltc_key[i].ltk_free(
448                                 ltc_key[i].ltk_key,
449                                 ltc_key[i].ltk_data );
450         }
451
452         pool->ltp_open_count--;
453         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
454
455         ldap_pvt_thread_exit(NULL);
456         return(NULL);
457 }
458
459 int ldap_pvt_thread_pool_getkey(
460         void *xctx,
461         void *key,
462         void **data,
463         ldap_pvt_thread_pool_keyfree_t **kfree )
464 {
465         ldap_int_thread_ctx_t *ctx = xctx;
466         int i;
467
468         if ( !ctx || !data || !ctx->ltc_key ) return EINVAL;
469
470         for ( i=0; i<MAXKEYS && ctx->ltc_key[i].ltk_key; i++ ) {
471                 if ( ctx->ltc_key[i].ltk_key == key ) {
472                         *data = ctx->ltc_key[i].ltk_data;
473                         if ( kfree ) *kfree = ctx->ltc_key[i].ltk_free;
474                         return 0;
475                 }
476         }
477         return ENOENT;
478 }
479
480 int ldap_pvt_thread_pool_setkey(
481         void *xctx,
482         void *key,
483         void *data,
484         ldap_pvt_thread_pool_keyfree_t *kfree )
485 {
486         ldap_int_thread_ctx_t *ctx = xctx;
487         int i;
488
489         if ( !ctx || !key || !ctx->ltc_key ) return EINVAL;
490
491         for ( i=0; i<MAXKEYS; i++ ) {
492                 if ( !ctx->ltc_key[i].ltk_key || ctx->ltc_key[i].ltk_key == key ) {
493                         ctx->ltc_key[i].ltk_key = key;
494                         ctx->ltc_key[i].ltk_data = data;
495                         ctx->ltc_key[i].ltk_free = kfree;
496                         return 0;
497                 }
498         }
499         return ENOMEM;
500 }
501
502 /*
503  * This is necessary if the caller does not have access to the
504  * thread context handle (for example, a slapd plugin calling
505  * slapi_search_internal()). No doubt it is more efficient to
506  * for the application to keep track of the thread context
507  * handles itself.
508  */
509 void *ldap_pvt_thread_pool_context( ldap_pvt_thread_pool_t *tpool )
510 {
511         ldap_pvt_thread_pool_t pool;
512         ldap_pvt_thread_t tid;
513         ldap_int_thread_ctx_t *ptr;
514
515         pool = *tpool;
516         if (pool == NULL) {
517                 return NULL;
518         }
519
520         tid = ldap_pvt_thread_self();
521
522         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
523         LDAP_SLIST_FOREACH(ptr, &pool->ltp_active_list, ltc_next.al)
524                 if (ptr != NULL && ptr->ltc_thread_id == tid) break;
525         if (ptr != NULL && ptr->ltc_thread_id != tid) {
526                 ptr = NULL;
527         }
528         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
529
530         return ptr;
531 }
532
533 #endif /* LDAP_HAVE_THREAD_POOL */