]> git.sur5r.net Git - openldap/blobdiff - libraries/libmdb/mdb.c
Use O_DSYNC on metafd.
[openldap] / libraries / libmdb / mdb.c
index 667d44c5f9484d0097e6cfb84be1108f58ed4e79..26712a8455f428d7553791f03005e68b2f178f81 100644 (file)
 #include <time.h>
 #include <unistd.h>
 #include <pthread.h>
-#include <endian.h>
 
 #include "mdb.h"
 
 #define ULONG          unsigned long
 typedef ULONG          pgno_t;
 
-#include "idl.h"
+#include "midl.h"
 
 #ifndef DEBUG
 #define DEBUG 1
@@ -84,40 +83,51 @@ typedef uint16_t     indx_t;
 #define DEFAULT_MAPSIZE        1048576
 
 /* Lock descriptor stuff */
-#define RXBODY \
-       ULONG           mr_txnid; \
-       pid_t           mr_pid; \
-       pthread_t       mr_tid
-typedef struct MDB_rxbody {
-       RXBODY;
-} MDB_rxbody;
-
 #ifndef CACHELINE
 #define CACHELINE      64      /* most CPUs. Itanium uses 128 */
 #endif
 
+typedef struct MDB_rxbody {
+       ULONG           mrb_txnid;
+       pid_t           mrb_pid;
+       pthread_t       mrb_tid;
+} MDB_rxbody;
+
 typedef struct MDB_reader {
-       RXBODY;
-       /* cache line alignment */
-       char pad[CACHELINE-sizeof(MDB_rxbody)];
+       union {
+               MDB_rxbody mrx;
+#define        mr_txnid        mru.mrx.mrb_txnid
+#define        mr_pid  mru.mrx.mrb_pid
+#define        mr_tid  mru.mrx.mrb_tid
+               /* cache line alignment */
+               char pad[(sizeof(MDB_rxbody)+CACHELINE-1) & ~(CACHELINE-1)];
+       } mru;
 } MDB_reader;
 
-#define        TXBODY \
-       uint32_t        mt_magic;       \
-       uint32_t        mt_version;     \
-       pthread_mutex_t mt_mutex;       \
-       ULONG           mt_txnid;       \
-       uint32_t        mt_numreaders
 typedef struct MDB_txbody {
-       TXBODY;
+       uint32_t        mtb_magic;
+       uint32_t        mtb_version;
+       pthread_mutex_t mtb_mutex;
+       ULONG           mtb_txnid;
+       uint32_t        mtb_numreaders;
 } MDB_txbody;
 
 typedef struct MDB_txninfo {
-       TXBODY;
-       char pad[CACHELINE-sizeof(MDB_txbody)];
-       pthread_mutex_t mt_wmutex;
-       char pad2[CACHELINE-sizeof(pthread_mutex_t)];
-       MDB_reader      mt_readers[1];
+       union {
+               MDB_txbody mtb;
+#define mti_magic      mt1.mtb.mtb_magic
+#define mti_version    mt1.mtb.mtb_version
+#define mti_mutex      mt1.mtb.mtb_mutex
+#define mti_txnid      mt1.mtb.mtb_txnid
+#define mti_numreaders mt1.mtb.mtb_numreaders
+               char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
+       } mt1;
+       union {
+               pthread_mutex_t mt2_wmutex;
+#define mti_wmutex     mt2.mt2_wmutex
+               char pad[(sizeof(pthread_mutex_t)+CACHELINE-1) & ~(CACHELINE-1)];
+       } mt2;
+       MDB_reader      mti_readers[1];
 } MDB_txninfo;
 
 /* Common header for all page types. Overflow pages
@@ -135,6 +145,7 @@ typedef struct MDB_page {           /* represents a page of storage */
 #define        P_OVERFLOW       0x04           /* overflow page */
 #define        P_META           0x08           /* meta page */
 #define        P_DIRTY          0x10           /* dirty page */
+#define        P_LEAF2          0x20           /* DB with small, fixed size keys and no data */
        uint32_t        mp_flags;
 #define mp_lower       mp_pb.pb.pb_lower
 #define mp_upper       mp_pb.pb.pb_upper
@@ -145,6 +156,10 @@ typedef struct MDB_page {          /* represents a page of storage */
                        indx_t          pb_upper;               /* upper bound of free space */
                } pb;
                uint32_t        pb_pages;       /* number of overflow pages */
+               struct {
+                       indx_t  pb_ksize;       /* on a LEAF2 page */
+                       indx_t  pb_numkeys;
+               } pb2;
        } mp_pb;
        indx_t          mp_ptrs[1];             /* dynamic size */
 } MDB_page;
@@ -223,6 +238,9 @@ typedef struct MDB_ppage {                                  /* ordered list of pages */
 } MDB_ppage;
 SLIST_HEAD(page_stack, MDB_ppage);
 
+/* FIXME: tree depth is mostly bounded, we should just
+ * use a fixed array and avoid malloc/pointer chasing
+ */
 #define CURSOR_EMPTY(c)                 SLIST_EMPTY(&(c)->mc_stack)
 #define CURSOR_TOP(c)           SLIST_FIRST(&(c)->mc_stack)
 #define CURSOR_POP(c)           SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry)
@@ -239,7 +257,6 @@ struct MDB_cursor {
        struct MDB_xcursor      *mc_xcursor;
 };
 
-#define METAHASHLEN     offsetof(MDB_meta, mm_hash)
 #define METADATA(p)     ((void *)((char *)p + PAGEHDRSZ))
 
 typedef struct MDB_node {
@@ -279,8 +296,8 @@ struct MDB_txn {
        MDB_db          *mt_dbs;
        unsigned int    mt_numdbs;
 
-#define MDB_TXN_RDONLY          0x01           /* read-only transaction */
-#define MDB_TXN_ERROR           0x02           /* an error has occurred */
+#define MDB_TXN_RDONLY         0x01            /* read-only transaction */
+#define MDB_TXN_ERROR          0x02            /* an error has occurred */
 #define MDB_TXN_METOGGLE       0x04            /* used meta page 1 */
        unsigned int    mt_flags;
 };
@@ -296,7 +313,10 @@ typedef struct MDB_xcursor {
 struct MDB_env {
        int                     me_fd;
        int                     me_lfd;
-       uint32_t        me_flags;
+       int                     me_mfd;                 /* just for writing the meta pages */
+       uint16_t        me_flags;
+       uint16_t                me_db_toggle;
+       unsigned int    me_psize;
        unsigned int    me_maxreaders;
        unsigned int    me_numdbs;
        unsigned int    me_maxdbs;
@@ -308,8 +328,6 @@ struct MDB_env {
        MDB_txn         *me_txn;                /* current write transaction */
        size_t          me_mapsize;
        off_t           me_size;                /* current file size */
-       unsigned int    me_psize;
-       int                     me_db_toggle;
        MDB_dbx         *me_dbxs;               /* array */
        MDB_db          *me_dbs[2];
        MDB_oldpages *me_pghead;
@@ -328,7 +346,6 @@ struct MDB_env {
 #define NODEDSZ(node)   ((node)->mn_dsize)
 
 #define MDB_COMMIT_PAGES        64     /* max number of pages to write in one commit */
-#define MDB_MAXCACHE_DEF        1024   /* max number of pages to keep in cache  */
 
 static int  mdb_search_page_root(MDB_txn *txn,
                            MDB_dbi dbi, MDB_val *key,
@@ -339,9 +356,9 @@ static int  mdb_search_page(MDB_txn *txn,
                            MDB_cursor *cursor, int modify,
                            MDB_pageparent *mpp);
 
-static int  mdbenv_read_header(MDB_env *env, MDB_meta *meta);
-static int  mdbenv_read_meta(MDB_env *env, int *which);
-static int  mdbenv_write_meta(MDB_txn *txn);
+static int  mdb_env_read_header(MDB_env *env, MDB_meta *meta);
+static int  mdb_env_read_meta(MDB_env *env, int *which);
+static int  mdb_env_write_meta(MDB_txn *txn);
 static MDB_page *mdb_get_page(MDB_txn *txn, pgno_t pgno);
 
 static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
@@ -386,7 +403,7 @@ static int           mdb_cursor_last(MDB_cursor *cursor,
                            MDB_val *key, MDB_val *data);
 
 static void            mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx);
-static void            mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_db *db);
+static void            mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node);
 static void            mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx);
 
 static size_t           mdb_leaf_size(MDB_env *env, MDB_val *key,
@@ -431,6 +448,15 @@ memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2)
        return *p1 - *p2;
 }
 
+char *
+mdb_version(int *maj, int *min, int *pat)
+{
+       *maj = MDB_VERSION_MAJOR;
+       *min = MDB_VERSION_MINOR;
+       *pat = MDB_VERSION_PATCH;
+       return MDB_VERSION_STRING;
+}
+
 int
 mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
 {
@@ -440,7 +466,11 @@ mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
 static int
 _mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *key1, const MDB_val *key2)
 {
-       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_REVERSEKEY))
+       if (txn->mt_dbs[dbi].md_flags & (MDB_REVERSEKEY
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+               |MDB_INTEGERKEY
+#endif
+       ))
                return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
        else
                return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
@@ -505,11 +535,11 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
        }
        if (txn->mt_env->me_pghead) {
                unsigned int i;
-               for (i=0; i<txn->mt_env->me_txns->mt_numreaders; i++) {
-                       ULONG mr = txn->mt_env->me_txns->mt_readers[i].mr_txnid;
+               for (i=0; i<txn->mt_env->me_txns->mti_numreaders; i++) {
+                       ULONG mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
                        if (!mr) continue;
                        if (mr < oldest)
-                               oldest = txn->mt_env->me_txns->mt_readers[i].mr_txnid;
+                               oldest = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
                }
                if (oldest > txn->mt_env->me_pghead->mo_txnid) {
                        MDB_oldpages *mop = txn->mt_env->me_pghead;
@@ -570,7 +600,7 @@ mdb_touch(MDB_txn *txn, MDB_pageparent *pp)
                if ((dp = mdb_alloc_page(txn, pp->mp_parent, pp->mp_pi, 1)) == NULL)
                        return ENOMEM;
                DPRINTF("touched page %lu -> %lu", mp->mp_pgno, dp->p.mp_pgno);
-               mdb_idl_insert(txn->mt_free_pgs, mp->mp_pgno);
+               mdb_midl_insert(txn->mt_free_pgs, mp->mp_pgno);
                pgno = dp->p.mp_pgno;
                memcpy(&dp->p, mp, txn->mt_env->me_psize);
                mp = &dp->p;
@@ -586,11 +616,11 @@ mdb_touch(MDB_txn *txn, MDB_pageparent *pp)
 }
 
 int
-mdbenv_sync(MDB_env *env)
+mdb_env_sync(MDB_env *env, int force)
 {
        int rc = 0;
-       if (!F_ISSET(env->me_flags, MDB_NOSYNC)) {
-               if (fsync(env->me_fd))
+       if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) {
+               if (fdatasync(env->me_fd))
                        rc = errno;
        }
        return rc;
@@ -617,32 +647,32 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
                }
                STAILQ_INIT(txn->mt_u.dirty_queue);
 
-               pthread_mutex_lock(&env->me_txns->mt_wmutex);
-               env->me_txns->mt_txnid++;
+               pthread_mutex_lock(&env->me_txns->mti_wmutex);
+               env->me_txns->mti_txnid++;
                txn->mt_free_pgs = env->me_free_pgs;
                txn->mt_free_pgs[0] = 0;
        }
 
-       txn->mt_txnid = env->me_txns->mt_txnid;
+       txn->mt_txnid = env->me_txns->mti_txnid;
        if (rdonly) {
                MDB_reader *r = pthread_getspecific(env->me_txkey);
                if (!r) {
                        unsigned int i;
-                       pthread_mutex_lock(&env->me_txns->mt_mutex);
-                       for (i=0; i<env->me_txns->mt_numreaders; i++)
-                               if (env->me_txns->mt_readers[i].mr_pid == 0)
+                       pthread_mutex_lock(&env->me_txns->mti_mutex);
+                       for (i=0; i<env->me_txns->mti_numreaders; i++)
+                               if (env->me_txns->mti_readers[i].mr_pid == 0)
                                        break;
                        if (i == env->me_maxreaders) {
                                pthread_mutex_unlock(&env->me_txns->mti_mutex);
                                return ENOSPC;
                        }
-                       env->me_txns->mt_readers[i].mr_pid = getpid();
-                       env->me_txns->mt_readers[i].mr_tid = pthread_self();
-                       r = &env->me_txns->mt_readers[i];
+                       env->me_txns->mti_readers[i].mr_pid = getpid();
+                       env->me_txns->mti_readers[i].mr_tid = pthread_self();
+                       r = &env->me_txns->mti_readers[i];
                        pthread_setspecific(env->me_txkey, r);
-                       if (i >= env->me_txns->mt_numreaders)
-                               env->me_txns->mt_numreaders = i+1;
-                       pthread_mutex_unlock(&env->me_txns->mt_mutex);
+                       if (i >= env->me_txns->mti_numreaders)
+                               env->me_txns->mti_numreaders = i+1;
+                       pthread_mutex_unlock(&env->me_txns->mti_mutex);
                }
                r->mr_txnid = txn->mt_txnid;
                txn->mt_u.reader = r;
@@ -652,7 +682,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
 
        txn->mt_env = env;
 
-       if ((rc = mdbenv_read_meta(env, &toggle)) != MDB_SUCCESS) {
+       if ((rc = mdb_env_read_meta(env, &toggle)) != MDB_SUCCESS) {
                mdb_txn_abort(txn);
                return rc;
        }
@@ -714,10 +744,10 @@ mdb_txn_abort(MDB_txn *txn)
                }
 
                env->me_txn = NULL;
-               env->me_txns->mt_txnid--;
+               env->me_txns->mti_txnid--;
                for (i=2; i<env->me_numdbs; i++)
                        env->me_dbxs[i].md_dirty = 0;
-               pthread_mutex_unlock(&env->me_txns->mt_wmutex);
+               pthread_mutex_unlock(&env->me_txns->mti_wmutex);
        }
 
        free(txn);
@@ -889,9 +919,8 @@ mdb_txn_commit(MDB_txn *txn)
                free(dp);
        }
 
-       if ((n = mdbenv_sync(env)) != 0 ||
-           (n = mdbenv_write_meta(txn)) != MDB_SUCCESS ||
-           (n = mdbenv_sync(env)) != 0) {
+       if ((n = mdb_env_sync(env, 0)) != 0 ||
+           (n = mdb_env_write_meta(txn)) != MDB_SUCCESS) {
                mdb_txn_abort(txn);
                return n;
        }
@@ -918,7 +947,7 @@ mdb_txn_commit(MDB_txn *txn)
                free(txn->mt_dbs);
        }
 
-       pthread_mutex_unlock(&env->me_txns->mt_wmutex);
+       pthread_mutex_unlock(&env->me_txns->mti_wmutex);
        free(txn->mt_u.dirty_queue);
        free(txn);
        txn = NULL;
@@ -930,7 +959,7 @@ done:
 }
 
 static int
-mdbenv_read_header(MDB_env *env, MDB_meta *meta)
+mdb_env_read_header(MDB_env *env, MDB_meta *meta)
 {
        char             page[PAGESIZE];
        MDB_page        *p;
@@ -975,7 +1004,7 @@ mdbenv_read_header(MDB_env *env, MDB_meta *meta)
 }
 
 static int
-mdbenv_init_meta(MDB_env *env, MDB_meta *meta)
+mdb_env_init_meta(MDB_env *env, MDB_meta *meta)
 {
        MDB_page *p, *q;
        MDB_meta *m;
@@ -990,10 +1019,7 @@ mdbenv_init_meta(MDB_env *env, MDB_meta *meta)
        meta->mm_psize = psize;
        meta->mm_last_pg = 1;
        meta->mm_flags = env->me_flags & 0xffff;
-#if __BYTE_ORDER == __LITTLE_ENDIAN
-       /* freeDB keys are pgno_t's, must compare in int order */
-       meta->mm_flags |= MDB_REVERSEKEY;
-#endif
+       meta->mm_flags |= MDB_INTEGERKEY;
        meta->mm_dbs[0].md_root = P_INVALID;
        meta->mm_dbs[1].md_root = P_INVALID;
 
@@ -1018,7 +1044,7 @@ mdbenv_init_meta(MDB_env *env, MDB_meta *meta)
 }
 
 static int
-mdbenv_write_meta(MDB_txn *txn)
+mdb_env_write_meta(MDB_txn *txn)
 {
        MDB_env *env;
        MDB_meta        meta;
@@ -1029,7 +1055,8 @@ mdbenv_write_meta(MDB_txn *txn)
        assert(txn != NULL);
        assert(txn->mt_env != NULL);
 
-       DPRINTF("writing meta page for root page %lu", txn->mt_dbs[MAIN_DBI].md_root);
+       DPRINTF("writing meta page %d for root page %lu",
+               !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE), txn->mt_dbs[MAIN_DBI].md_root);
 
        env = txn->mt_env;
 
@@ -1047,8 +1074,7 @@ mdbenv_write_meta(MDB_txn *txn)
                off += env->me_psize;
        off += PAGEHDRSZ;
 
-       lseek(env->me_fd, off, SEEK_SET);
-       rc = write(env->me_fd, ptr, len);
+       rc = pwrite(env->me_fd, ptr, len, off);
        if (rc != len) {
                DPRINTF("write failed, disk error?");
                return errno;
@@ -1058,7 +1084,7 @@ mdbenv_write_meta(MDB_txn *txn)
 }
 
 static int
-mdbenv_read_meta(MDB_env *env, int *which)
+mdb_env_read_meta(MDB_env *env, int *which)
 {
        int toggle = 0;
 
@@ -1078,7 +1104,7 @@ mdbenv_read_meta(MDB_env *env, int *which)
 }
 
 int
-mdbenv_create(MDB_env **env)
+mdb_env_create(MDB_env **env)
 {
        MDB_env *e;
 
@@ -1089,12 +1115,13 @@ mdbenv_create(MDB_env **env)
        e->me_maxdbs = 2;
        e->me_fd = -1;
        e->me_lfd = -1;
+       e->me_mfd = -1;
        *env = e;
        return MDB_SUCCESS;
 }
 
 int
-mdbenv_set_mapsize(MDB_env *env, size_t size)
+mdb_env_set_mapsize(MDB_env *env, size_t size)
 {
        if (env->me_map)
                return EINVAL;
@@ -1103,21 +1130,21 @@ mdbenv_set_mapsize(MDB_env *env, size_t size)
 }
 
 int
-mdbenv_set_maxdbs(MDB_env *env, int dbs)
+mdb_env_set_maxdbs(MDB_env *env, int dbs)
 {
        env->me_maxdbs = dbs;
        return MDB_SUCCESS;
 }
 
 int
-mdbenv_set_maxreaders(MDB_env *env, int readers)
+mdb_env_set_maxreaders(MDB_env *env, int readers)
 {
        env->me_maxreaders = readers;
        return MDB_SUCCESS;
 }
 
 int
-mdbenv_get_maxreaders(MDB_env *env, int *readers)
+mdb_env_get_maxreaders(MDB_env *env, int *readers)
 {
        if (!env || !readers)
                return EINVAL;
@@ -1125,8 +1152,8 @@ mdbenv_get_maxreaders(MDB_env *env, int *readers)
        return MDB_SUCCESS;
 }
 
-int
-mdbenv_open2(MDB_env *env, unsigned int flags)
+static int
+mdb_env_open2(MDB_env *env, unsigned int flags)
 {
        int i, newenv = 0;
        MDB_meta meta;
@@ -1136,7 +1163,7 @@ mdbenv_open2(MDB_env *env, unsigned int flags)
 
        memset(&meta, 0, sizeof(meta));
 
-       if ((i = mdbenv_read_header(env, &meta)) != 0) {
+       if ((i = mdb_env_read_header(env, &meta)) != 0) {
                if (i != ENOENT)
                        return i;
                DPRINTF("new mdbenv");
@@ -1159,7 +1186,7 @@ mdbenv_open2(MDB_env *env, unsigned int flags)
                meta.mm_mapsize = env->me_mapsize;
                if (flags & MDB_FIXEDMAP)
                        meta.mm_address = env->me_map;
-               i = mdbenv_init_meta(env, &meta);
+               i = mdb_env_init_meta(env, &meta);
                if (i != MDB_SUCCESS) {
                        munmap(env->me_map, env->me_mapsize);
                        return i;
@@ -1171,7 +1198,7 @@ mdbenv_open2(MDB_env *env, unsigned int flags)
        env->me_metas[0] = METADATA(p);
        env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize);
 
-       if ((i = mdbenv_read_meta(env, NULL)) != 0)
+       if ((i = mdb_env_read_meta(env, NULL)) != 0)
                return i;
 
        DPRINTF("opened database version %u, pagesize %u",
@@ -1187,7 +1214,7 @@ mdbenv_open2(MDB_env *env, unsigned int flags)
 }
 
 static void
-mdbenv_reader_dest(void *ptr)
+mdb_env_reader_dest(void *ptr)
 {
        MDB_reader *reader = ptr;
 
@@ -1198,11 +1225,11 @@ mdbenv_reader_dest(void *ptr)
 
 /* downgrade the exclusive lock on the region back to shared */
 static void
-mdbenv_share_locks(MDB_env *env)
+mdb_env_share_locks(MDB_env *env)
 {
        struct flock lock_info;
 
-       env->me_txns->mt_txnid = env->me_meta->mm_txnid;
+       env->me_txns->mti_txnid = env->me_meta->mm_txnid;
 
        memset((void *)&lock_info, 0, sizeof(lock_info));
        lock_info.l_type = F_RDLCK;
@@ -1213,7 +1240,7 @@ mdbenv_share_locks(MDB_env *env)
 }
 
 static int
-mdbenv_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
+mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
 {
        int rc;
        off_t size, rsize;
@@ -1266,23 +1293,26 @@ mdbenv_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
                pthread_mutexattr_t mattr;
 
                pthread_mutexattr_init(&mattr);
-               pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
-               pthread_mutex_init(&env->me_txns->mt_mutex, &mattr);
-               pthread_mutex_init(&env->me_txns->mt_wmutex, &mattr);
-               env->me_txns->mt_version = MDB_VERSION;
-               env->me_txns->mt_magic = MDB_MAGIC;
-               env->me_txns->mt_txnid = 0;
-               env->me_txns->mt_numreaders = 0;
+               rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+               if (rc) {
+                       goto fail;
+               }
+               pthread_mutex_init(&env->me_txns->mti_mutex, &mattr);
+               pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr);
+               env->me_txns->mti_version = MDB_VERSION;
+               env->me_txns->mti_magic = MDB_MAGIC;
+               env->me_txns->mti_txnid = 0;
+               env->me_txns->mti_numreaders = 0;
 
        } else {
-               if (env->me_txns->mt_magic != MDB_MAGIC) {
+               if (env->me_txns->mti_magic != MDB_MAGIC) {
                        DPRINTF("lock region has invalid magic");
                        rc = EINVAL;
                        goto fail;
                }
-               if (env->me_txns->mt_version != MDB_VERSION) {
+               if (env->me_txns->mti_version != MDB_VERSION) {
                        DPRINTF("lock region is version %u, expected version %u",
-                               env->me_txns->mt_version, MDB_VERSION);
+                               env->me_txns->mti_version, MDB_VERSION);
                        rc = MDB_VERSION_MISMATCH;
                        goto fail;
                }
@@ -1295,6 +1325,7 @@ mdbenv_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
 
 fail:
        close(env->me_lfd);
+       env->me_lfd = -1;
        return rc;
 
 }
@@ -1302,7 +1333,7 @@ fail:
 #define LOCKNAME       "/lock.mdb"
 #define DATANAME       "/data.mdb"
 int
-mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
+mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
 {
        int             oflags, rc, len, excl;
        char *lpath, *dpath;
@@ -1315,7 +1346,7 @@ mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
        sprintf(lpath, "%s" LOCKNAME, path);
        sprintf(dpath, "%s" DATANAME, path);
 
-       rc = mdbenv_setup_locks(env, lpath, mode, &excl);
+       rc = mdb_env_setup_locks(env, lpath, mode, &excl);
        if (rc)
                goto leave;
 
@@ -1324,18 +1355,25 @@ mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
        else
                oflags = O_RDWR | O_CREAT;
 
-       if ((env->me_fd = open(dpath, oflags, mode)) == -1)
-               return errno;
+       if ((env->me_fd = open(dpath, oflags, mode)) == -1) {
+               rc = errno;
+               goto leave;
+       }
+
+       if ((rc = mdb_env_open2(env, flags)) == MDB_SUCCESS) {
+               /* synchronous fd for meta writes */
+               if (!(flags & (MDB_RDONLY|MDB_NOSYNC)))
+                       oflags |= O_DSYNC;
+               if ((env->me_mfd = open(dpath, oflags, mode)) == -1) {
+                       rc = errno;
+                       goto leave;
+               }
 
-       if ((rc = mdbenv_open2(env, flags)) != MDB_SUCCESS) {
-               close(env->me_fd);
-               env->me_fd = -1;
-       } else {
                env->me_path = strdup(path);
                DPRINTF("opened dbenv %p", (void *) env);
-               pthread_key_create(&env->me_txkey, mdbenv_reader_dest);
+               pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
                if (excl)
-                       mdbenv_share_locks(env);
+                       mdb_env_share_locks(env);
                env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
                env->me_dbs[0] = calloc(env->me_maxdbs, sizeof(MDB_db));
                env->me_dbs[1] = calloc(env->me_maxdbs, sizeof(MDB_db));
@@ -1343,12 +1381,22 @@ mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
        }
 
 leave:
+       if (rc) {
+               if (env->me_fd >= 0) {
+                       close(env->me_fd);
+                       env->me_fd = -1;
+               }
+               if (env->me_lfd >= 0) {
+                       close(env->me_lfd);
+                       env->me_lfd = -1;
+               }
+       }
        free(lpath);
        return rc;
 }
 
 void
-mdbenv_close(MDB_env *env)
+mdb_env_close(MDB_env *env)
 {
        if (env == NULL)
                return;
@@ -1358,12 +1406,20 @@ mdbenv_close(MDB_env *env)
        free(env->me_dbxs);
        free(env->me_path);
 
+       pthread_key_delete(env->me_txkey);
+
        if (env->me_map) {
                munmap(env->me_map, env->me_mapsize);
        }
+       close(env->me_mfd);
        close(env->me_fd);
        if (env->me_txns) {
+               pid_t pid = getpid();
                size_t size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
+               int i;
+               for (i=0; i<env->me_txns->mti_numreaders; i++)
+                       if (env->me_txns->mti_readers[i].mr_pid == pid)
+                               env->me_txns->mti_readers[i].mr_pid = 0;
                munmap(env->me_txns, size);
        }
        close(env->me_lfd);
@@ -1659,7 +1715,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi,
        assert(data);
        DPRINTF("===> get key [%.*s]", (int)key->mv_size, (char *)key->mv_data);
 
-       if (txn == NULL || dbi >= txn->mt_numdbs)
+       if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
                return EINVAL;
 
        if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
@@ -1676,7 +1732,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi,
                        MDB_xcursor mx;
 
                        mdb_xcursor_init0(txn, dbi, &mx);
-                       mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+                       mdb_xcursor_init1(txn, dbi, &mx, leaf);
                        rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi, NULL, NULL, 0, &mpp);
                        if (rc != MDB_SUCCESS)
                                return rc;
@@ -1799,7 +1855,7 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
                        return rc;
 
                if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, NODEDATA(leaf));
+                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
                        rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc != MDB_SUCCESS)
                                return rc;
@@ -1821,7 +1877,7 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
 
        if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
                if (op == MDB_PREV || op == MDB_PREV_DUP) {
-                       rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV);
+                       rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV);
                        if (op != MDB_PREV || rc == MDB_SUCCESS)
                                return rc;
                }
@@ -1857,7 +1913,7 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
                        return rc;
 
                if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, NODEDATA(leaf));
+                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
                        rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc != MDB_SUCCESS)
                                return rc;
@@ -1914,7 +1970,7 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
                        return rc;
 
                if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, NODEDATA(leaf));
+                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
                        if (op == MDB_SET || op == MDB_SET_RANGE) {
                                rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        } else {
@@ -1968,7 +2024,7 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
                        return rc;
 
                if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, NODEDATA(leaf));
+                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
                        rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc)
                                return rc;
@@ -2009,7 +2065,7 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
                        return rc;
 
                if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, NODEDATA(leaf));
+                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
                        rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc)
                                return rc;
@@ -2290,21 +2346,32 @@ mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
 }
 
 static void
-mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_db *db)
+mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node)
 {
+       MDB_db *db = NODEDATA(node);
+       MDB_dbi dbn;
        mx->mx_dbs[0] = txn->mt_dbs[0];
        mx->mx_dbs[1] = txn->mt_dbs[1];
        if (dbi > 1) {
                mx->mx_dbs[2] = txn->mt_dbs[dbi];
-               mx->mx_dbs[3] = *db;
+               dbn = 3;
        } else {
-               mx->mx_dbs[2] = *db;
+               dbn = 2;
        }
+       mx->mx_dbs[dbn] = *db;
+       mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node);
+       mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize;
+       mx->mx_txn.mt_next_pgno = txn->mt_next_pgno;
+       mx->mx_txn.mt_oldest = txn->mt_oldest;
+       mx->mx_txn.mt_u = txn->mt_u;
 }
 
 static void
 mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
 {
+       txn->mt_next_pgno = mx->mx_txn.mt_next_pgno;
+       txn->mt_oldest = mx->mx_txn.mt_oldest;
+       txn->mt_u = mx->mx_txn.mt_u;
        txn->mt_dbs[0] = mx->mx_dbs[0];
        txn->mt_dbs[1] = mx->mx_dbs[1];
        txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty;
@@ -2345,6 +2412,23 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
        return MDB_SUCCESS;
 }
 
+/* Return the count of duplicate data items for the current key */
+int
+mdb_cursor_count(MDB_cursor *mc, unsigned long *countp)
+{
+       if (mc == NULL || countp == NULL)
+               return EINVAL;
+
+       if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT))
+               return EINVAL;
+
+       if (!mc->mc_xcursor->mx_cursor.mc_initialized)
+               return EINVAL;
+
+       *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
+       return MDB_SUCCESS;
+}
+
 void
 mdb_cursor_close(MDB_cursor *cursor)
 {
@@ -2646,7 +2730,7 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_no
                ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_psize);
                for (i=0; i<ovpages; i++) {
                        DPRINTF("freed ov page %lu", pg);
-                       mdb_idl_insert(txn->mt_free_pgs, pg);
+                       mdb_midl_insert(txn->mt_free_pgs, pg);
                        pg++;
                }
        }
@@ -2673,7 +2757,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
 
        assert(key != NULL);
 
-       if (txn == NULL || dbi >= txn->mt_numdbs)
+       if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
                return EINVAL;
 
        if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
@@ -2699,7 +2783,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                MDB_pageparent mp2;
 
                mdb_xcursor_init0(txn, dbi, &mx);
-               mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+               mdb_xcursor_init1(txn, dbi, &mx, leaf);
                if (flags == MDB_DEL_DUP) {
                        rc = mdb_del(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, NULL, 0);
                        mdb_xcursor_fini(txn, dbi, &mx);
@@ -2728,7 +2812,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                                        while (parent != NULL) {
                                                for (i=0; i<NUMKEYS(top->mp_page); i++) {
                                                        ni = NODEPTR(top->mp_page, i);
-                                                       mdb_idl_insert(txn->mt_free_pgs, ni->mn_pgno);
+                                                       mdb_midl_insert(txn->mt_free_pgs, ni->mn_pgno);
                                                }
                                                if (parent) {
                                                        parent->mp_ki++;
@@ -2743,7 +2827,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                                                }
                                        }
                                }
-                               mdb_idl_insert(txn->mt_free_pgs, mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root);
+                               mdb_midl_insert(txn->mt_free_pgs, mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root);
                        }
                }
        }
@@ -2983,11 +3067,11 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
                rdata = data;
        }
 
-       if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) {
-               rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID);
+       if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, rdata)) {
+               rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID);
        } else {
                /* There is room already in this leaf page. */
-               rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, data, 0, 0);
+               rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, rdata, 0, 0);
        }
 
        if (rc != MDB_SUCCESS)
@@ -3012,7 +3096,7 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
                        leaf = NODEPTR(mpp.mp_page, ki);
 put_sub:
                        mdb_xcursor_init0(txn, dbi, &mx);
-                       mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+                       mdb_xcursor_init1(txn, dbi, &mx, leaf);
                        xdata.mv_size = 0;
                        xdata.mv_data = "";
                        if (flags == MDB_NODUPDATA)
@@ -3035,7 +3119,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        assert(key != NULL);
        assert(data != NULL);
 
-       if (txn == NULL || dbi >= txn->mt_numdbs)
+       if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
                return EINVAL;
 
        if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
@@ -3053,7 +3137,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
 }
 
 int
-mdbenv_get_flags(MDB_env *env, unsigned int *arg)
+mdb_env_get_flags(MDB_env *env, unsigned int *arg)
 {
        if (!env || !arg)
                return EINVAL;
@@ -3063,7 +3147,7 @@ mdbenv_get_flags(MDB_env *env, unsigned int *arg)
 }
 
 int
-mdbenv_get_path(MDB_env *env, const char **arg)
+mdb_env_get_path(MDB_env *env, const char **arg)
 {
        if (!env || !arg)
                return EINVAL;
@@ -3085,7 +3169,7 @@ mdb_stat0(MDB_env *env, MDB_db *db, MDB_stat *arg)
        return MDB_SUCCESS;
 }
 int
-mdbenv_stat(MDB_env *env, MDB_stat *arg)
+mdb_env_stat(MDB_env *env, MDB_stat *arg)
 {
        if (env == NULL || arg == NULL)
                return EINVAL;
@@ -3103,6 +3187,8 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
        /* main DB? */
        if (!name) {
                *dbi = MAIN_DBI;
+               if (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY))
+                       txn->mt_dbs[MAIN_DBI].md_flags |= (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY));
                return MDB_SUCCESS;
        }
 
@@ -3171,3 +3257,30 @@ void mdb_close(MDB_txn *txn, MDB_dbi dbi)
        txn->mt_dbxs[dbi].md_name.mv_size = 0;
        free(ptr);
 }
+
+int mdb_set_compare(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp)
+{
+       if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
+               return EINVAL;
+
+       txn->mt_dbxs[dbi].md_cmp = cmp;
+       return MDB_SUCCESS;
+}
+
+int mdb_set_dupsort(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp)
+{
+       if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
+               return EINVAL;
+
+       txn->mt_dbxs[dbi].md_dcmp = cmp;
+       return MDB_SUCCESS;
+}
+
+int mdb_set_relfunc(MDB_txn *txn, MDB_dbi dbi, MDB_rel_func *rel)
+{
+       if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
+               return EINVAL;
+
+       txn->mt_dbxs[dbi].md_rel = rel;
+       return MDB_SUCCESS;
+}