]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
a runqueue for periodic thread execution (for syncrepl)
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /*
3  * Copyright 1998-2003 The OpenLDAP Foundation, Redwood City, California, USA
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms are permitted only
7  * as authorized by the OpenLDAP Public License.  A copy of this
8  * license is available at http://www.OpenLDAP.org/license.html or
9  * in file LICENSE in the top-level directory of the distribution.
10  */
11
12 #include "portable.h"
13
14 #include <stdio.h>
15
16 #include <ac/stdarg.h>
17 #include <ac/stdlib.h>
18 #include <ac/string.h>
19 #include <ac/time.h>
20 #include <ac/errno.h>
21
22 #include "ldap-int.h"
23 #include "ldap_pvt_thread.h"
24 #include "ldap_queue.h"
25
26 #ifndef LDAP_THREAD_HAVE_TPOOL
27
28 enum ldap_int_thread_pool_state {
29         LDAP_INT_THREAD_POOL_RUNNING,
30         LDAP_INT_THREAD_POOL_FINISHING,
31         LDAP_INT_THREAD_POOL_STOPPING
32 };
33
34 typedef struct ldap_int_thread_key_s {
35         void *ltk_key;
36         void *ltk_data;
37         ldap_pvt_thread_pool_keyfree_t *ltk_free;
38 } ldap_int_thread_key_t;
39
40 /* Max number of thread-specific keys we store per thread.
41  * We don't expect to use many...
42  */
43 #define MAXKEYS 32
44 #define MAXTHREADS      1024    /* must be a power of 2 */
45
46 static ldap_pvt_thread_t tid_zero;
47
48 #ifdef HAVE_PTHREADS
49 #define TID_EQ(a,b)     pthread_equal((a),(b))
50 #else
51 #define TID_EQ(a,b)     ((a) == (b))
52 #endif
53 static struct {
54         ldap_pvt_thread_t id;
55         ldap_int_thread_key_t *ctx;
56 } thread_keys[MAXTHREADS];
57         
58
59 typedef struct ldap_int_thread_ctx_s {
60         union {
61         LDAP_STAILQ_ENTRY(ldap_int_thread_ctx_s) q;
62         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) l;
63         LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) al;
64         } ltc_next;
65         ldap_pvt_thread_start_t *ltc_start_routine;
66         void *ltc_arg;
67 } ldap_int_thread_ctx_t;
68
69 struct ldap_int_thread_pool_s {
70         LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
71         ldap_pvt_thread_mutex_t ltp_mutex;
72         ldap_pvt_thread_cond_t ltp_cond;
73         LDAP_STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
74         LDAP_SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
75         LDAP_SLIST_HEAD(tclq, ldap_int_thread_ctx_s) ltp_active_list;
76         long ltp_state;
77         long ltp_max_count;
78         long ltp_max_pending;
79         long ltp_pending_count;
80         long ltp_active_count;
81         long ltp_open_count;
82         long ltp_starting;
83 };
84
85 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
86         ldap_int_thread_pool_list =
87         LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
88
89 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
90
91 static void *ldap_int_thread_pool_wrapper( void *pool );
92
93 int
94 ldap_int_thread_pool_startup ( void )
95 {
96         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
97 }
98
99 int
100 ldap_int_thread_pool_shutdown ( void )
101 {
102         struct ldap_int_thread_pool_s *pool;
103
104         while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
105                 LDAP_STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
106                 ldap_pvt_thread_pool_destroy( &pool, 0);
107         }
108         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
109         return(0);
110 }
111
112 int
113 ldap_pvt_thread_pool_init (
114         ldap_pvt_thread_pool_t *tpool,
115         int max_threads,
116         int max_pending )
117 {
118         ldap_pvt_thread_pool_t pool;
119         int rc;
120
121         *tpool = NULL;
122         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
123                 sizeof(struct ldap_int_thread_pool_s));
124
125         if (pool == NULL) return(-1);
126
127         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
128         if (rc != 0)
129                 return(rc);
130         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
131         if (rc != 0)
132                 return(rc);
133         pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
134         pool->ltp_max_count = max_threads;
135         pool->ltp_max_pending = max_pending;
136         LDAP_STAILQ_INIT(&pool->ltp_pending_list);
137         LDAP_SLIST_INIT(&pool->ltp_free_list);
138         LDAP_SLIST_INIT(&pool->ltp_active_list);
139         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
140         LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
141         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
142
143 #if 0
144         /* THIS WILL NOT WORK on some systems.  If the process
145          * forks after starting a thread, there is no guarantee
146          * that the thread will survive the fork.  For example,
147          * slapd forks in order to daemonize, and does so after
148          * calling ldap_pvt_thread_pool_init.  On some systems,
149          * this initial thread does not run in the child process,
150          * but ltp_open_count == 1, so two things happen: 
151          * 1) the first client connection fails, and 2) when
152          * slapd is kill'ed, it never terminates since it waits
153          * for all worker threads to exit. */
154
155         /* start up one thread, just so there is one. no need to
156          * lock the mutex right now, since no threads are running.
157          */
158         pool->ltp_open_count++;
159
160         ldap_pvt_thread_t thr;
161         rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool );
162
163         if( rc != 0) {
164                 /* couldn't start one?  then don't start any */
165                 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
166                 LDAP_STAILQ_REMOVE(ldap_int_thread_pool_list, pool, 
167                         ldap_int_thread_pool_s, ltp_next);
168                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
169                 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
170                 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
171                 LDAP_FREE(pool);
172                 return(-1);
173         }
174 #endif
175
176         *tpool = pool;
177         return(0);
178 }
179
180 #define TID_HASH(tid, hash) do { int i; \
181         unsigned char *ptr = (unsigned char *)&(tid); \
182         for (i=0, hash=0; i<sizeof(tid); i++) hash += ptr[i]; } while(0)
183
184 int
185 ldap_pvt_thread_pool_submit (
186         ldap_pvt_thread_pool_t *tpool,
187         ldap_pvt_thread_start_t *start_routine, void *arg )
188 {
189         struct ldap_int_thread_pool_s *pool;
190         ldap_int_thread_ctx_t *ctx;
191         int need_thread = 0;
192         ldap_pvt_thread_t thr;
193
194         if (tpool == NULL)
195                 return(-1);
196
197         pool = *tpool;
198
199         if (pool == NULL)
200                 return(-1);
201
202         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
203         if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
204                 || (pool->ltp_max_pending > 0
205                         && pool->ltp_pending_count >= pool->ltp_max_pending))
206         {
207                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
208                 return(-1);
209         }
210         ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list);
211         if (ctx) {
212                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
213         } else {
214                 int i;
215                 ctx = (ldap_int_thread_ctx_t *) LDAP_MALLOC(
216                         sizeof(ldap_int_thread_ctx_t));
217                 if (ctx == NULL) {
218                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
219                         return(-1);
220                 }
221         }
222
223         ctx->ltc_start_routine = start_routine;
224         ctx->ltc_arg = arg;
225
226         pool->ltp_pending_count++;
227         LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, ctx, ltc_next.q);
228         ldap_pvt_thread_cond_signal(&pool->ltp_cond);
229         if ((pool->ltp_open_count <= 0
230 #if 0
231                         || pool->ltp_pending_count > 1
232 #endif
233                         || pool->ltp_open_count == pool->ltp_active_count)
234                 && (pool->ltp_max_count <= 0
235                         || pool->ltp_open_count < pool->ltp_max_count))
236         {
237                 pool->ltp_open_count++;
238                 pool->ltp_starting++;
239                 need_thread = 1;
240         }
241         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
242
243         if (need_thread) {
244                 int rc;
245
246                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
247
248                 rc = ldap_pvt_thread_create( &thr, 1,
249                         ldap_int_thread_pool_wrapper, pool );
250                 if (rc == 0) {
251                         int hash;
252                         pool->ltp_starting--;
253
254                         /* assign this thread ID to a key slot; start
255                          * at the thread ID itself (mod MAXTHREADS) and
256                          * look for an empty slot.
257                          */
258                         TID_HASH(thr, hash);
259                         for (rc = hash & (MAXTHREADS-1);
260                                 !TID_EQ(thread_keys[rc].id, tid_zero);
261                                 rc = (rc+1) & (MAXTHREADS-1));
262                         thread_keys[rc].id = thr;
263                 } else {
264                         /* couldn't create thread.  back out of
265                          * ltp_open_count and check for even worse things.
266                          */
267                         pool->ltp_open_count--;
268                         pool->ltp_starting--;
269                         if (pool->ltp_open_count == 0) {
270                                 /* no open threads at all?!?
271                                  */
272                                 ldap_int_thread_ctx_t *ptr;
273                                 LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltc_next.q)
274                                         if (ptr == ctx) break;
275                                 if (ptr == ctx) {
276                                         /* no open threads, context not handled, so
277                                          * back out of ltp_pending_count, free the context,
278                                          * report the error.
279                                          */
280                                         LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, ctx, 
281                                                 ldap_int_thread_ctx_s, ltc_next.q);
282                                         pool->ltp_pending_count++;
283                                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
284                                         LDAP_FREE(ctx);
285                                         return(-1);
286                                 }
287                         }
288                         /* there is another open thread, so this
289                          * context will be handled eventually.
290                          * continue on and signal that the context
291                          * is waiting.
292                          */
293                 }
294                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
295         }
296
297         return(0);
298 }
299
300 int
301 ldap_pvt_thread_pool_maxthreads ( ldap_pvt_thread_pool_t *tpool, int max_threads )
302 {
303         struct ldap_int_thread_pool_s *pool;
304
305         if (tpool == NULL)
306                 return(-1);
307
308         pool = *tpool;
309
310         if (pool == NULL)
311                 return(-1);
312
313         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
314         pool->ltp_max_count = max_threads;
315         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
316         return(0);
317 }
318
319 int
320 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
321 {
322         struct ldap_int_thread_pool_s *pool;
323         int count;
324
325         if (tpool == NULL)
326                 return(-1);
327
328         pool = *tpool;
329
330         if (pool == NULL)
331                 return(0);
332
333         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
334         count = pool->ltp_pending_count + pool->ltp_active_count;
335         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
336         return(count);
337 }
338
339 int
340 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
341 {
342         struct ldap_int_thread_pool_s *pool, *pptr;
343         long waiting;
344         ldap_int_thread_ctx_t *ctx;
345
346         if (tpool == NULL)
347                 return(-1);
348
349         pool = *tpool;
350
351         if (pool == NULL) return(-1);
352
353         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
354         LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
355                 if (pptr == pool) break;
356         if (pptr == pool)
357                 LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
358                         ldap_int_thread_pool_s, ltp_next);
359         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
360
361         if (pool != pptr) return(-1);
362
363         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
364         pool->ltp_state = run_pending
365                 ? LDAP_INT_THREAD_POOL_FINISHING
366                 : LDAP_INT_THREAD_POOL_STOPPING;
367
368         ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
369         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
370
371         do {
372                 ldap_pvt_thread_yield();
373                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
374                 waiting = pool->ltp_open_count;
375                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
376         } while (waiting > 0);
377
378         while ((ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
379         {
380                 LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
381                 LDAP_FREE(ctx);
382         }
383
384         while ((ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
385         {
386                 LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
387                 LDAP_FREE(ctx);
388         }
389
390         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
391         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
392         LDAP_FREE(pool);
393         return(0);
394 }
395
396 static void *
397 ldap_int_thread_pool_wrapper ( 
398         void *xpool )
399 {
400         struct ldap_int_thread_pool_s *pool = xpool;
401         ldap_int_thread_ctx_t *ctx;
402         ldap_int_thread_key_t ltc_key[MAXKEYS];
403         ldap_pvt_thread_t tid;
404         int i, keyslot, hash;
405
406         if (pool == NULL)
407                 return NULL;
408
409         for ( i=0; i<MAXKEYS; i++ ) {
410                 ltc_key[i].ltk_key = NULL;
411         }
412
413         tid = ldap_pvt_thread_self();
414
415         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
416
417         /* store pointer to our keys */
418         TID_HASH(tid, hash);
419         for (i = hash & (MAXTHREADS-1); !TID_EQ(thread_keys[i].id, tid);
420                                 i = (i+1) & (MAXTHREADS-1));
421         thread_keys[i].ctx = ltc_key;
422         keyslot = i;
423
424         while (pool->ltp_state != LDAP_INT_THREAD_POOL_STOPPING) {
425                 ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list);
426                 if (ctx) {
427                         LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
428                 } else {
429                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
430                                 break;
431                         if (pool->ltp_max_count > 0
432                                 && pool->ltp_open_count > pool->ltp_max_count)
433                         {
434                                 /* too many threads running (can happen if the
435                                  * maximum threads value is set during ongoing
436                                  * operation using ldap_pvt_thread_pool_maxthreads)
437                                  * so let this thread die.
438                                  */
439                                 break;
440                         }
441
442                         /* we could check an idle timer here, and let the
443                          * thread die if it has been inactive for a while.
444                          * only die if there are other open threads (i.e.,
445                          * always have at least one thread open).  the check
446                          * should be like this:
447                          *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
448                          *       check timer, leave thread (break;)
449                          */
450
451                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
452                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
453
454                         continue;
455                 }
456
457                 pool->ltp_pending_count--;
458
459                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_active_list, ctx, ltc_next.al);
460                 pool->ltp_active_count++;
461                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
462
463                 ctx->ltc_start_routine(ltc_key, ctx->ltc_arg);
464
465                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
466                 LDAP_SLIST_REMOVE(&pool->ltp_active_list, ctx,
467                         ldap_int_thread_ctx_s, ltc_next.al);
468                 LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, ctx, ltc_next.l);
469                 pool->ltp_active_count--;
470                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
471
472                 ldap_pvt_thread_yield();
473
474                 /* if we use an idle timer, here's
475                  * a good place to update it
476                  */
477
478                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
479         }
480
481         for ( i=0; i<MAXKEYS && ltc_key[i].ltk_key; i++ ) {
482                 if (ltc_key[i].ltk_free)
483                         ltc_key[i].ltk_free(
484                                 ltc_key[i].ltk_key,
485                                 ltc_key[i].ltk_data );
486         }
487
488         thread_keys[keyslot].ctx = NULL;
489         thread_keys[keyslot].id = tid_zero;
490
491         pool->ltp_open_count--;
492         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
493
494         ldap_pvt_thread_exit(NULL);
495         return(NULL);
496 }
497
498 int ldap_pvt_thread_pool_getkey(
499         void *xctx,
500         void *key,
501         void **data,
502         ldap_pvt_thread_pool_keyfree_t **kfree )
503 {
504         ldap_int_thread_key_t *ctx = xctx;
505         int i;
506
507         if ( !ctx || !data ) return EINVAL;
508
509         for ( i=0; i<MAXKEYS && ctx[i].ltk_key; i++ ) {
510                 if ( ctx[i].ltk_key == key ) {
511                         *data = ctx[i].ltk_data;
512                         if ( kfree ) *kfree = ctx[i].ltk_free;
513                         return 0;
514                 }
515         }
516         return ENOENT;
517 }
518
519 int ldap_pvt_thread_pool_setkey(
520         void *xctx,
521         void *key,
522         void *data,
523         ldap_pvt_thread_pool_keyfree_t *kfree )
524 {
525         ldap_int_thread_key_t *ctx = xctx;
526         int i;
527
528         if ( !ctx || !key ) return EINVAL;
529
530         for ( i=0; i<MAXKEYS; i++ ) {
531                 if ( !ctx[i].ltk_key || ctx[i].ltk_key == key ) {
532                         ctx[i].ltk_key = key;
533                         ctx[i].ltk_data = data;
534                         ctx[i].ltk_free = kfree;
535                         return 0;
536                 }
537         }
538         return ENOMEM;
539 }
540
541 /*
542  * This is necessary if the caller does not have access to the
543  * thread context handle (for example, a slapd plugin calling
544  * slapi_search_internal()). No doubt it is more efficient to
545  * for the application to keep track of the thread context
546  * handles itself.
547  */
548 void *ldap_pvt_thread_pool_context( )
549 {
550         ldap_pvt_thread_t tid;
551         int i, hash;
552
553         tid = ldap_pvt_thread_self();
554
555         TID_HASH( tid, hash );
556         for (i = hash & (MAXTHREADS-1); !TID_EQ(thread_keys[i].id, tid_zero) &&
557                 !TID_EQ(thread_keys[i].id, tid); i = (i+1) & (MAXTHREADS-1));
558
559         return thread_keys[i].ctx;
560 }
561
562 #endif /* LDAP_THREAD_HAVE_TPOOL */