X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=servers%2Fslurpd%2Ffm.c;h=987163873604c3dcd323aa5041c4792bc46b5fa9;hb=fe3b6d00714d551a4c781ee24627e2586d06459f;hp=e517cb665a56353aa177bbabbd6eb7c7bc53ced0;hpb=0c0f011dc0302348484d0278e2e8f2cbe53d1961;p=openldap diff --git a/servers/slurpd/fm.c b/servers/slurpd/fm.c index e517cb665a..9871638736 100644 --- a/servers/slurpd/fm.c +++ b/servers/slurpd/fm.c @@ -1,5 +1,18 @@ -/* - * Copyright (c) 1996 Regents of the University of Michigan. +/* $OpenLDAP$ */ +/* This work is part of OpenLDAP Software . + * + * Copyright 1998-2006 The OpenLDAP Foundation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * . + */ +/* Portions Copyright (c) 1996 Regents of the University of Michigan. * All rights reserved. * * Redistribution and use in source and binary forms are permitted @@ -9,42 +22,34 @@ * software without specific prior written permission. This software * is provided ``as is'' without express or implied warranty. */ +/* ACKNOWLEDGEMENTS: + * This work was originally developed by the University of Michigan + * (as part of U-MICH LDAP). + */ /* * fm.c - file management routines. */ -#define DISABLE_BRIDGE #include "portable.h" #include + +#include #include -#include +#include +#include +#include #include "slurp.h" #include "globals.h" -/* - * Externs - */ -extern void do_admin LDAP_P((void)); -extern int file_nonempty LDAP_P(( char * )); -extern int acquire_lock LDAP_P((char *, FILE **, FILE ** )); -extern int relinquish_lock LDAP_P((char *, FILE *, FILE * )); - /* * Forward references */ static char *get_record LDAP_P(( FILE * )); static void populate_queue LDAP_P(( char *f )); -static void set_shutdown LDAP_P((void)); -void do_nothing LDAP_P((void)); - -#ifdef DECL_SYS_ERRLIST -extern char *sys_errlist[]; -#endif /* DECL_SYS_ERRLIST */ - /* @@ -55,32 +60,26 @@ extern char *sys_errlist[]; * - adds items to the internal queue of replication work to do * - signals the replication threads to let them know new work has arrived. */ -void +void * fm( void *arg ) { int rc; + int i; + fd_set readfds; /* Set up our signal handlers: * SIG{TERM,INT,HUP} causes a shutdown - * SIG(STKFLT|USR1) - does nothing, used to wake up sleeping threads. - * SIG(UNUSED|USR2) - causes slurpd to read its administrative interface file. - * (not yet implemented). */ -#ifdef SIGSTKFLT - (void) SIGNAL( SIGSTKFLT, (void *) do_nothing ); -#else - (void) SIGNAL( SIGUSR1, (void *) do_nothing ); + (void) SIGNAL( SIGTERM, slurp_set_shutdown ); + (void) SIGNAL( SIGINT, slurp_set_shutdown ); +#ifdef SIGHUP + (void) SIGNAL( SIGHUP, slurp_set_shutdown ); #endif -#ifdef SIGUNUSED - (void) SIGNAL( SIGUNUSED, (void *) do_admin ); -#else - (void) SIGNAL( SIGUSR2, (void *) do_admin ); +#if defined(SIGBREAK) && defined(HAVE_NT_SERVICE_MANAGER) + (void) SIGNAL( SIGBREAK, do_nothing ); #endif - (void) SIGNAL( SIGTERM, (void *) set_shutdown ); - (void) SIGNAL( SIGINT, (void *) set_shutdown ); - (void) SIGNAL( SIGHUP, (void *) set_shutdown ); if ( sglob->one_shot_mode ) { if ( file_nonempty( sglob->slapd_replogfile )) { @@ -91,7 +90,7 @@ fm( sglob->rq->rq_getcount( sglob->rq, RQ_COUNT_ALL )); printf( "%d replication records to process.\n", sglob->rq->rq_getcount( sglob->rq, RQ_COUNT_NZRC )); - return; + return NULL; } /* * There may be some leftover replication records in our own @@ -102,6 +101,7 @@ fm( populate_queue( sglob->slurpd_replogfile ); } + FD_ZERO( &readfds ); while ( !sglob->slurpd_shutdown ) { if ( file_nonempty( sglob->slapd_replogfile )) { @@ -120,7 +120,13 @@ fm( } } } else { - tsleep( sglob->no_work_interval ); + struct timeval tv; + + FD_SET( sglob->wake_sds[0], &readfds ); + tv.tv_sec = sglob->no_work_interval; + tv.tv_usec = 0; + + rc = select( sglob->wake_sds[0]+1, &readfds, NULL, NULL, &tv ); } /* Garbage-collect queue */ @@ -140,7 +146,14 @@ fm( } } } + sglob->rq->rq_lock( sglob->rq ); /* lock queue */ + ldap_pvt_thread_cond_broadcast( &(sglob->rq->rq_more) ); /* wake repl threads */ + for ( i = 0; i < sglob->num_replicas; i++ ) { + (sglob->replicas[ i ])->ri_wake( sglob->replicas[ i ]); + } + sglob->rq->rq_unlock( sglob->rq ); /* unlock queue */ Debug( LDAP_DEBUG_ARGS, "fm: exiting\n", 0, 0, 0 ); + return NULL; } @@ -149,26 +162,13 @@ fm( /* * Set a global flag which signals that we're shutting down. */ -static void -set_shutdown() +RETSIGTYPE +slurp_set_shutdown(int sig) { - int i; - sglob->slurpd_shutdown = 1; /* set flag */ -#ifdef SIGSTKFLT - pthread_kill( sglob->fm_tid, SIGSTKFLT ); /* wake up file mgr */ -#else - pthread_kill( sglob->fm_tid, SIGUSR1 ); /* wake up file mgr */ -#endif - sglob->rq->rq_lock( sglob->rq ); /* lock queue */ - pthread_cond_broadcast( &(sglob->rq->rq_more) ); /* wake repl threads */ - for ( i = 0; i < sglob->num_replicas; i++ ) { - (sglob->replicas[ i ])->ri_wake( sglob->replicas[ i ]); - } - sglob->rq->rq_unlock( sglob->rq ); /* unlock queue */ - (void) SIGNAL( SIGTERM, (void *) set_shutdown ); /* reinstall handlers */ - (void) SIGNAL( SIGINT, (void *) set_shutdown ); - (void) SIGNAL( SIGHUP, (void *) set_shutdown ); + tcp_write( sglob->wake_sds[1], "0", 1); /* wake up file mgr */ + + (void) SIGNAL_REINSTALL( sig, slurp_set_shutdown ); /* reinstall handlers */ } @@ -177,14 +177,10 @@ set_shutdown() /* * A do-nothing signal handler. */ -void -do_nothing() +RETSIGTYPE +do_nothing(int sig) { -#ifdef SIGSTKFLT - (void) SIGNAL( SIGSTKFLT, (void *) do_nothing ); -#else - (void) SIGNAL( SIGUSR1, (void *) do_nothing ); -#endif + (void) SIGNAL_REINSTALL( sig, do_nothing ); } @@ -200,7 +196,6 @@ populate_queue( ) { FILE *fp, *lfp; - Rq *rq = sglob->rq; char *p; if ( acquire_lock( f, &fp, &lfp ) < 0 ) { @@ -218,8 +213,7 @@ populate_queue( Debug( LDAP_DEBUG_ANY, "error: can't seek to offset %ld in file \"%s\"\n", sglob->srpos, f, 0 ); - return; - } + } else { while (( p = get_record( fp )) != NULL ) { if ( sglob->rq->rq_add( sglob->rq, p ) < 0 ) { char *t; @@ -232,9 +226,10 @@ populate_queue( p, 0, 0 ); } free( p ); - pthread_yield(); + ldap_pvt_thread_yield(); } sglob->srpos = ftell( fp ); + } (void) relinquish_lock( f, fp, lfp ); } @@ -259,13 +254,13 @@ get_record( while (( fgets( line, sizeof(line), fp ) != NULL ) && (( len = strlen( line )) > 1 )) { - while ( lcur + len + 1 > lmax ) { - lmax += BUFSIZ; - buf = (char *) ch_realloc( buf, lmax ); - } - strcpy( buf + lcur, line ); - lcur += len; + + while ( lcur + len + 1 > lmax ) { + lmax += BUFSIZ; + buf = (char *) ch_realloc( buf, lmax ); + } + strcpy( buf + lcur, line ); + lcur += len; } return( buf ); } -