]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
ITS#2997 change MAXTHREADS to LDAP_MAXTHR
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
3  *
4  * Copyright 1998-2004 The OpenLDAP Foundation.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted only as authorized by the OpenLDAP
9  * Public License.
10  *
11  * A copy of this license is available in file LICENSE in the
12  * top-level directory of the distribution or, alternatively, at
13  * <http://www.OpenLDAP.org/license.html>.
14  */
15
16 #include "portable.h"
17
18 #include <stdio.h>
19
20 #include <ac/stdarg.h>
21 #include <ac/stdlib.h>
22 #include <ac/string.h>
23 #include <ac/time.h>
24 #include <ac/errno.h>
25
26 #include "ldap-int.h"
27 #include "ldap_pvt_thread.h"
28 #include "ldap_queue.h"
29
30 #ifndef LDAP_THREAD_HAVE_TPOOL
31
32 enum ldap_int_thread_pool_state {
33         LDAP_INT_THREAD_POOL_RUNNING,
34         LDAP_INT_THREAD_POOL_FINISHING,
35         LDAP_INT_THREAD_POOL_STOPPING
36 };
37
38 typedef struct ldap_int_thread_key_s {
39         void *ltk_key;
40         void *ltk_data;
41         ldap_pvt_thread_pool_keyfree_t *ltk_free;
42 } ldap_int_thread_key_t;
43
44 /* Max number of thread-specific keys we store per thread.
45  * We don't expect to use many...
46  */
47 #define MAXKEYS 32
48 #define LDAP_MAXTHR     1024    /* must be a power of 2 */
49
50 static ldap_pvt_thread_t tid_zero;
51
52 #ifdef HAVE_PTHREADS
53 #define TID_EQ(a,b)     pthread_equal((a),(b))
54 #else
55 #define TID_EQ(a,b)     ((a) == (b))
56 #endif
57 static struct {
58         ldap_pvt_thread_t id;
59         ldap_int_thread_key_t *ctx;
60 } thread_keys[LDAP_MAXTHR];
61         
62
63 typedef struct ldap_int_thread_ctx_s {
64         union {
65         LDAP_STAILQ_ENTRY(ldap_int_thread_ctx_s) q;
66         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) l;
67         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) al;
68         } ltc_next;
69         ldap_pvt_thread_start_t *ltc_start_routine;
70         void *ltc_arg;
71 } ldap_int_thread_ctx_t;
72
73 struct ldap_int_thread_pool_s {
74         LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
75         ldap_pvt_thread_mutex_t ltp_mutex;
76         ldap_pvt_thread_cond_t ltp_cond;
77         LDAP_STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
78         LDAP_SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
79         LDAP_SLIST_HEAD(tclq, ldap_int_thread_ctx_s) ltp_active_list;
80         long ltp_state;
81         long ltp_max_count;
82         long ltp_max_pending;
83         long ltp_pending_count;
84         long ltp_active_count;
85         long ltp_open_count;
86         long ltp_starting;
87 };
88
89 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
90         ldap_int_thread_pool_list =
91         LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
92
93 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
94
95 static void *ldap_int_thread_pool_wrapper( void *pool );
96
97 int
98 ldap_int_thread_pool_startup ( void )
99 {
100         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
101 }
102
103 int
104 ldap_int_thread_pool_shutdown ( void )
105 {
106         struct ldap_int_thread_pool_s *pool;
107
108         while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
109                 LDAP_STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
110                 ldap_pvt_thread_pool_destroy( &pool, 0);
111         }
112         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
113         return(0);
114 }
115
116 int
117 ldap_pvt_thread_pool_init (
118         ldap_pvt_thread_pool_t *tpool,
119         int max_threads,
120         int max_pending )
121 {
122         ldap_pvt_thread_pool_t pool;
123         int rc;
124
125         *tpool = NULL;
126         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
127                 sizeof(struct ldap_int_thread_pool_s));
128
129         if (pool == NULL) return(-1);
130
131         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
132         if (rc != 0)
133                 return(rc);
134         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
135         if (rc != 0)
136                 return(rc);
137         pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
138         pool->ltp_max_count = max_threads;
139         pool->ltp_max_pending = max_pending;
140         LDAP_STAILQ_INIT(&pool->ltp_pending_list);
141         LDAP_SLIST_INIT(&pool->ltp_free_list);
142         LDAP_SLIST_INIT(&pool->ltp_active_list);
143         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
144         LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
145         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
146
147 #if 0
148         /* THIS WILL NOT WORK on some systems.  If the process
149          * forks after starting a thread, there is no guarantee
150          * that the thread will survive the fork.  For example,
151          * slapd forks in order to daemonize, and does so after
152          * calling ldap_pvt_thread_pool_init.  On some systems,
153          * this initial thread does not run in the child process,
154          * but ltp_open_count == 1, so two things happen: 
155          * 1) the first client connection fails, and 2) when
156          * slapd is kill'ed, it never terminates since it waits
157          * for all worker threads to exit. */
158
159         /* start up one thread, just so there is one. no need to
160          * lock the mutex right now, since no threads are running.
161          */
162         pool->ltp_open_count++;
163
164         ldap_pvt_thread_t thr;
165         rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool );
166
167         if( rc != 0) {
168                 /* couldn't start one?  then don't start any */
169                 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
170                 LDAP_STAILQ_REMOVE(ldap_int_thread_pool_list, pool, 
171                         ldap_int_thread_pool_s, ltp_next);
172                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
173                 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
174                 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
175                 LDAP_FREE(pool);
176                 return(-1);
177         }
178 #endif
179
180         *tpool = pool;
181         return(0);
182 }
183
184 #define TID_HASH(tid, hash) do { int i; \
185         unsigned char *ptr = (unsigned char *)&(tid); \
186         for (i=0, hash=0; i<sizeof(tid); i++) hash += ptr[i]; } while(0)
187
188 int
189 ldap_pvt_thread_pool_submit (
190         ldap_pvt_thread_pool_t *tpool,
191         ldap_pvt_thread_start_t *start_routine, void *arg )
192 {
193         struct ldap_int_thread_pool_s *pool;
194         ldap_int_thread_ctx_t *ctx;
195         int need_thread = 0;
196         ldap_pvt_thread_t thr;
197
198         if (tpool == NULL)
199                 return(-1);
200
201         pool = *tpool;
202
203         if (pool == NULL)
204                 return(-1);
205
206         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
207         if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
208                 || (pool->ltp_max_pending > 0
209                         && pool->ltp_pending_count >= pool->ltp_max_pending))
210         {
211                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
212                 return(-1);
213         }
214         ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list);
215         if (ctx) {
216                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
217         } else {
218                 int i;
219                 ctx = (ldap_int_thread_ctx_t *) LDAP_MALLOC(
220                         sizeof(ldap_int_thread_ctx_t));
221                 if (ctx == NULL) {
222                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
223                         return(-1);
224                 }
225         }
226
227         ctx->ltc_start_routine = start_routine;
228         ctx->ltc_arg = arg;
229
230         pool->ltp_pending_count++;
231         LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, ctx, ltc_next.q);
232         ldap_pvt_thread_cond_signal(&pool->ltp_cond);
233         if ((pool->ltp_open_count <= 0
234 #if 0
235                         || pool->ltp_pending_count > 1
236 #endif
237                         || pool->ltp_open_count == pool->ltp_active_count)
238                 && (pool->ltp_max_count <= 0
239                         || pool->ltp_open_count < pool->ltp_max_count))
240         {
241                 pool->ltp_open_count++;
242                 pool->ltp_starting++;
243                 need_thread = 1;
244         }
245         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
246
247         if (need_thread) {
248                 int rc;
249
250                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
251
252                 rc = ldap_pvt_thread_create( &thr, 1,
253                         ldap_int_thread_pool_wrapper, pool );
254                 if (rc == 0) {
255                         int hash;
256                         pool->ltp_starting--;
257
258                         /* assign this thread ID to a key slot; start
259                          * at the thread ID itself (mod LDAP_MAXTHR) and
260                          * look for an empty slot.
261                          */
262                         TID_HASH(thr, hash);
263                         for (rc = hash & (LDAP_MAXTHR-1);
264                                 !TID_EQ(thread_keys[rc].id, tid_zero);
265                                 rc = (rc+1) & (LDAP_MAXTHR-1));
266                         thread_keys[rc].id = thr;
267                 } else {
268                         /* couldn't create thread.  back out of
269                          * ltp_open_count and check for even worse things.
270                          */
271                         pool->ltp_open_count--;
272                         pool->ltp_starting--;
273                         if (pool->ltp_open_count == 0) {
274                                 /* no open threads at all?!?
275                                  */
276                                 ldap_int_thread_ctx_t *ptr;
277                                 LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltc_next.q)
278                                         if (ptr == ctx) break;
279                                 if (ptr == ctx) {
280                                         /* no open threads, context not handled, so
281                                          * back out of ltp_pending_count, free the context,
282                                          * report the error.
283                                          */
284                                         LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, ctx, 
285                                                 ldap_int_thread_ctx_s, ltc_next.q);
286                                         pool->ltp_pending_count++;
287                                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
288                                         LDAP_FREE(ctx);
289                                         return(-1);
290                                 }
291                         }
292                         /* there is another open thread, so this
293                          * context will be handled eventually.
294                          * continue on and signal that the context
295                          * is waiting.
296                          */
297                 }
298                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
299         }
300
301         return(0);
302 }
303
304 int
305 ldap_pvt_thread_pool_maxthreads ( ldap_pvt_thread_pool_t *tpool, int max_threads )
306 {
307         struct ldap_int_thread_pool_s *pool;
308
309         if (tpool == NULL)
310                 return(-1);
311
312         pool = *tpool;
313
314         if (pool == NULL)
315                 return(-1);
316
317         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
318         pool->ltp_max_count = max_threads;
319         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
320         return(0);
321 }
322
323 int
324 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
325 {
326         struct ldap_int_thread_pool_s *pool;
327         int count;
328
329         if (tpool == NULL)
330                 return(-1);
331
332         pool = *tpool;
333
334         if (pool == NULL)
335                 return(0);
336
337         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
338         count = pool->ltp_pending_count + pool->ltp_active_count;
339         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
340         return(count);
341 }
342
343 int
344 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
345 {
346         struct ldap_int_thread_pool_s *pool, *pptr;
347         long waiting;
348         ldap_int_thread_ctx_t *ctx;
349
350         if (tpool == NULL)
351                 return(-1);
352
353         pool = *tpool;
354
355         if (pool == NULL) return(-1);
356
357         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
358         LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
359                 if (pptr == pool) break;
360         if (pptr == pool)
361                 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
362                         ldap_int_thread_pool_s, ltp_next);
363         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
364
365         if (pool != pptr) return(-1);
366
367         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
368         pool->ltp_state = run_pending
369                 ? LDAP_INT_THREAD_POOL_FINISHING
370                 : LDAP_INT_THREAD_POOL_STOPPING;
371
372         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
373         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
374
375         do {
376                 ldap_pvt_thread_yield();
377                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
378                 waiting = pool->ltp_open_count;
379                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
380         } while (waiting > 0);
381
382         while ((ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
383         {
384                 LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
385                 LDAP_FREE(ctx);
386         }
387
388         while ((ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
389         {
390                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
391                 LDAP_FREE(ctx);
392         }
393
394         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
395         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
396         LDAP_FREE(pool);
397         return(0);
398 }
399
400 static void *
401 ldap_int_thread_pool_wrapper ( 
402         void *xpool )
403 {
404         struct ldap_int_thread_pool_s *pool = xpool;
405         ldap_int_thread_ctx_t *ctx;
406         ldap_int_thread_key_t ltc_key[MAXKEYS];
407         ldap_pvt_thread_t tid;
408         int i, keyslot, hash;
409
410         if (pool == NULL)
411                 return NULL;
412
413         for ( i=0; i<MAXKEYS; i++ ) {
414                 ltc_key[i].ltk_key = NULL;
415         }
416
417         tid = ldap_pvt_thread_self();
418
419         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
420
421         /* store pointer to our keys */
422         TID_HASH(tid, hash);
423         for (i = hash & (LDAP_MAXTHR-1); !TID_EQ(thread_keys[i].id, tid);
424                                 i = (i+1) & (LDAP_MAXTHR-1));
425         thread_keys[i].ctx = ltc_key;
426         keyslot = i;
427
428         while (pool->ltp_state != LDAP_INT_THREAD_POOL_STOPPING) {
429                 ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list);
430                 if (ctx) {
431                         LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
432                 } else {
433                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
434                                 break;
435                         if (pool->ltp_max_count > 0
436                                 && pool->ltp_open_count > pool->ltp_max_count)
437                         {
438                                 /* too many threads running (can happen if the
439                                  * maximum threads value is set during ongoing
440                                  * operation using ldap_pvt_thread_pool_maxthreads)
441                                  * so let this thread die.
442                                  */
443                                 break;
444                         }
445
446                         /* we could check an idle timer here, and let the
447                          * thread die if it has been inactive for a while.
448                          * only die if there are other open threads (i.e.,
449                          * always have at least one thread open).  the check
450                          * should be like this:
451                          *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
452                          *       check timer, leave thread (break;)
453                          */
454
455                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
456                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
457
458                         continue;
459                 }
460
461                 pool->ltp_pending_count--;
462
463                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_active_list, ctx, ltc_next.al);
464                 pool->ltp_active_count++;
465                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
466
467                 ctx->ltc_start_routine(ltc_key, ctx->ltc_arg);
468
469                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
470                 LDAP_SLIST_REMOVE(&pool->ltp_active_list, ctx,
471                         ldap_int_thread_ctx_s, ltc_next.al);
472                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, ctx, ltc_next.l);
473                 pool->ltp_active_count--;
474                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
475
476                 ldap_pvt_thread_yield();
477
478                 /* if we use an idle timer, here's
479                  * a good place to update it
480                  */
481
482                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
483         }
484
485         for ( i=0; i<MAXKEYS && ltc_key[i].ltk_key; i++ ) {
486                 if (ltc_key[i].ltk_free)
487                         ltc_key[i].ltk_free(
488                                 ltc_key[i].ltk_key,
489                                 ltc_key[i].ltk_data );
490         }
491
492         thread_keys[keyslot].ctx = NULL;
493         thread_keys[keyslot].id = tid_zero;
494
495         pool->ltp_open_count--;
496         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
497
498         ldap_pvt_thread_exit(NULL);
499         return(NULL);
500 }
501
502 int ldap_pvt_thread_pool_getkey(
503         void *xctx,
504         void *key,
505         void **data,
506         ldap_pvt_thread_pool_keyfree_t **kfree )
507 {
508         ldap_int_thread_key_t *ctx = xctx;
509         int i;
510
511         if ( !ctx || !data ) return EINVAL;
512
513         for ( i=0; i<MAXKEYS && ctx[i].ltk_key; i++ ) {
514                 if ( ctx[i].ltk_key == key ) {
515                         *data = ctx[i].ltk_data;
516                         if ( kfree ) *kfree = ctx[i].ltk_free;
517                         return 0;
518                 }
519         }
520         return ENOENT;
521 }
522
523 int ldap_pvt_thread_pool_setkey(
524         void *xctx,
525         void *key,
526         void *data,
527         ldap_pvt_thread_pool_keyfree_t *kfree )
528 {
529         ldap_int_thread_key_t *ctx = xctx;
530         int i;
531
532         if ( !ctx || !key ) return EINVAL;
533
534         for ( i=0; i<MAXKEYS; i++ ) {
535                 if ( !ctx[i].ltk_key || ctx[i].ltk_key == key ) {
536                         ctx[i].ltk_key = key;
537                         ctx[i].ltk_data = data;
538                         ctx[i].ltk_free = kfree;
539                         return 0;
540                 }
541         }
542         return ENOMEM;
543 }
544
545 /*
546  * This is necessary if the caller does not have access to the
547  * thread context handle (for example, a slapd plugin calling
548  * slapi_search_internal()). No doubt it is more efficient to
549  * for the application to keep track of the thread context
550  * handles itself.
551  */
552 void *ldap_pvt_thread_pool_context( )
553 {
554         ldap_pvt_thread_t tid;
555         int i, hash;
556
557         tid = ldap_pvt_thread_self();
558
559         TID_HASH( tid, hash );
560         for (i = hash & (LDAP_MAXTHR-1); !TID_EQ(thread_keys[i].id, tid_zero) &&
561                 !TID_EQ(thread_keys[i].id, tid); i = (i+1) & (LDAP_MAXTHR-1));
562
563         return thread_keys[i].ctx;
564 }
565
566 #endif /* LDAP_THREAD_HAVE_TPOOL */