From 580ae073e173019f6334d711ac537fc7df092af1 Mon Sep 17 00:00:00 2001 From: Jong Hyuk Choi Date: Wed, 7 May 2003 22:29:26 +0000 Subject: [PATCH] runqueue update --- include/ldap_rq.h | 52 ++++++++++++++++- libraries/libldap_r/rq.c | 113 ++++++++++++++++++++++++++++-------- servers/slapd/backend.c | 10 ++++ servers/slapd/daemon.c | 24 +++++--- servers/slapd/syncrepl.c | 14 ++++- servers/slapd/tools/mimic.c | 4 +- 6 files changed, 179 insertions(+), 38 deletions(-) diff --git a/include/ldap_rq.h b/include/ldap_rq.h index f9082a7eb3..3344f902e1 100644 --- a/include/ldap_rq.h +++ b/include/ldap_rq.h @@ -1,12 +1,62 @@ /* $OpenLDAP$ */ +#ifdef LDAP_SYNCREPL + typedef struct re_s { struct timeval next_sched; struct timeval interval; - LDAP_STAILQ_ENTRY(re_s) next; + LDAP_STAILQ_ENTRY(re_s) tnext; /* it includes running */ + LDAP_STAILQ_ENTRY(re_s) rnext; void *private; } re_t; typedef struct runqueue_s { + LDAP_STAILQ_HEAD(l, re_s) task_list; LDAP_STAILQ_HEAD(rl, re_s) run_list; + ldap_pvt_thread_mutex_t rq_mutex; } runqueue_t; + +LDAP_F( void ) +ldap_pvt_runqueue_insert( + struct runqueue_s* rq, + time_t interval, + void *private +); + +LDAP_F( void ) +ldap_pvt_runqueue_remove( + struct runqueue_s* rq, + struct re_s* entry +); + +LDAP_F( struct re_s* ) +ldap_pvt_runqueue_next_sched( + struct runqueue_s* rq, + struct timeval** next_run +); + +LDAP_F( void ) +ldap_pvt_runqueue_runtask( + struct runqueue_s* rq, + struct re_s* entry +); + +LDAP_F( void ) +ldap_pvt_runqueue_stoptask( + struct runqueue_s* rq, + struct re_s* entry +); + +LDAP_F( int ) +ldap_pvt_runqueue_isrunning( + struct runqueue_s* rq, + struct re_s* entry +); + +LDAP_F( void ) +ldap_pvt_runqueue_resched( + struct runqueue_s* rq, + struct re_s* entry +); + +#endif diff --git a/libraries/libldap_r/rq.c b/libraries/libldap_r/rq.c index 80f065a418..4cf59c1992 100644 --- a/libraries/libldap_r/rq.c +++ b/libraries/libldap_r/rq.c @@ -14,6 +14,8 @@ #include "ldap_queue.h" #include "ldap_rq.h" +#ifdef LDAP_SYNCREPL + void ldap_pvt_runqueue_insert( struct runqueue_s* rq, @@ -22,62 +24,123 @@ ldap_pvt_runqueue_insert( ) { struct re_s* entry; + entry = (struct re_s *) ch_calloc( 1, sizeof( struct re_s )); entry->interval.tv_sec = interval; entry->interval.tv_usec = 0; entry->next_sched.tv_sec = time( NULL ); entry->next_sched.tv_usec = 0; entry->private = private; - LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next ); + LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext ); } void +ldap_pvt_runqueue_remove( + struct runqueue_s* rq, + struct re_s* entry +) +{ + struct re_s* e; + + LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) { + if ( e == entry) + break; + } + + assert ( e == entry ); + + LDAP_STAILQ_REMOVE( &rq->task_list, entry, re_s, tnext ); + + ch_free( entry ); + +} + +struct re_s* ldap_pvt_runqueue_next_sched( struct runqueue_s* rq, - struct timeval** next_run, - void **private + struct timeval** next_run ) { struct re_s* entry; - entry = LDAP_STAILQ_FIRST( &rq->run_list ); + + entry = LDAP_STAILQ_FIRST( &rq->task_list ); if ( entry == NULL ) { *next_run = NULL; - *private = NULL; + return NULL; } else { *next_run = &entry->next_sched; - *private = entry->private; + return entry; + } +} + +void +ldap_pvt_runqueue_runtask( + struct runqueue_s* rq, + struct re_s* entry +) +{ + LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, rnext ); +} + +void +ldap_pvt_runqueue_stoptask( + struct runqueue_s* rq, + struct re_s* entry +) +{ + LDAP_STAILQ_REMOVE( &rq->run_list, entry, re_s, rnext ); +} + +int +ldap_pvt_runqueue_isrunning( + struct runqueue_s* rq, + struct re_s* entry +) +{ + struct re_s* e; + + LDAP_STAILQ_FOREACH( e, &rq->run_list, rnext ) { + if ( e == entry ) { + return 1; + } } + return 0; } void ldap_pvt_runqueue_resched( - struct runqueue_s* rq + struct runqueue_s* rq, + struct re_s* entry ) { - struct re_s* entry; struct re_s* prev; struct re_s* e; - entry = LDAP_STAILQ_FIRST( &rq->run_list ); - if ( entry == NULL ) { - return; + LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) { + if ( e == entry ) + break; + } + + assert ( e == entry ); + + LDAP_STAILQ_REMOVE( &rq->task_list, entry, re_s, tnext ); + + entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec; + if ( LDAP_STAILQ_EMPTY( &rq->task_list )) { + LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext ); } else { - entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec; - LDAP_STAILQ_REMOVE_HEAD( &rq->run_list, next ); - if ( LDAP_STAILQ_EMPTY( &rq->run_list )) { - LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next ); - } else { - prev = entry; - LDAP_STAILQ_FOREACH( e, &rq->run_list, next ) { - if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) { - if ( prev == entry ) { - LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next ); - } else { - LDAP_STAILQ_INSERT_AFTER( &rq->run_list, prev, entry, next ); - } + prev = NULL; + LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) { + if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) { + if ( prev == NULL ) { + LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext ); + } else { + LDAP_STAILQ_INSERT_AFTER( &rq->task_list, prev, entry, tnext ); } - prev = e; } + prev = e; } } } + +#endif diff --git a/servers/slapd/backend.c b/servers/slapd/backend.c index 5f9ab5f5b9..0075e9921c 100644 --- a/servers/slapd/backend.c +++ b/servers/slapd/backend.c @@ -18,6 +18,8 @@ #include "lutil.h" #include "lber_pvt.h" +#include "ldap_rq.h" + #ifdef LDAP_SLAPI #include "slapi.h" #endif @@ -331,6 +333,12 @@ int backend_startup(Backend *be) } } +#ifdef LDAP_SYNCREPL + ldap_pvt_thread_mutex_init( &syncrepl_rq.rq_mutex ); + LDAP_STAILQ_INIT( &syncrepl_rq.task_list ); + LDAP_STAILQ_INIT( &syncrepl_rq.run_list ); +#endif + /* open each backend database */ for( i = 0; i < nBackendDB; i++ ) { /* append global access controls */ @@ -355,8 +363,10 @@ int backend_startup(Backend *be) #ifdef LDAP_SYNCREPL if ( backendDB[i].syncinfo != NULL ) { syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo; + ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex ); ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval, (void *) &backendDB[i] ); + ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); } #endif } diff --git a/servers/slapd/daemon.c b/servers/slapd/daemon.c index 147f1fc819..571b8979d2 100644 --- a/servers/slapd/daemon.c +++ b/servers/slapd/daemon.c @@ -1219,8 +1219,8 @@ slapd_daemon_task( #ifdef LDAP_SYNCREPL struct timeval diff; struct timeval *cat; - BackendDB *db; time_t tdelta = 1; + struct re_s* rtask; #endif now = slap_get_time(); @@ -1298,13 +1298,13 @@ slapd_daemon_task( #ifdef LDAP_SYNCREPL /* cat is of struct timeval containing the earliest schedule */ - ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat, &db ); + ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex ); + rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat ); + ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); if ( cat != NULL ) { diff.tv_sec = cat->tv_sec - slap_get_time(); if ( diff.tv_sec == 0 ) diff.tv_sec = tdelta; - } else { - cat = NULL; } #endif @@ -1387,11 +1387,19 @@ slapd_daemon_task( #endif #ifdef LDAP_SYNCREPL - if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) { + ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex ); + rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat ); + if ( ldap_pvt_runqueue_isrunning( &syncrepl_rq, rtask )) { + ldap_pvt_runqueue_resched( &syncrepl_rq, rtask ); + ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); + } else if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) { + ldap_pvt_runqueue_runtask( &syncrepl_rq, rtask ); + ldap_pvt_runqueue_resched( &syncrepl_rq, rtask ); + ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); ldap_pvt_thread_pool_submit( &connection_pool, - do_syncrepl, (void *) db ); - /* FIXME : reschedule upon do_syncrepl termination */ - ldap_pvt_runqueue_resched( &syncrepl_rq ); + do_syncrepl, (void *) rtask ); + } else { + ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); } #endif ldap_pvt_thread_yield(); diff --git a/servers/slapd/syncrepl.c b/servers/slapd/syncrepl.c index 6dd40777f5..261287c5eb 100644 --- a/servers/slapd/syncrepl.c +++ b/servers/slapd/syncrepl.c @@ -118,7 +118,8 @@ do_syncrepl( void *ctx, void *arg ) { - Backend *be = arg; + struct re_s* rtask = arg; + Backend *be = rtask->private; syncinfo_t *si = ( syncinfo_t * ) be->syncinfo; SlapReply rs = {REP_RESULT}; @@ -634,7 +635,6 @@ do_syncrepl( Debug( LDAP_DEBUG_ANY, "do_syncrepl : unknown result\n", 0, 0, 0 ); #endif - return NULL; } done: @@ -646,6 +646,16 @@ done: if ( res ) ldap_msgfree( res ); ldap_unbind( ld ); + + ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex ); + ldap_pvt_runqueue_stoptask( &syncrepl_rq, rtask ); + if ( si->type == LDAP_SYNC_REFRESH_ONLY ) { + ldap_pvt_runqueue_resched( &syncrepl_rq, rtask ); + } else { + ldap_pvt_runqueue_remove( &syncrepl_rq, rtask ); + } + ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); + return NULL; } diff --git a/servers/slapd/tools/mimic.c b/servers/slapd/tools/mimic.c index 5cf739b2b9..a9325af5f2 100644 --- a/servers/slapd/tools/mimic.c +++ b/servers/slapd/tools/mimic.c @@ -256,11 +256,11 @@ struct runqueue_s syncrepl_rq; void init_syncrepl( ) { - return -1; + return; } void* do_syncrepl( void *ctx, void *arg ) { - return -1; + return NULL; } #endif -- 2.39.5