]> git.sur5r.net Git - openldap/commitdiff
Add thread pool routines.
authorMark Valence <mrv@openldap.org>
Wed, 7 Jun 2000 19:21:45 +0000 (19:21 +0000)
committerMark Valence <mrv@openldap.org>
Wed, 7 Jun 2000 19:21:45 +0000 (19:21 +0000)
include/ldap_pvt_thread.h
libraries/libldap_r/threads.c

index 668f58a1eb689d93b36e40d8e26f273fb78a3765..7d818c4b09e46a907c63dae8aa49c4b60b978852 100644 (file)
@@ -148,6 +148,32 @@ ldap_pvt_thread_rdwr_active LDAP_P((ldap_pvt_thread_rdwr_t *rdwrp));
 #define LDAP_PVT_THREAD_EINVAL EINVAL
 #define LDAP_PVT_THREAD_EBUSY EINVAL
 
+
+typedef struct t_ldap_pvt_thread_pool *ldap_pvt_thread_pool_t;
+
+
+LIBLDAP_F( int )
+ldap_pvt_thread_pool_initialize LDAP_P((
+                                               ldap_pvt_thread_pool_t *pool_out,
+                                               int max_concurrency,
+                                               int max_pending ));
+
+LIBLDAP_F( int )
+ldap_pvt_thread_pool_submit LDAP_P((
+                                               ldap_pvt_thread_pool_t pool,
+                                               void *(*start_routine)( void * ),
+                                               void *arg ));
+
+LIBLDAP_F( int )
+ldap_pvt_thread_pool_backload LDAP_P((
+                                               ldap_pvt_thread_pool_t pool ));
+
+LIBLDAP_F( int )
+ldap_pvt_thread_pool_destroy LDAP_P((
+                                               ldap_pvt_thread_pool_t pool,
+                                               int run_pending ));
+
+
 LDAP_END_DECL
 
 #endif /* _LDAP_THREAD_H */
index 74ce67c2d39b2e56f25b21488e5842308633813f..a1650db3149613ef7d072ea7ae123c3e96245ce1 100644 (file)
 
 #include "portable.h"
 
+#include <ac/stdlib.h>
+#include <ac/string.h>
+
 #include "ldap_int_thread.h"
 #include "ldap_pvt_thread.h"
 
-LIBLDAP_F( int )
+enum {
+       LDAP_PVT_THREAD_POOL_RUNNING,
+       LDAP_PVT_THREAD_POOL_FINISHING,
+       LDAP_PVT_THREAD_POOL_STOPPING
+};
+
+typedef struct t_ldap_pvt_thread_listelement {
+       struct t_ldap_pvt_thread_listelement *next;
+} ldap_pvt_thread_listelement, *ldap_pvt_thread_list;
+
+struct t_ldap_pvt_thread_pool {
+       struct t_ldap_pvt_thread_pool *ltp_next;
+       ldap_pvt_thread_mutex_t ltp_mutex;
+       ldap_pvt_thread_cond_t ltp_cond;
+       ldap_pvt_thread_list ltp_pending_list;
+       long ltp_state;
+       long ltp_max_count;
+       long ltp_max_pending;
+       long ltp_pending_count;
+       long ltp_active_count;
+       long ltp_open_count;
+};
+
+typedef struct t_ldap_pvt_thread_ctx {
+       struct t_ldap_pvt_thread_ctx *ltc_next;
+       void *(*ltc_start_routine)( void *);
+       void *ltc_arg;
+} ldap_pvt_thread_ctx;
+
+ldap_pvt_thread_list ldap_pvt_thread_pool_list = NULL;
+ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
+
+void *ldap_pvt_thread_pool_wrapper( ldap_pvt_thread_pool_t pool );
+void *ldap_pvt_thread_enlist( ldap_pvt_thread_list *list, void *elem );
+void *ldap_pvt_thread_delist( ldap_pvt_thread_list *list, void *elem );
+void *ldap_pvt_thread_onlist( ldap_pvt_thread_list *list, void *elem );
+
+
+int
 ldap_pvt_thread_initialize ( void )
 {
-       return ldap_int_thread_initialize();
+       int rc;
+
+       rc = ldap_int_thread_initialize();
+       if (rc == 0) {
+               /* init the mutex that protext the list of pools
+                */
+               ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
+       }
+       return rc;
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_destroy ( void )
 {
+       while (ldap_pvt_thread_pool_list != NULL) {
+               ldap_pvt_thread_pool_destroy((ldap_pvt_thread_pool_t)ldap_pvt_thread_pool_list, 0);
+       }
+       ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
+
        return ldap_int_thread_destroy();
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_get_concurrency ( void )
 {
 #ifdef HAVE_GETCONCURRENCY
@@ -36,7 +90,7 @@ ldap_pvt_thread_get_concurrency ( void )
 #endif
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_set_concurrency ( int concurrency )
 {
 #ifdef HAVE_SETCONCURRENCY
@@ -46,7 +100,7 @@ ldap_pvt_thread_set_concurrency ( int concurrency )
 #endif
 }
 
-LIBLDAP_F( int ) 
+int 
 ldap_pvt_thread_create (
        ldap_pvt_thread_t * thread, 
        int     detach,
@@ -56,55 +110,55 @@ ldap_pvt_thread_create (
        return ldap_int_thread_create(thread, detach, start_routine, arg);
 }
 
-LIBLDAP_F( void ) 
+void
 ldap_pvt_thread_exit ( void *retval )
 {
        ldap_int_thread_exit(retval);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_join ( ldap_pvt_thread_t thread, void **status )
 {
        return ldap_int_thread_join(thread, status);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_kill ( ldap_pvt_thread_t thread, int signo )
 {
        return ldap_int_thread_kill(thread, signo);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_yield ( void )
 {
        return ldap_int_thread_yield();
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_cond_init ( ldap_pvt_thread_cond_t *cond )
 {
        return ldap_int_thread_cond_init(cond);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_cond_destroy ( ldap_pvt_thread_cond_t *cond )
 {
        return ldap_int_thread_cond_destroy(cond);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_cond_signal ( ldap_pvt_thread_cond_t *cond )
 {
        return ldap_int_thread_cond_signal(cond);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_cond_broadcast ( ldap_pvt_thread_cond_t *cond )
 {
        return ldap_int_thread_cond_broadcast(cond);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_cond_wait (
        ldap_pvt_thread_cond_t *cond, 
        ldap_pvt_thread_mutex_t *mutex )
@@ -112,32 +166,374 @@ ldap_pvt_thread_cond_wait (
        return ldap_int_thread_cond_wait(cond, mutex);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_mutex_init ( ldap_pvt_thread_mutex_t *mutex )
 {
        return ldap_int_thread_mutex_init(mutex);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_mutex_destroy ( ldap_pvt_thread_mutex_t *mutex )
 {
        return ldap_int_thread_mutex_destroy(mutex);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_mutex_lock ( ldap_pvt_thread_mutex_t *mutex )
 {
        return ldap_int_thread_mutex_lock(mutex);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_mutex_trylock ( ldap_pvt_thread_mutex_t *mutex )
 {
        return ldap_int_thread_mutex_trylock(mutex);
 }
 
-LIBLDAP_F( int )
+int
 ldap_pvt_thread_mutex_unlock ( ldap_pvt_thread_mutex_t *mutex )
 {
        return ldap_int_thread_mutex_unlock(mutex);
 }
+
+#ifdef NO_THREADS
+
+/* There must be a separate implementation when NO_THREADS is on.
+ * Since ldap_pvt_thread_pool_wrapper loops, there's no way to
+ * simply let the underlying (stub) thread implementation take
+ * care of things (unless there was an #ifdef that removed the
+ * "while" in ldap_pvt_thread_pool_wrapper, but why do all the
+ * extra work of init/submit/destroy when all that's needed
+ * are these stubs?)
+ */
+int
+ldap_pvt_thread_pool_startup ( void )
+{
+       return(0);
+}
+
+int
+ldap_pvt_thread_pool_shutdown ( void )
+{
+       return(0);
+}
+
+int
+ldap_pvt_thread_pool_initialize ( ldap_pvt_thread_pool_t *pool_out, int max_concurrency, int max_pending )
+{
+       *pool_out = NULL;
+       return(0);
+}
+
+int
+ldap_pvt_thread_pool_submit ( ldap_pvt_thread_pool_t pool, void *(*start_routine)( void * ), void *arg )
+{
+       (start_routine)(arg);
+       return(0);
+}
+
+int
+ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t pool )
+{
+       return(0);
+}
+
+int
+ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t pool, int run_pending )
+{
+       return(0);
+}
+
+#else
+
+int
+ldap_pvt_thread_pool_startup ( void )
+{
+       return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
+}
+
+int
+ldap_pvt_thread_pool_shutdown ( void )
+{
+       while (ldap_pvt_thread_pool_list != NULL) {
+               ldap_pvt_thread_pool_destroy((ldap_pvt_thread_pool_t)ldap_pvt_thread_pool_list, 0);
+       }
+       ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
+       return(0);
+}
+
+int
+ldap_pvt_thread_pool_initialize ( ldap_pvt_thread_pool_t *pool_out, int max_concurrency, int max_pending )
+{
+       ldap_pvt_thread_pool_t pool;
+       ldap_pvt_thread_t thr;
+
+       *pool_out = NULL;
+       pool = (ldap_pvt_thread_pool_t)calloc(1, sizeof(struct t_ldap_pvt_thread_pool));
+       if (pool == NULL)
+               return(-1);
+
+       ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
+       ldap_pvt_thread_cond_init(&pool->ltp_cond);
+       pool->ltp_state = LDAP_PVT_THREAD_POOL_RUNNING;
+       pool->ltp_max_count = max_concurrency;
+       pool->ltp_max_pending = max_pending;
+       ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
+       ldap_pvt_thread_enlist(&ldap_pvt_thread_pool_list, pool);
+       ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
+
+       /* start up one thread, just so there is one */
+       pool->ltp_open_count++;
+       if (ldap_pvt_thread_create( &thr, 1, (void *)ldap_pvt_thread_pool_wrapper, pool ) != 0) {
+               /* couldn't start one?  then don't start any */
+               ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
+               ldap_pvt_thread_delist(&ldap_pvt_thread_pool_list, pool);
+               ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
+               ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
+               ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
+               free(pool);
+               return(-1);
+       }
+
+       *pool_out = pool;
+       return(0);
+}
+
+int
+ldap_pvt_thread_pool_submit ( ldap_pvt_thread_pool_t pool, void *(*start_routine)( void * ), void *arg )
+{
+       ldap_pvt_thread_ctx *ctx;
+       int need_thread = 0;
+       ldap_pvt_thread_t thr;
+
+       if (pool == NULL)
+               return(-1);
+
+       ctx = (ldap_pvt_thread_ctx *)calloc(1, sizeof(ldap_pvt_thread_ctx));
+       if (ctx == NULL)
+               return(-1);
+
+       ctx->ltc_start_routine = start_routine;
+       ctx->ltc_arg = arg;
+
+       ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+       if (pool->ltp_state != LDAP_PVT_THREAD_POOL_RUNNING
+               || (pool->ltp_max_pending > 0 && pool->ltp_pending_count >= pool->ltp_max_pending))
+       {
+               ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+               free(ctx);
+               return(-1);
+       }
+       pool->ltp_pending_count++;
+       ldap_pvt_thread_enlist(&pool->ltp_pending_list, ctx);
+       if ((pool->ltp_open_count <= 0 || pool->ltp_open_count == pool->ltp_active_count)
+               && (pool->ltp_max_count <= 0 || pool->ltp_open_count < pool->ltp_max_count))
+       {
+               pool->ltp_open_count++;
+               need_thread = 1;
+       }
+       ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+
+       if (need_thread) {
+               if (ldap_pvt_thread_create( &thr, 1, (void *)ldap_pvt_thread_pool_wrapper, pool ) != 0) {
+                       /* couldn't create thread.  back out of
+                        * ltp_open_count and check for even worse things.
+                        */
+                       ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+                       pool->ltp_open_count--;
+                       if (pool->ltp_open_count == 0) {
+                               /* no open threads at all?!?  this will never happen
+                                * because we always leave at least one thread open.
+                                */
+                               if (ldap_pvt_thread_delist(&pool->ltp_pending_list, ctx)) {
+                                       /* no open threads, context not handled, so
+                                        * back out of ltp_pending_count, free the context,
+                                        * report the error.
+                                        */
+                                       pool->ltp_pending_count++;
+                                       ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+                                       free(ctx);
+                                       return(-1);
+                               }
+                       }
+                       ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+                       /* there is another open thread, so this
+                        * context will be handled eventually.
+                        * continue on and signal that the context
+                        * is waiting.
+                        */
+               }
+       }
+
+       ldap_pvt_thread_cond_signal(&pool->ltp_cond);
+       return(0);
+}
+
+int
+ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t pool )
+{
+       int count;
+
+       if (pool == NULL)
+               return(0);
+
+       ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+       count = pool->ltp_pending_count + pool->ltp_active_count;
+       ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+       return(count);
+}
+
+int
+ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t pool, int run_pending )
+{
+       long waiting;
+       ldap_pvt_thread_ctx *ctx;
+
+       if (pool == NULL)
+               return(-1);
+
+       ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
+       pool = ldap_pvt_thread_delist(&ldap_pvt_thread_pool_list, pool);
+       ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
+
+       if (pool == NULL)
+               return(-1);
+
+       ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+       if (run_pending)
+               pool->ltp_state = LDAP_PVT_THREAD_POOL_FINISHING;
+       else
+               pool->ltp_state = LDAP_PVT_THREAD_POOL_STOPPING;
+       waiting = pool->ltp_open_count;
+       ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+
+       /* broadcast could be used here, but only after
+        * it is fixed in the NT thread implementation
+        */
+       while (--waiting >= 0)
+               ldap_pvt_thread_cond_signal(&pool->ltp_cond);
+       do {
+               ldap_pvt_thread_yield();
+               ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+               waiting = pool->ltp_open_count;
+               ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+       } while (waiting > 0);
+
+       while (ctx = (ldap_pvt_thread_ctx *)ldap_pvt_thread_delist(&pool->ltp_pending_list, NULL))
+               free(ctx);
+
+       ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
+       ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
+       free(pool);
+       return(0);
+}
+
+void *
+ldap_pvt_thread_pool_wrapper ( ldap_pvt_thread_pool_t pool )
+{
+       ldap_pvt_thread_ctx *ctx;
+
+       ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+
+       while (pool->ltp_state != LDAP_PVT_THREAD_POOL_STOPPING) {
+
+               if (pool->ltp_state == LDAP_PVT_THREAD_POOL_RUNNING) {
+                       ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
+                       if (pool->ltp_state == LDAP_PVT_THREAD_POOL_STOPPING)
+                               break;
+               }
+
+               ctx = ldap_pvt_thread_delist(&pool->ltp_pending_list, NULL);
+               if (ctx == NULL) {
+                       if (pool->ltp_state == LDAP_PVT_THREAD_POOL_FINISHING)
+                               break;
+                       /* we could check an idle timer here, and let the
+                        * thread die if it has been inactive for a while.
+                        * only die if there are other open threads (i.e.,
+                        * always have at least one thread open).
+                        */
+                       continue;
+               }
+
+               pool->ltp_pending_count--;
+               pool->ltp_active_count++;
+               ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+
+               (ctx->ltc_start_routine)(ctx->ltc_arg);
+               free(ctx);
+
+               /* if we use an idle timer, here's
+                * a good place to update it
+                */
+
+               ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+               pool->ltp_active_count--;
+       }
+
+       pool->ltp_open_count--;
+       ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+
+       ldap_pvt_thread_exit(NULL);
+       return(NULL);
+}
+
+void *
+ldap_pvt_thread_enlist( ldap_pvt_thread_list *list, void *elem )
+{
+       ldap_pvt_thread_listelement *prev;
+
+       if (elem == NULL)
+               return(NULL);
+
+       ((ldap_pvt_thread_listelement *)elem)->next = NULL;
+       if (*list == NULL) {
+               *list = elem;
+               return(elem);
+       }
+
+       for (prev = *list ; prev->next != NULL; prev = prev->next) ;
+       prev->next = elem;
+       return(elem);
+}
+
+void *
+ldap_pvt_thread_delist( ldap_pvt_thread_list *list, void *elem )
+{
+       ldap_pvt_thread_listelement *prev;
+
+       if (*list == NULL)
+               return(NULL);
+
+       if (elem == NULL)
+               elem = *list;
+
+       if (*list == elem) {
+               *list = ((ldap_pvt_thread_listelement *)elem)->next;
+               return(elem);
+       }
+
+       for (prev = *list ; prev->next != NULL; prev = prev->next) {
+               if (prev->next == elem) {
+                       prev->next = ((ldap_pvt_thread_listelement *)elem)->next;
+                       return(elem);
+               }
+       }
+       return(NULL);
+}
+
+void *
+ldap_pvt_thread_onlist( ldap_pvt_thread_list *list, void *elem )
+{
+       ldap_pvt_thread_listelement *prev;
+
+       if (elem == NULL || *list == NULL)
+               return(NULL);
+
+       for (prev = *list ; prev != NULL; prev = prev->next) {
+               if (prev == elem)
+                       return(elem);
+       }
+
+       return(NULL);
+}
+#endif /* NO_THREADS */