]> git.sur5r.net Git - openldap/commitdiff
runqueue update
authorJong Hyuk Choi <jongchoi@openldap.org>
Wed, 7 May 2003 22:29:26 +0000 (22:29 +0000)
committerJong Hyuk Choi <jongchoi@openldap.org>
Wed, 7 May 2003 22:29:26 +0000 (22:29 +0000)
include/ldap_rq.h
libraries/libldap_r/rq.c
servers/slapd/backend.c
servers/slapd/daemon.c
servers/slapd/syncrepl.c
servers/slapd/tools/mimic.c

index f9082a7eb3831b18e73e192799eb343e2749e760..3344f902e1e9c83af972e26abda001490983098a 100644 (file)
@@ -1,12 +1,62 @@
 /* $OpenLDAP$ */
 
+#ifdef LDAP_SYNCREPL
+
 typedef struct re_s {
        struct timeval next_sched;
        struct timeval interval;
-       LDAP_STAILQ_ENTRY(re_s) next;
+       LDAP_STAILQ_ENTRY(re_s) tnext; /* it includes running */
+       LDAP_STAILQ_ENTRY(re_s) rnext;
        void *private;
 } re_t;
 
 typedef struct runqueue_s {
+       LDAP_STAILQ_HEAD(l, re_s) task_list;
        LDAP_STAILQ_HEAD(rl, re_s) run_list;
+       ldap_pvt_thread_mutex_t rq_mutex;
 } runqueue_t;
+
+LDAP_F( void )
+ldap_pvt_runqueue_insert(
+       struct runqueue_s* rq,
+       time_t interval,
+       void *private
+);
+
+LDAP_F( void )
+ldap_pvt_runqueue_remove(
+       struct runqueue_s* rq,
+       struct re_s* entry
+);
+
+LDAP_F( struct re_s* )
+ldap_pvt_runqueue_next_sched(
+       struct runqueue_s* rq,
+       struct timeval** next_run
+);
+
+LDAP_F( void )
+ldap_pvt_runqueue_runtask(
+       struct runqueue_s* rq,
+       struct re_s* entry
+);
+
+LDAP_F( void )
+ldap_pvt_runqueue_stoptask(
+       struct runqueue_s* rq,
+       struct re_s* entry
+);
+
+LDAP_F( int )
+ldap_pvt_runqueue_isrunning(
+       struct runqueue_s* rq,
+       struct re_s* entry
+);
+
+LDAP_F( void )
+ldap_pvt_runqueue_resched(
+       struct runqueue_s* rq,
+       struct re_s* entry
+);
+
+#endif
index 80f065a418145f17bc34cd1a7408a6ecf31b9081..4cf59c199252a20795790fa796f77d153af9bee3 100644 (file)
@@ -14,6 +14,8 @@
 #include "ldap_queue.h"
 #include "ldap_rq.h"
 
+#ifdef LDAP_SYNCREPL
+
 void
 ldap_pvt_runqueue_insert(
        struct runqueue_s* rq,
@@ -22,62 +24,123 @@ ldap_pvt_runqueue_insert(
 )
 {
        struct re_s* entry;
+
        entry = (struct re_s *) ch_calloc( 1, sizeof( struct re_s ));
        entry->interval.tv_sec = interval;
        entry->interval.tv_usec = 0;
        entry->next_sched.tv_sec = time( NULL );
        entry->next_sched.tv_usec = 0;
        entry->private = private;
-       LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
+       LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
 }
 
 void
+ldap_pvt_runqueue_remove(
+       struct runqueue_s* rq,
+       struct re_s* entry
+)
+{
+       struct re_s* e;
+
+       LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) {
+               if ( e == entry)
+                       break;
+       }
+
+       assert ( e == entry );
+
+       LDAP_STAILQ_REMOVE( &rq->task_list, entry, re_s, tnext );
+
+       ch_free( entry );
+
+}
+
+struct re_s*
 ldap_pvt_runqueue_next_sched(
        struct runqueue_s* rq,
-       struct timeval** next_run,
-       void **private
+       struct timeval** next_run
 )
 {
        struct re_s* entry;
-       entry = LDAP_STAILQ_FIRST( &rq->run_list );
+
+       entry = LDAP_STAILQ_FIRST( &rq->task_list );
        if ( entry == NULL ) {
                *next_run = NULL;
-               *private = NULL;
+               return NULL;
        } else {
                *next_run = &entry->next_sched;
-               *private = entry->private;
+               return entry;
+       }
+}
+
+void
+ldap_pvt_runqueue_runtask(
+       struct runqueue_s* rq,
+       struct re_s* entry
+)
+{
+       LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, rnext );
+}
+
+void
+ldap_pvt_runqueue_stoptask(
+       struct runqueue_s* rq,
+       struct re_s* entry
+)
+{
+       LDAP_STAILQ_REMOVE( &rq->run_list, entry, re_s, rnext );
+}
+
+int
+ldap_pvt_runqueue_isrunning(
+       struct runqueue_s* rq,
+       struct re_s* entry
+)
+{
+       struct re_s* e;
+
+       LDAP_STAILQ_FOREACH( e, &rq->run_list, rnext ) {
+               if ( e == entry ) {
+                       return 1;
+               }
        }
+       return 0;
 }
 
 void 
 ldap_pvt_runqueue_resched(
-       struct runqueue_s* rq
+       struct runqueue_s* rq,
+       struct re_s* entry
 )
 {
-       struct re_s* entry;
        struct re_s* prev;
        struct re_s* e;
 
-       entry = LDAP_STAILQ_FIRST( &rq->run_list );
-       if ( entry == NULL ) {
-               return;
+       LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) {
+               if ( e == entry )
+                       break;
+       }
+
+       assert ( e == entry );
+
+       LDAP_STAILQ_REMOVE( &rq->task_list, entry, re_s, tnext );
+
+       entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec;
+       if ( LDAP_STAILQ_EMPTY( &rq->task_list )) {
+               LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
        } else {
-               entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec;
-               LDAP_STAILQ_REMOVE_HEAD( &rq->run_list, next );
-               if ( LDAP_STAILQ_EMPTY( &rq->run_list )) {
-                       LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
-               } else {
-                       prev = entry;
-                       LDAP_STAILQ_FOREACH( e, &rq->run_list, next ) {
-                               if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) {
-                                       if ( prev == entry ) {
-                                               LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
-                                       } else {
-                                               LDAP_STAILQ_INSERT_AFTER( &rq->run_list, prev, entry, next );
-                                       }
+               prev = NULL;
+               LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) {
+                       if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) {
+                               if ( prev == NULL ) {
+                                       LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
+                               } else {
+                                       LDAP_STAILQ_INSERT_AFTER( &rq->task_list, prev, entry, tnext );
                                }
-                               prev = e;
                        }
+                       prev = e;
                }
        }
 }
+
+#endif
index 5f9ab5f5b9f679225e954e20310a962fa173e97c..0075e9921c492c5fc2647c19e32133136b56925c 100644 (file)
@@ -18,6 +18,8 @@
 #include "lutil.h"
 #include "lber_pvt.h"
 
+#include "ldap_rq.h"
+
 #ifdef LDAP_SLAPI
 #include "slapi.h"
 #endif
@@ -331,6 +333,12 @@ int backend_startup(Backend *be)
                }
        }
 
+#ifdef LDAP_SYNCREPL
+       ldap_pvt_thread_mutex_init( &syncrepl_rq.rq_mutex );
+       LDAP_STAILQ_INIT( &syncrepl_rq.task_list );
+       LDAP_STAILQ_INIT( &syncrepl_rq.run_list );
+#endif
+
        /* open each backend database */
        for( i = 0; i < nBackendDB; i++ ) {
                /* append global access controls */
@@ -355,8 +363,10 @@ int backend_startup(Backend *be)
 #ifdef LDAP_SYNCREPL
                if ( backendDB[i].syncinfo != NULL ) {
                        syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo;
+                       ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
                        ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval,
                                                        (void *) &backendDB[i] );
+                       ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
                }
 #endif
        }
index 147f1fc819b8d65a9b489f7e9a62fd37880baf94..571b8979d22dfe04109090aa28bf12894b0567da 100644 (file)
@@ -1219,8 +1219,8 @@ slapd_daemon_task(
 #ifdef LDAP_SYNCREPL
                struct timeval          diff;
                struct timeval          *cat;
-               BackendDB                       *db;
                time_t                          tdelta = 1;
+               struct re_s*            rtask;
 #endif
 
                now = slap_get_time();
@@ -1298,13 +1298,13 @@ slapd_daemon_task(
 
 #ifdef LDAP_SYNCREPL
                /* cat is of struct timeval containing the earliest schedule */
-               ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat, &db );
+               ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+               rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat );
+               ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
                if ( cat != NULL ) {
                        diff.tv_sec = cat->tv_sec - slap_get_time();
                        if ( diff.tv_sec == 0 )
                                diff.tv_sec = tdelta;
-               } else {
-                       cat = NULL;
                }
 #endif
 
@@ -1387,11 +1387,19 @@ slapd_daemon_task(
 #endif
 
 #ifdef LDAP_SYNCREPL
-                       if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) {
+                       ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+                       rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat );
+                       if ( ldap_pvt_runqueue_isrunning( &syncrepl_rq, rtask )) {
+                               ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
+                               ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
+                       } else if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) {
+                               ldap_pvt_runqueue_runtask( &syncrepl_rq, rtask );
+                               ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
+                               ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
                                ldap_pvt_thread_pool_submit( &connection_pool,
-                                                               do_syncrepl, (void *) db );
-                               /* FIXME : reschedule upon do_syncrepl termination */
-                               ldap_pvt_runqueue_resched( &syncrepl_rq );
+                                                               do_syncrepl, (void *) rtask );
+                       } else {
+                               ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
                        }
 #endif
                        ldap_pvt_thread_yield();
index 6dd40777f5d6295b9c7dc2a670856d0e3f3961c3..261287c5eb4abebc3628bbc597c4e74825f531c5 100644 (file)
@@ -118,7 +118,8 @@ do_syncrepl(
        void    *ctx,
        void    *arg )
 {
-       Backend *be = arg;
+       struct re_s* rtask = arg;
+       Backend *be = rtask->private;
        syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
 
        SlapReply       rs = {REP_RESULT};
@@ -634,7 +635,6 @@ do_syncrepl(
                Debug( LDAP_DEBUG_ANY,
                        "do_syncrepl : unknown result\n", 0, 0, 0 );
 #endif
-               return NULL;
        }
 
 done:
@@ -646,6 +646,16 @@ done:
        if ( res )
                ldap_msgfree( res );
        ldap_unbind( ld );
+
+       ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+       ldap_pvt_runqueue_stoptask( &syncrepl_rq, rtask );
+       if ( si->type == LDAP_SYNC_REFRESH_ONLY ) {
+               ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
+       } else {
+               ldap_pvt_runqueue_remove( &syncrepl_rq, rtask );
+       }
+       ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
+
        return NULL;
 }
 
index 5cf739b2b9ab424ac8987e26ee5752dc2e3d229d..a9325af5f2671dd471d84e50c5b3eaae99bbfe14 100644 (file)
@@ -256,11 +256,11 @@ struct runqueue_s syncrepl_rq;
 
 void init_syncrepl( )
 {
-        return -1;
+       return;
 }
 
 void* do_syncrepl( void *ctx, void *arg )
 {
-                return -1;
+       return NULL;
 }
 #endif