/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
- * Copyright 1998-2005 The OpenLDAP Foundation.
+ * Copyright 1998-2006 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
#include <ac/errno.h>
#include "ldap-int.h"
-#include "ldap_pvt_thread.h"
+#include "ldap_pvt_thread.h" /* Get the thread interface */
#include "ldap_queue.h"
+#define LDAP_THREAD_POOL_IMPLEMENTATION
+#include "ldap_thr_debug.h" /* May rename symbols defined below */
#ifndef LDAP_THREAD_HAVE_TPOOL
-enum ldap_int_thread_pool_state {
+typedef enum ldap_int_thread_pool_state_e {
LDAP_INT_THREAD_POOL_RUNNING,
LDAP_INT_THREAD_POOL_FINISHING,
LDAP_INT_THREAD_POOL_STOPPING,
LDAP_INT_THREAD_POOL_PAUSING
-};
+} ldap_int_thread_pool_state_t;
typedef struct ldap_int_thread_key_s {
void *ltk_key;
#define MAXKEYS 32
#define LDAP_MAXTHR 1024 /* must be a power of 2 */
+typedef struct ldap_int_thread_userctx_s {
+ ldap_pvt_thread_t ltu_id;
+ ldap_int_thread_key_t ltu_key[MAXKEYS];
+} ldap_int_thread_userctx_t;
+
static ldap_pvt_thread_t tid_zero;
-#ifdef HAVE_PTHREADS
-#define TID_EQ(a,b) pthread_equal((a),(b))
-#else
-#define TID_EQ(a,b) ((a) == (b))
-#endif
static struct {
ldap_pvt_thread_t id;
- ldap_int_thread_key_t *ctx;
+ ldap_int_thread_userctx_t *ctx;
} thread_keys[LDAP_MAXTHR];
LDAP_STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
LDAP_SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
LDAP_SLIST_HEAD(tclq, ldap_int_thread_ctx_s) ltp_active_list;
- long ltp_state;
+ ldap_int_thread_pool_state_t ltp_state;
long ltp_max_count;
long ltp_max_pending;
long ltp_pending_count;
static ldap_pvt_thread_t ldap_int_main_tid;
-static ldap_int_thread_key_t ldap_int_main_thrctx[LDAP_MAXTHR];
+static ldap_int_thread_userctx_t ldap_int_main_thrctx;
int
ldap_int_thread_pool_startup ( void )
{
ldap_int_main_tid = ldap_pvt_thread_self();
+ ldap_int_main_thrctx.ltu_id = ldap_int_main_tid;
return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
}
struct ldap_int_thread_pool_s *pool;
while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
- LDAP_STAILQ_REMOVE_HEAD(&ldap_int_thread_pool_list, ltp_next);
- ldap_pvt_thread_pool_destroy( &pool, 0);
+ (ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
}
ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
return(0);
}
+typedef struct ldap_lazy_sem_t {
+ ldap_pvt_thread_mutex_t ls_mutex;
+ ldap_pvt_thread_cond_t ls_cond;
+ int ls_sem_value;
+ /*
+ * when more than ls_lazy_count number of resources
+ * becmoes available, the thread wating for the resources will
+ * be waken up in order to prevent frequent blocking/waking-up
+ */
+ unsigned int ls_lazy_count;
+ /*
+ * only one thread(listener) will wait on this semaphore
+ * using a flag instead of a list
+ */
+ int ls_wait;
+} ldap_lazy_sem_t;
+
+ldap_lazy_sem_t* thread_pool_sem = NULL;
+
+int
+ldap_lazy_sem_init( unsigned int value, unsigned int lazyness )
+{
+ thread_pool_sem = (ldap_lazy_sem_t*) LDAP_CALLOC(1,
+ sizeof( ldap_lazy_sem_t ));
+
+ if( thread_pool_sem == NULL ) return -1;
+
+ ldap_pvt_thread_mutex_init( &thread_pool_sem->ls_mutex );
+ ldap_pvt_thread_cond_init( &thread_pool_sem->ls_cond );
+ thread_pool_sem->ls_sem_value = value;
+ thread_pool_sem->ls_lazy_count = lazyness;
+ thread_pool_sem->ls_wait = 0;
+
+ return 0;
+}
+
+/* FIXME: move to some approprite header */
+int ldap_lazy_sem_dec( ldap_lazy_sem_t* ls );
+int ldap_lazy_sem_wait ( ldap_lazy_sem_t* ls );
+
+/*
+ * ldap_lazy_sem_wait is used if a caller is blockable(listener).
+ * Otherwise use ldap_lazy_sem_dec (worker)
+ */
+int
+ldap_lazy_sem_op_submit( ldap_lazy_sem_t* ls )
+{
+ if ( ls == NULL ) return -1;
+
+ /* only worker thread has its thread ctx */
+ if ( ldap_pvt_thread_pool_context() ) {
+ /* worker thread */
+ return ldap_lazy_sem_dec( ls );
+ } else {
+ /* listener */
+ return ldap_lazy_sem_wait( ls );
+ }
+}
+
+/*
+ * test if given semaphore's count is zero.
+ * If 0, the caller is blocked
+ * If not, the count is decremented.
+ */
+int
+ldap_lazy_sem_wait ( ldap_lazy_sem_t* ls )
+{
+ ldap_pvt_thread_mutex_lock( &ls->ls_mutex );
+
+lazy_sem_retry:
+ if ( ls->ls_sem_value <= 0 ) {
+ /* no more avaliable resources */
+ ls->ls_wait = 1;
+ ldap_pvt_thread_cond_wait( &ls->ls_cond, &ls->ls_mutex );
+ goto lazy_sem_retry;
+ } else {
+ /* avaliable resources */
+ ls->ls_sem_value--;
+ }
+
+ ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );
+
+ return 0;
+}
+
+/*
+ * decrement the count without blocking
+ * even when the count becomes less than or equal to 0
+ */
+int
+ldap_lazy_sem_dec( ldap_lazy_sem_t* ls )
+{
+ ldap_pvt_thread_mutex_lock( &ls->ls_mutex );
+
+ ls->ls_sem_value--;
+
+ ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );
+
+ return 0;
+}
+
+/*
+ * Increment the count by one and test if it is greater or
+ * equal to lazyness. If it is, wake up a blocked thread.
+ */
+int
+ldap_lazy_sem_post( ldap_lazy_sem_t* ls )
+{
+ if( ls == NULL ) return (-1);
+
+ ldap_pvt_thread_mutex_lock( &ls->ls_mutex );
+
+ ls->ls_sem_value++;
+ if ( ls->ls_wait ) {
+ if ( ls->ls_sem_value >= ls->ls_lazy_count ) {
+ ls->ls_wait = 0;
+ ldap_pvt_thread_cond_signal( &ls->ls_cond );
+ }
+ }
+
+ ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );
+
+ return 0;
+}
+
int
ldap_pvt_thread_pool_init (
ldap_pvt_thread_pool_t *tpool,
return(0);
}
-#define TID_HASH(tid, hash) do { int i; \
+#define TID_HASH(tid, hash) do { unsigned i; \
unsigned char *ptr = (unsigned char *)&(tid); \
for (i=0, hash=0; i<sizeof(tid); i++) hash += ptr[i]; } while(0)
return(0);
}
ldap_pvt_thread_cond_signal(&pool->ltp_cond);
- if ((pool->ltp_open_count <= 0
-#if 0
- || pool->ltp_pending_count > 1
-#endif
- || pool->ltp_open_count == pool->ltp_active_count)
- && (pool->ltp_max_count <= 0
- || pool->ltp_open_count < pool->ltp_max_count))
+ if (pool->ltp_open_count < pool->ltp_active_count + pool->ltp_pending_count
+ && (pool->ltp_open_count < pool->ltp_max_count ||
+ pool->ltp_max_count <= 0 ))
{
pool->ltp_open_count++;
pool->ltp_starting++;
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+#ifdef LDAP_PVT_THREAD_POOL_SEM_LOAD_CONTROL
+ ldap_lazy_sem_op_submit( thread_pool_sem );
+#endif
+
if (need_thread) {
int rc;
*/
TID_HASH(thr, hash);
for (rc = hash & (LDAP_MAXTHR-1);
- !TID_EQ(thread_keys[rc].id, tid_zero);
+ !ldap_pvt_thread_equal(thread_keys[rc].id, tid_zero);
rc = (rc+1) & (LDAP_MAXTHR-1));
thread_keys[rc].id = thr;
} else {
}
int
-ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
+ldap_pvt_thread_pool_query ( ldap_pvt_thread_pool_t *tpool, ldap_pvt_thread_pool_param_t param, void *value )
{
- struct ldap_int_thread_pool_s *pool;
- int count;
+ struct ldap_int_thread_pool_s *pool;
+ int count = -1;
- if (tpool == NULL)
- return(-1);
+ if ( tpool == NULL || value == NULL ) {
+ return -1;
+ }
pool = *tpool;
- if (pool == NULL)
- return(0);
+ if ( pool == NULL ) {
+ return 0;
+ }
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
- count = pool->ltp_pending_count + pool->ltp_active_count;
- ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
- return(count);
+ switch ( param ) {
+ case LDAP_PVT_THREAD_POOL_PARAM_MAX:
+ count = pool->ltp_max_count;
+ break;
+
+ case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
+ count = pool->ltp_max_pending;
+ break;
+
+ case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
+ count = pool->ltp_open_count;
+ break;
+
+ case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
+ count = pool->ltp_starting;
+ break;
+
+ case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
+ count = pool->ltp_active_count;
+ break;
+
+ case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
+ count = pool->ltp_pending_count;
+ break;
+
+ case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
+ count = pool->ltp_pending_count + pool->ltp_active_count;
+ break;
+
+ case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
+ break;
+
+ case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
+ break;
+
+ case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
+ break;
+
+ case LDAP_PVT_THREAD_POOL_PARAM_STATE: {
+ static struct {
+ char *name;
+ ldap_int_thread_pool_state_t state;
+ } str2state[] = {
+ { "running", LDAP_INT_THREAD_POOL_RUNNING },
+ { "finishing", LDAP_INT_THREAD_POOL_FINISHING },
+ { "stopping", LDAP_INT_THREAD_POOL_STOPPING },
+ { "pausing", LDAP_INT_THREAD_POOL_PAUSING },
+ { NULL }
+ };
+ int i;
+
+ for ( i = 0; str2state[ i ].name != NULL; i++ ) {
+ if ( str2state[ i ].state == pool->ltp_state ) {
+ break;
+ }
+ }
+ *((char **)value) = str2state[ i ].name;
+ if ( str2state[ i ].name != NULL ) {
+ count = -2;
+ }
+ } break;
+ }
+ ldap_pvt_thread_mutex_unlock( &pool->ltp_mutex );
+
+ if ( count > -1 ) {
+ *((int *)value) = count;
+ }
+
+ return ( count == -1 ? -1 : 0 );
+}
+
+/*
+ * wrapper for ldap_pvt_thread_pool_query(), left around
+ * for backwards compatibility
+ */
+int
+ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
+{
+ int rc, count;
+
+ rc = ldap_pvt_thread_pool_query( tpool,
+ LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
+
+ if ( rc == 0 ) {
+ return count;
+ }
+
+ return rc;
}
int
ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
{
struct ldap_int_thread_pool_s *pool, *pptr;
- long waiting;
ldap_int_thread_ctx_t *ctx;
if (tpool == NULL)
? LDAP_INT_THREAD_POOL_FINISHING
: LDAP_INT_THREAD_POOL_STOPPING;
- ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
+ if ( pool->ltp_open_count ) {
+ ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
+ ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
+ }
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
- do {
- ldap_pvt_thread_yield();
- ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
- waiting = pool->ltp_open_count;
- ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
- } while (waiting > 0);
-
while ((ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
{
LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
LDAP_FREE(pool);
+#ifdef LDAP_PVT_THREAD_POOL_SEM_LOAD_CONTROL
+ if ( thread_pool_sem ) {
+ LDAP_FREE( thread_pool_sem );
+ }
+#endif
return(0);
}
{
struct ldap_int_thread_pool_s *pool = xpool;
ldap_int_thread_ctx_t *ctx;
- ldap_int_thread_key_t ltc_key[MAXKEYS];
- ldap_pvt_thread_t tid;
+ ldap_int_thread_userctx_t uctx;
int i, keyslot, hash;
if (pool == NULL)
return NULL;
for ( i=0; i<MAXKEYS; i++ ) {
- ltc_key[i].ltk_key = NULL;
+ uctx.ltu_key[i].ltk_key = NULL;
}
- tid = ldap_pvt_thread_self();
+ uctx.ltu_id = ldap_pvt_thread_self();
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
/* store pointer to our keys */
- TID_HASH(tid, hash);
- for (i = hash & (LDAP_MAXTHR-1); !TID_EQ(thread_keys[i].id, tid);
+ TID_HASH(uctx.ltu_id, hash);
+ for (i = hash & (LDAP_MAXTHR-1);
+ !ldap_pvt_thread_equal(thread_keys[i].id, uctx.ltu_id);
i = (i+1) & (LDAP_MAXTHR-1));
- thread_keys[i].ctx = ltc_key;
+ thread_keys[i].ctx = &uctx;
keyslot = i;
while (pool->ltp_state != LDAP_INT_THREAD_POOL_STOPPING) {
* should be like this:
* if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
* check timer, leave thread (break;)
+ *
+ * Just use pthread_cond_timedwait if we want to
+ * check idle time.
*/
- if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
+ if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING
+ || pool->ltp_state == LDAP_INT_THREAD_POOL_PAUSING)
+ {
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
+ }
continue;
}
pool->ltp_active_count++;
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
- ctx->ltc_start_routine(ltc_key, ctx->ltc_arg);
+ ctx->ltc_start_routine(&uctx, ctx->ltc_arg);
+#ifdef LDAP_PVT_THREAD_POOL_SEM_LOAD_CONTROL
+ ldap_lazy_sem_post( thread_pool_sem );
+#endif
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
LDAP_SLIST_REMOVE(&pool->ltp_active_list, ctx,
ldap_int_thread_ctx_s, ltc_next.al);
}
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
}
- ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
-
- ldap_pvt_thread_yield();
-
- /* if we use an idle timer, here's
- * a good place to update it
- */
-
- ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
}
- for ( i=0; i<MAXKEYS && ltc_key[i].ltk_key; i++ ) {
- if (ltc_key[i].ltk_free)
- ltc_key[i].ltk_free(
- ltc_key[i].ltk_key,
- ltc_key[i].ltk_data );
+ for ( i=0; i<MAXKEYS && uctx.ltu_key[i].ltk_key; i++ ) {
+ if (uctx.ltu_key[i].ltk_free)
+ uctx.ltu_key[i].ltk_free(
+ uctx.ltu_key[i].ltk_key,
+ uctx.ltu_key[i].ltk_data );
}
thread_keys[keyslot].ctx = NULL;
thread_keys[keyslot].id = tid_zero;
pool->ltp_open_count--;
+
+ /* let pool_destroy know we're all done */
+ if (pool->ltp_open_count < 1)
+ ldap_pvt_thread_cond_signal(&pool->ltp_cond);
+
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
ldap_pvt_thread_exit(NULL);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
/* If someone else has already requested a pause, we have to wait */
- if (pool->ltp_state == LDAP_INT_THREAD_POOL_PAUSING) {
+ while (pool->ltp_state == LDAP_INT_THREAD_POOL_PAUSING) {
pool->ltp_pending_count++;
pool->ltp_active_count--;
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
void **data,
ldap_pvt_thread_pool_keyfree_t **kfree )
{
- ldap_int_thread_key_t *ctx = xctx;
+ ldap_int_thread_userctx_t *ctx = xctx;
int i;
if ( !ctx || !data ) return EINVAL;
- for ( i=0; i<MAXKEYS && ctx[i].ltk_key; i++ ) {
- if ( ctx[i].ltk_key == key ) {
- *data = ctx[i].ltk_data;
- if ( kfree ) *kfree = ctx[i].ltk_free;
+ for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
+ if ( ctx->ltu_key[i].ltk_key == key ) {
+ *data = ctx->ltu_key[i].ltk_data;
+ if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
return 0;
}
}
void *data,
ldap_pvt_thread_pool_keyfree_t *kfree )
{
- ldap_int_thread_key_t *ctx = xctx;
+ ldap_int_thread_userctx_t *ctx = xctx;
int i;
if ( !ctx || !key ) return EINVAL;
for ( i=0; i<MAXKEYS; i++ ) {
- if ( !ctx[i].ltk_key || ctx[i].ltk_key == key ) {
- ctx[i].ltk_key = key;
- ctx[i].ltk_data = data;
- ctx[i].ltk_free = kfree;
+ if ( !ctx->ltu_key[i].ltk_key || ctx->ltu_key[i].ltk_key == key ) {
+ ctx->ltu_key[i].ltk_key = key;
+ ctx->ltu_key[i].ltk_data = data;
+ ctx->ltu_key[i].ltk_free = kfree;
return 0;
}
}
void ldap_pvt_thread_pool_purgekey( void *key )
{
int i, j;
- ldap_int_thread_key_t *ctx;
+ ldap_int_thread_userctx_t *ctx;
for ( i=0; i<LDAP_MAXTHR; i++ ) {
if ( thread_keys[i].ctx ) {
ctx = thread_keys[i].ctx;
for ( j=0; j<MAXKEYS; j++ ) {
- if ( ctx[j].ltk_key == key ) {
- if (ctx[j].ltk_free)
- ctx[j].ltk_free( ctx[j].ltk_key, ctx[j].ltk_data );
- ctx[j].ltk_key = NULL;
- ctx[j].ltk_free = NULL;
+ if ( ctx->ltu_key[j].ltk_key == key ) {
+ if (ctx->ltu_key[j].ltk_free)
+ ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
+ ctx->ltu_key[j].ltk_data );
+ ctx->ltu_key[j].ltk_key = NULL;
+ ctx->ltu_key[j].ltk_free = NULL;
break;
}
}
int i, hash;
tid = ldap_pvt_thread_self();
- if ( TID_EQ( tid, ldap_int_main_tid ))
- return ldap_int_main_thrctx;
+ if ( ldap_pvt_thread_equal( tid, ldap_int_main_tid ))
+ return &ldap_int_main_thrctx;
TID_HASH( tid, hash );
- for (i = hash & (LDAP_MAXTHR-1); !TID_EQ(thread_keys[i].id, tid_zero) &&
- !TID_EQ(thread_keys[i].id, tid); i = (i+1) & (LDAP_MAXTHR-1));
+ for (i = hash & (LDAP_MAXTHR-1);
+ !ldap_pvt_thread_equal(thread_keys[i].id, tid_zero) &&
+ !ldap_pvt_thread_equal(thread_keys[i].id, tid);
+ i = (i+1) & (LDAP_MAXTHR-1));
return thread_keys[i].ctx;
}
void ldap_pvt_thread_pool_context_reset( void *vctx )
{
- ldap_int_thread_key_t *ctx = vctx;
+ ldap_int_thread_userctx_t *ctx = vctx;
int i;
- for ( i=0; i<MAXKEYS && ctx[i].ltk_key; i++) {
- if ( ctx[i].ltk_free )
- ctx[i].ltk_free( ctx[i].ltk_key, ctx[i].ltk_data );
- ctx[i].ltk_key = NULL;
+ for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++) {
+ if ( ctx->ltu_key[i].ltk_free )
+ ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
+ ctx->ltu_key[i].ltk_data );
+ ctx->ltu_key[i].ltk_key = NULL;
}
}
+
+ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
+{
+ ldap_int_thread_userctx_t *ctx = vctx;
+
+ return ctx->ltu_id;
+}
#endif /* LDAP_THREAD_HAVE_TPOOL */