#ifndef LDAP_THREAD_HAVE_TPOOL
+#ifndef CACHELINE
+#define CACHELINE 64
+#endif
+
/* Thread-specific key with data and optional free function */
typedef struct ldap_int_tpool_key_s {
void *ltk_key;
/* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
* Protected by ldap_pvt_thread_pool_mutex except during pauses,
* when it is read-only (used by pool_purgekey and pool_context).
- * Protected by tpool->ltp_mutex during pauses.
+ * Protected by ldap_pvt_thread_pool_mutex.
*/
static struct {
ldap_int_thread_userctx_t *ctx;
typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
struct ldap_int_thread_poolq_s {
+ void *ltp_free;
+
struct ldap_int_thread_pool_s *ltp_pool;
- /* protect members below, and protect thread_keys[] during pauses */
+ /* protect members below */
ldap_pvt_thread_mutex_t ltp_mutex;
- /* not paused and something to do for pool_<wrapper/pause/destroy>() */
+ /* not paused and something to do for pool_<wrapper/pause/destroy>()
+ * Used for normal pool operation, to synch between submitter and
+ * worker threads. Not used for pauses. In normal operation multiple
+ * queues can rendezvous without acquiring the main pool lock.
+ */
ldap_pvt_thread_cond_t ltp_cond;
- /* ltp_active_count <= 1 && ltp_pause */
- ldap_pvt_thread_cond_t ltp_pcond;
-
/* ltp_pause == 0 ? <p_pending_list : &empty_pending_list,
* maintaned to reduce work for pool_wrapper()
*/
ldap_int_tpool_plist_t ltp_pending_list;
LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
- /* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */
+ /* Max number of threads in this queue */
int ltp_max_count;
/* Max pending + paused + idle tasks, negated when ltp_finishing */
struct ldap_int_thread_pool_s {
LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
- struct ldap_int_thread_poolq_s *ltp_wqs;
+ struct ldap_int_thread_poolq_s **ltp_wqs;
/* number of poolqs */
int ltp_numqs;
- /* protect members below, and protect thread_keys[] during pauses */
+ /* protect members below */
ldap_pvt_thread_mutex_t ltp_mutex;
+ /* paused and waiting for resume
+ * When a pause is in effect all workers switch to waiting on
+ * this cond instead of their per-queue cond.
+ */
+ ldap_pvt_thread_cond_t ltp_cond;
+
+ /* ltp_active_queues < 1 && ltp_pause */
+ ldap_pvt_thread_cond_t ltp_pcond;
+
+ /* number of active queues */
+ int ltp_active_queues;
+
/* The pool is finishing, waiting for its threads to close.
* They close when ltp_pending_list is done. pool_submit()
* rejects new tasks. ltp_max_pending = -(its old value).
*tpool = NULL;
pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
- sizeof(struct ldap_int_thread_pool_s) +
- numqs * sizeof(struct ldap_int_thread_poolq_s));
+ sizeof(struct ldap_int_thread_pool_s));
if (pool == NULL) return(-1);
- pool->ltp_wqs = (struct ldap_int_thread_poolq_s *)(pool+1);
+ pool->ltp_wqs = LDAP_MALLOC(numqs * sizeof(struct ldap_int_thread_poolq_s *));
+ if (pool->ltp_wqs == NULL) {
+ LDAP_FREE(pool);
+ return(-1);
+ }
+
+ for (i=0; i<numqs; i++) {
+ char *ptr = LDAP_MALLOC(sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
+ if (ptr == NULL) {
+ for (--i; i>=0; i--)
+ LDAP_FREE(pool->ltp_wqs[i]->ltp_free);
+ LDAP_FREE(pool->ltp_wqs);
+ LDAP_FREE(pool);
+ return(-1);
+ }
+ pool->ltp_wqs[i] = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
+ pool->ltp_wqs[i]->ltp_free = ptr;
+ }
+
pool->ltp_numqs = numqs;
pool->ltp_conf_max_count = max_threads;
if ( !max_threads )
if (rc != 0)
return(rc);
+ rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
+ if (rc != 0)
+ return(rc);
+
+ rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
+ if (rc != 0)
+ return(rc);
+
rem_thr = max_threads % numqs;
rem_pend = max_pending % numqs;
for ( i=0; i<numqs; i++ ) {
- pq = &pool->ltp_wqs[i];
+ pq = pool->ltp_wqs[i];
pq->ltp_pool = pool;
rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
if (rc != 0)
return(rc);
rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
- if (rc != 0)
- return(rc);
- rc = ldap_pvt_thread_cond_init(&pq->ltp_pcond);
if (rc != 0)
return(rc);
LDAP_STAILQ_INIT(&pq->ltp_pending_list);
j = i;
while(1) {
- ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i].ltp_mutex);
- if (pool->ltp_wqs[i].ltp_pending_count < pool->ltp_wqs[i].ltp_max_pending) {
+ ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i]->ltp_mutex);
+ if (pool->ltp_wqs[i]->ltp_pending_count < pool->ltp_wqs[i]->ltp_max_pending) {
break;
}
- ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i].ltp_mutex);
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i]->ltp_mutex);
i++;
i %= pool->ltp_numqs;
if ( i == j )
return -1;
}
- pq = &pool->ltp_wqs[i];
+ pq = pool->ltp_wqs[i];
task = LDAP_SLIST_FIRST(&pq->ltp_free_list);
if (task) {
LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
pq->ltp_pending_count++;
LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
+ if (pool->ltp_pause)
+ goto done;
+
/* should we open (create) a thread? */
if (pq->ltp_open_count < pq->ltp_active_count+pq->ltp_pending_count &&
pq->ltp_open_count < pq->ltp_max_count)
{
- if (pool->ltp_pause)
- goto done;
-
pq->ltp_starting++;
pq->ltp_open_count++;
return(-1);
i = ldap_int_poolq_hash( pool, arg );
- pq = &pool->ltp_wqs[i];
+ pq = pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
return task != NULL;
}
+/* Set number of work queues in this pool. Should not be
+ * more than the number of CPUs. */
+int
+ldap_pvt_thread_pool_queues(
+ ldap_pvt_thread_pool_t *tpool,
+ int numqs )
+{
+ struct ldap_int_thread_pool_s *pool;
+ struct ldap_int_thread_poolq_s *pq;
+ int i, rc, rem_thr, rem_pend;
+
+ if (numqs < 1 || tpool == NULL)
+ return(-1);
+
+ pool = *tpool;
+
+ if (pool == NULL)
+ return(-1);
+
+ if (numqs < pool->ltp_numqs) {
+ for (i=numqs; i<pool->ltp_numqs; i++)
+ pool->ltp_wqs[i]->ltp_max_count = 0;
+ } else if (numqs > pool->ltp_numqs) {
+ struct ldap_int_thread_poolq_s **wqs;
+ wqs = LDAP_REALLOC(pool->ltp_wqs, numqs * sizeof(struct ldap_int_thread_poolq_s *));
+ if (wqs == NULL)
+ return(-1);
+ pool->ltp_wqs = wqs;
+ for (i=pool->ltp_numqs; i<numqs; i++) {
+ char *ptr = LDAP_MALLOC(sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
+ if (ptr == NULL) {
+ for (; i<numqs; i++)
+ pool->ltp_wqs[i] = NULL;
+ return(-1);
+ }
+ pq = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
+ pq->ltp_free = ptr;
+ pool->ltp_wqs[i] = pq;
+ pq->ltp_pool = pool;
+ rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
+ if (rc != 0)
+ return(rc);
+ rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
+ if (rc != 0)
+ return(rc);
+ LDAP_STAILQ_INIT(&pq->ltp_pending_list);
+ pq->ltp_work_list = &pq->ltp_pending_list;
+ LDAP_SLIST_INIT(&pq->ltp_free_list);
+ }
+ }
+ rem_thr = pool->ltp_max_count % numqs;
+ rem_pend = pool->ltp_max_pending % numqs;
+ for ( i=0; i<numqs; i++ ) {
+ pq = pool->ltp_wqs[i];
+ pq->ltp_max_count = pool->ltp_max_count / numqs;
+ if ( rem_thr ) {
+ pq->ltp_max_count++;
+ rem_thr--;
+ }
+ pq->ltp_max_pending = pool->ltp_max_pending / numqs;
+ if ( rem_pend ) {
+ pq->ltp_max_pending++;
+ rem_pend--;
+ }
+ }
+ pool->ltp_numqs = numqs;
+ return 0;
+}
+
/* Set max #threads. value <= 0 means max supported #threads (LDAP_MAXTHR) */
int
ldap_pvt_thread_pool_maxthreads(
max_threads /= pool->ltp_numqs;
for (i=0; i<pool->ltp_numqs; i++) {
- pq = &pool->ltp_wqs[i];
- ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
+ pq = pool->ltp_wqs[i];
pq->ltp_max_count = max_threads;
if (remthr) {
pq->ltp_max_count++;
remthr--;
}
- ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
}
return(0);
}
int i;
count = 0;
for (i=0; i<pool->ltp_numqs; i++) {
- struct ldap_int_thread_poolq_s *pq = &pool->ltp_wqs[i];
+ struct ldap_int_thread_poolq_s *pq = pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
switch(param) {
case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
else {
int i;
for (i=0; i<pool->ltp_numqs; i++)
- if (pool->ltp_wqs[i].ltp_pending_count) break;
+ if (pool->ltp_wqs[i]->ltp_pending_count) break;
if (i<pool->ltp_numqs)
*((char **)value) = "finishing";
else
if (pool->ltp_max_pending > 0)
pool->ltp_max_pending = -pool->ltp_max_pending;
+ ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+
for (i=0; i<pool->ltp_numqs; i++) {
- pq = &pool->ltp_wqs[i];
+ pq = pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
if (pq->ltp_max_pending > 0)
pq->ltp_max_pending = -pq->ltp_max_pending;
}
while (pq->ltp_open_count) {
- if (!pool->ltp_pause)
- ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
+ ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
}
LDAP_FREE(task);
}
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
- ldap_pvt_thread_cond_destroy(&pq->ltp_pcond);
ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
}
- ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+ ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
+ ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
LDAP_FREE(pool);
*tpool = NULL;
ldap_int_tpool_plist_t *work_list;
ldap_int_thread_userctx_t ctx, *kctx;
unsigned i, keyslot, hash;
+ int pool_lock = 0, freeme = 0;
assert(pool != NULL);
ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
- ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
-
- /* thread_keys[] is read-only when paused */
- while (pool->ltp_pause)
- ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
+ if (pool->ltp_pause) {
+ ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+ /* thread_keys[] is read-only when paused */
+ while (pool->ltp_pause)
+ ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+ }
/* find a key slot to give this thread ID and store a
* pointer to our keys there; start at the thread ID
thread_keys[keyslot].ctx = &ctx;
ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
+ ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
pq->ltp_starting--;
pq->ltp_active_count++;
work_list = pq->ltp_work_list; /* help the compiler a bit */
task = LDAP_STAILQ_FIRST(work_list);
if (task == NULL) { /* paused or no pending tasks */
- if (--(pq->ltp_active_count) < 2) {
- /* Notify pool_pause it is the sole active thread. */
- ldap_pvt_thread_cond_signal(&pq->ltp_pcond);
+ if (--(pq->ltp_active_count) < 1) {
+ if (pool->ltp_pause) {
+ ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
+ ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+ pool_lock = 1;
+ if (--(pool->ltp_active_queues) < 1) {
+ /* Notify pool_pause it is the sole active thread. */
+ ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
+ }
+ }
}
do {
* Just use pthread_cond_timedwait() if we want to
* check idle time.
*/
- ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
+ if (pool_lock) {
+ ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
+ if (!pool->ltp_pause) {
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+ ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
+ pool_lock = 0;
+ }
+ } else
+ ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
work_list = pq->ltp_work_list;
task = LDAP_STAILQ_FIRST(work_list);
} while (task == NULL);
+ if (pool_lock) {
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+ ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
+ pool_lock = 0;
+ }
pq->ltp_active_count++;
}
}
done:
- assert(!pool->ltp_pause); /* thread_keys writable, ltp_open_count >= 0 */
+ ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
- /* The ltp_mutex lock protects ctx->ltu_key from pool_purgekey()
+ /* The pool_mutex lock protects ctx->ltu_key from pool_purgekey()
* during this call, since it prevents new pauses. */
ldap_pvt_thread_pool_context_reset(&ctx);
- ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
pq->ltp_open_count--;
- /* let pool_destroy know we're all done */
- if (pq->ltp_open_count == 0)
- ldap_pvt_thread_cond_signal(&pq->ltp_cond);
+ if (pq->ltp_open_count == 0) {
+ if (pool->ltp_finishing)
+ /* let pool_destroy know we're all done */
+ ldap_pvt_thread_cond_signal(&pq->ltp_cond);
+ else
+ freeme = 1;
+ }
- ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
+ if (pool_lock)
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+ else
+ ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
+ if (freeme) {
+ ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
+ ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
+ LDAP_FREE(pq->ltp_free);
+ }
ldap_pvt_thread_exit(NULL);
return(NULL);
}
pause_type -= pause;
if (pause_type & GO_IDLE) {
- ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+ int do_pool = 0;
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
pq->ltp_pending_count++;
pq->ltp_active_count--;
if (pause && pq->ltp_active_count < 1) {
- /* Tell the task waiting to DO_PAUSE it can proceed */
- ldap_pvt_thread_cond_signal(&pq->ltp_pcond);
+ do_pool = 1;
}
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
+ if (do_pool) {
+ pool->ltp_active_queues--;
+ if (pool->ltp_active_queues < 1)
+ /* Tell the task waiting to DO_PAUSE it can proceed */
+ ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
+ }
}
if (pause_type & GO_UNIDLE) {
- ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
- ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
/* Wait out pause if any, then cancel GO_IDLE */
if (pause > max_ltp_pause) {
ret = 1;
do {
- ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
+ ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
} while (pool->ltp_pause > max_ltp_pause);
}
+ ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
pq->ltp_pending_count--;
pq->ltp_active_count++;
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
ret = 0;
assert(!pool->ltp_pause);
pool->ltp_pause = WANT_PAUSE;
+ pool->ltp_active_queues = 0;
for (i=0; i<pool->ltp_numqs; i++)
- if (&pool->ltp_wqs[i] == pq) break;
+ if (pool->ltp_wqs[i] == pq) break;
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
/* temporarily remove ourself from active count */
j=i;
do {
+ pq = pool->ltp_wqs[j];
if (j != i)
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
+
/* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
* and do not finish threads in ldap_pvt_thread_pool_wrapper() */
pq->ltp_open_count = -pq->ltp_open_count;
/* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
pq->ltp_work_list = &empty_pending_list;
- /* Wait for this task to become the sole active task */
- while (pq->ltp_active_count > 0) {
- ldap_pvt_thread_cond_wait(&pq->ltp_pcond, &pq->ltp_mutex);
- }
+
+ if (pq->ltp_active_count > 0)
+ pool->ltp_active_queues++;
+
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
if (pool->ltp_numqs > 1) {
j++;
}
} while (j != i);
+ /* Wait for this task to become the sole active task */
+ while (pool->ltp_active_queues > 0)
+ ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
+
/* restore us to active count */
- pool->ltp_wqs[i].ltp_active_count++;
+ pool->ltp_wqs[i]->ltp_active_count++;
assert(pool->ltp_pause == WANT_PAUSE);
pool->ltp_pause = PAUSED;
- ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
}
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
return(ret);
}
return(0);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
-
assert(pool->ltp_pause == PAUSED);
pool->ltp_pause = 0;
for (i=0; i<pool->ltp_numqs; i++) {
- pq = &pool->ltp_wqs[i];
- ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
+ pq = pool->ltp_wqs[i];
if (pq->ltp_open_count <= 0) /* true when paused, but be paranoid */
pq->ltp_open_count = -pq->ltp_open_count;
pq->ltp_work_list = &pq->ltp_pending_list;
-
ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
- ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
}
-
+ ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
return(0);
}
assert ( key != NULL );
+ ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
for ( i=0; i<LDAP_MAXTHR; i++ ) {
ctx = thread_keys[i].ctx;
if ( ctx && ctx != DELETED_THREAD_CTX ) {
}
}
}
+ ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
}
/*