X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=libraries%2Flibldap_r%2Ftpool.c;h=b1cb01635151a3b78b48d15c4762a598023e969c;hb=10566c8be384c6435476ec54e842382841cb84b6;hp=44f9544b58e68c27aa015b1f7e195205bee386fb;hpb=f6eacdbbc56be65dc1080ae2527b96399c24418d;p=openldap
diff --git a/libraries/libldap_r/tpool.c b/libraries/libldap_r/tpool.c
index 44f9544b58..b1cb016351 100644
--- a/libraries/libldap_r/tpool.c
+++ b/libraries/libldap_r/tpool.c
@@ -1,7 +1,7 @@
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software .
*
- * Copyright 1998-2014 The OpenLDAP Foundation.
+ * Copyright 1998-2017 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -66,8 +66,6 @@ typedef struct ldap_int_thread_userctx_s {
/* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
- * Protected by ldap_pvt_thread_pool_mutex except during pauses,
- * when it is read-only (used by pool_purgekey and pool_context).
* Protected by ldap_pvt_thread_pool_mutex.
*/
static struct {
@@ -92,6 +90,7 @@ typedef struct ldap_int_thread_task_s {
} ltt_next;
ldap_pvt_thread_start_t *ltt_start_routine;
void *ltt_arg;
+ struct ldap_int_thread_poolq_s *ltt_queue;
} ldap_int_thread_task_t;
typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
@@ -112,7 +111,7 @@ struct ldap_int_thread_poolq_s {
ldap_pvt_thread_cond_t ltp_cond;
/* ltp_pause == 0 ? <p_pending_list : &empty_pending_list,
- * maintaned to reduce work for pool_wrapper()
+ * maintained to reduce work for pool_wrapper()
*/
ldap_int_tpool_plist_t *ltp_work_list;
@@ -128,7 +127,7 @@ struct ldap_int_thread_poolq_s {
int ltp_pending_count; /* Pending + paused + idle tasks */
int ltp_active_count; /* Active, not paused/idle tasks */
- int ltp_open_count; /* Number of threads, negated when ltp_pause */
+ int ltp_open_count; /* Number of threads */
int ltp_starting; /* Currently starting threads */
};
@@ -266,16 +265,22 @@ ldap_pvt_thread_pool_init_q (
max_threads = LDAP_MAXTHR;
rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
- if (rc != 0)
+ if (rc != 0) {
+fail:
+ for (i=0; iltp_wqs[i]->ltp_free);
+ LDAP_FREE(pool->ltp_wqs);
+ LDAP_FREE(pool);
return(rc);
+ }
rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
if (rc != 0)
- return(rc);
+ goto fail;
rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
if (rc != 0)
- return(rc);
+ goto fail;
rem_thr = max_threads % numqs;
rem_pend = max_pending % numqs;
@@ -333,25 +338,21 @@ ldap_pvt_thread_pool_init (
return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 );
}
-static int
-ldap_int_poolq_hash(
- struct ldap_int_thread_pool_s *pool,
- void *arg )
+/* Submit a task to be performed by the thread pool */
+int
+ldap_pvt_thread_pool_submit (
+ ldap_pvt_thread_pool_t *tpool,
+ ldap_pvt_thread_start_t *start_routine, void *arg )
{
- int i = 0, j;
- unsigned char *ptr = (unsigned char *)&arg;
- /* dumb hash of arg to choose a queue */
- for (j=0; jltp_numqs;
- return i;
+ return ldap_pvt_thread_pool_submit2( tpool, start_routine, arg, NULL );
}
/* Submit a task to be performed by the thread pool */
int
-ldap_pvt_thread_pool_submit (
+ldap_pvt_thread_pool_submit2 (
ldap_pvt_thread_pool_t *tpool,
- ldap_pvt_thread_start_t *start_routine, void *arg )
+ ldap_pvt_thread_start_t *start_routine, void *arg,
+ void **cookie )
{
struct ldap_int_thread_pool_s *pool;
struct ldap_int_thread_poolq_s *pq;
@@ -367,9 +368,23 @@ ldap_pvt_thread_pool_submit (
if (pool == NULL)
return(-1);
- if ( pool->ltp_numqs > 1 )
- i = ldap_int_poolq_hash( pool, arg );
- else
+ if ( pool->ltp_numqs > 1 ) {
+ int min = pool->ltp_wqs[0]->ltp_max_pending + pool->ltp_wqs[0]->ltp_max_count;
+ int min_x = 0, cnt;
+ for ( i = 0; i < pool->ltp_numqs; i++ ) {
+ /* take first queue that has nothing active */
+ if ( !pool->ltp_wqs[i]->ltp_active_count ) {
+ min_x = i;
+ break;
+ }
+ cnt = pool->ltp_wqs[i]->ltp_active_count + pool->ltp_wqs[i]->ltp_pending_count;
+ if ( cnt < min ) {
+ min = cnt;
+ min_x = i;
+ }
+ }
+ i = min_x;
+ } else
i = 0;
j = i;
@@ -397,6 +412,9 @@ ldap_pvt_thread_pool_submit (
task->ltt_start_routine = start_routine;
task->ltt_arg = arg;
+ task->ltt_queue = pq;
+ if ( cookie )
+ *cookie = task;
pq->ltp_pending_count++;
LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
@@ -471,29 +489,22 @@ no_task( void *ctx, void *arg )
*/
int
ldap_pvt_thread_pool_retract (
- ldap_pvt_thread_pool_t *tpool,
- ldap_pvt_thread_start_t *start_routine, void *arg )
+ void *cookie )
{
- struct ldap_int_thread_pool_s *pool;
+ ldap_int_thread_task_t *task, *ttmp;
struct ldap_int_thread_poolq_s *pq;
- ldap_int_thread_task_t *task;
- int i;
- if (tpool == NULL)
+ if (cookie == NULL)
return(-1);
- pool = *tpool;
-
- if (pool == NULL)
+ ttmp = cookie;
+ pq = ttmp->ltt_queue;
+ if (pq == NULL)
return(-1);
- i = ldap_int_poolq_hash( pool, arg );
- pq = pool->ltp_wqs[i];
-
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
- if (task->ltt_start_routine == start_routine &&
- task->ltt_arg == arg) {
+ if (task == ttmp) {
/* Could LDAP_STAILQ_REMOVE the task, but that
* walks ltp_pending_list again to find it.
*/
@@ -1020,6 +1031,8 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
{
ldap_int_thread_userctx_t *ctx = ldap_pvt_thread_pool_context();
pq = ctx->ltu_pq;
+ if ( !pq )
+ return(-1);
}
/* Let pool_unidle() ignore requests for new pauses */
@@ -1084,9 +1097,6 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
if (j != i)
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
- /* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
- * and do not finish threads in ldap_pvt_thread_pool_wrapper() */
- pq->ltp_open_count = -pq->ltp_open_count;
/* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
pq->ltp_work_list = &empty_pending_list;
@@ -1173,8 +1183,6 @@ ldap_pvt_thread_pool_resume (
pool->ltp_pause = 0;
for (i=0; iltp_numqs; i++) {
pq = pool->ltp_wqs[i];
- if (pq->ltp_open_count <= 0) /* true when paused, but be paranoid */
- pq->ltp_open_count = -pq->ltp_open_count;
pq->ltp_work_list = &pq->ltp_pending_list;
ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
}