X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=libraries%2Flibldap_r%2Ftpool.c;h=b1cb01635151a3b78b48d15c4762a598023e969c;hb=10566c8be384c6435476ec54e842382841cb84b6;hp=44f9544b58e68c27aa015b1f7e195205bee386fb;hpb=1f6f4f4f281fd7da7b66c05606b66b029d87032f;p=openldap diff --git a/libraries/libldap_r/tpool.c b/libraries/libldap_r/tpool.c index 44f9544b58..b1cb016351 100644 --- a/libraries/libldap_r/tpool.c +++ b/libraries/libldap_r/tpool.c @@ -1,7 +1,7 @@ /* $OpenLDAP$ */ /* This work is part of OpenLDAP Software . * - * Copyright 1998-2014 The OpenLDAP Foundation. + * Copyright 1998-2017 The OpenLDAP Foundation. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -66,8 +66,6 @@ typedef struct ldap_int_thread_userctx_s { /* Simple {thread ID -> context} hash table; key=ctx->ltu_id. - * Protected by ldap_pvt_thread_pool_mutex except during pauses, - * when it is read-only (used by pool_purgekey and pool_context). * Protected by ldap_pvt_thread_pool_mutex. */ static struct { @@ -92,6 +90,7 @@ typedef struct ldap_int_thread_task_s { } ltt_next; ldap_pvt_thread_start_t *ltt_start_routine; void *ltt_arg; + struct ldap_int_thread_poolq_s *ltt_queue; } ldap_int_thread_task_t; typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t; @@ -112,7 +111,7 @@ struct ldap_int_thread_poolq_s { ldap_pvt_thread_cond_t ltp_cond; /* ltp_pause == 0 ? <p_pending_list : &empty_pending_list, - * maintaned to reduce work for pool_wrapper() + * maintained to reduce work for pool_wrapper() */ ldap_int_tpool_plist_t *ltp_work_list; @@ -128,7 +127,7 @@ struct ldap_int_thread_poolq_s { int ltp_pending_count; /* Pending + paused + idle tasks */ int ltp_active_count; /* Active, not paused/idle tasks */ - int ltp_open_count; /* Number of threads, negated when ltp_pause */ + int ltp_open_count; /* Number of threads */ int ltp_starting; /* Currently starting threads */ }; @@ -266,16 +265,22 @@ ldap_pvt_thread_pool_init_q ( max_threads = LDAP_MAXTHR; rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex); - if (rc != 0) + if (rc != 0) { +fail: + for (i=0; iltp_wqs[i]->ltp_free); + LDAP_FREE(pool->ltp_wqs); + LDAP_FREE(pool); return(rc); + } rc = ldap_pvt_thread_cond_init(&pool->ltp_cond); if (rc != 0) - return(rc); + goto fail; rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond); if (rc != 0) - return(rc); + goto fail; rem_thr = max_threads % numqs; rem_pend = max_pending % numqs; @@ -333,25 +338,21 @@ ldap_pvt_thread_pool_init ( return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 ); } -static int -ldap_int_poolq_hash( - struct ldap_int_thread_pool_s *pool, - void *arg ) +/* Submit a task to be performed by the thread pool */ +int +ldap_pvt_thread_pool_submit ( + ldap_pvt_thread_pool_t *tpool, + ldap_pvt_thread_start_t *start_routine, void *arg ) { - int i = 0, j; - unsigned char *ptr = (unsigned char *)&arg; - /* dumb hash of arg to choose a queue */ - for (j=0; jltp_numqs; - return i; + return ldap_pvt_thread_pool_submit2( tpool, start_routine, arg, NULL ); } /* Submit a task to be performed by the thread pool */ int -ldap_pvt_thread_pool_submit ( +ldap_pvt_thread_pool_submit2 ( ldap_pvt_thread_pool_t *tpool, - ldap_pvt_thread_start_t *start_routine, void *arg ) + ldap_pvt_thread_start_t *start_routine, void *arg, + void **cookie ) { struct ldap_int_thread_pool_s *pool; struct ldap_int_thread_poolq_s *pq; @@ -367,9 +368,23 @@ ldap_pvt_thread_pool_submit ( if (pool == NULL) return(-1); - if ( pool->ltp_numqs > 1 ) - i = ldap_int_poolq_hash( pool, arg ); - else + if ( pool->ltp_numqs > 1 ) { + int min = pool->ltp_wqs[0]->ltp_max_pending + pool->ltp_wqs[0]->ltp_max_count; + int min_x = 0, cnt; + for ( i = 0; i < pool->ltp_numqs; i++ ) { + /* take first queue that has nothing active */ + if ( !pool->ltp_wqs[i]->ltp_active_count ) { + min_x = i; + break; + } + cnt = pool->ltp_wqs[i]->ltp_active_count + pool->ltp_wqs[i]->ltp_pending_count; + if ( cnt < min ) { + min = cnt; + min_x = i; + } + } + i = min_x; + } else i = 0; j = i; @@ -397,6 +412,9 @@ ldap_pvt_thread_pool_submit ( task->ltt_start_routine = start_routine; task->ltt_arg = arg; + task->ltt_queue = pq; + if ( cookie ) + *cookie = task; pq->ltp_pending_count++; LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q); @@ -471,29 +489,22 @@ no_task( void *ctx, void *arg ) */ int ldap_pvt_thread_pool_retract ( - ldap_pvt_thread_pool_t *tpool, - ldap_pvt_thread_start_t *start_routine, void *arg ) + void *cookie ) { - struct ldap_int_thread_pool_s *pool; + ldap_int_thread_task_t *task, *ttmp; struct ldap_int_thread_poolq_s *pq; - ldap_int_thread_task_t *task; - int i; - if (tpool == NULL) + if (cookie == NULL) return(-1); - pool = *tpool; - - if (pool == NULL) + ttmp = cookie; + pq = ttmp->ltt_queue; + if (pq == NULL) return(-1); - i = ldap_int_poolq_hash( pool, arg ); - pq = pool->ltp_wqs[i]; - ldap_pvt_thread_mutex_lock(&pq->ltp_mutex); LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q) - if (task->ltt_start_routine == start_routine && - task->ltt_arg == arg) { + if (task == ttmp) { /* Could LDAP_STAILQ_REMOVE the task, but that * walks ltp_pending_list again to find it. */ @@ -1020,6 +1031,8 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type ) { ldap_int_thread_userctx_t *ctx = ldap_pvt_thread_pool_context(); pq = ctx->ltu_pq; + if ( !pq ) + return(-1); } /* Let pool_unidle() ignore requests for new pauses */ @@ -1084,9 +1097,6 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type ) if (j != i) ldap_pvt_thread_mutex_lock(&pq->ltp_mutex); - /* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test, - * and do not finish threads in ldap_pvt_thread_pool_wrapper() */ - pq->ltp_open_count = -pq->ltp_open_count; /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */ pq->ltp_work_list = &empty_pending_list; @@ -1173,8 +1183,6 @@ ldap_pvt_thread_pool_resume ( pool->ltp_pause = 0; for (i=0; iltp_numqs; i++) { pq = pool->ltp_wqs[i]; - if (pq->ltp_open_count <= 0) /* true when paused, but be paranoid */ - pq->ltp_open_count = -pq->ltp_open_count; pq->ltp_work_list = &pq->ltp_pending_list; ldap_pvt_thread_cond_broadcast(&pq->ltp_cond); }