]> git.sur5r.net Git - openldap/commitdiff
Fixes for multiple threadpool queues
authorHoward Chu <hyc@openldap.org>
Wed, 15 Mar 2017 11:13:09 +0000 (11:13 +0000)
committerHoward Chu <hyc@openldap.org>
Wed, 15 Mar 2017 11:13:09 +0000 (11:13 +0000)
Remove poolq_hash, it wasn't distributing work evenly to the queues.
Just walk through all queues and use the one with smallest
active+pending count. Since pool_retract also relied on the hash,
a different means of locating the thread to retract was needed.
Add pool_submit2 which returns the threadpool task structure,
and record which poolq this task lives on.

include/ldap_pvt_thread.h
include/ldap_rq.h
libraries/libldap_r/tpool.c
servers/slapd/daemon.c
servers/slapd/syncrepl.c

index 3324cdf77693049be4a09dd1ef87009d9283a0a7..246d3b0b20bcb3befe67abb559da363364b3f926 100644 (file)
@@ -227,10 +227,15 @@ ldap_pvt_thread_pool_submit LDAP_P((
        void *arg ));
 
 LDAP_F( int )
-ldap_pvt_thread_pool_retract LDAP_P((
+ldap_pvt_thread_pool_submit2 LDAP_P((
        ldap_pvt_thread_pool_t *pool,
        ldap_pvt_thread_start_t *start,
-       void *arg ));
+       void *arg,
+       void **cookie ));
+
+LDAP_F( int )
+ldap_pvt_thread_pool_retract LDAP_P((
+       void *cookie ));
 
 LDAP_F( int )
 ldap_pvt_thread_pool_maxthreads LDAP_P((
index 15bfbbd6c661c7c61287c4b55ee0e41445d223ba..23bd09dc10998335bb49ad522f989c4ad8e851e8 100644 (file)
@@ -29,6 +29,7 @@ typedef struct re_s {
        void *arg;
        char *tname;
        char *tspec;
+       void *pool_cookie;
 } re_t;
 
 typedef struct runqueue_s {
index 538a223b6fc071608a65e4d6933ab70ef49f453c..b785631213ced2581953076333b6e1ed320244fd 100644 (file)
@@ -90,6 +90,7 @@ typedef struct ldap_int_thread_task_s {
        } ltt_next;
        ldap_pvt_thread_start_t *ltt_start_routine;
        void *ltt_arg;
+       struct ldap_int_thread_poolq_s *ltt_queue;
 } ldap_int_thread_task_t;
 
 typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
@@ -337,25 +338,21 @@ ldap_pvt_thread_pool_init (
        return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 );
 }
 
-static int
-ldap_int_poolq_hash(
-       struct ldap_int_thread_pool_s *pool,
-       void *arg )
+/* Submit a task to be performed by the thread pool */
+int
+ldap_pvt_thread_pool_submit (
+       ldap_pvt_thread_pool_t *tpool,
+       ldap_pvt_thread_start_t *start_routine, void *arg )
 {
-       int i = 0, j;
-       unsigned char *ptr = (unsigned char *)&arg;
-       /* dumb hash of arg to choose a queue */
-       for (j=0; j<sizeof(arg); j++)
-               i += *ptr++;
-       i %= pool->ltp_numqs;
-       return i;
+       return ldap_pvt_thread_pool_submit2( tpool, start_routine, arg, NULL );
 }
 
 /* Submit a task to be performed by the thread pool */
 int
-ldap_pvt_thread_pool_submit (
+ldap_pvt_thread_pool_submit2 (
        ldap_pvt_thread_pool_t *tpool,
-       ldap_pvt_thread_start_t *start_routine, void *arg )
+       ldap_pvt_thread_start_t *start_routine, void *arg,
+       void **cookie )
 {
        struct ldap_int_thread_pool_s *pool;
        struct ldap_int_thread_poolq_s *pq;
@@ -371,9 +368,23 @@ ldap_pvt_thread_pool_submit (
        if (pool == NULL)
                return(-1);
 
-       if ( pool->ltp_numqs > 1 )
-               i = ldap_int_poolq_hash( pool, arg );
-       else
+       if ( pool->ltp_numqs > 1 ) {
+               int min = pool->ltp_wqs[0]->ltp_max_pending + pool->ltp_wqs[0]->ltp_max_count;
+               int min_x = 0, cnt;
+               for ( i = 0; i < pool->ltp_numqs; i++ ) {
+                       /* take first queue that has nothing active */
+                       if ( !pool->ltp_wqs[i]->ltp_active_count ) {
+                               min_x = i;
+                               break;
+                       }
+                       cnt = pool->ltp_wqs[i]->ltp_active_count + pool->ltp_wqs[i]->ltp_pending_count;
+                       if ( cnt < min ) {
+                               min = cnt;
+                               min_x = i;
+                       }
+               }
+               i = min_x;
+       } else
                i = 0;
 
        j = i;
@@ -401,6 +412,9 @@ ldap_pvt_thread_pool_submit (
 
        task->ltt_start_routine = start_routine;
        task->ltt_arg = arg;
+       task->ltt_queue = pq;
+       if ( cookie )
+               *cookie = task;
 
        pq->ltp_pending_count++;
        LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
@@ -475,29 +489,22 @@ no_task( void *ctx, void *arg )
  */
 int
 ldap_pvt_thread_pool_retract (
-       ldap_pvt_thread_pool_t *tpool,
-       ldap_pvt_thread_start_t *start_routine, void *arg )
+       void *cookie )
 {
-       struct ldap_int_thread_pool_s *pool;
+       ldap_int_thread_task_t *task, *ttmp;
        struct ldap_int_thread_poolq_s *pq;
-       ldap_int_thread_task_t *task;
-       int i;
 
-       if (tpool == NULL)
+       if (cookie == NULL)
                return(-1);
 
-       pool = *tpool;
-
-       if (pool == NULL)
+       ttmp = cookie;
+       pq = ttmp->ltt_queue;
+       if (pq == NULL)
                return(-1);
 
-       i = ldap_int_poolq_hash( pool, arg );
-       pq = pool->ltp_wqs[i];
-
        ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
        LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
-               if (task->ltt_start_routine == start_routine &&
-                       task->ltt_arg == arg) {
+               if (task == ttmp) {
                        /* Could LDAP_STAILQ_REMOVE the task, but that
                         * walks ltp_pending_list again to find it.
                         */
index de26b1efe04ddaf5049d8eb804f7ca6d352c7735..160619e857ded8ae94165b07996d839812c5a929 100644 (file)
@@ -2460,8 +2460,8 @@ loop:
                                        ldap_pvt_runqueue_runtask( &slapd_rq, rtask );
                                        ldap_pvt_runqueue_resched( &slapd_rq, rtask, 0 );
                                        ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
-                                       ldap_pvt_thread_pool_submit( &connection_pool,
-                                               rtask->routine, (void *) rtask );
+                                       ldap_pvt_thread_pool_submit2( &connection_pool,
+                                               rtask->routine, (void *) rtask, &rtask->pool_cookie );
                                        ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
                                }
                                rtask = ldap_pvt_runqueue_next_sched( &slapd_rq, &cat );
index c020229528af3af19b4793b98534616adb095194..e0dfbc9670941af2dd7b13872cd8389687b7df85 100644 (file)
@@ -5769,8 +5769,7 @@ syncrepl_config( ConfigArgs *c )
                                                                ldap_pvt_runqueue_stoptask( &slapd_rq, re );
                                                                isrunning = 1;
                                                        }
-                                                       if ( ldap_pvt_thread_pool_retract( &connection_pool,
-                                                                       re->routine, re ) > 0 )
+                                                       if ( ldap_pvt_thread_pool_retract( re->pool_cookie ) > 0 )
                                                                isrunning = 0;
 
                                                        ldap_pvt_runqueue_remove( &slapd_rq, re );