From 0ef9e6107baf45d29e194442991132df6c190adb Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Tue, 3 Sep 2013 15:06:37 -0700 Subject: [PATCH] More for threadpool queues Allow dynamic reconfig --- libraries/libldap_r/tpool.c | 147 ++++++++++++++++++++++++++++++------ servers/slapd/bconfig.c | 2 + 2 files changed, 125 insertions(+), 24 deletions(-) diff --git a/libraries/libldap_r/tpool.c b/libraries/libldap_r/tpool.c index eaea58e9a0..87fc0b3187 100644 --- a/libraries/libldap_r/tpool.c +++ b/libraries/libldap_r/tpool.c @@ -32,6 +32,10 @@ #ifndef LDAP_THREAD_HAVE_TPOOL +#ifndef CACHELINE +#define CACHELINE 64 +#endif + /* Thread-specific key with data and optional free function */ typedef struct ldap_int_tpool_key_s { void *ltk_key; @@ -93,6 +97,8 @@ typedef struct ldap_int_thread_task_s { typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t; struct ldap_int_thread_poolq_s { + void *ltp_free; + struct ldap_int_thread_pool_s *ltp_pool; /* protect members below */ @@ -129,7 +135,7 @@ struct ldap_int_thread_poolq_s { struct ldap_int_thread_pool_s { LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next; - struct ldap_int_thread_poolq_s *ltp_wqs; + struct ldap_int_thread_poolq_s **ltp_wqs; /* number of poolqs */ int ltp_numqs; @@ -231,12 +237,29 @@ ldap_pvt_thread_pool_init_q ( *tpool = NULL; pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1, - sizeof(struct ldap_int_thread_pool_s) + - numqs * sizeof(struct ldap_int_thread_poolq_s)); + sizeof(struct ldap_int_thread_pool_s)); if (pool == NULL) return(-1); - pool->ltp_wqs = (struct ldap_int_thread_poolq_s *)(pool+1); + pool->ltp_wqs = LDAP_MALLOC(numqs * sizeof(struct ldap_int_thread_poolq_s *)); + if (pool->ltp_wqs == NULL) { + LDAP_FREE(pool); + return(-1); + } + + for (i=0; i=0; i--) + LDAP_FREE(pool->ltp_wqs[i]->ltp_free); + LDAP_FREE(pool->ltp_wqs); + LDAP_FREE(pool); + return(-1); + } + pool->ltp_wqs[i] = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1)); + pool->ltp_wqs[i]->ltp_free = ptr; + } + pool->ltp_numqs = numqs; pool->ltp_conf_max_count = max_threads; if ( !max_threads ) @@ -257,7 +280,7 @@ ldap_pvt_thread_pool_init_q ( rem_thr = max_threads % numqs; rem_pend = max_pending % numqs; for ( i=0; iltp_wqs[i]; + pq = pool->ltp_wqs[i]; pq->ltp_pool = pool; rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex); if (rc != 0) @@ -351,18 +374,18 @@ ldap_pvt_thread_pool_submit ( j = i; while(1) { - ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i].ltp_mutex); - if (pool->ltp_wqs[i].ltp_pending_count < pool->ltp_wqs[i].ltp_max_pending) { + ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i]->ltp_mutex); + if (pool->ltp_wqs[i]->ltp_pending_count < pool->ltp_wqs[i]->ltp_max_pending) { break; } - ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i].ltp_mutex); + ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i]->ltp_mutex); i++; i %= pool->ltp_numqs; if ( i == j ) return -1; } - pq = &pool->ltp_wqs[i]; + pq = pool->ltp_wqs[i]; task = LDAP_SLIST_FIRST(&pq->ltp_free_list); if (task) { LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l); @@ -465,7 +488,7 @@ ldap_pvt_thread_pool_retract ( return(-1); i = ldap_int_poolq_hash( pool, arg ); - pq = &pool->ltp_wqs[i]; + pq = pool->ltp_wqs[i]; ldap_pvt_thread_mutex_lock(&pq->ltp_mutex); LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q) @@ -482,6 +505,75 @@ ldap_pvt_thread_pool_retract ( return task != NULL; } +/* Set number of work queues in this pool. Should not be + * more than the number of CPUs. */ +int +ldap_pvt_thread_pool_queues( + ldap_pvt_thread_pool_t *tpool, + int numqs ) +{ + struct ldap_int_thread_pool_s *pool; + struct ldap_int_thread_poolq_s *pq; + int i, rc, rem_thr, rem_pend; + + if (numqs < 1 || tpool == NULL) + return(-1); + + pool = *tpool; + + if (pool == NULL) + return(-1); + + if (numqs < pool->ltp_numqs) { + for (i=numqs; iltp_numqs; i++) + pool->ltp_wqs[i]->ltp_max_count = 0; + } else if (numqs > pool->ltp_numqs) { + struct ldap_int_thread_poolq_s **wqs; + wqs = LDAP_REALLOC(pool->ltp_wqs, numqs * sizeof(struct ldap_int_thread_poolq_s *)); + if (wqs == NULL) + return(-1); + pool->ltp_wqs = wqs; + for (i=pool->ltp_numqs; iltp_wqs[i] = NULL; + return(-1); + } + pq = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1)); + pq->ltp_free = ptr; + pool->ltp_wqs[i] = pq; + pq->ltp_pool = pool; + rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex); + if (rc != 0) + return(rc); + rc = ldap_pvt_thread_cond_init(&pq->ltp_cond); + if (rc != 0) + return(rc); + LDAP_STAILQ_INIT(&pq->ltp_pending_list); + pq->ltp_work_list = &pq->ltp_pending_list; + LDAP_SLIST_INIT(&pq->ltp_free_list); + } + } + rem_thr = pool->ltp_max_count % numqs; + rem_pend = pool->ltp_max_pending % numqs; + for ( i=0; iltp_wqs[i]; + pq->ltp_max_count = pool->ltp_max_count / numqs; + if ( rem_thr ) { + pq->ltp_max_count++; + rem_thr--; + } + pq->ltp_max_pending = pool->ltp_max_pending / numqs; + if ( rem_pend ) { + pq->ltp_max_pending++; + rem_pend--; + } + } + pool->ltp_numqs = numqs; + return 0; +} + /* Set max #threads. value <= 0 means max supported #threads (LDAP_MAXTHR) */ int ldap_pvt_thread_pool_maxthreads( @@ -512,14 +604,12 @@ ldap_pvt_thread_pool_maxthreads( max_threads /= pool->ltp_numqs; for (i=0; iltp_numqs; i++) { - pq = &pool->ltp_wqs[i]; - ldap_pvt_thread_mutex_lock(&pq->ltp_mutex); + pq = pool->ltp_wqs[i]; pq->ltp_max_count = max_threads; if (remthr) { pq->ltp_max_count++; remthr--; } - ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex); } return(0); } @@ -572,7 +662,7 @@ ldap_pvt_thread_pool_query( int i; count = 0; for (i=0; iltp_numqs; i++) { - struct ldap_int_thread_poolq_s *pq = &pool->ltp_wqs[i]; + struct ldap_int_thread_poolq_s *pq = pool->ltp_wqs[i]; ldap_pvt_thread_mutex_lock(&pq->ltp_mutex); switch(param) { case LDAP_PVT_THREAD_POOL_PARAM_OPEN: @@ -615,7 +705,7 @@ ldap_pvt_thread_pool_query( else { int i; for (i=0; iltp_numqs; i++) - if (pool->ltp_wqs[i].ltp_pending_count) break; + if (pool->ltp_wqs[i]->ltp_pending_count) break; if (iltp_numqs) *((char **)value) = "finishing"; else @@ -706,7 +796,7 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending ) ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); for (i=0; iltp_numqs; i++) { - pq = &pool->ltp_wqs[i]; + pq = pool->ltp_wqs[i]; ldap_pvt_thread_mutex_lock(&pq->ltp_mutex); if (pq->ltp_max_pending > 0) pq->ltp_max_pending = -pq->ltp_max_pending; @@ -753,7 +843,7 @@ ldap_int_thread_pool_wrapper ( ldap_int_tpool_plist_t *work_list; ldap_int_thread_userctx_t ctx, *kctx; unsigned i, keyslot, hash; - int pool_lock = 0; + int pool_lock = 0, freeme = 0; assert(pool != NULL); @@ -869,15 +959,24 @@ ldap_int_thread_pool_wrapper ( ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex); pq->ltp_open_count--; - /* let pool_destroy know we're all done */ - if (pq->ltp_open_count == 0) - ldap_pvt_thread_cond_signal(&pq->ltp_cond); + if (pq->ltp_open_count == 0) { + if (pool->ltp_finishing) + /* let pool_destroy know we're all done */ + ldap_pvt_thread_cond_signal(&pq->ltp_cond); + else + freeme = 1; + } if (pool_lock) ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); else ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex); + if (freeme) { + ldap_pvt_thread_cond_destroy(&pq->ltp_cond); + ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex); + LDAP_FREE(pq->ltp_free); + } ldap_pvt_thread_exit(NULL); return(NULL); } @@ -965,7 +1064,7 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type ) pool->ltp_active_queues = 0; for (i=0; iltp_numqs; i++) - if (&pool->ltp_wqs[i] == pq) break; + if (pool->ltp_wqs[i] == pq) break; ldap_pvt_thread_mutex_lock(&pq->ltp_mutex); /* temporarily remove ourself from active count */ @@ -973,7 +1072,7 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type ) j=i; do { - pq = &pool->ltp_wqs[j]; + pq = pool->ltp_wqs[j]; if (j != i) ldap_pvt_thread_mutex_lock(&pq->ltp_mutex); @@ -998,7 +1097,7 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type ) ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex); /* restore us to active count */ - pool->ltp_wqs[i].ltp_active_count++; + pool->ltp_wqs[i]->ltp_active_count++; assert(pool->ltp_pause == WANT_PAUSE); pool->ltp_pause = PAUSED; @@ -1065,7 +1164,7 @@ ldap_pvt_thread_pool_resume ( assert(pool->ltp_pause == PAUSED); pool->ltp_pause = 0; for (i=0; iltp_numqs; i++) { - pq = &pool->ltp_wqs[i]; + pq = pool->ltp_wqs[i]; if (pq->ltp_open_count <= 0) /* true when paused, but be paranoid */ pq->ltp_open_count = -pq->ltp_open_count; pq->ltp_work_list = &pq->ltp_pending_list; diff --git a/servers/slapd/bconfig.c b/servers/slapd/bconfig.c index c760a98e2e..0d62310d8a 100644 --- a/servers/slapd/bconfig.c +++ b/servers/slapd/bconfig.c @@ -1717,6 +1717,8 @@ config_generic(ConfigArgs *c) { c->log, c->cr_msg, 0 ); return 1; } + if ( slapMode & SLAP_SERVER_MODE ) + ldap_pvt_thread_pool_queues(&connection_pool, c->value_int); connection_pool_queues = c->value_int; /* save for reference */ break; -- 2.39.5