]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
Use queue-compat.h for queue management. Cache ctx structures on a free
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /*
3  * Copyright 1998-2000 The OpenLDAP Foundation, Redwood City, California, USA
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms are permitted only
7  * as authorized by the OpenLDAP Public License.  A copy of this
8  * license is available at http://www.OpenLDAP.org/license.html or
9  * in file LICENSE in the top-level directory of the distribution.
10  */
11
12 #include "portable.h"
13
14 #include <stdio.h>
15
16 #include <ac/stdarg.h>
17 #include <ac/stdlib.h>
18 #include <ac/string.h>
19 #include <ac/time.h>
20
21 #include "ldap-int.h"
22 #include "ldap_pvt_thread.h"
23 #include "queue-compat.h"
24
25 #ifndef LDAP_THREAD_HAVE_TPOOL
26
27 enum ldap_int_thread_pool_state {
28         LDAP_INT_THREAD_POOL_RUNNING,
29         LDAP_INT_THREAD_POOL_FINISHING,
30         LDAP_INT_THREAD_POOL_STOPPING
31 };
32
33 typedef struct ldap_int_thread_ctx_s {
34         union {
35         SLIST_ENTRY(ldap_int_thread_ctx_s) l;
36         STAILQ_ENTRY(ldap_int_thread_ctx_s) q;
37         } ltc_next;
38         void *(*ltc_start_routine)( void *);
39         void *ltc_arg;
40 } ldap_int_thread_ctx_t;
41
42 struct ldap_int_thread_pool_s {
43         STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
44         ldap_pvt_thread_mutex_t ltp_mutex;
45         ldap_pvt_thread_cond_t ltp_cond;
46         STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
47         long ltp_state;
48         long ltp_max_count;
49         long ltp_max_pending;
50         long ltp_pending_count;
51         long ltp_active_count;
52         long ltp_open_count;
53         long ltp_starting;
54 };
55
56 static STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
57         ldap_int_thread_pool_list =
58         STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
59
60 static SLIST_HEAD(tcl, ldap_int_thread_ctx_s)
61         ldap_int_ctx_free_list = 
62         SLIST_HEAD_INITIALIZER(ldap_int_ctx_free_list);
63
64 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
65 static ldap_pvt_thread_mutex_t ldap_pvt_ctx_free_mutex;
66
67 static void *ldap_int_thread_pool_wrapper(
68         struct ldap_int_thread_pool_s *pool );
69
70 int
71 ldap_int_thread_pool_startup ( void )
72 {
73         int rc = ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
74         if (rc == 0)
75                 rc = ldap_pvt_thread_mutex_init(&ldap_pvt_ctx_free_mutex);
76         return rc;
77 }
78
79 int
80 ldap_int_thread_pool_shutdown ( void )
81 {
82         ldap_int_thread_ctx_t *ctx;
83         struct ldap_int_thread_pool_s *pool;
84
85         while ((pool = STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
86                 STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
87                 ldap_pvt_thread_pool_destroy( &pool, 0);
88         }
89         while ((ctx = SLIST_FIRST(&ldap_int_ctx_free_list))) {
90                 SLIST_REMOVE_HEAD(&ldap_int_ctx_free_list, ltc_next.l);
91                 free(ctx);
92         }
93         ldap_pvt_thread_mutex_destroy(&ldap_pvt_ctx_free_mutex);
94         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
95         return(0);
96 }
97
98 int
99 ldap_pvt_thread_pool_init (
100         ldap_pvt_thread_pool_t *tpool,
101         int max_threads,
102         int max_pending )
103 {
104         ldap_pvt_thread_pool_t pool;
105         int rc;
106
107         *tpool = NULL;
108         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
109                 sizeof(struct ldap_int_thread_pool_s));
110
111         if (pool == NULL) return(-1);
112
113         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
114         if (rc != 0)
115                 return(rc);
116         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
117         if (rc != 0)
118                 return(rc);
119         pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
120         pool->ltp_max_count = max_threads;
121         pool->ltp_max_pending = max_pending;
122         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
123         if (STAILQ_EMPTY(&ldap_int_thread_pool_list)) {
124                 STAILQ_INSERT_HEAD(&ldap_int_thread_pool_list, pool, ltp_next);
125         } else {
126                 STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
127         }
128         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
129
130 #if 0
131         /* THIS WILL NOT WORK on some systems.  If the process
132          * forks after starting a thread, there is no guarantee
133          * that the thread will survive the fork.  For example,
134          * slapd forks in order to daemonize, and does so after
135          * calling ldap_pvt_thread_pool_init.  On some systems,
136          * this initial thread does not run in the child process,
137          * but ltp_open_count == 1, so two things happen: 
138          * 1) the first client connection fails, and 2) when
139          * slapd is kill'ed, it never terminates since it waits
140          * for all worker threads to exit. */
141
142         /* start up one thread, just so there is one. no need to
143          * lock the mutex right now, since no threads are running.
144          */
145         pool->ltp_open_count++;
146
147         ldap_pvt_thread_t thr;
148         rc = ldap_pvt_thread_create( &thr, 1,
149                 (void *) ldap_int_thread_pool_wrapper, pool );
150
151         if( rc != 0) {
152                 /* couldn't start one?  then don't start any */
153                 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
154                 STAILQ_REMOVE(ldap_int_thread_pool_list, pool, 
155                         ldap_int_thread_element_s, ltp_next);
156                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
157                 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
158                 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
159                 free(pool);
160                 return(-1);
161         }
162 #endif
163
164         *tpool = pool;
165         return(0);
166 }
167
168 int
169 ldap_pvt_thread_pool_submit (
170         ldap_pvt_thread_pool_t *tpool,
171         void *(*start_routine)( void * ), void *arg )
172 {
173         struct ldap_int_thread_pool_s *pool;
174         ldap_int_thread_ctx_t *ctx;
175         int need_thread = 0;
176         ldap_pvt_thread_t thr;
177
178         if (tpool == NULL)
179                 return(-1);
180
181         pool = *tpool;
182
183         if (pool == NULL)
184                 return(-1);
185
186         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
187         if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
188                 || (pool->ltp_max_pending > 0
189                         && pool->ltp_pending_count >= pool->ltp_max_pending))
190         {
191                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
192                 return(-1);
193         }
194         ldap_pvt_thread_mutex_lock(&ldap_pvt_ctx_free_mutex);
195         ctx = SLIST_FIRST(&ldap_int_ctx_free_list);
196         if (ctx) {
197                 SLIST_REMOVE_HEAD(&ldap_int_ctx_free_list, ltc_next.l);
198                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_ctx_free_mutex);
199         } else {
200                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_ctx_free_mutex);
201                 ctx = (ldap_int_thread_ctx_t *) LDAP_MALLOC(
202                         sizeof(ldap_int_thread_ctx_t));
203                 if (ctx == NULL) {
204                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
205                         return(-1);
206                 }
207         }
208
209         ctx->ltc_start_routine = start_routine;
210         ctx->ltc_arg = arg;
211
212         pool->ltp_pending_count++;
213         if (STAILQ_EMPTY(&pool->ltp_pending_list)) {
214                 STAILQ_INSERT_HEAD(&pool->ltp_pending_list, ctx, ltc_next.q);
215         } else {
216                 STAILQ_INSERT_TAIL(&pool->ltp_pending_list, ctx, ltc_next.q);
217         }
218         ldap_pvt_thread_cond_signal(&pool->ltp_cond);
219         if ((pool->ltp_open_count <= 0
220                         || pool->ltp_pending_count > 1
221                         || pool->ltp_open_count == pool->ltp_active_count)
222                 && (pool->ltp_max_count <= 0
223                         || pool->ltp_open_count < pool->ltp_max_count))
224         {
225                 pool->ltp_open_count++;
226                 pool->ltp_starting++;
227                 need_thread = 1;
228         }
229         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
230
231         if (need_thread) {
232                 int rc = ldap_pvt_thread_create( &thr, 1,
233                         (void *)ldap_int_thread_pool_wrapper, pool );
234                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
235                 if (rc == 0) {
236                         pool->ltp_starting--;
237                 } else {
238                         /* couldn't create thread.  back out of
239                          * ltp_open_count and check for even worse things.
240                          */
241                         pool->ltp_open_count--;
242                         pool->ltp_starting--;
243                         if (pool->ltp_open_count == 0) {
244                                 /* no open threads at all?!?
245                                  */
246                                 ldap_int_thread_ctx_t *ptr;
247                                 STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltc_next.q)
248                                         if (ptr == ctx) break;
249                                 if (ptr == ctx) {
250                                         /* no open threads, context not handled, so
251                                          * back out of ltp_pending_count, free the context,
252                                          * report the error.
253                                          */
254                                         STAILQ_REMOVE(&pool->ltp_pending_list, ctx, 
255                                                 ldap_int_thread_ctx_s, ltc_next.q);
256                                         pool->ltp_pending_count++;
257                                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
258                                         free(ctx);
259                                         return(-1);
260                                 }
261                         }
262                         /* there is another open thread, so this
263                          * context will be handled eventually.
264                          * continue on and signal that the context
265                          * is waiting.
266                          */
267                 }
268                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
269         }
270
271         return(0);
272 }
273
274 int
275 ldap_pvt_thread_pool_maxthreads ( ldap_pvt_thread_pool_t *tpool, int max_threads )
276 {
277         struct ldap_int_thread_pool_s *pool;
278
279         if (tpool == NULL)
280                 return(-1);
281
282         pool = *tpool;
283
284         if (pool == NULL)
285                 return(-1);
286
287         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
288         pool->ltp_max_count = max_threads;
289         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
290         return(0);
291 }
292
293 int
294 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
295 {
296         struct ldap_int_thread_pool_s *pool;
297         int count;
298
299         if (tpool == NULL)
300                 return(-1);
301
302         pool = *tpool;
303
304         if (pool == NULL)
305                 return(0);
306
307         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
308         count = pool->ltp_pending_count + pool->ltp_active_count;
309         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
310         return(count);
311 }
312
313 int
314 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
315 {
316         struct ldap_int_thread_pool_s *pool, *pptr;
317         long waiting;
318         ldap_int_thread_ctx_t *ctx;
319
320         if (tpool == NULL)
321                 return(-1);
322
323         pool = *tpool;
324
325         if (pool == NULL) return(-1);
326
327         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
328         STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
329                 if (pptr == pool) break;
330         if (pptr == pool)
331                 STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
332                         ldap_int_thread_pool_s, ltp_next);
333         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
334
335         if (pool != pptr) return(-1);
336
337         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
338         pool->ltp_state = run_pending
339                 ? LDAP_INT_THREAD_POOL_FINISHING
340                 : LDAP_INT_THREAD_POOL_STOPPING;
341         waiting = pool->ltp_open_count;
342
343         /* broadcast could be used here, but only after
344          * it is fixed in the NT thread implementation
345          */
346         while (--waiting >= 0) {
347                 ldap_pvt_thread_cond_signal(&pool->ltp_cond);
348         }
349         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
350
351         do {
352                 ldap_pvt_thread_yield();
353                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
354                 waiting = pool->ltp_open_count;
355                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
356         } while (waiting > 0);
357
358         while ((ctx = STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
359         {
360                 STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
361                 free(ctx);
362         }
363
364         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
365         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
366         free(pool);
367         return(0);
368 }
369
370 static void *
371 ldap_int_thread_pool_wrapper ( 
372         struct ldap_int_thread_pool_s *pool )
373 {
374         ldap_int_thread_ctx_t *ctx;
375
376         if (pool == NULL)
377                 return NULL;
378
379         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
380
381         while (pool->ltp_state != LDAP_INT_THREAD_POOL_STOPPING) {
382                 ctx = STAILQ_FIRST(&pool->ltp_pending_list);
383                 if (ctx) {
384                         STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
385                 } else {
386                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
387                                 break;
388                         if (pool->ltp_max_count > 0
389                                 && pool->ltp_open_count > pool->ltp_max_count)
390                         {
391                                 /* too many threads running (can happen if the
392                                  * maximum threads value is set during ongoing
393                                  * operation using ldap_pvt_thread_pool_maxthreads)
394                                  * so let this thread die.
395                                  */
396                                 break;
397                         }
398
399                         /* we could check an idle timer here, and let the
400                          * thread die if it has been inactive for a while.
401                          * only die if there are other open threads (i.e.,
402                          * always have at least one thread open).  the check
403                          * should be like this:
404                          *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
405                          *       check timer, leave thread (break;)
406                          */
407
408                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
409                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
410
411                         continue;
412                 }
413
414                 pool->ltp_pending_count--;
415                 pool->ltp_active_count++;
416                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
417
418                 (ctx->ltc_start_routine)(ctx->ltc_arg);
419                 ldap_pvt_thread_mutex_lock(&ldap_pvt_ctx_free_mutex);
420                 SLIST_INSERT_HEAD(&ldap_int_ctx_free_list, ctx, ltc_next.l);
421                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_ctx_free_mutex);
422                 ldap_pvt_thread_yield();
423
424                 /* if we use an idle timer, here's
425                  * a good place to update it
426                  */
427
428                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
429                 pool->ltp_active_count--;
430         }
431
432         pool->ltp_open_count--;
433         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
434
435         ldap_pvt_thread_exit(NULL);
436         return(NULL);
437 }
438 #endif /* LDAP_HAVE_THREAD_POOL */