/* not paused and something to do for pool_<wrapper/pause/destroy>() */
ldap_pvt_thread_cond_t ltp_cond;
- /* ltp_active_count <= 1 && ltp_pause */
- ldap_pvt_thread_cond_t ltp_pcond;
+ /* Some active task needs to be the sole active task.
+ * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
+ */
+ volatile sig_atomic_t ltp_pause;
/* ltp_pause == 0 ? <p_pending_list : &empty_pending_list,
* maintaned to reduce work for pool_wrapper()
/* protect members below, and protect thread_keys[] during pauses */
ldap_pvt_thread_mutex_t ltp_mutex;
+ /* ltp_active_count <= 1 && ltp_pause */
+ ldap_pvt_thread_cond_t ltp_pcond;
+
+ /* number of active queues */
+ int ltp_active_queues;
+
/* The pool is finishing, waiting for its threads to close.
* They close when ltp_pending_list is done. pool_submit()
* rejects new tasks. ltp_max_pending = -(its old value).
if (rc != 0)
return(rc);
+ rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
+ if (rc != 0)
+ return(rc);
+
rem_thr = max_threads % numqs;
rem_pend = max_pending % numqs;
for ( i=0; i<numqs; i++ ) {
if (rc != 0)
return(rc);
rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
- if (rc != 0)
- return(rc);
- rc = ldap_pvt_thread_cond_init(&pq->ltp_pcond);
if (rc != 0)
return(rc);
LDAP_STAILQ_INIT(&pq->ltp_pending_list);
if (pool->ltp_max_pending > 0)
pool->ltp_max_pending = -pool->ltp_max_pending;
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+
for (i=0; i<pool->ltp_numqs; i++) {
pq = &pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
LDAP_FREE(task);
}
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
- ldap_pvt_thread_cond_destroy(&pq->ltp_pcond);
ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
}
- ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+ ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
LDAP_FREE(pool);
*tpool = NULL;
work_list = pq->ltp_work_list; /* help the compiler a bit */
task = LDAP_STAILQ_FIRST(work_list);
if (task == NULL) { /* paused or no pending tasks */
- if (--(pq->ltp_active_count) < 2) {
- /* Notify pool_pause it is the sole active thread. */
- ldap_pvt_thread_cond_signal(&pq->ltp_pcond);
+ if (--(pq->ltp_active_count) < 1) {
+ ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
+ ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+ if (--(pool->ltp_active_queues) < 1) {
+ /* Notify pool_pause it is the sole active thread. */
+ ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
+ }
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+ ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
}
do {
/* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
pause_type -= pause;
- if (pause_type & GO_IDLE) {
+ if (!(pause_type & DO_PAUSE))
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+
+ if (pause_type & GO_IDLE) {
+ int do_pool = 0;
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
pq->ltp_pending_count++;
pq->ltp_active_count--;
if (pause && pq->ltp_active_count < 1) {
- /* Tell the task waiting to DO_PAUSE it can proceed */
- ldap_pvt_thread_cond_signal(&pq->ltp_pcond);
+ do_pool = 1;
}
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
+ if (do_pool) {
+ if (!(pause_type & DO_PAUSE))
+ ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+ pool->ltp_active_queues--;
+ if (pool->ltp_active_queues < 1)
+ /* Tell the task waiting to DO_PAUSE it can proceed */
+ ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
+ if (!(pause_type & DO_PAUSE))
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+ }
}
if (pause_type & GO_UNIDLE) {
- ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+ if (pause_type & DO_PAUSE)
+ ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
/* Wait out pause if any, then cancel GO_IDLE */
if (pause > max_ltp_pause) {
ret = 1;
do {
ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
- } while (pool->ltp_pause > max_ltp_pause);
+ } while (pq->ltp_pause > max_ltp_pause);
}
pq->ltp_pending_count--;
pq->ltp_active_count++;
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
+ if (pause_type & DO_PAUSE)
+ ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
}
if (pause_type & DO_PAUSE) {
ret = 0;
assert(!pool->ltp_pause);
pool->ltp_pause = WANT_PAUSE;
+ pool->ltp_active_queues = 0;
for (i=0; i<pool->ltp_numqs; i++)
if (&pool->ltp_wqs[i] == pq) break;
j=i;
do {
+ pq = &pool->ltp_wqs[j];
if (j != i)
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
+
+ pq->ltp_pause = WANT_PAUSE;
+
/* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
* and do not finish threads in ldap_pvt_thread_pool_wrapper() */
pq->ltp_open_count = -pq->ltp_open_count;
/* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
pq->ltp_work_list = &empty_pending_list;
- /* Wait for this task to become the sole active task */
- while (pq->ltp_active_count > 0) {
- ldap_pvt_thread_cond_wait(&pq->ltp_pcond, &pq->ltp_mutex);
- }
+
+ if (pq->ltp_active_count > 0)
+ pool->ltp_active_queues++;
+
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
if (pool->ltp_numqs > 1) {
j++;
}
} while (j != i);
+ /* Wait for this task to become the sole active task */
+ while (pool->ltp_active_queues > 0)
+ ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
+
/* restore us to active count */
pool->ltp_wqs[i].ltp_active_count++;
assert(pool->ltp_pause == WANT_PAUSE);
pool->ltp_pause = PAUSED;
+ for (i=0; i<pool->ltp_numqs; i++)
+ pool->ltp_wqs[i].ltp_pause = PAUSED;
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
}
pq->ltp_open_count = -pq->ltp_open_count;
pq->ltp_work_list = &pq->ltp_pending_list;
+ pq->ltp_pause = 0;
ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
}