]> git.sur5r.net Git - openldap/blob - libraries/libldap_r/tpool.c
Add filter.c
[openldap] / libraries / libldap_r / tpool.c
1 /* $OpenLDAP$ */
2 /*
3  * Copyright 1998-2000 The OpenLDAP Foundation, Redwood City, California, USA
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms are permitted only
7  * as authorized by the OpenLDAP Public License.  A copy of this
8  * license is available at http://www.OpenLDAP.org/license.html or
9  * in file LICENSE in the top-level directory of the distribution.
10  */
11
12 #include "portable.h"
13
14 #include <stdio.h>
15
16 #include <ac/stdarg.h>
17 #include <ac/stdlib.h>
18 #include <ac/string.h>
19 #include <ac/time.h>
20 #include <ac/queue.h>
21
22 #include "ldap-int.h"
23 #include "ldap_pvt_thread.h"
24
25
26 #ifndef LDAP_THREAD_HAVE_TPOOL
27
28 enum ldap_int_thread_pool_state {
29         LDAP_INT_THREAD_POOL_RUNNING,
30         LDAP_INT_THREAD_POOL_FINISHING,
31         LDAP_INT_THREAD_POOL_STOPPING
32 };
33
34 typedef struct ldap_int_thread_ctx_s {
35         union {
36         STAILQ_ENTRY(ldap_int_thread_ctx_s) q;
37         SLIST_ENTRY(ldap_int_thread_ctx_s) l;
38         } ltc_next;
39         void *(*ltc_start_routine)( void *);
40         void *ltc_arg;
41 } ldap_int_thread_ctx_t;
42
43 struct ldap_int_thread_pool_s {
44         STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
45         ldap_pvt_thread_mutex_t ltp_mutex;
46         ldap_pvt_thread_cond_t ltp_cond;
47         STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
48         SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
49         long ltp_state;
50         long ltp_max_count;
51         long ltp_max_pending;
52         long ltp_pending_count;
53         long ltp_active_count;
54         long ltp_open_count;
55         long ltp_starting;
56 };
57
58 static STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
59         ldap_int_thread_pool_list =
60         STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
61
62 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
63
64 static void *ldap_int_thread_pool_wrapper(
65         struct ldap_int_thread_pool_s *pool );
66
67 int
68 ldap_int_thread_pool_startup ( void )
69 {
70         return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
71 }
72
73 int
74 ldap_int_thread_pool_shutdown ( void )
75 {
76         struct ldap_int_thread_pool_s *pool;
77
78         while ((pool = STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
79                 STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
80                 ldap_pvt_thread_pool_destroy( &pool, 0);
81         }
82         ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
83         return(0);
84 }
85
86 int
87 ldap_pvt_thread_pool_init (
88         ldap_pvt_thread_pool_t *tpool,
89         int max_threads,
90         int max_pending )
91 {
92         ldap_pvt_thread_pool_t pool;
93         int rc;
94
95         *tpool = NULL;
96         pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
97                 sizeof(struct ldap_int_thread_pool_s));
98
99         if (pool == NULL) return(-1);
100
101         rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
102         if (rc != 0)
103                 return(rc);
104         rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
105         if (rc != 0)
106                 return(rc);
107         pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
108         pool->ltp_max_count = max_threads;
109         pool->ltp_max_pending = max_pending;
110         STAILQ_INIT(&pool->ltp_pending_list);
111         SLIST_INIT(&pool->ltp_free_list);
112         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
113         STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
114         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
115
116 #if 0
117         /* THIS WILL NOT WORK on some systems.  If the process
118          * forks after starting a thread, there is no guarantee
119          * that the thread will survive the fork.  For example,
120          * slapd forks in order to daemonize, and does so after
121          * calling ldap_pvt_thread_pool_init.  On some systems,
122          * this initial thread does not run in the child process,
123          * but ltp_open_count == 1, so two things happen: 
124          * 1) the first client connection fails, and 2) when
125          * slapd is kill'ed, it never terminates since it waits
126          * for all worker threads to exit. */
127
128         /* start up one thread, just so there is one. no need to
129          * lock the mutex right now, since no threads are running.
130          */
131         pool->ltp_open_count++;
132
133         ldap_pvt_thread_t thr;
134         rc = ldap_pvt_thread_create( &thr, 1,
135                 (void *) ldap_int_thread_pool_wrapper, pool );
136
137         if( rc != 0) {
138                 /* couldn't start one?  then don't start any */
139                 ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
140                 STAILQ_REMOVE(ldap_int_thread_pool_list, pool, 
141                         ldap_int_thread_pool_s, ltp_next);
142                 ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
143                 ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
144                 ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
145                 free(pool);
146                 return(-1);
147         }
148 #endif
149
150         *tpool = pool;
151         return(0);
152 }
153
154 int
155 ldap_pvt_thread_pool_submit (
156         ldap_pvt_thread_pool_t *tpool,
157         void *(*start_routine)( void * ), void *arg )
158 {
159         struct ldap_int_thread_pool_s *pool;
160         ldap_int_thread_ctx_t *ctx;
161         int need_thread = 0;
162         ldap_pvt_thread_t thr;
163
164         if (tpool == NULL)
165                 return(-1);
166
167         pool = *tpool;
168
169         if (pool == NULL)
170                 return(-1);
171
172         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
173         if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
174                 || (pool->ltp_max_pending > 0
175                         && pool->ltp_pending_count >= pool->ltp_max_pending))
176         {
177                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
178                 return(-1);
179         }
180         ctx = SLIST_FIRST(&pool->ltp_free_list);
181         if (ctx) {
182                 SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
183         } else {
184                 ctx = (ldap_int_thread_ctx_t *) LDAP_MALLOC(
185                         sizeof(ldap_int_thread_ctx_t));
186                 if (ctx == NULL) {
187                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
188                         return(-1);
189                 }
190         }
191
192         ctx->ltc_start_routine = start_routine;
193         ctx->ltc_arg = arg;
194
195         pool->ltp_pending_count++;
196         STAILQ_INSERT_TAIL(&pool->ltp_pending_list, ctx, ltc_next.q);
197         ldap_pvt_thread_cond_signal(&pool->ltp_cond);
198         if ((pool->ltp_open_count <= 0
199                         || pool->ltp_pending_count > 1
200                         || pool->ltp_open_count == pool->ltp_active_count)
201                 && (pool->ltp_max_count <= 0
202                         || pool->ltp_open_count < pool->ltp_max_count))
203         {
204                 pool->ltp_open_count++;
205                 pool->ltp_starting++;
206                 need_thread = 1;
207         }
208         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
209
210         if (need_thread) {
211                 int rc = ldap_pvt_thread_create( &thr, 1,
212                         (void *)ldap_int_thread_pool_wrapper, pool );
213                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
214                 if (rc == 0) {
215                         pool->ltp_starting--;
216                 } else {
217                         /* couldn't create thread.  back out of
218                          * ltp_open_count and check for even worse things.
219                          */
220                         pool->ltp_open_count--;
221                         pool->ltp_starting--;
222                         if (pool->ltp_open_count == 0) {
223                                 /* no open threads at all?!?
224                                  */
225                                 ldap_int_thread_ctx_t *ptr;
226                                 STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltc_next.q)
227                                         if (ptr == ctx) break;
228                                 if (ptr == ctx) {
229                                         /* no open threads, context not handled, so
230                                          * back out of ltp_pending_count, free the context,
231                                          * report the error.
232                                          */
233                                         STAILQ_REMOVE(&pool->ltp_pending_list, ctx, 
234                                                 ldap_int_thread_ctx_s, ltc_next.q);
235                                         pool->ltp_pending_count++;
236                                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
237                                         free(ctx);
238                                         return(-1);
239                                 }
240                         }
241                         /* there is another open thread, so this
242                          * context will be handled eventually.
243                          * continue on and signal that the context
244                          * is waiting.
245                          */
246                 }
247                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
248         }
249
250         return(0);
251 }
252
253 int
254 ldap_pvt_thread_pool_maxthreads ( ldap_pvt_thread_pool_t *tpool, int max_threads )
255 {
256         struct ldap_int_thread_pool_s *pool;
257
258         if (tpool == NULL)
259                 return(-1);
260
261         pool = *tpool;
262
263         if (pool == NULL)
264                 return(-1);
265
266         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
267         pool->ltp_max_count = max_threads;
268         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
269         return(0);
270 }
271
272 int
273 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
274 {
275         struct ldap_int_thread_pool_s *pool;
276         int count;
277
278         if (tpool == NULL)
279                 return(-1);
280
281         pool = *tpool;
282
283         if (pool == NULL)
284                 return(0);
285
286         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
287         count = pool->ltp_pending_count + pool->ltp_active_count;
288         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
289         return(count);
290 }
291
292 int
293 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
294 {
295         struct ldap_int_thread_pool_s *pool, *pptr;
296         long waiting;
297         ldap_int_thread_ctx_t *ctx;
298
299         if (tpool == NULL)
300                 return(-1);
301
302         pool = *tpool;
303
304         if (pool == NULL) return(-1);
305
306         ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
307         STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
308                 if (pptr == pool) break;
309         if (pptr == pool)
310                 STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
311                         ldap_int_thread_pool_s, ltp_next);
312         ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
313
314         if (pool != pptr) return(-1);
315
316         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
317         pool->ltp_state = run_pending
318                 ? LDAP_INT_THREAD_POOL_FINISHING
319                 : LDAP_INT_THREAD_POOL_STOPPING;
320         waiting = pool->ltp_open_count;
321
322         /* broadcast could be used here, but only after
323          * it is fixed in the NT thread implementation
324          */
325         while (--waiting >= 0) {
326                 ldap_pvt_thread_cond_signal(&pool->ltp_cond);
327         }
328         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
329
330         do {
331                 ldap_pvt_thread_yield();
332                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
333                 waiting = pool->ltp_open_count;
334                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
335         } while (waiting > 0);
336
337         while ((ctx = STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
338         {
339                 STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
340                 free(ctx);
341         }
342
343         while ((ctx = SLIST_FIRST(&pool->ltp_free_list)) != NULL)
344         {
345                 SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
346                 free(ctx);
347         }
348
349         ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
350         ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
351         free(pool);
352         return(0);
353 }
354
355 static void *
356 ldap_int_thread_pool_wrapper ( 
357         struct ldap_int_thread_pool_s *pool )
358 {
359         ldap_int_thread_ctx_t *ctx;
360
361         if (pool == NULL)
362                 return NULL;
363
364         ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
365
366         while (pool->ltp_state != LDAP_INT_THREAD_POOL_STOPPING) {
367                 ctx = STAILQ_FIRST(&pool->ltp_pending_list);
368                 if (ctx) {
369                         STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
370                 } else {
371                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
372                                 break;
373                         if (pool->ltp_max_count > 0
374                                 && pool->ltp_open_count > pool->ltp_max_count)
375                         {
376                                 /* too many threads running (can happen if the
377                                  * maximum threads value is set during ongoing
378                                  * operation using ldap_pvt_thread_pool_maxthreads)
379                                  * so let this thread die.
380                                  */
381                                 break;
382                         }
383
384                         /* we could check an idle timer here, and let the
385                          * thread die if it has been inactive for a while.
386                          * only die if there are other open threads (i.e.,
387                          * always have at least one thread open).  the check
388                          * should be like this:
389                          *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
390                          *       check timer, leave thread (break;)
391                          */
392
393                         if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
394                                 ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
395
396                         continue;
397                 }
398
399                 pool->ltp_pending_count--;
400                 pool->ltp_active_count++;
401                 ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
402
403                 (ctx->ltc_start_routine)(ctx->ltc_arg);
404                 SLIST_INSERT_HEAD(&pool->ltp_free_list, ctx, ltc_next.l);
405                 ldap_pvt_thread_yield();
406
407                 /* if we use an idle timer, here's
408                  * a good place to update it
409                  */
410
411                 ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
412                 pool->ltp_active_count--;
413         }
414
415         pool->ltp_open_count--;
416         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
417
418         ldap_pvt_thread_exit(NULL);
419         return(NULL);
420 }
421 #endif /* LDAP_HAVE_THREAD_POOL */