From: Howard Chu Date: Wed, 15 Mar 2017 11:13:09 +0000 (+0000) Subject: Fixes for multiple threadpool queues X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=e12ca8b6fed6b8a2526c5c8ee820bf5aa942b59d;p=openldap Fixes for multiple threadpool queues Remove poolq_hash, it wasn't distributing work evenly to the queues. Just walk through all queues and use the one with smallest active+pending count. Since pool_retract also relied on the hash, a different means of locating the thread to retract was needed. Add pool_submit2 which returns the threadpool task structure, and record which poolq this task lives on. --- diff --git a/include/ldap_pvt_thread.h b/include/ldap_pvt_thread.h index 3324cdf776..246d3b0b20 100644 --- a/include/ldap_pvt_thread.h +++ b/include/ldap_pvt_thread.h @@ -227,10 +227,15 @@ ldap_pvt_thread_pool_submit LDAP_P(( void *arg )); LDAP_F( int ) -ldap_pvt_thread_pool_retract LDAP_P(( +ldap_pvt_thread_pool_submit2 LDAP_P(( ldap_pvt_thread_pool_t *pool, ldap_pvt_thread_start_t *start, - void *arg )); + void *arg, + void **cookie )); + +LDAP_F( int ) +ldap_pvt_thread_pool_retract LDAP_P(( + void *cookie )); LDAP_F( int ) ldap_pvt_thread_pool_maxthreads LDAP_P(( diff --git a/include/ldap_rq.h b/include/ldap_rq.h index 15bfbbd6c6..23bd09dc10 100644 --- a/include/ldap_rq.h +++ b/include/ldap_rq.h @@ -29,6 +29,7 @@ typedef struct re_s { void *arg; char *tname; char *tspec; + void *pool_cookie; } re_t; typedef struct runqueue_s { diff --git a/libraries/libldap_r/tpool.c b/libraries/libldap_r/tpool.c index 538a223b6f..b785631213 100644 --- a/libraries/libldap_r/tpool.c +++ b/libraries/libldap_r/tpool.c @@ -90,6 +90,7 @@ typedef struct ldap_int_thread_task_s { } ltt_next; ldap_pvt_thread_start_t *ltt_start_routine; void *ltt_arg; + struct ldap_int_thread_poolq_s *ltt_queue; } ldap_int_thread_task_t; typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t; @@ -337,25 +338,21 @@ ldap_pvt_thread_pool_init ( return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 ); } -static int -ldap_int_poolq_hash( - struct ldap_int_thread_pool_s *pool, - void *arg ) +/* Submit a task to be performed by the thread pool */ +int +ldap_pvt_thread_pool_submit ( + ldap_pvt_thread_pool_t *tpool, + ldap_pvt_thread_start_t *start_routine, void *arg ) { - int i = 0, j; - unsigned char *ptr = (unsigned char *)&arg; - /* dumb hash of arg to choose a queue */ - for (j=0; jltp_numqs; - return i; + return ldap_pvt_thread_pool_submit2( tpool, start_routine, arg, NULL ); } /* Submit a task to be performed by the thread pool */ int -ldap_pvt_thread_pool_submit ( +ldap_pvt_thread_pool_submit2 ( ldap_pvt_thread_pool_t *tpool, - ldap_pvt_thread_start_t *start_routine, void *arg ) + ldap_pvt_thread_start_t *start_routine, void *arg, + void **cookie ) { struct ldap_int_thread_pool_s *pool; struct ldap_int_thread_poolq_s *pq; @@ -371,9 +368,23 @@ ldap_pvt_thread_pool_submit ( if (pool == NULL) return(-1); - if ( pool->ltp_numqs > 1 ) - i = ldap_int_poolq_hash( pool, arg ); - else + if ( pool->ltp_numqs > 1 ) { + int min = pool->ltp_wqs[0]->ltp_max_pending + pool->ltp_wqs[0]->ltp_max_count; + int min_x = 0, cnt; + for ( i = 0; i < pool->ltp_numqs; i++ ) { + /* take first queue that has nothing active */ + if ( !pool->ltp_wqs[i]->ltp_active_count ) { + min_x = i; + break; + } + cnt = pool->ltp_wqs[i]->ltp_active_count + pool->ltp_wqs[i]->ltp_pending_count; + if ( cnt < min ) { + min = cnt; + min_x = i; + } + } + i = min_x; + } else i = 0; j = i; @@ -401,6 +412,9 @@ ldap_pvt_thread_pool_submit ( task->ltt_start_routine = start_routine; task->ltt_arg = arg; + task->ltt_queue = pq; + if ( cookie ) + *cookie = task; pq->ltp_pending_count++; LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q); @@ -475,29 +489,22 @@ no_task( void *ctx, void *arg ) */ int ldap_pvt_thread_pool_retract ( - ldap_pvt_thread_pool_t *tpool, - ldap_pvt_thread_start_t *start_routine, void *arg ) + void *cookie ) { - struct ldap_int_thread_pool_s *pool; + ldap_int_thread_task_t *task, *ttmp; struct ldap_int_thread_poolq_s *pq; - ldap_int_thread_task_t *task; - int i; - if (tpool == NULL) + if (cookie == NULL) return(-1); - pool = *tpool; - - if (pool == NULL) + ttmp = cookie; + pq = ttmp->ltt_queue; + if (pq == NULL) return(-1); - i = ldap_int_poolq_hash( pool, arg ); - pq = pool->ltp_wqs[i]; - ldap_pvt_thread_mutex_lock(&pq->ltp_mutex); LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q) - if (task->ltt_start_routine == start_routine && - task->ltt_arg == arg) { + if (task == ttmp) { /* Could LDAP_STAILQ_REMOVE the task, but that * walks ltp_pending_list again to find it. */ diff --git a/servers/slapd/daemon.c b/servers/slapd/daemon.c index de26b1efe0..160619e857 100644 --- a/servers/slapd/daemon.c +++ b/servers/slapd/daemon.c @@ -2460,8 +2460,8 @@ loop: ldap_pvt_runqueue_runtask( &slapd_rq, rtask ); ldap_pvt_runqueue_resched( &slapd_rq, rtask, 0 ); ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); - ldap_pvt_thread_pool_submit( &connection_pool, - rtask->routine, (void *) rtask ); + ldap_pvt_thread_pool_submit2( &connection_pool, + rtask->routine, (void *) rtask, &rtask->pool_cookie ); ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); } rtask = ldap_pvt_runqueue_next_sched( &slapd_rq, &cat ); diff --git a/servers/slapd/syncrepl.c b/servers/slapd/syncrepl.c index c020229528..e0dfbc9670 100644 --- a/servers/slapd/syncrepl.c +++ b/servers/slapd/syncrepl.c @@ -5769,8 +5769,7 @@ syncrepl_config( ConfigArgs *c ) ldap_pvt_runqueue_stoptask( &slapd_rq, re ); isrunning = 1; } - if ( ldap_pvt_thread_pool_retract( &connection_pool, - re->routine, re ) > 0 ) + if ( ldap_pvt_thread_pool_retract( re->pool_cookie ) > 0 ) isrunning = 0; ldap_pvt_runqueue_remove( &slapd_rq, re );