]> git.sur5r.net Git - openldap/commitdiff
ITS#3607 add automatic BDB recovery
authorHoward Chu <hyc@openldap.org>
Sat, 26 Mar 2005 09:52:04 +0000 (09:52 +0000)
committerHoward Chu <hyc@openldap.org>
Sat, 26 Mar 2005 09:52:04 +0000 (09:52 +0000)
servers/slapd/back-bdb/alock.c [new file with mode: 0644]
servers/slapd/back-bdb/alock.h [new file with mode: 0644]
servers/slapd/back-bdb/back-bdb.h
servers/slapd/back-bdb/init.c

diff --git a/servers/slapd/back-bdb/alock.c b/servers/slapd/back-bdb/alock.c
new file mode 100644 (file)
index 0000000..82ee0f3
--- /dev/null
@@ -0,0 +1,596 @@
+/* alock.c - access lock library */
+/* $OpenLDAP$ */
+/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
+ *
+ * Copyright 2005 The OpenLDAP Foundation.
+ * Portions Copyright 2004-2005 Symas Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * <http://www.OpenLDAP.org/license.html>.
+ */
+/* ACKNOWLEDGEMENTS:
+ * This work was initially developed by Matthew Backes at Symas
+ * Corporation for inclusion in OpenLDAP Software.
+ */
+
+#include "portable.h"
+#include "alock.h"
+
+#include <ac/stdlib.h>
+#include <ac/string.h>
+#include <ac/unistd.h>
+#include <ac/errno.h>
+#include <ac/assert.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+static int
+alock_grab_lock ( int fd, int slot )
+{
+       int res;
+       
+#ifdef HAVE_LOCKF
+       res = lseek (fd, (off_t) (ALOCK_SLOT_SIZE * slot), SEEK_SET);
+       if (res == -1) return -1;
+       res = lockf (fd, F_LOCK, (off_t) ALOCK_SLOT_SIZE);
+#else
+# ifdef HAVE_FCNTL
+       struct flock lock_info;
+       (void) memset ((void *) &lock_info, 0, sizeof (struct flock));
+
+       lock_info.l_type = F_WRLCK;
+       lock_info.l_whence = SEEK_SET;
+       lock_info.l_start = (off_t) (ALOCK_SLOT_SIZE * slot);
+       lock_info.l_len = (off_t) ALOCK_SLOT_SIZE;
+
+       res = fcntl (fd, F_SETLKW, &lock_info);
+# else
+#   error libalock needs lockf or fcntl
+# endif
+#endif
+       if (res == -1) {
+               assert (errno != EDEADLK);
+               return -1;
+       }
+       return 0;
+}
+
+static int
+alock_release_lock ( int fd, int slot )
+{
+       int res;
+       
+#ifdef HAVE_LOCKF
+       res = lseek (fd, (off_t) (ALOCK_SLOT_SIZE * slot), SEEK_SET);
+       if (res == -1) return -1;
+       res = lockf (fd, F_ULOCK, (off_t) ALOCK_SLOT_SIZE);
+       if (res == -1) return -1;
+#else
+# ifdef HAVE_FCNTL
+       struct flock lock_info;
+       (void) memset ((void *) &lock_info, 0, sizeof (struct flock));
+
+       lock_info.l_type = F_UNLCK;
+       lock_info.l_whence = SEEK_SET;
+       lock_info.l_start = (off_t) (ALOCK_SLOT_SIZE * slot);
+       lock_info.l_len = (off_t) ALOCK_SLOT_SIZE;
+
+       res = fcntl (fd, F_SETLKW, &lock_info);
+       if (res == -1) return -1;
+# else
+#    error libalock needs lockf or fcntl
+# endif
+#endif
+
+       return 0;
+}
+
+static int
+alock_test_lock ( int fd, int slot )
+{
+       int res;
+
+#ifdef HAVE_LOCKF
+       res = lseek (fd, (off_t) (ALOCK_SLOT_SIZE * slot), SEEK_SET);
+       if (res == -1) return -1;
+
+       res = lockf (fd, F_TEST, (off_t) ALOCK_SLOT_SIZE);
+       if (res == -1) {
+               if (errno == EACCES) { 
+                       return ALOCK_LOCKED;
+               } else {
+                       return -1;
+               }
+       }
+#else
+# ifdef HAVE_FCNTL
+       struct flock lock_info;
+       (void) memset ((void *) &lock_info, 0, sizeof (struct flock));
+
+       lock_info.l_type = F_WRLCK;
+       lock_info.l_whence = SEEK_SET;
+       lock_info.l_start = (off_t) (ALOCK_SLOT_SIZE * slot);
+       lock_info.l_len = (off_t) ALOCK_SLOT_SIZE;
+
+       res = fcntl (fd, F_GETLK, &lock_info);
+       if (res == -1) return -1;
+
+       if (lock_info.l_type != F_UNLCK) return ALOCK_LOCKED;
+# else
+#    error libalock needs lockf or fcntl
+# endif
+#endif
+       
+       return 0;
+}
+
+/* Read a 64bit LE value */
+static unsigned long int
+alock_read_iattr ( unsigned char * bufptr )
+{
+       unsigned long int val = 0;
+       int count;
+
+       assert (bufptr != NULL);
+
+       bufptr += sizeof (unsigned long int);
+       for (count=0; count <= sizeof (unsigned long int); ++count) {
+               val <<= 8;
+               val += (unsigned long int) *bufptr--;
+       }
+
+       return val;
+}
+
+/* Write a 64bit LE value */
+static void
+alock_write_iattr ( unsigned char * bufptr,
+                   unsigned long int val )
+{
+       int count;
+
+       assert (bufptr != NULL);
+
+       for (count=0; count < 8; ++count) {
+               *bufptr++ = (unsigned char) (val & 0xff);
+               val >>= 8;
+       }
+}
+
+static int
+alock_read_slot ( alock_info_t * info,
+                 alock_slot_t * slot_data )
+{
+       unsigned char slotbuf [ALOCK_SLOT_SIZE];
+       int res, size, size_total, err;
+
+       assert (info != NULL);
+       assert (slot_data != NULL);
+       assert (info->al_slot > 0);
+
+       res = lseek (info->al_fd, 
+                    (off_t) (ALOCK_SLOT_SIZE * info->al_slot), 
+                    SEEK_SET);
+       if (res == -1) return -1;
+
+       size_total = 0;
+       while (size_total < ALOCK_SLOT_SIZE) {
+               size = read (info->al_fd, 
+                            slotbuf + size_total, 
+                            ALOCK_SLOT_SIZE - size_total);
+               if (size == 0) return -1;
+               if (size < 0) {
+                       err = errno;
+                       if (err != EINTR && err != EAGAIN) return -1;
+               } else {
+                       size_total += size;
+               }
+       }
+       
+       if (alock_read_iattr (slotbuf) != ALOCK_MAGIC) {
+               return 1;
+       }
+       slot_data->al_lock  = alock_read_iattr (slotbuf+8);
+       slot_data->al_stamp = alock_read_iattr (slotbuf+16);
+       slot_data->al_pid   = alock_read_iattr (slotbuf+24);
+
+       if (slot_data->al_appname) free (slot_data->al_appname);
+       slot_data->al_appname = calloc (1, ALOCK_MAX_APPNAME);
+       strncpy (slot_data->al_appname, slotbuf+32, ALOCK_MAX_APPNAME-1);
+       (slot_data->al_appname) [ALOCK_MAX_APPNAME-1] = '\0';
+
+       return 0;
+}
+
+static int
+alock_write_slot ( alock_info_t * info,
+                  alock_slot_t * slot_data )
+{
+       unsigned char slotbuf [ALOCK_SLOT_SIZE];
+       int res, size, size_total, err;
+
+       assert (info != NULL);
+       assert (slot_data != NULL);
+       assert (info->al_slot > 0);
+
+       (void) memset ((void *) slotbuf, 0, ALOCK_SLOT_SIZE);
+       
+       alock_write_iattr (slotbuf,    ALOCK_MAGIC);
+       assert (alock_read_iattr (slotbuf) == ALOCK_MAGIC);
+       alock_write_iattr (slotbuf+8,  slot_data->al_lock);
+       alock_write_iattr (slotbuf+16, slot_data->al_stamp);
+       alock_write_iattr (slotbuf+24, slot_data->al_pid);
+
+       strncpy (slotbuf+32, slot_data->al_appname, ALOCK_MAX_APPNAME-1);
+       slotbuf[ALOCK_SLOT_SIZE-1] = '\0';
+
+       res = lseek (info->al_fd, 
+                    (off_t) (ALOCK_SLOT_SIZE * info->al_slot),
+                    SEEK_SET);
+       if (res == -1) return -1;
+
+       size_total = 0;
+       while (size_total < ALOCK_SLOT_SIZE) {
+               size = write (info->al_fd, 
+                             slotbuf + size_total, 
+                             ALOCK_SLOT_SIZE - size_total);
+               if (size == 0) return -1;
+               if (size < 0) {
+                       err = errno;
+                       if (err != EINTR && err != EAGAIN) return -1;
+               } else {
+                       size_total += size;
+               }
+       }
+       
+       return 0;
+}
+
+static int
+alock_query_slot ( alock_info_t * info )
+{
+       int res;
+       alock_slot_t slot_data;
+
+       assert (info != NULL);
+       assert (info->al_slot > 0);
+       
+       (void) memset ((void *) &slot_data, 0, sizeof (alock_slot_t));
+       alock_read_slot (info, &slot_data);
+       if (slot_data.al_lock == ALOCK_UNLOCKED) return ALOCK_UNLOCKED;
+
+       if (slot_data.al_appname != NULL) free (slot_data.al_appname);
+       slot_data.al_appname = NULL;
+
+       res = alock_test_lock (info->al_fd, info->al_slot);
+       if (res < 0) return -1;
+       if (res > 0) {
+               if (slot_data.al_lock == ALOCK_UNIQUE) {
+                       return ALOCK_UNIQUE;
+               } else {
+                       return ALOCK_LOCKED;
+               }
+       }
+       
+       return ALOCK_DIRTY;
+}
+
+int 
+alock_open ( alock_info_t * info,
+            const char * appname,
+            const char * envdir,
+            int locktype )
+{
+       struct stat statbuf;
+       alock_info_t scan_info;
+       alock_slot_t slot_data;
+       char * filename;
+       struct timeval tv;
+       int res, max_slot;
+       int dirty_count, live_count;
+
+       assert (info != NULL);
+       assert (appname != NULL);
+       assert (envdir != NULL);
+       assert (locktype >= 1 && locktype <= 2);
+
+       res = gettimeofday (&tv, NULL);
+       if (res == -1) return ALOCK_UNSTABLE;
+
+       slot_data.al_lock = locktype;
+       slot_data.al_stamp = tv.tv_sec;
+       slot_data.al_pid = getpid();
+       slot_data.al_appname = calloc (1, ALOCK_MAX_APPNAME);
+       strncpy (slot_data.al_appname, appname, ALOCK_MAX_APPNAME-1);
+       slot_data.al_appname [ALOCK_MAX_APPNAME-1] = '\0';
+
+       filename = calloc (1, strlen (envdir) + strlen ("/alock") + 1);
+       strcpy (filename, envdir);
+       strcat (filename, "/alock");
+       info->al_fd = open (filename, O_CREAT|O_RDWR, 0666);
+       free (filename);
+       if (info->al_fd < 0) {
+               free (slot_data.al_appname);
+               return ALOCK_UNSTABLE;
+       }
+       info->al_slot = 0;
+
+       res = alock_grab_lock (info->al_fd, 0);
+       if (res == -1) { 
+               close (info->al_fd);
+               free (slot_data.al_appname);
+               return ALOCK_UNSTABLE;
+       }
+
+       res = fstat (info->al_fd, &statbuf);
+       if (res == -1) { 
+               close (info->al_fd);
+               free (slot_data.al_appname);
+               return ALOCK_UNSTABLE;
+       }
+
+       max_slot = (statbuf.st_size + ALOCK_SLOT_SIZE - 1) / ALOCK_SLOT_SIZE;
+       dirty_count = 0;
+       live_count = 0;
+       scan_info.al_fd = info->al_fd;
+       for (scan_info.al_slot = 1; 
+            scan_info.al_slot < max_slot;
+            ++ scan_info.al_slot) {
+               if (scan_info.al_slot != info->al_slot) {
+                       res = alock_query_slot (&scan_info);
+
+                       if (res == ALOCK_UNLOCKED
+                           && info->al_slot == 0) {
+                               info->al_slot = scan_info.al_slot;
+
+                       } else if (res == ALOCK_LOCKED) {
+                               ++live_count;
+
+                       } else if (res == ALOCK_UNIQUE
+                               && locktype == ALOCK_UNIQUE) {
+                               close (info->al_fd);
+                               free (slot_data.al_appname);
+                               return ALOCK_BUSY;
+
+                       } else if (res == ALOCK_DIRTY) {
+                               ++dirty_count;
+
+                       } else if (res == -1) {
+                               close (info->al_fd);
+                               free (slot_data.al_appname);
+                               return ALOCK_UNSTABLE;
+
+                       }
+               }
+       }
+
+       if (dirty_count && live_count) {
+               close (info->al_fd);
+               free (slot_data.al_appname);
+               return ALOCK_UNSTABLE;
+       }
+       
+       if (info->al_slot == 0) info->al_slot = max_slot + 1;
+       res = alock_grab_lock (info->al_fd,
+                              info->al_slot);
+       if (res == -1) { 
+               close (info->al_fd);
+               free (slot_data.al_appname);
+               return ALOCK_UNSTABLE;
+       }
+       res = alock_write_slot (info, &slot_data);
+       free (slot_data.al_appname);
+       if (res == -1) { 
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+
+       res = alock_release_lock (info->al_fd, 0);
+       if (res == -1) { 
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+       
+       if (dirty_count) return ALOCK_RECOVER;
+       return ALOCK_CLEAN;
+}
+
+int 
+alock_scan ( alock_info_t * info )
+{
+       struct stat statbuf;
+       alock_info_t scan_info;
+       int res, max_slot;
+       int dirty_count, live_count;
+
+       assert (info != NULL);
+
+       scan_info.al_fd = info->al_fd;
+
+       res = alock_grab_lock (info->al_fd, 0);
+       if (res == -1) {
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+
+       res = fstat (info->al_fd, &statbuf);
+       if (res == -1) {
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+
+       max_slot = (statbuf.st_size + ALOCK_SLOT_SIZE - 1) / ALOCK_SLOT_SIZE;
+       dirty_count = 0;
+       live_count = 0;
+       for (scan_info.al_slot = 1; 
+            scan_info.al_slot < max_slot;
+            ++ scan_info.al_slot) {
+               if (scan_info.al_slot != info->al_slot) {
+                       res = alock_query_slot (&scan_info);
+
+                       if (res == ALOCK_LOCKED) {
+                               ++live_count;
+                               
+                       } else if (res == ALOCK_DIRTY) {
+                               ++dirty_count;
+
+                       } else if (res == -1) {
+                               close (info->al_fd);
+                               return ALOCK_UNSTABLE;
+
+                       }
+               }
+       }
+
+       res = alock_release_lock (info->al_fd, 0);
+       if (res == -1) {
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+
+       if (dirty_count) {
+               if (live_count) {
+                       close (info->al_fd);
+                       return ALOCK_UNSTABLE;
+               } else {
+                       return ALOCK_RECOVER;
+               }
+       }
+       
+       return ALOCK_CLEAN;
+}
+
+int
+alock_close ( alock_info_t * info )
+{
+       alock_slot_t slot_data;
+       int res;
+
+       (void) memset ((void *) &slot_data, 0, sizeof(alock_slot_t));
+
+       res = alock_grab_lock (info->al_fd, 0);
+       if (res == -1) {
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+
+       /* mark our slot as clean */
+       res = alock_read_slot (info, &slot_data);
+       if (res == -1) {
+               close (info->al_fd);
+               if (slot_data.al_appname != NULL) 
+                       free (slot_data.al_appname);
+               return ALOCK_UNSTABLE;
+       }
+       slot_data.al_lock = ALOCK_UNLOCKED;
+       res = alock_write_slot (info, &slot_data);
+       if (res == -1) {
+               close (info->al_fd);
+               if (slot_data.al_appname != NULL) 
+                       free (slot_data.al_appname);
+               return ALOCK_UNSTABLE;
+       }
+       if (slot_data.al_appname != NULL) {
+               free (slot_data.al_appname);
+               slot_data.al_appname = NULL;
+       }
+
+       res = alock_release_lock (info->al_fd, info->al_slot);
+       if (res == -1) {
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+       res = alock_release_lock (info->al_fd, 0);
+       if (res == -1) {
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+
+       res = close (info->al_fd);
+       if (res == -1) return ALOCK_UNSTABLE;
+       
+       return ALOCK_CLEAN;
+}
+
+int 
+alock_recover ( alock_info_t * info )
+{
+       struct stat statbuf;
+       alock_slot_t slot_data;
+       alock_info_t scan_info;
+       int res, max_slot;
+
+       assert (info != NULL);
+
+       scan_info.al_fd = info->al_fd;
+
+       (void) memset ((void *) &slot_data, 0, sizeof(alock_slot_t));
+
+       res = alock_grab_lock (info->al_fd, 0);
+       if (res == -1) {
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+
+       res = fstat (info->al_fd, &statbuf);
+       if (res == -1) {
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+
+       max_slot = (statbuf.st_size + ALOCK_SLOT_SIZE - 1) / ALOCK_SLOT_SIZE;
+       for (scan_info.al_slot = 1; 
+            scan_info.al_slot < max_slot;
+            ++ scan_info.al_slot) {
+               if (scan_info.al_slot != info->al_slot) {
+                       res = alock_query_slot (&scan_info);
+
+                       if (res == ALOCK_LOCKED
+                           || res == ALOCK_UNIQUE) {
+                               /* recovery attempt on an active db? */
+                               close (info->al_fd);
+                               return ALOCK_UNSTABLE;
+                               
+                       } else if (res == ALOCK_DIRTY) {
+                               /* mark it clean */
+                               res = alock_read_slot (&scan_info, &slot_data);
+                               if (res == -1) {
+                                       close (info->al_fd);
+                                       return ALOCK_UNSTABLE;
+                               }
+                               slot_data.al_lock = ALOCK_UNLOCKED;
+                               res = alock_write_slot (&scan_info, &slot_data);
+                               if (res == -1) {
+                                       close (info->al_fd);
+                                       if (slot_data.al_appname != NULL) 
+                                               free (slot_data.al_appname);
+                                       return ALOCK_UNSTABLE;
+                               }
+                               if (slot_data.al_appname != NULL) {
+                                       free (slot_data.al_appname);
+                                       slot_data.al_appname = NULL;
+                               }
+                               
+                       } else if (res == -1) {
+                               close (info->al_fd);
+                               return ALOCK_UNSTABLE;
+
+                       }
+               }
+       }
+
+       res = alock_release_lock (info->al_fd, 0);
+       if (res == -1) {
+               close (info->al_fd);
+               return ALOCK_UNSTABLE;
+       }
+
+       return ALOCK_CLEAN;
+}
diff --git a/servers/slapd/back-bdb/alock.h b/servers/slapd/back-bdb/alock.h
new file mode 100644 (file)
index 0000000..902077d
--- /dev/null
@@ -0,0 +1,69 @@
+/* alock.h - access lock header */
+/* $OpenLDAP$ */
+/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
+ *
+ * Copyright 2005 The OpenLDAP Foundation.
+ * Portions Copyright 2004-2005 Symas Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * <http://www.OpenLDAP.org/license.html>.
+ */
+/* ACKNOWLEDGEMENTS:
+ * This work was initially developed by Matthew Backes at Symas
+ * Corporation for inclusion in OpenLDAP Software.
+ */
+
+#ifndef _ALOCK_H_
+#define _ALOCK_H_
+
+#include "portable.h"
+#include <ac/time.h>
+#include <ac/unistd.h>
+
+/* environment states (all the slots together) */
+#define ALOCK_CLEAN            (0)
+#define ALOCK_RECOVER  (1)
+#define ALOCK_BUSY             (2)
+#define ALOCK_UNSTABLE (3)
+
+/* lock user types and states */
+#define ALOCK_UNLOCKED (0)
+#define ALOCK_LOCKED   (1)
+#define ALOCK_UNIQUE   (2)
+#define ALOCK_DIRTY            (3)
+
+/* constants */
+#define ALOCK_SLOT_SIZE                (1024)
+#define ALOCK_SLOT_IATTRS      (4)
+#define ALOCK_MAX_APPNAME      (ALOCK_SLOT_SIZE - 8 * ALOCK_SLOT_IATTRS)
+#define ALOCK_MAGIC                    (0x12345678)
+
+LDAP_BEGIN_DECL
+
+typedef struct alock_info {
+       int al_fd;
+       int al_slot;
+} alock_info_t;
+
+typedef struct alock_slot {
+       unsigned int al_lock;
+       time_t al_stamp;
+       pid_t al_pid;
+       char * al_appname;
+} alock_slot_t;
+
+extern int alock_open LDAP_P(( alock_info_t * info, const char * appname,
+       const char * envdir, int locktype ));
+extern int alock_scan LDAP_P(( alock_info_t * info ));
+extern int alock_close LDAP_P(( alock_info_t * info ));
+extern int alock_recover LDAP_P(( alock_info_t * info ));
+
+LDAP_END_DECL
+
+#endif
index 44517010fbf96dc4a03df63506efe0e59928b83e..eb8cb0208069431998b559c5661614beb26a5a2f 100644 (file)
@@ -20,6 +20,7 @@
 #include <portable.h>
 #include "slap.h"
 #include <db.h>
+#include "alock.h"
 
 LDAP_BEGIN_DECL
 
@@ -189,6 +190,7 @@ struct bdb_info {
        bdb_idl_cache_entry_t   *bi_idl_lru_tail;
        ldap_pvt_thread_rdwr_t bi_idl_tree_rwlock;
        ldap_pvt_thread_mutex_t bi_idl_tree_lrulock;
+       alock_info_t    bi_alock_info;
 };
 
 #define bi_id2entry    bi_databases[BDB_ID2ENTRY]
index f66f1fd69001d8371beeba54cda0667bf424561e..962aa47f3930c1296422ad6a6a100792d5723f7c 100644 (file)
@@ -24,6 +24,7 @@
 #include "back-bdb.h"
 #include <lutil.h>
 #include <ldap_rq.h>
+#include "alock.h"
 
 static const struct bdbi_database {
        char *file;
@@ -121,13 +122,21 @@ bdb_db_open( BackendDB *be )
        if ( !( slapMode & SLAP_TOOL_QUICK ))
                flags |= DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_TXN;
        
-#if 0
-       /* Never do automatic recovery, must perform it manually.
-        * Otherwise restarting with gentlehup will corrupt the
-        * database.
-        */
-       if( !(slapMode & SLAP_TOOL_MODE) ) flags |= DB_RECOVER;
-#endif
+       rc = alock_open( &bdb->bi_alock_info, "slapd", bdb->bi_dbenv_home,
+               slapMode & SLAP_TOOL_READONLY ?  ALOCK_LOCKED : ALOCK_UNIQUE );
+       if( rc == ALOCK_RECOVER ) {
+               Debug( LDAP_DEBUG_ANY,
+                       "bdb_db_open: alock_open: recovery required\n", 0, 0, 0 );
+               flags |= DB_RECOVER;
+       } else if( rc == ALOCK_BUSY ) {
+               Debug( LDAP_DEBUG_ANY,
+                  "bdb_db_open: alock_open: database in use\n", 0, 0, 0 );
+               return -1;
+       } else if( rc != ALOCK_CLEAN ) {
+               Debug( LDAP_DEBUG_ANY,
+                  "bdb_db_open: alock_open: database unstable\n", 0, 0, 0 );
+               return -1;
+       }
 
        /* If a key was set, use shared memory for the BDB environment */
        if ( bdb->bi_shm_key ) {
@@ -250,6 +259,14 @@ bdb_db_open( BackendDB *be )
                        db_strerror(rc), rc, 0 );
                return rc;
        }
+       if( flags & DB_RECOVER ) {
+               rc = alock_recover (&bdb->bi_alock_info);
+               if( rc != 0 ) {
+                       Debug( LDAP_DEBUG_ANY,
+                          "bdb_db_open: unable to alock_recover\n", 0, 0, 0 );
+                       return -1;
+               }
+       }
 
        flags = DB_THREAD | bdb->bi_db_opflags;
 
@@ -438,6 +455,13 @@ bdb_db_destroy( BackendDB *be )
                }
        }
 
+       rc = alock_close( &bdb->bi_alock_info );
+       if( rc != 0 ) {
+               Debug( LDAP_DEBUG_ANY,
+                       "bdb_db_destroy: alock_close failed\n", 0, 0, 0 );
+               return -1;
+       }
+
        if( bdb->bi_dbenv_home ) ch_free( bdb->bi_dbenv_home );
 
        ldap_pvt_thread_rdwr_destroy ( &bdb->bi_cache.c_rwlock );