From 45776bff04651074655474c0615f790dc1af35d3 Mon Sep 17 00:00:00 2001 From: Jong Hyuk Choi Date: Wed, 7 May 2003 02:06:01 +0000 Subject: [PATCH] a runqueue for periodic thread execution (for syncrepl) --- include/ldap_rq.h | 12 +++++ libraries/libldap_r/Makefile.in | 4 +- libraries/libldap_r/rq.c | 83 +++++++++++++++++++++++++++++++++ libraries/libldap_r/tpool.c | 2 +- servers/slapd/backend.c | 17 ++----- servers/slapd/daemon.c | 40 ++++++++++++++++ servers/slapd/proto-slap.h | 2 + servers/slapd/syncrepl.c | 4 ++ servers/slapd/tools/mimic.c | 6 +++ 9 files changed, 153 insertions(+), 17 deletions(-) create mode 100644 include/ldap_rq.h create mode 100644 libraries/libldap_r/rq.c diff --git a/include/ldap_rq.h b/include/ldap_rq.h new file mode 100644 index 0000000000..f9082a7eb3 --- /dev/null +++ b/include/ldap_rq.h @@ -0,0 +1,12 @@ +/* $OpenLDAP$ */ + +typedef struct re_s { + struct timeval next_sched; + struct timeval interval; + LDAP_STAILQ_ENTRY(re_s) next; + void *private; +} re_t; + +typedef struct runqueue_s { + LDAP_STAILQ_HEAD(rl, re_s) run_list; +} runqueue_t; diff --git a/libraries/libldap_r/Makefile.in b/libraries/libldap_r/Makefile.in index 0e5c8b20b2..95fe32c76f 100644 --- a/libraries/libldap_r/Makefile.in +++ b/libraries/libldap_r/Makefile.in @@ -19,10 +19,10 @@ XXSRCS = apitest.c test.c \ request.c os-ip.c url.c sortctrl.c vlvctrl.c \ init.c options.c print.c string.c util-int.c schema.c \ charray.c tls.c os-local.c dnssrv.c utf-8.c utf-8-conv.c -SRCS = threads.c rdwr.c tpool.c \ +SRCS = threads.c rdwr.c tpool.c rq.c \ thr_posix.c thr_cthreads.c thr_thr.c thr_lwp.c thr_nt.c \ thr_pth.c thr_stub.c -OBJS = threads.lo rdwr.lo tpool.lo \ +OBJS = threads.lo rdwr.lo tpool.lo rq.lo \ thr_posix.lo thr_cthreads.lo thr_thr.lo thr_lwp.lo thr_nt.lo \ thr_pth.lo thr_stub.lo \ bind.lo open.lo result.lo error.lo compare.lo search.lo \ diff --git a/libraries/libldap_r/rq.c b/libraries/libldap_r/rq.c new file mode 100644 index 0000000000..80f065a418 --- /dev/null +++ b/libraries/libldap_r/rq.c @@ -0,0 +1,83 @@ +/* $OpenLDAP$ */ +#include "portable.h" + +#include + +#include +#include +#include +#include +#include + +#include "ldap-int.h" +#include "ldap_pvt_thread.h" +#include "ldap_queue.h" +#include "ldap_rq.h" + +void +ldap_pvt_runqueue_insert( + struct runqueue_s* rq, + time_t interval, + void *private +) +{ + struct re_s* entry; + entry = (struct re_s *) ch_calloc( 1, sizeof( struct re_s )); + entry->interval.tv_sec = interval; + entry->interval.tv_usec = 0; + entry->next_sched.tv_sec = time( NULL ); + entry->next_sched.tv_usec = 0; + entry->private = private; + LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next ); +} + +void +ldap_pvt_runqueue_next_sched( + struct runqueue_s* rq, + struct timeval** next_run, + void **private +) +{ + struct re_s* entry; + entry = LDAP_STAILQ_FIRST( &rq->run_list ); + if ( entry == NULL ) { + *next_run = NULL; + *private = NULL; + } else { + *next_run = &entry->next_sched; + *private = entry->private; + } +} + +void +ldap_pvt_runqueue_resched( + struct runqueue_s* rq +) +{ + struct re_s* entry; + struct re_s* prev; + struct re_s* e; + + entry = LDAP_STAILQ_FIRST( &rq->run_list ); + if ( entry == NULL ) { + return; + } else { + entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec; + LDAP_STAILQ_REMOVE_HEAD( &rq->run_list, next ); + if ( LDAP_STAILQ_EMPTY( &rq->run_list )) { + LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next ); + } else { + prev = entry; + LDAP_STAILQ_FOREACH( e, &rq->run_list, next ) { + if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) { + if ( prev == entry ) { + LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next ); + } else { + LDAP_STAILQ_INSERT_AFTER( &rq->run_list, prev, entry, next ); + } + } + prev = e; + } + } + } +} diff --git a/libraries/libldap_r/tpool.c b/libraries/libldap_r/tpool.c index 5fc4d709d6..9ee509d9aa 100644 --- a/libraries/libldap_r/tpool.c +++ b/libraries/libldap_r/tpool.c @@ -559,4 +559,4 @@ void *ldap_pvt_thread_pool_context( ) return thread_keys[i].ctx; } -#endif /* LDAP_HAVE_THREAD_POOL */ +#endif /* LDAP_THREAD_HAVE_TPOOL */ diff --git a/servers/slapd/backend.c b/servers/slapd/backend.c index 6598b451ab..5f9ab5f5b9 100644 --- a/servers/slapd/backend.c +++ b/servers/slapd/backend.c @@ -354,20 +354,9 @@ int backend_startup(Backend *be) #ifdef LDAP_SYNCREPL if ( backendDB[i].syncinfo != NULL ) { - int ret; - ret = ldap_pvt_thread_pool_submit( &syncrepl_pool, - do_syncrepl, (void *) &backendDB[i] ); - if ( ret != 0 ) { -#ifdef NEW_LOGGING - LDAP_LOG( BACKEND, CRIT, - "syncrepl thread pool submit failed (%d)\n", - ret, 0, 0 ); -#else - Debug( LDAP_DEBUG_ANY, - "ldap_pvt_thread_pool_submit failed (%d) \n", - ret, 0, 0 ); -#endif - } + syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo; + ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval, + (void *) &backendDB[i] ); } #endif } diff --git a/servers/slapd/daemon.c b/servers/slapd/daemon.c index e42565af46..efb0a932fc 100644 --- a/servers/slapd/daemon.c +++ b/servers/slapd/daemon.c @@ -20,6 +20,10 @@ #include "lutil.h" #include "slap.h" +#ifdef LDAP_SYNCREPL +#include "ldap_rq.h" +#endif + #ifdef HAVE_TCPD #include #define SLAP_STRING_UNKNOWN STRING_UNKNOWN @@ -1212,6 +1216,13 @@ slapd_daemon_task( struct timeval zero; struct timeval *tvp; +#ifdef LDAP_SYNCREPL + struct timeval diff; + struct timeval *cat; + BackendDB *db; + time_t tdelta = 1; +#endif + if( emfile ) { now = slap_get_time(); connections_timeout_idle( now ); @@ -1285,12 +1296,32 @@ slapd_daemon_task( at = ldap_pvt_thread_pool_backload(&connection_pool); +#ifdef LDAP_SYNCREPL + /* cat is of struct timeval containing the earliest schedule */ + ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat, &db ); + if ( cat != NULL ) { + diff.tv_sec = cat->tv_sec - slap_get_time(); + if ( diff.tv_sec == 0 ) + diff.tv_sec = tdelta; + } else { + cat = NULL; + } +#endif + #if defined( HAVE_YIELDING_SELECT ) || defined( NO_THREADS ) tvp = NULL; #else tvp = at ? &zero : NULL; #endif +#ifdef LDAP_SYNCREPL + if ( cat != NULL ) { + if ( tvp == NULL ) { + tvp = &diff; + } + } +#endif + for ( l = 0; slap_listeners[l] != NULL; l++ ) { if ( slap_listeners[l]->sl_sd == AC_SOCKET_INVALID ) continue; @@ -1354,6 +1385,15 @@ slapd_daemon_task( Debug( LDAP_DEBUG_CONNS, "daemon: select timeout - yielding\n", 0, 0, 0 ); #endif + +#ifdef LDAP_SYNCREPL + if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) { + ldap_pvt_thread_pool_submit( &connection_pool, + do_syncrepl, (void *) db ); + /* FIXME : reschedule upon do_syncrepl termination */ + ldap_pvt_runqueue_resched( &syncrepl_rq ); + } +#endif ldap_pvt_thread_yield(); continue; diff --git a/servers/slapd/proto-slap.h b/servers/slapd/proto-slap.h index 711b115023..91d81bc459 100644 --- a/servers/slapd/proto-slap.h +++ b/servers/slapd/proto-slap.h @@ -1123,6 +1123,8 @@ LDAP_SLAPD_F (int) do_extended LDAP_P((Operation *op, SlapReply *rs)); */ #ifdef LDAP_SYNCREPL +LDAP_SLAPD_V (struct runqueue_s) syncrepl_rq; + LDAP_SLAPD_F (void) init_syncrepl LDAP_P(()); LDAP_SLAPD_F (void*) do_syncrepl LDAP_P((void *, void *)); #endif diff --git a/servers/slapd/syncrepl.c b/servers/slapd/syncrepl.c index b7cdfa1101..6dd40777f5 100644 --- a/servers/slapd/syncrepl.c +++ b/servers/slapd/syncrepl.c @@ -35,6 +35,8 @@ #ifdef LDAP_SYNCREPL +#include "ldap_rq.h" + static Entry* syncrepl_message_to_entry ( LDAP *, Operation *, LDAPMessage *, Modifications **, int*, struct berval *, struct berval * ); @@ -75,6 +77,8 @@ static AttributeDescription **add_descs_lastmod; static AttributeDescription **del_descs; static AttributeDescription **del_descs_lastmod; +struct runqueue_s syncrepl_rq; + void init_syncrepl() { diff --git a/servers/slapd/tools/mimic.c b/servers/slapd/tools/mimic.c index c9aec23833..5cf739b2b9 100644 --- a/servers/slapd/tools/mimic.c +++ b/servers/slapd/tools/mimic.c @@ -13,6 +13,10 @@ #include "../slap.h" +#ifdef LDAP_SYNCREPL +#include "ldap_rq.h" +#endif + /* needed by WIN32 and back-monitor */ time_t starttime; @@ -248,6 +252,8 @@ int root_dse_info( Connection *conn, Entry **entry, const char **text ) } #ifdef LDAP_SYNCREPL +struct runqueue_s syncrepl_rq; + void init_syncrepl( ) { return -1; -- 2.39.5