]> git.sur5r.net Git - openldap/blobdiff - libraries/libldap_r/tpool.c
ITS#3950 delete gratuitous yield. (could use pthread_cond_timedwait
[openldap] / libraries / libldap_r / tpool.c
index 675fa4d0b46e5f1319ad86b7ea6651cc52d99b52..e23f9ad09cf8b252c31e89308af11490431b2a41 100644 (file)
@@ -1,7 +1,7 @@
 /* $OpenLDAP$ */
 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
  *
- * Copyright 1998-2005 The OpenLDAP Foundation.
+ * Copyright 1998-2006 The OpenLDAP Foundation.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
 #include <ac/errno.h>
 
 #include "ldap-int.h"
-#include "ldap_pvt_thread.h"
+#include "ldap_pvt_thread.h" /* Get the thread interface */
 #include "ldap_queue.h"
+#define LDAP_THREAD_POOL_IMPLEMENTATION
+#include "ldap_thr_debug.h"  /* May rename symbols defined below */
 
 #ifndef LDAP_THREAD_HAVE_TPOOL
 
-enum ldap_int_thread_pool_state {
+typedef enum ldap_int_thread_pool_state_e {
        LDAP_INT_THREAD_POOL_RUNNING,
        LDAP_INT_THREAD_POOL_FINISHING,
        LDAP_INT_THREAD_POOL_STOPPING,
        LDAP_INT_THREAD_POOL_PAUSING
-};
+} ldap_int_thread_pool_state_t;
 
 typedef struct ldap_int_thread_key_s {
        void *ltk_key;
@@ -50,11 +52,6 @@ typedef struct ldap_int_thread_key_s {
 
 static ldap_pvt_thread_t tid_zero;
 
-#ifdef HAVE_PTHREADS
-#define        TID_EQ(a,b)     pthread_equal((a),(b))
-#else
-#define        TID_EQ(a,b)     ((a) == (b))
-#endif
 static struct {
        ldap_pvt_thread_t id;
        ldap_int_thread_key_t *ctx;
@@ -79,7 +76,7 @@ struct ldap_int_thread_pool_s {
        LDAP_STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
        LDAP_SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
        LDAP_SLIST_HEAD(tclq, ldap_int_thread_ctx_s) ltp_active_list;
-       long ltp_state;
+       ldap_int_thread_pool_state_t ltp_state;
        long ltp_max_count;
        long ltp_max_pending;
        long ltp_pending_count;
@@ -96,9 +93,15 @@ static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
 
 static void *ldap_int_thread_pool_wrapper( void *pool );
 
+static ldap_pvt_thread_t ldap_int_main_tid;
+
+static ldap_int_thread_key_t ldap_int_main_thrctx[LDAP_MAXTHR];
+
 int
 ldap_int_thread_pool_startup ( void )
 {
+       ldap_int_main_tid = ldap_pvt_thread_self();
+
        return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
 }
 
@@ -109,12 +112,137 @@ ldap_int_thread_pool_shutdown ( void )
 
        while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
                LDAP_STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
-               ldap_pvt_thread_pool_destroy( &pool, 0);
+               (ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
        }
        ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
        return(0);
 }
 
+typedef struct ldap_lazy_sem_t {
+       ldap_pvt_thread_mutex_t ls_mutex;
+       ldap_pvt_thread_cond_t  ls_cond;
+       int ls_sem_value;
+       /*
+        * when more than ls_lazy_count number of resources
+        * becmoes available, the thread wating for the resources will
+        * be waken up in order to prevent frequent blocking/waking-up
+        */
+       unsigned int ls_lazy_count;
+       /*
+        * only one thread(listener) will wait on this semaphore
+        * using a flag instead of a list
+        */
+       int ls_wait;
+} ldap_lazy_sem_t;
+
+ldap_lazy_sem_t* thread_pool_sem = NULL;
+
+int
+ldap_lazy_sem_init( unsigned int value, unsigned int lazyness )
+{
+       thread_pool_sem = (ldap_lazy_sem_t*) LDAP_CALLOC(1,
+               sizeof( ldap_lazy_sem_t ));
+
+       if( thread_pool_sem == NULL ) return -1;
+
+       ldap_pvt_thread_mutex_init( &thread_pool_sem->ls_mutex );
+       ldap_pvt_thread_cond_init( &thread_pool_sem->ls_cond );
+       thread_pool_sem->ls_sem_value = value;
+       thread_pool_sem->ls_lazy_count = lazyness;
+       thread_pool_sem->ls_wait = 0;
+
+       return 0;
+}
+
+/* FIXME: move to some approprite header */
+int ldap_lazy_sem_dec( ldap_lazy_sem_t* ls );
+int ldap_lazy_sem_wait ( ldap_lazy_sem_t* ls );
+
+/*
+ * ldap_lazy_sem_wait is used if a caller is blockable(listener).
+ * Otherwise use ldap_lazy_sem_dec (worker)
+ */
+int
+ldap_lazy_sem_op_submit( ldap_lazy_sem_t* ls )
+{
+       if ( ls == NULL ) return -1;
+
+       /* only worker thread has its thread ctx */
+       if ( ldap_pvt_thread_pool_context() ) {
+               /* worker thread */
+               return ldap_lazy_sem_dec( ls );
+       } else {
+               /* listener */
+               return ldap_lazy_sem_wait( ls );
+       }
+}
+
+/*
+ * test if given semaphore's count is zero.
+ * If 0, the caller is blocked 
+ * If not, the count is decremented.
+ */
+int
+ldap_lazy_sem_wait ( ldap_lazy_sem_t* ls )
+{
+       ldap_pvt_thread_mutex_lock( &ls->ls_mutex );
+
+lazy_sem_retry:
+       if ( ls->ls_sem_value <= 0 ) {
+               /* no more avaliable resources */
+               ls->ls_wait = 1;
+               ldap_pvt_thread_cond_wait( &ls->ls_cond, &ls->ls_mutex );
+               goto lazy_sem_retry;
+       } else {
+               /* avaliable resources */
+               ls->ls_sem_value--;
+       }
+
+       ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );
+
+       return 0;
+}
+
+/*
+ * decrement the count without blocking
+ * even when the count becomes less than or equal to 0
+ */
+int
+ldap_lazy_sem_dec( ldap_lazy_sem_t* ls )
+{
+       ldap_pvt_thread_mutex_lock( &ls->ls_mutex );
+
+       ls->ls_sem_value--;
+
+       ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );
+
+       return 0;
+}
+
+/*
+ * Increment the count by one and test if it is greater or
+ * equal to lazyness. If it is, wake up a blocked thread.
+ */
+int
+ldap_lazy_sem_post( ldap_lazy_sem_t* ls )
+{
+       if( ls == NULL ) return (-1);
+
+       ldap_pvt_thread_mutex_lock( &ls->ls_mutex );
+
+       ls->ls_sem_value++;
+       if ( ls->ls_wait ) {
+               if ( ls->ls_sem_value >= ls->ls_lazy_count ) {
+                       ls->ls_wait = 0;
+                       ldap_pvt_thread_cond_signal( &ls->ls_cond );
+               }
+       }
+
+       ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );
+
+       return 0;
+}
+
 int
 ldap_pvt_thread_pool_init (
        ldap_pvt_thread_pool_t *tpool,
@@ -187,7 +315,7 @@ ldap_pvt_thread_pool_init (
        return(0);
 }
 
-#define        TID_HASH(tid, hash) do { int i; \
+#define        TID_HASH(tid, hash) do { unsigned i; \
        unsigned char *ptr = (unsigned char *)&(tid); \
        for (i=0, hash=0; i<sizeof(tid); i++) hash += ptr[i]; } while(0)
 
@@ -240,13 +368,9 @@ ldap_pvt_thread_pool_submit (
                return(0);
        }
        ldap_pvt_thread_cond_signal(&pool->ltp_cond);
-       if ((pool->ltp_open_count <= 0
-#if 0
-                       || pool->ltp_pending_count > 1
-#endif
-                       || pool->ltp_open_count == pool->ltp_active_count)
-               && (pool->ltp_max_count <= 0
-                       || pool->ltp_open_count < pool->ltp_max_count))
+       if (pool->ltp_open_count < pool->ltp_active_count + pool->ltp_pending_count
+               && (pool->ltp_open_count < pool->ltp_max_count ||
+                       pool->ltp_max_count <= 0 ))
        {
                pool->ltp_open_count++;
                pool->ltp_starting++;
@@ -254,6 +378,10 @@ ldap_pvt_thread_pool_submit (
        }
        ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
 
+#ifdef LDAP_PVT_THREAD_POOL_SEM_LOAD_CONTROL
+       ldap_lazy_sem_op_submit( thread_pool_sem );
+#endif
+
        if (need_thread) {
                int rc;
 
@@ -271,7 +399,7 @@ ldap_pvt_thread_pool_submit (
                         */
                        TID_HASH(thr, hash);
                        for (rc = hash & (LDAP_MAXTHR-1);
-                               !TID_EQ(thread_keys[rc].id, tid_zero);
+                               !ldap_pvt_thread_equal(thread_keys[rc].id, tid_zero);
                                rc = (rc+1) & (LDAP_MAXTHR-1));
                        thread_keys[rc].id = thr;
                } else {
@@ -405,6 +533,11 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
        ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
        ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
        LDAP_FREE(pool);
+#ifdef LDAP_PVT_THREAD_POOL_SEM_LOAD_CONTROL
+       if ( thread_pool_sem ) {
+               LDAP_FREE( thread_pool_sem );
+       }
+#endif
        return(0);
 }
 
@@ -431,7 +564,8 @@ ldap_int_thread_pool_wrapper (
 
        /* store pointer to our keys */
        TID_HASH(tid, hash);
-       for (i = hash & (LDAP_MAXTHR-1); !TID_EQ(thread_keys[i].id, tid);
+       for (i = hash & (LDAP_MAXTHR-1);
+                               !ldap_pvt_thread_equal(thread_keys[i].id, tid);
                                i = (i+1) & (LDAP_MAXTHR-1));
        thread_keys[i].ctx = ltc_key;
        keyslot = i;
@@ -461,10 +595,16 @@ ldap_int_thread_pool_wrapper (
                         * should be like this:
                         *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
                         *       check timer, leave thread (break;)
+                        *
+                        * Just use pthread_cond_timedwait if we want to
+                        * check idle time.
                         */
 
-                       if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
+                       if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING
+                               || pool->ltp_state == LDAP_INT_THREAD_POOL_PAUSING)
+                       {
                                ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
+                       }
 
                        continue;
                }
@@ -477,6 +617,9 @@ ldap_int_thread_pool_wrapper (
 
                ctx->ltc_start_routine(ltc_key, ctx->ltc_arg);
 
+#ifdef LDAP_PVT_THREAD_POOL_SEM_LOAD_CONTROL
+               ldap_lazy_sem_post( thread_pool_sem );
+#endif
                ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
                LDAP_SLIST_REMOVE(&pool->ltp_active_list, ctx,
                        ldap_int_thread_ctx_s, ltc_next.al);
@@ -489,15 +632,6 @@ ldap_int_thread_pool_wrapper (
                        }
                        ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
                }
-               ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
-
-               ldap_pvt_thread_yield();
-
-               /* if we use an idle timer, here's
-                * a good place to update it
-                */
-
-               ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
        }
 
        for ( i=0; i<MAXKEYS && ltc_key[i].ltk_key; i++ ) {
@@ -534,7 +668,7 @@ ldap_pvt_thread_pool_pause (
        ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
 
        /* If someone else has already requested a pause, we have to wait */
-       if (pool->ltp_state == LDAP_INT_THREAD_POOL_PAUSING) {
+       while (pool->ltp_state == LDAP_INT_THREAD_POOL_PAUSING) {
                pool->ltp_pending_count++;
                pool->ltp_active_count--;
                ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
@@ -652,29 +786,27 @@ void *ldap_pvt_thread_pool_context( )
        int i, hash;
 
        tid = ldap_pvt_thread_self();
+       if ( ldap_pvt_thread_equal( tid, ldap_int_main_tid ))
+               return ldap_int_main_thrctx;
 
        TID_HASH( tid, hash );
-       for (i = hash & (LDAP_MAXTHR-1); !TID_EQ(thread_keys[i].id, tid_zero) &&
-               !TID_EQ(thread_keys[i].id, tid); i = (i+1) & (LDAP_MAXTHR-1));
+       for (i = hash & (LDAP_MAXTHR-1);
+               !ldap_pvt_thread_equal(thread_keys[i].id, tid_zero) &&
+               !ldap_pvt_thread_equal(thread_keys[i].id, tid);
+               i = (i+1) & (LDAP_MAXTHR-1));
 
        return thread_keys[i].ctx;
 }
 
-void *ldap_pvt_thread_pool_fake_context_init( )
-{
-       return LDAP_CALLOC( LDAP_MAXTHR, sizeof(ldap_int_thread_key_t) );
-}
-
-void ldap_pvt_thread_pool_fake_context_destroy( void *vctx )
+void ldap_pvt_thread_pool_context_reset( void *vctx )
 {
        ldap_int_thread_key_t *ctx = vctx;
        int i;
 
-       for ( i=0; ctx[i].ltk_key; i++) {
+       for ( i=0; i<MAXKEYS && ctx[i].ltk_key; i++) {
                if ( ctx[i].ltk_free )
                        ctx[i].ltk_free( ctx[i].ltk_key, ctx[i].ltk_data );
                ctx[i].ltk_key = NULL;
        }
-       LDAP_FREE( vctx );
 }
 #endif /* LDAP_THREAD_HAVE_TPOOL */