From: Howard Chu Date: Sat, 26 Mar 2005 09:52:04 +0000 (+0000) Subject: ITS#3607 add automatic BDB recovery X-Git-Tag: OPENLDAP_AC_BP~1062 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=d7396583d5bac01fda89da592824b1c8179984f4;p=openldap ITS#3607 add automatic BDB recovery --- diff --git a/servers/slapd/back-bdb/alock.c b/servers/slapd/back-bdb/alock.c new file mode 100644 index 0000000000..82ee0f3680 --- /dev/null +++ b/servers/slapd/back-bdb/alock.c @@ -0,0 +1,596 @@ +/* alock.c - access lock library */ +/* $OpenLDAP$ */ +/* This work is part of OpenLDAP Software . + * + * Copyright 2005 The OpenLDAP Foundation. + * Portions Copyright 2004-2005 Symas Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in the file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * . + */ +/* ACKNOWLEDGEMENTS: + * This work was initially developed by Matthew Backes at Symas + * Corporation for inclusion in OpenLDAP Software. + */ + +#include "portable.h" +#include "alock.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +static int +alock_grab_lock ( int fd, int slot ) +{ + int res; + +#ifdef HAVE_LOCKF + res = lseek (fd, (off_t) (ALOCK_SLOT_SIZE * slot), SEEK_SET); + if (res == -1) return -1; + res = lockf (fd, F_LOCK, (off_t) ALOCK_SLOT_SIZE); +#else +# ifdef HAVE_FCNTL + struct flock lock_info; + (void) memset ((void *) &lock_info, 0, sizeof (struct flock)); + + lock_info.l_type = F_WRLCK; + lock_info.l_whence = SEEK_SET; + lock_info.l_start = (off_t) (ALOCK_SLOT_SIZE * slot); + lock_info.l_len = (off_t) ALOCK_SLOT_SIZE; + + res = fcntl (fd, F_SETLKW, &lock_info); +# else +# error libalock needs lockf or fcntl +# endif +#endif + if (res == -1) { + assert (errno != EDEADLK); + return -1; + } + return 0; +} + +static int +alock_release_lock ( int fd, int slot ) +{ + int res; + +#ifdef HAVE_LOCKF + res = lseek (fd, (off_t) (ALOCK_SLOT_SIZE * slot), SEEK_SET); + if (res == -1) return -1; + res = lockf (fd, F_ULOCK, (off_t) ALOCK_SLOT_SIZE); + if (res == -1) return -1; +#else +# ifdef HAVE_FCNTL + struct flock lock_info; + (void) memset ((void *) &lock_info, 0, sizeof (struct flock)); + + lock_info.l_type = F_UNLCK; + lock_info.l_whence = SEEK_SET; + lock_info.l_start = (off_t) (ALOCK_SLOT_SIZE * slot); + lock_info.l_len = (off_t) ALOCK_SLOT_SIZE; + + res = fcntl (fd, F_SETLKW, &lock_info); + if (res == -1) return -1; +# else +# error libalock needs lockf or fcntl +# endif +#endif + + return 0; +} + +static int +alock_test_lock ( int fd, int slot ) +{ + int res; + +#ifdef HAVE_LOCKF + res = lseek (fd, (off_t) (ALOCK_SLOT_SIZE * slot), SEEK_SET); + if (res == -1) return -1; + + res = lockf (fd, F_TEST, (off_t) ALOCK_SLOT_SIZE); + if (res == -1) { + if (errno == EACCES) { + return ALOCK_LOCKED; + } else { + return -1; + } + } +#else +# ifdef HAVE_FCNTL + struct flock lock_info; + (void) memset ((void *) &lock_info, 0, sizeof (struct flock)); + + lock_info.l_type = F_WRLCK; + lock_info.l_whence = SEEK_SET; + lock_info.l_start = (off_t) (ALOCK_SLOT_SIZE * slot); + lock_info.l_len = (off_t) ALOCK_SLOT_SIZE; + + res = fcntl (fd, F_GETLK, &lock_info); + if (res == -1) return -1; + + if (lock_info.l_type != F_UNLCK) return ALOCK_LOCKED; +# else +# error libalock needs lockf or fcntl +# endif +#endif + + return 0; +} + +/* Read a 64bit LE value */ +static unsigned long int +alock_read_iattr ( unsigned char * bufptr ) +{ + unsigned long int val = 0; + int count; + + assert (bufptr != NULL); + + bufptr += sizeof (unsigned long int); + for (count=0; count <= sizeof (unsigned long int); ++count) { + val <<= 8; + val += (unsigned long int) *bufptr--; + } + + return val; +} + +/* Write a 64bit LE value */ +static void +alock_write_iattr ( unsigned char * bufptr, + unsigned long int val ) +{ + int count; + + assert (bufptr != NULL); + + for (count=0; count < 8; ++count) { + *bufptr++ = (unsigned char) (val & 0xff); + val >>= 8; + } +} + +static int +alock_read_slot ( alock_info_t * info, + alock_slot_t * slot_data ) +{ + unsigned char slotbuf [ALOCK_SLOT_SIZE]; + int res, size, size_total, err; + + assert (info != NULL); + assert (slot_data != NULL); + assert (info->al_slot > 0); + + res = lseek (info->al_fd, + (off_t) (ALOCK_SLOT_SIZE * info->al_slot), + SEEK_SET); + if (res == -1) return -1; + + size_total = 0; + while (size_total < ALOCK_SLOT_SIZE) { + size = read (info->al_fd, + slotbuf + size_total, + ALOCK_SLOT_SIZE - size_total); + if (size == 0) return -1; + if (size < 0) { + err = errno; + if (err != EINTR && err != EAGAIN) return -1; + } else { + size_total += size; + } + } + + if (alock_read_iattr (slotbuf) != ALOCK_MAGIC) { + return 1; + } + slot_data->al_lock = alock_read_iattr (slotbuf+8); + slot_data->al_stamp = alock_read_iattr (slotbuf+16); + slot_data->al_pid = alock_read_iattr (slotbuf+24); + + if (slot_data->al_appname) free (slot_data->al_appname); + slot_data->al_appname = calloc (1, ALOCK_MAX_APPNAME); + strncpy (slot_data->al_appname, slotbuf+32, ALOCK_MAX_APPNAME-1); + (slot_data->al_appname) [ALOCK_MAX_APPNAME-1] = '\0'; + + return 0; +} + +static int +alock_write_slot ( alock_info_t * info, + alock_slot_t * slot_data ) +{ + unsigned char slotbuf [ALOCK_SLOT_SIZE]; + int res, size, size_total, err; + + assert (info != NULL); + assert (slot_data != NULL); + assert (info->al_slot > 0); + + (void) memset ((void *) slotbuf, 0, ALOCK_SLOT_SIZE); + + alock_write_iattr (slotbuf, ALOCK_MAGIC); + assert (alock_read_iattr (slotbuf) == ALOCK_MAGIC); + alock_write_iattr (slotbuf+8, slot_data->al_lock); + alock_write_iattr (slotbuf+16, slot_data->al_stamp); + alock_write_iattr (slotbuf+24, slot_data->al_pid); + + strncpy (slotbuf+32, slot_data->al_appname, ALOCK_MAX_APPNAME-1); + slotbuf[ALOCK_SLOT_SIZE-1] = '\0'; + + res = lseek (info->al_fd, + (off_t) (ALOCK_SLOT_SIZE * info->al_slot), + SEEK_SET); + if (res == -1) return -1; + + size_total = 0; + while (size_total < ALOCK_SLOT_SIZE) { + size = write (info->al_fd, + slotbuf + size_total, + ALOCK_SLOT_SIZE - size_total); + if (size == 0) return -1; + if (size < 0) { + err = errno; + if (err != EINTR && err != EAGAIN) return -1; + } else { + size_total += size; + } + } + + return 0; +} + +static int +alock_query_slot ( alock_info_t * info ) +{ + int res; + alock_slot_t slot_data; + + assert (info != NULL); + assert (info->al_slot > 0); + + (void) memset ((void *) &slot_data, 0, sizeof (alock_slot_t)); + alock_read_slot (info, &slot_data); + if (slot_data.al_lock == ALOCK_UNLOCKED) return ALOCK_UNLOCKED; + + if (slot_data.al_appname != NULL) free (slot_data.al_appname); + slot_data.al_appname = NULL; + + res = alock_test_lock (info->al_fd, info->al_slot); + if (res < 0) return -1; + if (res > 0) { + if (slot_data.al_lock == ALOCK_UNIQUE) { + return ALOCK_UNIQUE; + } else { + return ALOCK_LOCKED; + } + } + + return ALOCK_DIRTY; +} + +int +alock_open ( alock_info_t * info, + const char * appname, + const char * envdir, + int locktype ) +{ + struct stat statbuf; + alock_info_t scan_info; + alock_slot_t slot_data; + char * filename; + struct timeval tv; + int res, max_slot; + int dirty_count, live_count; + + assert (info != NULL); + assert (appname != NULL); + assert (envdir != NULL); + assert (locktype >= 1 && locktype <= 2); + + res = gettimeofday (&tv, NULL); + if (res == -1) return ALOCK_UNSTABLE; + + slot_data.al_lock = locktype; + slot_data.al_stamp = tv.tv_sec; + slot_data.al_pid = getpid(); + slot_data.al_appname = calloc (1, ALOCK_MAX_APPNAME); + strncpy (slot_data.al_appname, appname, ALOCK_MAX_APPNAME-1); + slot_data.al_appname [ALOCK_MAX_APPNAME-1] = '\0'; + + filename = calloc (1, strlen (envdir) + strlen ("/alock") + 1); + strcpy (filename, envdir); + strcat (filename, "/alock"); + info->al_fd = open (filename, O_CREAT|O_RDWR, 0666); + free (filename); + if (info->al_fd < 0) { + free (slot_data.al_appname); + return ALOCK_UNSTABLE; + } + info->al_slot = 0; + + res = alock_grab_lock (info->al_fd, 0); + if (res == -1) { + close (info->al_fd); + free (slot_data.al_appname); + return ALOCK_UNSTABLE; + } + + res = fstat (info->al_fd, &statbuf); + if (res == -1) { + close (info->al_fd); + free (slot_data.al_appname); + return ALOCK_UNSTABLE; + } + + max_slot = (statbuf.st_size + ALOCK_SLOT_SIZE - 1) / ALOCK_SLOT_SIZE; + dirty_count = 0; + live_count = 0; + scan_info.al_fd = info->al_fd; + for (scan_info.al_slot = 1; + scan_info.al_slot < max_slot; + ++ scan_info.al_slot) { + if (scan_info.al_slot != info->al_slot) { + res = alock_query_slot (&scan_info); + + if (res == ALOCK_UNLOCKED + && info->al_slot == 0) { + info->al_slot = scan_info.al_slot; + + } else if (res == ALOCK_LOCKED) { + ++live_count; + + } else if (res == ALOCK_UNIQUE + && locktype == ALOCK_UNIQUE) { + close (info->al_fd); + free (slot_data.al_appname); + return ALOCK_BUSY; + + } else if (res == ALOCK_DIRTY) { + ++dirty_count; + + } else if (res == -1) { + close (info->al_fd); + free (slot_data.al_appname); + return ALOCK_UNSTABLE; + + } + } + } + + if (dirty_count && live_count) { + close (info->al_fd); + free (slot_data.al_appname); + return ALOCK_UNSTABLE; + } + + if (info->al_slot == 0) info->al_slot = max_slot + 1; + res = alock_grab_lock (info->al_fd, + info->al_slot); + if (res == -1) { + close (info->al_fd); + free (slot_data.al_appname); + return ALOCK_UNSTABLE; + } + res = alock_write_slot (info, &slot_data); + free (slot_data.al_appname); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + + res = alock_release_lock (info->al_fd, 0); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + + if (dirty_count) return ALOCK_RECOVER; + return ALOCK_CLEAN; +} + +int +alock_scan ( alock_info_t * info ) +{ + struct stat statbuf; + alock_info_t scan_info; + int res, max_slot; + int dirty_count, live_count; + + assert (info != NULL); + + scan_info.al_fd = info->al_fd; + + res = alock_grab_lock (info->al_fd, 0); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + + res = fstat (info->al_fd, &statbuf); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + + max_slot = (statbuf.st_size + ALOCK_SLOT_SIZE - 1) / ALOCK_SLOT_SIZE; + dirty_count = 0; + live_count = 0; + for (scan_info.al_slot = 1; + scan_info.al_slot < max_slot; + ++ scan_info.al_slot) { + if (scan_info.al_slot != info->al_slot) { + res = alock_query_slot (&scan_info); + + if (res == ALOCK_LOCKED) { + ++live_count; + + } else if (res == ALOCK_DIRTY) { + ++dirty_count; + + } else if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + + } + } + } + + res = alock_release_lock (info->al_fd, 0); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + + if (dirty_count) { + if (live_count) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } else { + return ALOCK_RECOVER; + } + } + + return ALOCK_CLEAN; +} + +int +alock_close ( alock_info_t * info ) +{ + alock_slot_t slot_data; + int res; + + (void) memset ((void *) &slot_data, 0, sizeof(alock_slot_t)); + + res = alock_grab_lock (info->al_fd, 0); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + + /* mark our slot as clean */ + res = alock_read_slot (info, &slot_data); + if (res == -1) { + close (info->al_fd); + if (slot_data.al_appname != NULL) + free (slot_data.al_appname); + return ALOCK_UNSTABLE; + } + slot_data.al_lock = ALOCK_UNLOCKED; + res = alock_write_slot (info, &slot_data); + if (res == -1) { + close (info->al_fd); + if (slot_data.al_appname != NULL) + free (slot_data.al_appname); + return ALOCK_UNSTABLE; + } + if (slot_data.al_appname != NULL) { + free (slot_data.al_appname); + slot_data.al_appname = NULL; + } + + res = alock_release_lock (info->al_fd, info->al_slot); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + res = alock_release_lock (info->al_fd, 0); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + + res = close (info->al_fd); + if (res == -1) return ALOCK_UNSTABLE; + + return ALOCK_CLEAN; +} + +int +alock_recover ( alock_info_t * info ) +{ + struct stat statbuf; + alock_slot_t slot_data; + alock_info_t scan_info; + int res, max_slot; + + assert (info != NULL); + + scan_info.al_fd = info->al_fd; + + (void) memset ((void *) &slot_data, 0, sizeof(alock_slot_t)); + + res = alock_grab_lock (info->al_fd, 0); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + + res = fstat (info->al_fd, &statbuf); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + + max_slot = (statbuf.st_size + ALOCK_SLOT_SIZE - 1) / ALOCK_SLOT_SIZE; + for (scan_info.al_slot = 1; + scan_info.al_slot < max_slot; + ++ scan_info.al_slot) { + if (scan_info.al_slot != info->al_slot) { + res = alock_query_slot (&scan_info); + + if (res == ALOCK_LOCKED + || res == ALOCK_UNIQUE) { + /* recovery attempt on an active db? */ + close (info->al_fd); + return ALOCK_UNSTABLE; + + } else if (res == ALOCK_DIRTY) { + /* mark it clean */ + res = alock_read_slot (&scan_info, &slot_data); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + slot_data.al_lock = ALOCK_UNLOCKED; + res = alock_write_slot (&scan_info, &slot_data); + if (res == -1) { + close (info->al_fd); + if (slot_data.al_appname != NULL) + free (slot_data.al_appname); + return ALOCK_UNSTABLE; + } + if (slot_data.al_appname != NULL) { + free (slot_data.al_appname); + slot_data.al_appname = NULL; + } + + } else if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + + } + } + } + + res = alock_release_lock (info->al_fd, 0); + if (res == -1) { + close (info->al_fd); + return ALOCK_UNSTABLE; + } + + return ALOCK_CLEAN; +} diff --git a/servers/slapd/back-bdb/alock.h b/servers/slapd/back-bdb/alock.h new file mode 100644 index 0000000000..902077de96 --- /dev/null +++ b/servers/slapd/back-bdb/alock.h @@ -0,0 +1,69 @@ +/* alock.h - access lock header */ +/* $OpenLDAP$ */ +/* This work is part of OpenLDAP Software . + * + * Copyright 2005 The OpenLDAP Foundation. + * Portions Copyright 2004-2005 Symas Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in the file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * . + */ +/* ACKNOWLEDGEMENTS: + * This work was initially developed by Matthew Backes at Symas + * Corporation for inclusion in OpenLDAP Software. + */ + +#ifndef _ALOCK_H_ +#define _ALOCK_H_ + +#include "portable.h" +#include +#include + +/* environment states (all the slots together) */ +#define ALOCK_CLEAN (0) +#define ALOCK_RECOVER (1) +#define ALOCK_BUSY (2) +#define ALOCK_UNSTABLE (3) + +/* lock user types and states */ +#define ALOCK_UNLOCKED (0) +#define ALOCK_LOCKED (1) +#define ALOCK_UNIQUE (2) +#define ALOCK_DIRTY (3) + +/* constants */ +#define ALOCK_SLOT_SIZE (1024) +#define ALOCK_SLOT_IATTRS (4) +#define ALOCK_MAX_APPNAME (ALOCK_SLOT_SIZE - 8 * ALOCK_SLOT_IATTRS) +#define ALOCK_MAGIC (0x12345678) + +LDAP_BEGIN_DECL + +typedef struct alock_info { + int al_fd; + int al_slot; +} alock_info_t; + +typedef struct alock_slot { + unsigned int al_lock; + time_t al_stamp; + pid_t al_pid; + char * al_appname; +} alock_slot_t; + +extern int alock_open LDAP_P(( alock_info_t * info, const char * appname, + const char * envdir, int locktype )); +extern int alock_scan LDAP_P(( alock_info_t * info )); +extern int alock_close LDAP_P(( alock_info_t * info )); +extern int alock_recover LDAP_P(( alock_info_t * info )); + +LDAP_END_DECL + +#endif diff --git a/servers/slapd/back-bdb/back-bdb.h b/servers/slapd/back-bdb/back-bdb.h index 44517010fb..eb8cb02080 100644 --- a/servers/slapd/back-bdb/back-bdb.h +++ b/servers/slapd/back-bdb/back-bdb.h @@ -20,6 +20,7 @@ #include #include "slap.h" #include +#include "alock.h" LDAP_BEGIN_DECL @@ -189,6 +190,7 @@ struct bdb_info { bdb_idl_cache_entry_t *bi_idl_lru_tail; ldap_pvt_thread_rdwr_t bi_idl_tree_rwlock; ldap_pvt_thread_mutex_t bi_idl_tree_lrulock; + alock_info_t bi_alock_info; }; #define bi_id2entry bi_databases[BDB_ID2ENTRY] diff --git a/servers/slapd/back-bdb/init.c b/servers/slapd/back-bdb/init.c index f66f1fd690..962aa47f39 100644 --- a/servers/slapd/back-bdb/init.c +++ b/servers/slapd/back-bdb/init.c @@ -24,6 +24,7 @@ #include "back-bdb.h" #include #include +#include "alock.h" static const struct bdbi_database { char *file; @@ -121,13 +122,21 @@ bdb_db_open( BackendDB *be ) if ( !( slapMode & SLAP_TOOL_QUICK )) flags |= DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_TXN; -#if 0 - /* Never do automatic recovery, must perform it manually. - * Otherwise restarting with gentlehup will corrupt the - * database. - */ - if( !(slapMode & SLAP_TOOL_MODE) ) flags |= DB_RECOVER; -#endif + rc = alock_open( &bdb->bi_alock_info, "slapd", bdb->bi_dbenv_home, + slapMode & SLAP_TOOL_READONLY ? ALOCK_LOCKED : ALOCK_UNIQUE ); + if( rc == ALOCK_RECOVER ) { + Debug( LDAP_DEBUG_ANY, + "bdb_db_open: alock_open: recovery required\n", 0, 0, 0 ); + flags |= DB_RECOVER; + } else if( rc == ALOCK_BUSY ) { + Debug( LDAP_DEBUG_ANY, + "bdb_db_open: alock_open: database in use\n", 0, 0, 0 ); + return -1; + } else if( rc != ALOCK_CLEAN ) { + Debug( LDAP_DEBUG_ANY, + "bdb_db_open: alock_open: database unstable\n", 0, 0, 0 ); + return -1; + } /* If a key was set, use shared memory for the BDB environment */ if ( bdb->bi_shm_key ) { @@ -250,6 +259,14 @@ bdb_db_open( BackendDB *be ) db_strerror(rc), rc, 0 ); return rc; } + if( flags & DB_RECOVER ) { + rc = alock_recover (&bdb->bi_alock_info); + if( rc != 0 ) { + Debug( LDAP_DEBUG_ANY, + "bdb_db_open: unable to alock_recover\n", 0, 0, 0 ); + return -1; + } + } flags = DB_THREAD | bdb->bi_db_opflags; @@ -438,6 +455,13 @@ bdb_db_destroy( BackendDB *be ) } } + rc = alock_close( &bdb->bi_alock_info ); + if( rc != 0 ) { + Debug( LDAP_DEBUG_ANY, + "bdb_db_destroy: alock_close failed\n", 0, 0, 0 ); + return -1; + } + if( bdb->bi_dbenv_home ) ch_free( bdb->bi_dbenv_home ); ldap_pvt_thread_rdwr_destroy ( &bdb->bi_cache.c_rwlock );