/* $OpenLDAP$ */
+#ifdef LDAP_SYNCREPL
+
typedef struct re_s {
struct timeval next_sched;
struct timeval interval;
- LDAP_STAILQ_ENTRY(re_s) next;
+ LDAP_STAILQ_ENTRY(re_s) tnext; /* it includes running */
+ LDAP_STAILQ_ENTRY(re_s) rnext;
void *private;
} re_t;
typedef struct runqueue_s {
+ LDAP_STAILQ_HEAD(l, re_s) task_list;
LDAP_STAILQ_HEAD(rl, re_s) run_list;
+ ldap_pvt_thread_mutex_t rq_mutex;
} runqueue_t;
+
+LDAP_F( void )
+ldap_pvt_runqueue_insert(
+ struct runqueue_s* rq,
+ time_t interval,
+ void *private
+);
+
+LDAP_F( void )
+ldap_pvt_runqueue_remove(
+ struct runqueue_s* rq,
+ struct re_s* entry
+);
+
+LDAP_F( struct re_s* )
+ldap_pvt_runqueue_next_sched(
+ struct runqueue_s* rq,
+ struct timeval** next_run
+);
+
+LDAP_F( void )
+ldap_pvt_runqueue_runtask(
+ struct runqueue_s* rq,
+ struct re_s* entry
+);
+
+LDAP_F( void )
+ldap_pvt_runqueue_stoptask(
+ struct runqueue_s* rq,
+ struct re_s* entry
+);
+
+LDAP_F( int )
+ldap_pvt_runqueue_isrunning(
+ struct runqueue_s* rq,
+ struct re_s* entry
+);
+
+LDAP_F( void )
+ldap_pvt_runqueue_resched(
+ struct runqueue_s* rq,
+ struct re_s* entry
+);
+
+#endif
#include "ldap_queue.h"
#include "ldap_rq.h"
+#ifdef LDAP_SYNCREPL
+
void
ldap_pvt_runqueue_insert(
struct runqueue_s* rq,
)
{
struct re_s* entry;
+
entry = (struct re_s *) ch_calloc( 1, sizeof( struct re_s ));
entry->interval.tv_sec = interval;
entry->interval.tv_usec = 0;
entry->next_sched.tv_sec = time( NULL );
entry->next_sched.tv_usec = 0;
entry->private = private;
- LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
+ LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
}
void
+ldap_pvt_runqueue_remove(
+ struct runqueue_s* rq,
+ struct re_s* entry
+)
+{
+ struct re_s* e;
+
+ LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) {
+ if ( e == entry)
+ break;
+ }
+
+ assert ( e == entry );
+
+ LDAP_STAILQ_REMOVE( &rq->task_list, entry, re_s, tnext );
+
+ ch_free( entry );
+
+}
+
+struct re_s*
ldap_pvt_runqueue_next_sched(
struct runqueue_s* rq,
- struct timeval** next_run,
- void **private
+ struct timeval** next_run
)
{
struct re_s* entry;
- entry = LDAP_STAILQ_FIRST( &rq->run_list );
+
+ entry = LDAP_STAILQ_FIRST( &rq->task_list );
if ( entry == NULL ) {
*next_run = NULL;
- *private = NULL;
+ return NULL;
} else {
*next_run = &entry->next_sched;
- *private = entry->private;
+ return entry;
+ }
+}
+
+void
+ldap_pvt_runqueue_runtask(
+ struct runqueue_s* rq,
+ struct re_s* entry
+)
+{
+ LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, rnext );
+}
+
+void
+ldap_pvt_runqueue_stoptask(
+ struct runqueue_s* rq,
+ struct re_s* entry
+)
+{
+ LDAP_STAILQ_REMOVE( &rq->run_list, entry, re_s, rnext );
+}
+
+int
+ldap_pvt_runqueue_isrunning(
+ struct runqueue_s* rq,
+ struct re_s* entry
+)
+{
+ struct re_s* e;
+
+ LDAP_STAILQ_FOREACH( e, &rq->run_list, rnext ) {
+ if ( e == entry ) {
+ return 1;
+ }
}
+ return 0;
}
void
ldap_pvt_runqueue_resched(
- struct runqueue_s* rq
+ struct runqueue_s* rq,
+ struct re_s* entry
)
{
- struct re_s* entry;
struct re_s* prev;
struct re_s* e;
- entry = LDAP_STAILQ_FIRST( &rq->run_list );
- if ( entry == NULL ) {
- return;
+ LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) {
+ if ( e == entry )
+ break;
+ }
+
+ assert ( e == entry );
+
+ LDAP_STAILQ_REMOVE( &rq->task_list, entry, re_s, tnext );
+
+ entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec;
+ if ( LDAP_STAILQ_EMPTY( &rq->task_list )) {
+ LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
} else {
- entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec;
- LDAP_STAILQ_REMOVE_HEAD( &rq->run_list, next );
- if ( LDAP_STAILQ_EMPTY( &rq->run_list )) {
- LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
- } else {
- prev = entry;
- LDAP_STAILQ_FOREACH( e, &rq->run_list, next ) {
- if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) {
- if ( prev == entry ) {
- LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
- } else {
- LDAP_STAILQ_INSERT_AFTER( &rq->run_list, prev, entry, next );
- }
+ prev = NULL;
+ LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) {
+ if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) {
+ if ( prev == NULL ) {
+ LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
+ } else {
+ LDAP_STAILQ_INSERT_AFTER( &rq->task_list, prev, entry, tnext );
}
- prev = e;
}
+ prev = e;
}
}
}
+
+#endif
#include "lutil.h"
#include "lber_pvt.h"
+#include "ldap_rq.h"
+
#ifdef LDAP_SLAPI
#include "slapi.h"
#endif
}
}
+#ifdef LDAP_SYNCREPL
+ ldap_pvt_thread_mutex_init( &syncrepl_rq.rq_mutex );
+ LDAP_STAILQ_INIT( &syncrepl_rq.task_list );
+ LDAP_STAILQ_INIT( &syncrepl_rq.run_list );
+#endif
+
/* open each backend database */
for( i = 0; i < nBackendDB; i++ ) {
/* append global access controls */
#ifdef LDAP_SYNCREPL
if ( backendDB[i].syncinfo != NULL ) {
syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo;
+ ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval,
(void *) &backendDB[i] );
+ ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
}
#endif
}
#ifdef LDAP_SYNCREPL
struct timeval diff;
struct timeval *cat;
- BackendDB *db;
time_t tdelta = 1;
+ struct re_s* rtask;
#endif
now = slap_get_time();
#ifdef LDAP_SYNCREPL
/* cat is of struct timeval containing the earliest schedule */
- ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat, &db );
+ ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+ rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat );
+ ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
if ( cat != NULL ) {
diff.tv_sec = cat->tv_sec - slap_get_time();
if ( diff.tv_sec == 0 )
diff.tv_sec = tdelta;
- } else {
- cat = NULL;
}
#endif
#endif
#ifdef LDAP_SYNCREPL
- if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) {
+ ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+ rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat );
+ if ( ldap_pvt_runqueue_isrunning( &syncrepl_rq, rtask )) {
+ ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
+ ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
+ } else if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) {
+ ldap_pvt_runqueue_runtask( &syncrepl_rq, rtask );
+ ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
+ ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
ldap_pvt_thread_pool_submit( &connection_pool,
- do_syncrepl, (void *) db );
- /* FIXME : reschedule upon do_syncrepl termination */
- ldap_pvt_runqueue_resched( &syncrepl_rq );
+ do_syncrepl, (void *) rtask );
+ } else {
+ ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
}
#endif
ldap_pvt_thread_yield();
void *ctx,
void *arg )
{
- Backend *be = arg;
+ struct re_s* rtask = arg;
+ Backend *be = rtask->private;
syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
SlapReply rs = {REP_RESULT};
Debug( LDAP_DEBUG_ANY,
"do_syncrepl : unknown result\n", 0, 0, 0 );
#endif
- return NULL;
}
done:
if ( res )
ldap_msgfree( res );
ldap_unbind( ld );
+
+ ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+ ldap_pvt_runqueue_stoptask( &syncrepl_rq, rtask );
+ if ( si->type == LDAP_SYNC_REFRESH_ONLY ) {
+ ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
+ } else {
+ ldap_pvt_runqueue_remove( &syncrepl_rq, rtask );
+ }
+ ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
+
return NULL;
}
void init_syncrepl( )
{
- return -1;
+ return;
}
void* do_syncrepl( void *ctx, void *arg )
{
- return -1;
+ return NULL;
}
#endif