#ifndef LDAP_THREAD_HAVE_TPOOL
-typedef enum ldap_int_thread_pool_state_e {
- LDAP_INT_THREAD_POOL_RUNNING,
- LDAP_INT_THREAD_POOL_FINISHING,
- LDAP_INT_THREAD_POOL_STOPPING
-} ldap_int_thread_pool_state_t;
-
/* Thread-specific key with data and optional free function */
typedef struct ldap_int_tpool_key_s {
void *ltk_key;
/* Max number of threads */
#define LDAP_MAXTHR 1024 /* must be a power of 2 */
+/* (Theoretical) max number of pending requests */
+#define MAX_PENDING (INT_MAX/2) /* INT_MAX - (room to avoid overflow) */
+
/* Context: thread ID and thread-specific key/data pairs */
typedef struct ldap_int_thread_userctx_s {
ldap_pvt_thread_t ltu_id;
LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ltp_pending_list;
LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
- ldap_int_thread_pool_state_t ltp_state;
-
+ /* The pool is finishing, waiting for its threads to close.
+ * They close when ltp_pending_list is done. pool_submit()
+ * rejects new tasks. ltp_max_pending = -(its old value).
+ */
+ int ltp_finishing;
+
/* Some active task needs to be the sole active task.
- * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
+ * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
*/
- volatile sig_atomic_t ltp_pause;
+ volatile sig_atomic_t ltp_pause;
+
+ /* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */
+ int ltp_max_count;
+
+ /* Max number of pending + paused requests, negated when ltp_finishing */
+ int ltp_max_pending;
- int ltp_max_count; /* Max number of threads in pool, or 0 */
- int ltp_max_pending; /* Max pending or paused requests, or 0 */
int ltp_pending_count; /* Pending or paused requests */
int ltp_active_count; /* Active, not paused requests */
int ltp_open_count; /* Number of threads */
if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
max_threads = 0;
- if (max_pending < 0)
- max_pending = 0;
+ if (! (1 <= max_pending && max_pending <= MAX_PENDING))
+ max_pending = MAX_PENDING;
*tpool = NULL;
pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
return(rc);
ldap_int_has_thread_pool = 1;
- pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
pool->ltp_max_count = max_threads;
pool->ltp_max_pending = max_pending;
LDAP_STAILQ_INIT(&pool->ltp_pending_list);
return(-1);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
- if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
- || (pool->ltp_max_pending
- && pool->ltp_pending_count >= pool->ltp_max_pending))
+
+ if (pool->ltp_pending_count >= pool->ltp_max_pending)
goto failed;
task = LDAP_SLIST_FIRST(&pool->ltp_free_list);
case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
count = pool->ltp_max_pending;
+ if (count < 0)
+ count = -count;
+ if (count == MAX_PENDING)
+ count = 0;
break;
case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
break;
- case LDAP_PVT_THREAD_POOL_PARAM_STATE: {
- static struct {
- char *name;
- ldap_int_thread_pool_state_t state;
- } str2state[] = {
- { "running", LDAP_INT_THREAD_POOL_RUNNING },
- { "finishing", LDAP_INT_THREAD_POOL_FINISHING },
- { "stopping", LDAP_INT_THREAD_POOL_STOPPING },
- { NULL }
- };
- int i;
-
- if ( pool->ltp_pause ) {
- *((char **)value) = "pausing";
- } else {
- for ( i = 0; str2state[ i ].name != NULL; i++ ) {
- if ( str2state[ i ].state == pool->ltp_state ) {
- break;
- }
- }
- *((char **)value) = str2state[ i ].name;
- }
- if ( *((char **)value) != NULL ) {
- count = -2;
- }
- } break;
+ case LDAP_PVT_THREAD_POOL_PARAM_STATE:
+ *((char **)value) =
+ pool->ltp_pause ? "pausing" :
+ !pool->ltp_finishing ? "running" :
+ pool->ltp_pending_count ? "finishing" : "stopping";
+ break;
case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
break;
return rc;
}
+static void
+flush_pending_list( struct ldap_int_thread_pool_s *pool )
+{
+ ldap_int_thread_task_t *task;
+
+ while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) {
+ LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q);
+ LDAP_FREE(task);
+ }
+}
+
/* Destroy the pool after making its threads finish */
int
ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
if (pool != pptr) return(-1);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
- pool->ltp_state = run_pending
- ? LDAP_INT_THREAD_POOL_FINISHING
- : LDAP_INT_THREAD_POOL_STOPPING;
+
+ pool->ltp_finishing = 1;
+ if (pool->ltp_max_pending > 0)
+ pool->ltp_max_pending = -pool->ltp_max_pending;
+ if (!run_pending)
+ flush_pending_list(pool);
while (pool->ltp_open_count) {
if (!pool->ltp_pause)
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
- while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
- {
- LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q);
- LDAP_FREE(task);
- }
+ flush_pending_list(pool);
while ((task = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
{
while (pool->ltp_pause)
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
- if (pool->ltp_state == LDAP_INT_THREAD_POOL_STOPPING)
- break;
-
task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list);
if (task == NULL) {
- if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
+ if (pool->ltp_finishing)
break;
if (pool->ltp_open_count >
* check idle time.
*/
- assert(pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING);
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
continue;
}
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
pool->ltp_pause = 0;
- if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
+ if (!pool->ltp_finishing)
ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
return(0);