X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=libraries%2Flibldap_r%2Ftpool.c;h=140e1af55a4de75211b14abd535d253bafbd62cb;hb=a4f0fc9941adcdab8a414eef05fecfa561097d8e;hp=a82222cb1480ebbbebd4f1d9a4f983343faf0ba0;hpb=650aaee3a51254ea2068f3cf5c201be191f17710;p=openldap diff --git a/libraries/libldap_r/tpool.c b/libraries/libldap_r/tpool.c index a82222cb14..140e1af55a 100644 --- a/libraries/libldap_r/tpool.c +++ b/libraries/libldap_r/tpool.c @@ -1,7 +1,7 @@ /* $OpenLDAP$ */ /* This work is part of OpenLDAP Software . * - * Copyright 1998-2008 The OpenLDAP Foundation. + * Copyright 1998-2012 The OpenLDAP Foundation. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -50,6 +50,9 @@ typedef struct ldap_int_tpool_key_s { /* (Theoretical) max number of pending requests */ #define MAX_PENDING (INT_MAX/2) /* INT_MAX - (room to avoid overflow) */ +/* pool->ltp_pause values */ +enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 }; + /* Context: thread ID and thread-specific key/data pairs */ typedef struct ldap_int_thread_userctx_s { ldap_pvt_thread_t ltu_id; @@ -100,6 +103,11 @@ struct ldap_int_thread_pool_s { /* ltp_active_count <= 1 && ltp_pause */ ldap_pvt_thread_cond_t ltp_pcond; + /* ltp_pause == 0 ? <p_pending_list : &empty_pending_list, + * maintaned to reduce work for pool_wrapper() + */ + ldap_int_tpool_plist_t *ltp_work_list; + /* pending tasks, and unused task objects */ ldap_int_tpool_plist_t ltp_pending_list; LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list; @@ -120,24 +128,19 @@ struct ldap_int_thread_pool_s { /* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */ int ltp_max_count; - /* Max number of pending + paused requests, negated when ltp_finishing */ + /* Max pending + paused + idle tasks, negated when ltp_finishing */ int ltp_max_pending; - int ltp_pending_count; /* Pending or paused requests */ - int ltp_active_count; /* Active, not paused requests */ + int ltp_pending_count; /* Pending + paused + idle tasks */ + int ltp_active_count; /* Active, not paused/idle tasks */ int ltp_open_count; /* Number of threads, negated when ltp_pause */ int ltp_starting; /* Currenlty starting threads */ - /* - * State maintained to reduce the time ltp_mutex must be locked - * in ldap_pvt_thread_pool_(). + /* >0 if paused or we may open a thread, <0 if we should close a thread. + * Updated when ltp_ change. + * Maintained to reduce the time ltp_mutex must be locked in + * ldap_pvt_thread_pool_(). */ - - /* ltp_pause == 0 ? <p_pending_list : &empty_pending_list */ - ldap_int_tpool_plist_t *ltp_work_list; - - /* >0 if paused or we may open a thread, <0 if we should close a thread. */ - /* Updated when ltp_ change. */ int ltp_vary_open_count; # define SET_VARY_OPEN_COUNT(pool) \ ((pool)->ltp_vary_open_count = \ @@ -234,41 +237,12 @@ ldap_pvt_thread_pool_init ( LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next); ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex); -#if 0 - /* THIS WILL NOT WORK on some systems. If the process - * forks after starting a thread, there is no guarantee - * that the thread will survive the fork. For example, - * slapd forks in order to daemonize, and does so after - * calling ldap_pvt_thread_pool_init. On some systems, - * this initial thread does not run in the child process, - * but ltp_open_count == 1, so two things happen: - * 1) the first client connection fails, and 2) when - * slapd is kill'ed, it never terminates since it waits - * for all worker threads to exit. */ - - /* start up one thread, just so there is one. no need to - * lock the mutex right now, since no threads are running. + /* Start no threads just yet. That can break if the process forks + * later, as slapd does in order to daemonize. On at least POSIX, + * only the forking thread would survive in the child. Yet fork() + * can't unlock/clean up other threads' locks and data structures, + * unless pthread_atfork() handlers have been set up to do so. */ - pool->ltp_open_count++; - SET_VARY_OPEN_COUNT(pool); - - ldap_pvt_thread_t thr; - rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool ); - - if( rc != 0) { - /* couldn't start one? then don't start any */ - ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex); - LDAP_STAILQ_REMOVE(ldap_int_thread_pool_list, pool, - ldap_int_thread_pool_s, ltp_next); - ldap_int_has_thread_pool = 0; - ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex); - ldap_pvt_thread_cond_destroy(&pool->ltp_pcond); - ldap_pvt_thread_cond_destroy(&pool->ltp_cond); - ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex); - LDAP_FREE(pool); - return(-1); - } -#endif *tpool = pool; return(0); @@ -373,6 +347,47 @@ ldap_pvt_thread_pool_submit ( return(-1); } +static void * +no_task( void *ctx, void *arg ) +{ + return NULL; +} + +/* Cancel a pending task that was previously submitted. + * Return 1 if the task was successfully cancelled, 0 if + * not found, -1 for invalid parameters + */ +int +ldap_pvt_thread_pool_retract ( + ldap_pvt_thread_pool_t *tpool, + ldap_pvt_thread_start_t *start_routine, void *arg ) +{ + struct ldap_int_thread_pool_s *pool; + ldap_int_thread_task_t *task; + + if (tpool == NULL) + return(-1); + + pool = *tpool; + + if (pool == NULL) + return(-1); + + ldap_pvt_thread_mutex_lock(&pool->ltp_mutex); + LDAP_STAILQ_FOREACH(task, &pool->ltp_pending_list, ltt_next.q) + if (task->ltt_start_routine == start_routine && + task->ltt_arg == arg) { + /* Could LDAP_STAILQ_REMOVE the task, but that + * walks ltp_pending_list again to find it. + */ + task->ltt_start_routine = no_task; + task->ltt_arg = NULL; + break; + } + ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); + return task != NULL; +} + /* Set max #threads. value <= 0 means max supported #threads (LDAP_MAXTHR) */ int ldap_pvt_thread_pool_maxthreads( @@ -450,7 +465,7 @@ ldap_pvt_thread_pool_query( break; case LDAP_PVT_THREAD_POOL_PARAM_PAUSING: - count = pool->ltp_pause; + count = (pool->ltp_pause != 0); break; case LDAP_PVT_THREAD_POOL_PARAM_PENDING: @@ -500,7 +515,7 @@ ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool ) struct ldap_int_thread_pool_s *pool; if ( tpool != NULL && (pool = *tpool) != NULL ) { - rc = pool->ltp_pause; + rc = (pool->ltp_pause != 0); } return rc; @@ -525,17 +540,6 @@ ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool ) return rc; } -static void -flush_pending_list( struct ldap_int_thread_pool_s *pool ) -{ - ldap_int_thread_task_t *task; - - while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) { - LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q); - LDAP_FREE(task); - } -} - /* Destroy the pool after making its threads finish */ int ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending ) @@ -566,17 +570,20 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending ) SET_VARY_OPEN_COUNT(pool); if (pool->ltp_max_pending > 0) pool->ltp_max_pending = -pool->ltp_max_pending; - if (!run_pending) - flush_pending_list(pool); + + if (!run_pending) { + while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) { + LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q); + LDAP_FREE(task); + } + pool->ltp_pending_count = 0; + } while (pool->ltp_open_count) { if (!pool->ltp_pause) ldap_pvt_thread_cond_broadcast(&pool->ltp_cond); ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex); } - ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); - - flush_pending_list(pool); while ((task = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL) { @@ -584,6 +591,7 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending ) LDAP_FREE(task); } + ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); ldap_pvt_thread_cond_destroy(&pool->ltp_pcond); ldap_pvt_thread_cond_destroy(&pool->ltp_cond); ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex); @@ -600,6 +608,7 @@ ldap_int_thread_pool_wrapper ( { struct ldap_int_thread_pool_s *pool = xpool; ldap_int_thread_task_t *task; + ldap_int_tpool_plist_t *work_list; ldap_int_thread_userctx_t ctx, *kctx; unsigned i, keyslot, hash; @@ -632,48 +641,56 @@ ldap_int_thread_pool_wrapper ( ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex); pool->ltp_starting--; + pool->ltp_active_count++; for (;;) { - task = LDAP_STAILQ_FIRST(pool->ltp_work_list); + work_list = pool->ltp_work_list; /* help the compiler a bit */ + task = LDAP_STAILQ_FIRST(work_list); if (task == NULL) { /* paused or no pending tasks */ - if (pool->ltp_vary_open_count < 0) { - /* not paused, and either finishing or too many - * threads running (can happen if ltp_max_count - * was reduced) so let this thread die. - */ - break; + if (--(pool->ltp_active_count) < 2) { + /* Notify pool_pause it is the sole active thread. */ + ldap_pvt_thread_cond_signal(&pool->ltp_pcond); } - /* we could check an idle timer here, and let the - * thread die if it has been inactive for a while. - * only die if there are other open threads (i.e., - * always have at least one thread open). the check - * should be like this: - * if (pool->ltp_open_count > 1 && pool->ltp_starting == 0) - * check timer, wait if ltp_pause, leave thread (break;) - * - * Just use pthread_cond_timedwait if we want to - * check idle time. - */ + do { + if (pool->ltp_vary_open_count < 0) { + /* Not paused, and either finishing or too many + * threads running (can happen if ltp_max_count + * was reduced). Let this thread die. + */ + goto done; + } - ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex); - continue; + /* We could check an idle timer here, and let the + * thread die if it has been inactive for a while. + * Only die if there are other open threads (i.e., + * always have at least one thread open). + * The check should be like this: + * if (pool->ltp_open_count>1 && pool->ltp_starting==0) + * check timer, wait if ltp_pause, leave thread; + * + * Just use pthread_cond_timedwait() if we want to + * check idle time. + */ + ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex); + + work_list = pool->ltp_work_list; + task = LDAP_STAILQ_FIRST(work_list); + } while (task == NULL); + + pool->ltp_active_count++; } - LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q); + LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q); pool->ltp_pending_count--; - pool->ltp_active_count++; ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); task->ltt_start_routine(&ctx, task->ltt_arg); ldap_pvt_thread_mutex_lock(&pool->ltp_mutex); LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task, ltt_next.l); - pool->ltp_active_count--; - /* let pool_pause know when it is the sole active thread */ - if (pool->ltp_active_count < 2) - ldap_pvt_thread_cond_signal(&pool->ltp_pcond); } + done: assert(!pool->ltp_pause); /* thread_keys writable, ltp_open_count >= 0 */ @@ -697,12 +714,21 @@ ldap_int_thread_pool_wrapper ( return(NULL); } -/* Pause the pool. Return when all other threads are paused. */ -int -ldap_pvt_thread_pool_pause ( - ldap_pvt_thread_pool_t *tpool ) +/* Arguments > ltp_pause to handle_pause(,PAUSE_ARG()). arg=PAUSE_ARG + * ensures (arg-ltp_pause) sets GO_* at need and keeps DO_PAUSE/GO_*. + */ +#define GO_IDLE 8 +#define GO_UNIDLE 16 +#define CHECK_PAUSE 32 /* if ltp_pause: GO_IDLE; wait; GO_UNIDLE */ +#define DO_PAUSE 64 /* CHECK_PAUSE; pause the pool */ +#define PAUSE_ARG(a) \ + ((a) | ((a) & (GO_IDLE|GO_UNIDLE) ? GO_IDLE-1 : CHECK_PAUSE)) + +static int +handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type ) { struct ldap_int_thread_pool_s *pool; + int ret = 0, pause, max_ltp_pause; if (tpool == NULL) return(-1); @@ -712,37 +738,97 @@ ldap_pvt_thread_pool_pause ( if (pool == NULL) return(0); + if (pause_type == CHECK_PAUSE && !pool->ltp_pause) + return(0); + + /* Let pool_unidle() ignore requests for new pauses */ + max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED; + ldap_pvt_thread_mutex_lock(&pool->ltp_mutex); - /* If someone else has already requested a pause, we have to wait */ - if (pool->ltp_pause) { + pause = pool->ltp_pause; /* NOT_PAUSED, WANT_PAUSE or PAUSED */ + + /* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */ + pause_type -= pause; + + if (pause_type & GO_IDLE) { pool->ltp_pending_count++; pool->ltp_active_count--; - /* let the other pool_pause() know when it can proceed */ - if (pool->ltp_active_count < 2) + if (pause && pool->ltp_active_count < 2) { + /* Tell the task waiting to DO_PAUSE it can proceed */ ldap_pvt_thread_cond_signal(&pool->ltp_pcond); - do { - ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex); - } while (pool->ltp_pause); + } + } + + if (pause_type & GO_UNIDLE) { + /* Wait out pause if any, then cancel GO_IDLE */ + if (pause > max_ltp_pause) { + ret = 1; + do { + ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex); + } while (pool->ltp_pause > max_ltp_pause); + } pool->ltp_pending_count--; pool->ltp_active_count++; } - /* Wait for everyone else to pause or finish */ - pool->ltp_pause = 1; - /* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test, - * and do not finish threads in ldap_pvt_thread_pool_wrapper() */ - pool->ltp_open_count = -pool->ltp_open_count; - SET_VARY_OPEN_COUNT(pool); - /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */ - pool->ltp_work_list = &empty_pending_list; - - while (pool->ltp_active_count > 1) { - ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex); + if (pause_type & DO_PAUSE) { + /* Tell everyone else to pause or finish, then await that */ + ret = 0; + assert(!pool->ltp_pause); + pool->ltp_pause = WANT_PAUSE; + /* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test, + * and do not finish threads in ldap_pvt_thread_pool_wrapper() */ + pool->ltp_open_count = -pool->ltp_open_count; + SET_VARY_OPEN_COUNT(pool); + /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */ + pool->ltp_work_list = &empty_pending_list; + /* Wait for this task to become the sole active task */ + while (pool->ltp_active_count > 1) { + ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex); + } + assert(pool->ltp_pause == WANT_PAUSE); + pool->ltp_pause = PAUSED; } ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); - return(0); + return(ret); +} + +/* Consider this task idle: It will not block pool_pause() in other tasks. */ +void +ldap_pvt_thread_pool_idle( ldap_pvt_thread_pool_t *tpool ) +{ + handle_pause(tpool, PAUSE_ARG(GO_IDLE)); +} + +/* Cancel pool_idle(). If the pool is paused, wait it out first. */ +void +ldap_pvt_thread_pool_unidle( ldap_pvt_thread_pool_t *tpool ) +{ + handle_pause(tpool, PAUSE_ARG(GO_UNIDLE)); +} + +/* + * If a pause was requested, wait for it. If several threads + * are waiting to pause, let through one or more pauses. + * The calling task must be active, not idle. + * Return 1 if we waited, 0 if not, -1 at parameter error. + */ +int +ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool ) +{ + return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE)); +} + +/* + * Pause the pool. The calling task must be active, not idle. + * Return when all other tasks are paused or idle. + */ +int +ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool ) +{ + return handle_pause(tpool, PAUSE_ARG(DO_PAUSE)); } /* End a pause */ @@ -762,15 +848,14 @@ ldap_pvt_thread_pool_resume ( ldap_pvt_thread_mutex_lock(&pool->ltp_mutex); - assert(pool->ltp_pause); + assert(pool->ltp_pause == PAUSED); pool->ltp_pause = 0; if (pool->ltp_open_count <= 0) /* true when paused, but be paranoid */ pool->ltp_open_count = -pool->ltp_open_count; SET_VARY_OPEN_COUNT(pool); pool->ltp_work_list = &pool->ltp_pending_list; - if (!pool->ltp_finishing) - ldap_pvt_thread_cond_broadcast(&pool->ltp_cond); + ldap_pvt_thread_cond_broadcast(&pool->ltp_cond); ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); return(0); @@ -907,7 +992,7 @@ void *ldap_pvt_thread_pool_context( ) void *ctx = NULL; ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx ); - return ctx ? ctx : &ldap_int_main_thrctx; + return ctx ? ctx : (void *) &ldap_int_main_thrctx; } /*