--- /dev/null
+/* $OpenLDAP$ */
+
+typedef struct re_s {
+ struct timeval next_sched;
+ struct timeval interval;
+ LDAP_STAILQ_ENTRY(re_s) next;
+ void *private;
+} re_t;
+
+typedef struct runqueue_s {
+ LDAP_STAILQ_HEAD(rl, re_s) run_list;
+} runqueue_t;
request.c os-ip.c url.c sortctrl.c vlvctrl.c \
init.c options.c print.c string.c util-int.c schema.c \
charray.c tls.c os-local.c dnssrv.c utf-8.c utf-8-conv.c
-SRCS = threads.c rdwr.c tpool.c \
+SRCS = threads.c rdwr.c tpool.c rq.c \
thr_posix.c thr_cthreads.c thr_thr.c thr_lwp.c thr_nt.c \
thr_pth.c thr_stub.c
-OBJS = threads.lo rdwr.lo tpool.lo \
+OBJS = threads.lo rdwr.lo tpool.lo rq.lo \
thr_posix.lo thr_cthreads.lo thr_thr.lo thr_lwp.lo thr_nt.lo \
thr_pth.lo thr_stub.lo \
bind.lo open.lo result.lo error.lo compare.lo search.lo \
--- /dev/null
+/* $OpenLDAP$ */
+#include "portable.h"
+
+#include <stdio.h>
+
+#include <ac/stdarg.h>
+#include <ac/stdlib.h>
+#include <ac/string.h>
+#include <ac/time.h>
+#include <ac/errno.h>
+
+#include "ldap-int.h"
+#include "ldap_pvt_thread.h"
+#include "ldap_queue.h"
+#include "ldap_rq.h"
+
+void
+ldap_pvt_runqueue_insert(
+ struct runqueue_s* rq,
+ time_t interval,
+ void *private
+)
+{
+ struct re_s* entry;
+ entry = (struct re_s *) ch_calloc( 1, sizeof( struct re_s ));
+ entry->interval.tv_sec = interval;
+ entry->interval.tv_usec = 0;
+ entry->next_sched.tv_sec = time( NULL );
+ entry->next_sched.tv_usec = 0;
+ entry->private = private;
+ LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
+}
+
+void
+ldap_pvt_runqueue_next_sched(
+ struct runqueue_s* rq,
+ struct timeval** next_run,
+ void **private
+)
+{
+ struct re_s* entry;
+ entry = LDAP_STAILQ_FIRST( &rq->run_list );
+ if ( entry == NULL ) {
+ *next_run = NULL;
+ *private = NULL;
+ } else {
+ *next_run = &entry->next_sched;
+ *private = entry->private;
+ }
+}
+
+void
+ldap_pvt_runqueue_resched(
+ struct runqueue_s* rq
+)
+{
+ struct re_s* entry;
+ struct re_s* prev;
+ struct re_s* e;
+
+ entry = LDAP_STAILQ_FIRST( &rq->run_list );
+ if ( entry == NULL ) {
+ return;
+ } else {
+ entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec;
+ LDAP_STAILQ_REMOVE_HEAD( &rq->run_list, next );
+ if ( LDAP_STAILQ_EMPTY( &rq->run_list )) {
+ LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
+ } else {
+ prev = entry;
+ LDAP_STAILQ_FOREACH( e, &rq->run_list, next ) {
+ if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) {
+ if ( prev == entry ) {
+ LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
+ } else {
+ LDAP_STAILQ_INSERT_AFTER( &rq->run_list, prev, entry, next );
+ }
+ }
+ prev = e;
+ }
+ }
+ }
+}
return thread_keys[i].ctx;
}
-#endif /* LDAP_HAVE_THREAD_POOL */
+#endif /* LDAP_THREAD_HAVE_TPOOL */
#ifdef LDAP_SYNCREPL
if ( backendDB[i].syncinfo != NULL ) {
- int ret;
- ret = ldap_pvt_thread_pool_submit( &syncrepl_pool,
- do_syncrepl, (void *) &backendDB[i] );
- if ( ret != 0 ) {
-#ifdef NEW_LOGGING
- LDAP_LOG( BACKEND, CRIT,
- "syncrepl thread pool submit failed (%d)\n",
- ret, 0, 0 );
-#else
- Debug( LDAP_DEBUG_ANY,
- "ldap_pvt_thread_pool_submit failed (%d) \n",
- ret, 0, 0 );
-#endif
- }
+ syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo;
+ ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval,
+ (void *) &backendDB[i] );
}
#endif
}
#include "lutil.h"
#include "slap.h"
+#ifdef LDAP_SYNCREPL
+#include "ldap_rq.h"
+#endif
+
#ifdef HAVE_TCPD
#include <tcpd.h>
#define SLAP_STRING_UNKNOWN STRING_UNKNOWN
struct timeval zero;
struct timeval *tvp;
+#ifdef LDAP_SYNCREPL
+ struct timeval diff;
+ struct timeval *cat;
+ BackendDB *db;
+ time_t tdelta = 1;
+#endif
+
if( emfile ) {
now = slap_get_time();
connections_timeout_idle( now );
at = ldap_pvt_thread_pool_backload(&connection_pool);
+#ifdef LDAP_SYNCREPL
+ /* cat is of struct timeval containing the earliest schedule */
+ ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat, &db );
+ if ( cat != NULL ) {
+ diff.tv_sec = cat->tv_sec - slap_get_time();
+ if ( diff.tv_sec == 0 )
+ diff.tv_sec = tdelta;
+ } else {
+ cat = NULL;
+ }
+#endif
+
#if defined( HAVE_YIELDING_SELECT ) || defined( NO_THREADS )
tvp = NULL;
#else
tvp = at ? &zero : NULL;
#endif
+#ifdef LDAP_SYNCREPL
+ if ( cat != NULL ) {
+ if ( tvp == NULL ) {
+ tvp = &diff;
+ }
+ }
+#endif
+
for ( l = 0; slap_listeners[l] != NULL; l++ ) {
if ( slap_listeners[l]->sl_sd == AC_SOCKET_INVALID )
continue;
Debug( LDAP_DEBUG_CONNS, "daemon: select timeout - yielding\n",
0, 0, 0 );
#endif
+
+#ifdef LDAP_SYNCREPL
+ if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) {
+ ldap_pvt_thread_pool_submit( &connection_pool,
+ do_syncrepl, (void *) db );
+ /* FIXME : reschedule upon do_syncrepl termination */
+ ldap_pvt_runqueue_resched( &syncrepl_rq );
+ }
+#endif
ldap_pvt_thread_yield();
continue;
*/
#ifdef LDAP_SYNCREPL
+LDAP_SLAPD_V (struct runqueue_s) syncrepl_rq;
+
LDAP_SLAPD_F (void) init_syncrepl LDAP_P(());
LDAP_SLAPD_F (void*) do_syncrepl LDAP_P((void *, void *));
#endif
#ifdef LDAP_SYNCREPL
+#include "ldap_rq.h"
+
static Entry*
syncrepl_message_to_entry ( LDAP *, Operation *, LDAPMessage *,
Modifications **, int*, struct berval *, struct berval * );
static AttributeDescription **del_descs;
static AttributeDescription **del_descs_lastmod;
+struct runqueue_s syncrepl_rq;
+
void
init_syncrepl()
{
#include "../slap.h"
+#ifdef LDAP_SYNCREPL
+#include "ldap_rq.h"
+#endif
+
/* needed by WIN32 and back-monitor */
time_t starttime;
}
#ifdef LDAP_SYNCREPL
+struct runqueue_s syncrepl_rq;
+
void init_syncrepl( )
{
return -1;