]> git.sur5r.net Git - openldap/commitdiff
a runqueue for periodic thread execution (for syncrepl)
authorJong Hyuk Choi <jongchoi@openldap.org>
Wed, 7 May 2003 02:06:01 +0000 (02:06 +0000)
committerJong Hyuk Choi <jongchoi@openldap.org>
Wed, 7 May 2003 02:06:01 +0000 (02:06 +0000)
include/ldap_rq.h [new file with mode: 0644]
libraries/libldap_r/Makefile.in
libraries/libldap_r/rq.c [new file with mode: 0644]
libraries/libldap_r/tpool.c
servers/slapd/backend.c
servers/slapd/daemon.c
servers/slapd/proto-slap.h
servers/slapd/syncrepl.c
servers/slapd/tools/mimic.c

diff --git a/include/ldap_rq.h b/include/ldap_rq.h
new file mode 100644 (file)
index 0000000..f9082a7
--- /dev/null
@@ -0,0 +1,12 @@
+/* $OpenLDAP$ */
+
+typedef struct re_s {
+       struct timeval next_sched;
+       struct timeval interval;
+       LDAP_STAILQ_ENTRY(re_s) next;
+       void *private;
+} re_t;
+
+typedef struct runqueue_s {
+       LDAP_STAILQ_HEAD(rl, re_s) run_list;
+} runqueue_t;
index 0e5c8b20b2c304166aff8406bc209e831ab547a3..95fe32c76f1170702830a3402a1ac79b2c6efd79 100644 (file)
@@ -19,10 +19,10 @@ XXSRCS    = apitest.c test.c \
        request.c os-ip.c url.c sortctrl.c vlvctrl.c \
        init.c options.c print.c string.c util-int.c schema.c \
        charray.c tls.c os-local.c dnssrv.c utf-8.c utf-8-conv.c
-SRCS   = threads.c rdwr.c tpool.c \
+SRCS   = threads.c rdwr.c tpool.c rq.c \
        thr_posix.c thr_cthreads.c thr_thr.c thr_lwp.c thr_nt.c \
        thr_pth.c thr_stub.c
-OBJS   = threads.lo rdwr.lo tpool.lo  \
+OBJS   = threads.lo rdwr.lo tpool.lo  rq.lo \
        thr_posix.lo thr_cthreads.lo thr_thr.lo thr_lwp.lo thr_nt.lo \
        thr_pth.lo thr_stub.lo \
        bind.lo open.lo result.lo error.lo compare.lo search.lo \
diff --git a/libraries/libldap_r/rq.c b/libraries/libldap_r/rq.c
new file mode 100644 (file)
index 0000000..80f065a
--- /dev/null
@@ -0,0 +1,83 @@
+/* $OpenLDAP$ */
+#include "portable.h"
+
+#include <stdio.h>
+
+#include <ac/stdarg.h>
+#include <ac/stdlib.h>
+#include <ac/string.h>
+#include <ac/time.h>
+#include <ac/errno.h>
+
+#include "ldap-int.h"
+#include "ldap_pvt_thread.h"
+#include "ldap_queue.h"
+#include "ldap_rq.h"
+
+void
+ldap_pvt_runqueue_insert(
+       struct runqueue_s* rq,
+       time_t interval,
+       void *private
+)
+{
+       struct re_s* entry;
+       entry = (struct re_s *) ch_calloc( 1, sizeof( struct re_s ));
+       entry->interval.tv_sec = interval;
+       entry->interval.tv_usec = 0;
+       entry->next_sched.tv_sec = time( NULL );
+       entry->next_sched.tv_usec = 0;
+       entry->private = private;
+       LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
+}
+
+void
+ldap_pvt_runqueue_next_sched(
+       struct runqueue_s* rq,
+       struct timeval** next_run,
+       void **private
+)
+{
+       struct re_s* entry;
+       entry = LDAP_STAILQ_FIRST( &rq->run_list );
+       if ( entry == NULL ) {
+               *next_run = NULL;
+               *private = NULL;
+       } else {
+               *next_run = &entry->next_sched;
+               *private = entry->private;
+       }
+}
+
+void 
+ldap_pvt_runqueue_resched(
+       struct runqueue_s* rq
+)
+{
+       struct re_s* entry;
+       struct re_s* prev;
+       struct re_s* e;
+
+       entry = LDAP_STAILQ_FIRST( &rq->run_list );
+       if ( entry == NULL ) {
+               return;
+       } else {
+               entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec;
+               LDAP_STAILQ_REMOVE_HEAD( &rq->run_list, next );
+               if ( LDAP_STAILQ_EMPTY( &rq->run_list )) {
+                       LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
+               } else {
+                       prev = entry;
+                       LDAP_STAILQ_FOREACH( e, &rq->run_list, next ) {
+                               if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) {
+                                       if ( prev == entry ) {
+                                               LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
+                                       } else {
+                                               LDAP_STAILQ_INSERT_AFTER( &rq->run_list, prev, entry, next );
+                                       }
+                               }
+                               prev = e;
+                       }
+               }
+       }
+}
index 5fc4d709d68872ed56b05a23c853687d153dbbc3..9ee509d9aa87e0caf2025a11a47e3e17ed7deea6 100644 (file)
@@ -559,4 +559,4 @@ void *ldap_pvt_thread_pool_context( )
        return thread_keys[i].ctx;
 }
 
-#endif /* LDAP_HAVE_THREAD_POOL */
+#endif /* LDAP_THREAD_HAVE_TPOOL */
index 6598b451ab357446f6ec84e2fa556a0adb62997e..5f9ab5f5b9f679225e954e20310a962fa173e97c 100644 (file)
@@ -354,20 +354,9 @@ int backend_startup(Backend *be)
 
 #ifdef LDAP_SYNCREPL
                if ( backendDB[i].syncinfo != NULL ) {
-                       int ret;
-                       ret = ldap_pvt_thread_pool_submit( &syncrepl_pool,
-                                       do_syncrepl, (void *) &backendDB[i] );
-                       if ( ret != 0 ) {
-#ifdef NEW_LOGGING
-                               LDAP_LOG( BACKEND, CRIT,
-                                       "syncrepl thread pool submit failed (%d)\n",
-                                       ret, 0, 0 );
-#else
-                               Debug( LDAP_DEBUG_ANY,
-                                       "ldap_pvt_thread_pool_submit failed (%d) \n",
-                                       ret, 0, 0 );
-#endif
-                       }
+                       syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo;
+                       ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval,
+                                                       (void *) &backendDB[i] );
                }
 #endif
        }
index e42565af46a7fc541a696d07c6ec4d69ffcafb7d..efb0a932fce58b8e7993d78517d0df25b4664ae2 100644 (file)
 #include "lutil.h"
 #include "slap.h"
 
+#ifdef LDAP_SYNCREPL
+#include "ldap_rq.h"
+#endif
+
 #ifdef HAVE_TCPD
 #include <tcpd.h>
 #define SLAP_STRING_UNKNOWN    STRING_UNKNOWN
@@ -1212,6 +1216,13 @@ slapd_daemon_task(
                struct timeval          zero;
                struct timeval          *tvp;
 
+#ifdef LDAP_SYNCREPL
+               struct timeval          diff;
+               struct timeval          *cat;
+               BackendDB                       *db;
+               time_t                          tdelta = 1;
+#endif
+
                if( emfile ) {
                        now = slap_get_time();
                        connections_timeout_idle( now );
@@ -1285,12 +1296,32 @@ slapd_daemon_task(
 
                at = ldap_pvt_thread_pool_backload(&connection_pool);
 
+#ifdef LDAP_SYNCREPL
+               /* cat is of struct timeval containing the earliest schedule */
+               ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat, &db );
+               if ( cat != NULL ) {
+                       diff.tv_sec = cat->tv_sec - slap_get_time();
+                       if ( diff.tv_sec == 0 )
+                               diff.tv_sec = tdelta;
+               } else {
+                       cat = NULL;
+               }
+#endif
+
 #if defined( HAVE_YIELDING_SELECT ) || defined( NO_THREADS )
                tvp = NULL;
 #else
                tvp = at ? &zero : NULL;
 #endif
 
+#ifdef LDAP_SYNCREPL
+               if ( cat != NULL ) {
+                       if ( tvp == NULL ) {
+                               tvp = &diff;
+                       }
+               }
+#endif
+
                for ( l = 0; slap_listeners[l] != NULL; l++ ) {
                        if ( slap_listeners[l]->sl_sd == AC_SOCKET_INVALID )
                                continue;
@@ -1354,6 +1385,15 @@ slapd_daemon_task(
                        Debug( LDAP_DEBUG_CONNS, "daemon: select timeout - yielding\n",
                            0, 0, 0 );
 #endif
+
+#ifdef LDAP_SYNCREPL
+                       if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) {
+                               ldap_pvt_thread_pool_submit( &connection_pool,
+                                                               do_syncrepl, (void *) db );
+                               /* FIXME : reschedule upon do_syncrepl termination */
+                               ldap_pvt_runqueue_resched( &syncrepl_rq );
+                       }
+#endif
                        ldap_pvt_thread_yield();
                        continue;
 
index 711b115023428bab752d71002f20c9c9538f8691..91d81bc4598a50ebc48b26e5f71aa327e4812b4c 100644 (file)
@@ -1123,6 +1123,8 @@ LDAP_SLAPD_F (int) do_extended LDAP_P((Operation *op, SlapReply *rs));
  */
 
 #ifdef LDAP_SYNCREPL
+LDAP_SLAPD_V (struct runqueue_s) syncrepl_rq;
+
 LDAP_SLAPD_F (void) init_syncrepl LDAP_P(());
 LDAP_SLAPD_F (void*) do_syncrepl LDAP_P((void *, void *));
 #endif
index b7cdfa1101ce5992a7ebf0d7cba2d58b0bac0f6f..6dd40777f5d6295b9c7dc2a670856d0e3f3961c3 100644 (file)
@@ -35,6 +35,8 @@
 
 #ifdef LDAP_SYNCREPL
 
+#include "ldap_rq.h"
+
 static Entry*
 syncrepl_message_to_entry ( LDAP *, Operation *, LDAPMessage *,
                Modifications **, int*, struct berval *, struct berval * );
@@ -75,6 +77,8 @@ static AttributeDescription **add_descs_lastmod;
 static AttributeDescription **del_descs;
 static AttributeDescription **del_descs_lastmod;
 
+struct runqueue_s syncrepl_rq;
+
 void
 init_syncrepl()
 {
index c9aec23833024100d61cd128dbf49d72de2d029e..5cf739b2b9ab424ac8987e26ee5752dc2e3d229d 100644 (file)
 
 #include "../slap.h"
 
+#ifdef LDAP_SYNCREPL
+#include "ldap_rq.h"
+#endif
+
 /* needed by WIN32 and back-monitor */
 time_t starttime;
 
@@ -248,6 +252,8 @@ int root_dse_info( Connection *conn, Entry **entry, const char **text )
 }
 
 #ifdef LDAP_SYNCREPL
+struct runqueue_s syncrepl_rq;
+
 void init_syncrepl( )
 {
         return -1;