]> git.sur5r.net Git - openldap/blobdiff - libraries/libmdb/mdb.c
Fix cursor_set for dups
[openldap] / libraries / libmdb / mdb.c
index 6faa1f5bdd5ecfffe1e80d45839583f803188be1..45945b21e3c16ae0ae4796d93de9cee1ae719b2e 100644 (file)
@@ -29,7 +29,6 @@
  */
 #include <sys/types.h>
 #include <sys/stat.h>
-#include <sys/queue.h>
 #include <sys/param.h>
 #include <sys/uio.h>
 #include <sys/mman.h>
@@ -110,6 +109,7 @@ typedef struct MDB_txbody {
        pthread_mutex_t mtb_mutex;
        ULONG           mtb_txnid;
        uint32_t        mtb_numreaders;
+       uint32_t        mtb_me_toggle;
 } MDB_txbody;
 
 typedef struct MDB_txninfo {
@@ -120,6 +120,7 @@ typedef struct MDB_txninfo {
 #define mti_mutex      mt1.mtb.mtb_mutex
 #define mti_txnid      mt1.mtb.mtb_txnid
 #define mti_numreaders mt1.mtb.mtb_numreaders
+#define mti_me_toggle  mt1.mtb.mtb_me_toggle
                char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
        } mt1;
        union {
@@ -138,7 +139,7 @@ typedef struct MDB_page {           /* represents a page of storage */
 #define        mp_pgno         mp_p.p_pgno
        union padded {
                pgno_t          p_pgno;         /* page number */
-               void *          p_pad;
+               void *          p_align;        /* for IL32P64 */
        } mp_p;
 #define        P_BRANCH         0x01           /* branch page */
 #define        P_LEAF           0x02           /* leaf page */
@@ -229,25 +230,19 @@ static MDB_dpage *mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int pa
 static int             mdb_touch(MDB_txn *txn, MDB_pageparent *mp);
 
 typedef struct MDB_ppage {                                     /* ordered list of pages */
-       SLIST_ENTRY(MDB_ppage)   mp_entry;
        MDB_page                *mp_page;
        unsigned int    mp_ki;          /* cursor index on page */
 } MDB_ppage;
-SLIST_HEAD(page_stack, MDB_ppage);
 
-/* FIXME: tree depth is mostly bounded, we should just
- * use a fixed array and avoid malloc/pointer chasing
- */
-#define CURSOR_EMPTY(c)                 SLIST_EMPTY(&(c)->mc_stack)
-#define CURSOR_TOP(c)           SLIST_FIRST(&(c)->mc_stack)
-#define CURSOR_POP(c)           SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry)
-#define CURSOR_PUSH(c,p)        SLIST_INSERT_HEAD(&(c)->mc_stack, p, mp_entry)
+#define CURSOR_TOP(c)           (&(c)->mc_stack[(c)->mc_snum-1])
+#define CURSOR_PARENT(c)        (&(c)->mc_stack[(c)->mc_snum-2])
 
 struct MDB_xcursor;
 
 struct MDB_cursor {
        MDB_txn         *mc_txn;
-       struct page_stack        mc_stack;              /* stack of parent pages */
+       MDB_ppage       mc_stack[32];           /* stack of parent pages */
+       unsigned int    mc_snum;                /* number of pushed pages */
        MDB_dbi         mc_dbi;
        short           mc_initialized; /* 1 if initialized */
        short           mc_eof;         /* 1 if end is reached */
@@ -267,6 +262,7 @@ typedef struct MDB_node {
        unsigned int    mn_ksize:12;                    /* key size */
 #define F_BIGDATA       0x01                   /* data put on overflow page */
 #define F_SUBDATA       0x02                   /* data is a sub-database */
+#define F_DUPDATA       0x04                   /* data has duplicates */
        char            mn_data[1];
 } MDB_node;
 
@@ -311,9 +307,9 @@ struct MDB_env {
        int                     me_fd;
        int                     me_lfd;
        int                     me_mfd;                 /* just for writing the meta pages */
-       uint16_t        me_flags;
-       uint16_t                me_db_toggle;
-       unsigned int    me_psize;
+#define        MDB_FATAL_ERROR 0x80000000U
+       uint32_t        me_flags;
+       uint32_t        me_extrapad;    /* unused for now */
        unsigned int    me_maxreaders;
        unsigned int    me_numdbs;
        unsigned int    me_maxdbs;
@@ -325,6 +321,9 @@ struct MDB_env {
        MDB_txn         *me_txn;                /* current write transaction */
        size_t          me_mapsize;
        off_t           me_size;                /* current file size */
+       pgno_t          me_maxpg;               /* me_mapsize / me_psize */
+       unsigned int    me_psize;
+       unsigned int    me_db_toggle;
        MDB_dbx         *me_dbxs;               /* array */
        MDB_db          *me_dbs[2];
        MDB_oldpages *me_pghead;
@@ -455,23 +454,50 @@ mdb_version(int *maj, int *min, int *pat)
        return MDB_VERSION_STRING;
 }
 
-int
-mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
+static const char *errstr[] = {
+       "MDB_KEYEXIST: Key/data pair already exists",
+       "MDB_NOTFOUND: No matching key/data pair found",
+       "MDB_PAGE_NOTFOUND: Requested page not found",
+       "MDB_CORRUPTED: Located page was wrong type",
+       "MDB_PANIC: Update of meta page failed",
+       "MDB_VERSION_MISMATCH: Database environment version mismatch"
+};
+
+char *
+mdb_strerror(int err)
 {
-       return txn->mt_dbxs[dbi].md_cmp(a, b);
+       if (!err)
+               return ("Successful return: 0");
+
+       if (err >= MDB_KEYEXIST && err <= MDB_VERSION_MISMATCH)
+               return (char *)errstr[err - MDB_KEYEXIST];
+
+       return strerror(err);
 }
 
-static int
-_mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *key1, const MDB_val *key2)
+int
+mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
 {
+       if (txn->mt_dbxs[dbi].md_cmp)
+               return txn->mt_dbxs[dbi].md_cmp(a, b);
+
        if (txn->mt_dbs[dbi].md_flags & (MDB_REVERSEKEY
 #if __BYTE_ORDER == __LITTLE_ENDIAN
                |MDB_INTEGERKEY
 #endif
        ))
-               return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
+               return memnrcmp(a->mv_data, a->mv_size, b->mv_data, b->mv_size);
        else
-               return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
+               return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size);
+}
+
+int
+mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
+{
+       if (txn->mt_dbxs[dbi].md_dcmp)
+               return txn->mt_dbxs[dbi].md_dcmp(a, b);
+
+       return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size);
 }
 
 /* Allocate new page(s) for writing */
@@ -568,6 +594,11 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
        }
        }
 
+       if (pgno == P_INVALID) {
+               /* DB size is maxed out */
+               if (txn->mt_next_pgno + num >= txn->mt_env->me_maxpg)
+                       return NULL;
+       }
        if ((dp = malloc(txn->mt_env->me_psize * num + sizeof(MDB_dhead))) == NULL)
                return NULL;
        dp->h.md_num = num;
@@ -633,6 +664,10 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
        MDB_txn *txn;
        int rc, toggle;
 
+       if (env->me_flags & MDB_FATAL_ERROR) {
+               DPRINTF("mdb_txn_begin: environment had fatal error, must shutdown!");
+               return MDB_PANIC;
+       }
        if ((txn = calloc(1, sizeof(MDB_txn))) == NULL) {
                DPRINTF("calloc: %s", strerror(errno));
                return ENOMEM;
@@ -679,6 +714,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
 
        txn->mt_env = env;
 
+       toggle = env->me_txns->mti_me_toggle;
        if ((rc = mdb_env_read_meta(env, &toggle)) != MDB_SUCCESS) {
                mdb_txn_abort(txn);
                return rc;
@@ -763,9 +799,8 @@ mdb_txn_commit(MDB_txn *txn)
        env = txn->mt_env;
 
        if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
-               DPRINTF("attempt to commit read-only transaction");
                mdb_txn_abort(txn);
-               return EPERM;
+               return MDB_SUCCESS;
        }
 
        if (txn != env->me_txn) {
@@ -849,11 +884,12 @@ mdb_txn_commit(MDB_txn *txn)
        /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
         */
        next = 0;
+       i = 1;
        do {
                n = 0;
                done = 1;
                size = 0;
-               for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
+               for (; i<=txn->mt_u.dirty_list[0].mid; i++) {
                        dp = txn->mt_u.dirty_list[i].mptr;
                        if (dp->p.mp_pgno != next) {
                                if (n) {
@@ -914,8 +950,9 @@ mdb_txn_commit(MDB_txn *txn)
                mdb_txn_abort(txn);
                return n;
        }
-       env->me_txn = NULL;
 
+done:
+       env->me_txn = NULL;
        /* update the DB tables */
        {
                int toggle = !env->me_db_toggle;
@@ -939,10 +976,6 @@ mdb_txn_commit(MDB_txn *txn)
 
        pthread_mutex_unlock(&env->me_txns->mti_wmutex);
        free(txn);
-       txn = NULL;
-
-done:
-       mdb_txn_abort(txn);
 
        return MDB_SUCCESS;
 }
@@ -1036,19 +1069,23 @@ static int
 mdb_env_write_meta(MDB_txn *txn)
 {
        MDB_env *env;
-       MDB_meta        meta;
+       MDB_meta        meta, metab;
        off_t off;
-       int rc, len;
+       int rc, len, toggle;
        char *ptr;
 
        assert(txn != NULL);
        assert(txn->mt_env != NULL);
 
+       toggle = !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE);
        DPRINTF("writing meta page %d for root page %lu",
-               !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE), txn->mt_dbs[MAIN_DBI].md_root);
+               toggle, txn->mt_dbs[MAIN_DBI].md_root);
 
        env = txn->mt_env;
 
+       metab.mm_txnid = env->me_metas[toggle]->mm_txnid;
+       metab.mm_last_pg = env->me_metas[toggle]->mm_last_pg;
+
        ptr = (char *)&meta;
        off = offsetof(MDB_meta, mm_dbs[0].md_depth);
        len = sizeof(MDB_meta) - off;
@@ -1059,15 +1096,25 @@ mdb_env_write_meta(MDB_txn *txn)
        meta.mm_last_pg = txn->mt_next_pgno - 1;
        meta.mm_txnid = txn->mt_txnid;
 
-       if (!F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE))
+       if (toggle)
                off += env->me_psize;
        off += PAGEHDRSZ;
 
-       rc = pwrite(env->me_fd, ptr, len, off);
+       /* Write to the SYNC fd */
+       rc = pwrite(env->me_mfd, ptr, len, off);
        if (rc != len) {
                DPRINTF("write failed, disk error?");
+               /* On a failure, the pagecache still contains the new data.
+                * Write some old data back, to prevent it from being used.
+                * Use the non-SYNC fd; we know it will fail anyway.
+                */
+               meta.mm_last_pg = metab.mm_last_pg;
+               meta.mm_txnid = metab.mm_txnid;
+               rc = pwrite(env->me_fd, ptr, len, off);
+               env->me_flags |= MDB_FATAL_ERROR;
                return errno;
        }
+       txn->mt_env->me_txns->mti_me_toggle = toggle;
 
        return MDB_SUCCESS;
 }
@@ -1079,13 +1126,13 @@ mdb_env_read_meta(MDB_env *env, int *which)
 
        assert(env != NULL);
 
-       if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
+       if (which)
+               toggle = *which;
+       else if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
                toggle = 1;
 
        if (env->me_meta != env->me_metas[toggle])
                env->me_meta = env->me_metas[toggle];
-       if (which)
-               *which = toggle;
 
        DPRINTF("Using meta page %d", toggle);
 
@@ -1183,7 +1230,9 @@ mdb_env_open2(MDB_env *env, unsigned int flags)
        }
        env->me_psize = meta.mm_psize;
 
-       p = (MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)env->me_map;
+       env->me_maxpg = env->me_mapsize / env->me_psize;
+
+       p = (MDB_page *)env->me_map;
        env->me_metas[0] = METADATA(p);
        env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize);
 
@@ -1219,6 +1268,8 @@ mdb_env_share_locks(MDB_env *env)
        struct flock lock_info;
 
        env->me_txns->mti_txnid = env->me_meta->mm_txnid;
+       if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
+               env->me_txns->mti_me_toggle = 1;
 
        memset((void *)&lock_info, 0, sizeof(lock_info));
        lock_info.l_type = F_RDLCK;
@@ -1292,6 +1343,7 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
                env->me_txns->mti_magic = MDB_MAGIC;
                env->me_txns->mti_txnid = 0;
                env->me_txns->mti_numreaders = 0;
+               env->me_txns->mti_me_toggle = 0;
 
        } else {
                if (env->me_txns->mti_magic != MDB_MAGIC) {
@@ -1450,10 +1502,7 @@ mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key,
                nodekey.mv_size = node->mn_ksize;
                nodekey.mv_data = NODEKEY(node);
 
-               if (txn->mt_dbxs[dbi].md_cmp)
-                       rc = txn->mt_dbxs[dbi].md_cmp(key, &nodekey);
-               else
-                       rc = _mdb_cmp(txn, dbi, key, &nodekey);
+               rc = mdb_cmp(txn, dbi, key, &nodekey);
 
                if (IS_LEAF(mp))
                        DPRINTF("found leaf index %u [%.*s], rc = %i",
@@ -1490,12 +1539,12 @@ cursor_pop_page(MDB_cursor *cursor)
 {
        MDB_ppage       *top;
 
-       top = CURSOR_TOP(cursor);
-       CURSOR_POP(cursor);
-
-       DPRINTF("popped page %lu off cursor %p", top->mp_page->mp_pgno, (void *) cursor);
+       if (cursor->mc_snum) {
+               top = CURSOR_TOP(cursor);
+               cursor->mc_snum--;
 
-       free(top);
+               DPRINTF("popped page %lu off cursor %p", top->mp_page->mp_pgno, (void *) cursor);
+       }
 }
 
 static MDB_ppage *
@@ -1505,10 +1554,9 @@ cursor_push_page(MDB_cursor *cursor, MDB_page *mp)
 
        DPRINTF("pushing page %lu on cursor %p", mp->mp_pgno, (void *) cursor);
 
-       if ((ppage = calloc(1, sizeof(MDB_ppage))) == NULL)
-               return NULL;
+       ppage = &cursor->mc_stack[cursor->mc_snum++];
        ppage->mp_page = mp;
-       CURSOR_PUSH(cursor, ppage);
+       ppage->mp_ki = 0;
        return ppage;
 }
 
@@ -1546,7 +1594,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
        int rc;
 
        if (cursor && cursor_push_page(cursor, mp) == NULL)
-               return MDB_FAIL;
+               return ENOMEM;
 
        while (IS_BRANCH(mp)) {
                unsigned int     i = 0;
@@ -1583,12 +1631,12 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
 
                mpp->mp_parent = mp;
                if ((mp = mdb_get_page(txn, NODEPGNO(node))) == NULL)
-                       return MDB_FAIL;
+                       return MDB_PAGE_NOTFOUND;
                mpp->mp_pi = i;
                mpp->mp_page = mp;
 
                if (cursor && cursor_push_page(cursor, mp) == NULL)
-                       return MDB_FAIL;
+                       return ENOMEM;
 
                if (modify) {
                        MDB_dhead *dh = ((MDB_dhead *)mp)-1;
@@ -1605,7 +1653,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
        if (!IS_LEAF(mp)) {
                DPRINTF("internal error, index points to a %02X page!?",
                    mp->mp_flags);
-               return MDB_FAIL;
+               return MDB_CORRUPTED;
        }
 
        DPRINTF("found leaf page %lu for key %.*s", mp->mp_pgno,
@@ -1643,7 +1691,7 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
        }
 
        if ((mpp->mp_page = mdb_get_page(txn, root)) == NULL)
-               return MDB_FAIL;
+               return MDB_PAGE_NOTFOUND;
 
        DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags);
 
@@ -1687,9 +1735,9 @@ mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data)
        memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
        if ((omp = mdb_get_page(txn, pgno)) == NULL) {
                DPRINTF("read overflow page %lu failed", pgno);
-               return MDB_FAIL;
+               return MDB_PAGE_NOTFOUND;
        }
-       data->mv_data = omp;
+       data->mv_data = METADATA(omp);
 
        return MDB_SUCCESS;
 }
@@ -1719,7 +1767,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi,
        leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL);
        if (leaf && exact) {
                /* Return first duplicate data item */
-               if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        MDB_xcursor mx;
 
                        mdb_xcursor_init0(txn, dbi, &mx);
@@ -1742,13 +1790,13 @@ mdb_sibling(MDB_cursor *cursor, int move_right)
 {
        int              rc;
        MDB_node        *indx;
-       MDB_ppage       *parent, *top;
+       MDB_ppage       *parent;
        MDB_page        *mp;
 
-       top = CURSOR_TOP(cursor);
-       if ((parent = SLIST_NEXT(top, mp_entry)) == NULL) {
+       if (cursor->mc_snum < 2) {
                return MDB_NOTFOUND;            /* root has no siblings */
        }
+       parent = CURSOR_PARENT(cursor);
 
        DPRINTF("parent page is page %lu, index %u",
            parent->mp_page->mp_pgno, parent->mp_ki);
@@ -1773,7 +1821,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right)
 
        indx = NODEPTR(parent->mp_page, parent->mp_ki);
        if ((mp = mdb_get_page(cursor->mc_txn, indx->mn_pgno)) == NULL)
-               return MDB_FAIL;
+               return MDB_PAGE_NOTFOUND;
 #if 0
        mp->parent = parent->mp_page;
        mp->parent_index = parent->mp_ki;
@@ -1810,17 +1858,18 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
 
        assert(cursor->mc_initialized);
 
+       top = CURSOR_TOP(cursor);
+       mp = top->mp_page;
+
        if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-               if (op == MDB_NEXT || op == MDB_NEXT_DUP) {
+               leaf = NODEPTR(mp, top->mp_ki);
+               if ((op == MDB_NEXT || op == MDB_NEXT_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT);
                        if (op != MDB_NEXT || rc == MDB_SUCCESS)
                                return rc;
                }
        }
 
-       top = CURSOR_TOP(cursor);
-       mp = top->mp_page;
-
        DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
 
        if (top->mp_ki + 1 >= NUMKEYS(mp)) {
@@ -1841,12 +1890,14 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
        assert(IS_LEAF(mp));
        leaf = NODEPTR(mp, top->mp_ki);
 
+       if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+               mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+       }
        if (data) {
                if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
                        return rc;
 
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc != MDB_SUCCESS)
                                return rc;
@@ -1866,17 +1917,18 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
 
        assert(cursor->mc_initialized);
 
+       top = CURSOR_TOP(cursor);
+       mp = top->mp_page;
+
        if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-               if (op == MDB_PREV || op == MDB_PREV_DUP) {
+               leaf = NODEPTR(mp, top->mp_ki);
+               if ((op == MDB_PREV || op == MDB_PREV_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV);
                        if (op != MDB_PREV || rc == MDB_SUCCESS)
                                return rc;
                }
        }
 
-       top = CURSOR_TOP(cursor);
-       mp = top->mp_page;
-
        DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
 
        if (top->mp_ki == 0)  {
@@ -1899,12 +1951,14 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o
        assert(IS_LEAF(mp));
        leaf = NODEPTR(mp, top->mp_ki);
 
+       if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+               mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+       }
        if (data) {
                if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
                        return rc;
 
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc != MDB_SUCCESS)
                                return rc;
@@ -1927,8 +1981,7 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        assert(key);
        assert(key->mv_size > 0);
 
-       while (CURSOR_TOP(cursor) != NULL)
-               cursor_pop_page(cursor);
+       cursor->mc_snum = 0;
 
        rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp);
        if (rc != MDB_SUCCESS)
@@ -1956,12 +2009,11 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        cursor->mc_initialized = 1;
        cursor->mc_eof = 0;
 
+       if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+               mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+       }
        if (data) {
-               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
-                       return rc;
-
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        if (op == MDB_SET || op == MDB_SET_RANGE) {
                                rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        } else {
@@ -1978,6 +2030,19 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
                                if (rc != MDB_SUCCESS)
                                        return rc;
                        }
+               } else if (op == MDB_GET_BOTH || op == MDB_GET_BOTH_RANGE) {
+                       MDB_val d2;
+                       if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS)
+                               return rc;
+                       rc = mdb_dcmp(cursor->mc_txn, cursor->mc_dbi, data, &d2);
+                       if (rc) {
+                               if (op == MDB_GET_BOTH || rc > 0)
+                                       return MDB_NOTFOUND;
+                       }
+
+               } else {
+                       if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+                               return rc;
                }
        }
 
@@ -1998,8 +2063,7 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        MDB_pageparent  mpp;
        MDB_node        *leaf;
 
-       while (CURSOR_TOP(cursor) != NULL)
-               cursor_pop_page(cursor);
+       cursor->mc_snum = 0;
 
        rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, NULL, cursor, 0, &mpp);
        if (rc != MDB_SUCCESS)
@@ -2011,14 +2075,14 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        cursor->mc_eof = 0;
 
        if (data) {
-               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
-                       return rc;
-
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
                        rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc)
                                return rc;
+               } else {
+                       if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+                               return rc;
                }
        }
        return mdb_set_key(leaf, key);
@@ -2033,8 +2097,7 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        MDB_node        *leaf;
        MDB_val lkey;
 
-       while (CURSOR_TOP(cursor) != NULL)
-               cursor_pop_page(cursor);
+       cursor->mc_snum = 0;
 
        lkey.mv_size = MAXKEYSIZE+1;
        lkey.mv_data = NULL;
@@ -2052,14 +2115,14 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        top->mp_ki = NUMKEYS(top->mp_page) - 1;
 
        if (data) {
-               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
-                       return rc;
-
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+               if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
                        rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
                        if (rc)
                                return rc;
+               } else {
+                       if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+                               return rc;
                }
        }
 
@@ -2078,7 +2141,7 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        switch (op) {
        case MDB_GET_BOTH:
        case MDB_GET_BOTH_RANGE:
-               if (data == NULL) {
+               if (data == NULL || cursor->mc_xcursor == NULL) {
                        rc = EINVAL;
                        break;
                }
@@ -2087,7 +2150,7 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        case MDB_SET_RANGE:
                if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
                        rc = EINVAL;
-               } else if (op != MDB_SET_RANGE)
+               } else if (op == MDB_SET_RANGE)
                        rc = mdb_cursor_set(cursor, key, data, op, NULL);
                else
                        rc = mdb_cursor_set(cursor, key, data, op, &exact);
@@ -2211,7 +2274,7 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx,
                            data->mv_size);
                        node_size += sizeof(pgno_t);
                        if ((ofp = mdb_new_page(txn, dbi, P_OVERFLOW, ovpages)) == NULL)
-                               return MDB_FAIL;
+                               return ENOMEM;
                        DPRINTF("allocated overflow page %lu", ofp->p.mp_pgno);
                        flags |= F_BIGDATA;
                } else {
@@ -2331,7 +2394,7 @@ mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
        mx->mx_dbxs[dbn+1].md_dirty = 0;
        mx->mx_txn.mt_numdbs = dbn+2;
 
-       SLIST_INIT(&mx->mx_cursor.mc_stack);
+       mx->mx_cursor.mc_snum = 0;
        mx->mx_cursor.mc_txn = &mx->mx_txn;
        mx->mx_cursor.mc_dbi = dbn+1;
 }
@@ -2386,7 +2449,6 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
                size += sizeof(MDB_xcursor);
 
        if ((cursor = calloc(1, size)) != NULL) {
-               SLIST_INIT(&cursor->mc_stack);
                cursor->mc_dbi = dbi;
                cursor->mc_txn = txn;
                if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
@@ -2407,16 +2469,25 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
 int
 mdb_cursor_count(MDB_cursor *mc, unsigned long *countp)
 {
+       MDB_ppage       *top;
+       MDB_node        *leaf;
+
        if (mc == NULL || countp == NULL)
                return EINVAL;
 
        if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT))
                return EINVAL;
 
-       if (!mc->mc_xcursor->mx_cursor.mc_initialized)
-               return EINVAL;
+       top = CURSOR_TOP(mc);
+       leaf = NODEPTR(top->mp_page, top->mp_ki);
+       if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+               *countp = 1;
+       } else {
+               if (!mc->mc_xcursor->mx_cursor.mc_initialized)
+                       return EINVAL;
 
-       *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
+               *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
+       }
        return MDB_SUCCESS;
 }
 
@@ -2424,14 +2495,6 @@ void
 mdb_cursor_close(MDB_cursor *cursor)
 {
        if (cursor != NULL) {
-               while(!CURSOR_EMPTY(cursor))
-                       cursor_pop_page(cursor);
-               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_fini(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor);
-                       while(!CURSOR_EMPTY(&cursor->mc_xcursor->mx_cursor))
-                               cursor_pop_page(&cursor->mc_xcursor->mx_cursor);
-               }
-
                free(cursor);
        }
 }
@@ -2646,7 +2709,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                        DPRINTF("collapsing root page!");
                        txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
                        if ((root = mdb_get_page(txn, txn->mt_dbs[dbi].md_root)) == NULL)
-                               return MDB_FAIL;
+                               return MDB_PAGE_NOTFOUND;
                        txn->mt_dbs[dbi].md_depth--;
                        txn->mt_dbs[dbi].md_branch_pages--;
                } else
@@ -2672,7 +2735,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                DPRINTF("reading right neighbor");
                node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
                if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
-                       return MDB_FAIL;
+                       return MDB_PAGE_NOTFOUND;
                npp.mp_pi = mpp->mp_pi + 1;
                si = 0;
                di = NUMKEYS(mpp->mp_page);
@@ -2682,7 +2745,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                DPRINTF("reading left neighbor");
                node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
                if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
-                       return MDB_FAIL;
+                       return MDB_PAGE_NOTFOUND;
                npp.mp_pi = mpp->mp_pi - 1;
                si = NUMKEYS(npp.mp_page) - 1;
                di = 0;
@@ -2769,7 +2832,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                return MDB_NOTFOUND;
        }
 
-       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+       if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                MDB_xcursor mx;
                MDB_pageparent mp2;
 
@@ -2784,6 +2847,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                        if (mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root != P_INVALID) {
                                memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
                                        sizeof(MDB_db));
+                               txn->mt_dbs[dbi].md_entries--;
                                return rc;
                        }
                        /* otherwise fall thru and delete the sub-DB */
@@ -2797,24 +2861,21 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                                unsigned int i;
 
                                cursor_pop_page(&mx.mx_cursor);
-                               top = CURSOR_TOP(&mx.mx_cursor);
-                               if (top != NULL) {
-                                       parent = SLIST_NEXT(top, mp_entry);
-                                       while (parent != NULL) {
+                               if (mx.mx_cursor.mc_snum) {
+                                       top = CURSOR_TOP(&mx.mx_cursor);
+                                       while (mx.mx_cursor.mc_snum > 1) {
+                                               parent = CURSOR_PARENT(&mx.mx_cursor);
                                                for (i=0; i<NUMKEYS(top->mp_page); i++) {
                                                        ni = NODEPTR(top->mp_page, i);
                                                        mdb_midl_insert(txn->mt_free_pgs, ni->mn_pgno);
                                                }
-                                               if (parent) {
-                                                       parent->mp_ki++;
-                                                       if (parent->mp_ki >= NUMKEYS(parent->mp_page)) {
-                                                               cursor_pop_page(&mx.mx_cursor);
-                                                               top = CURSOR_TOP(&mx.mx_cursor);
-                                                               parent = SLIST_NEXT(top, mp_entry);
-                                                       } else {
-                                                               ni = NODEPTR(parent->mp_page, parent->mp_ki);
-                                                               top->mp_page = mdb_get_page(&mx.mx_txn, ni->mn_pgno);
-                                                       }
+                                               parent->mp_ki++;
+                                               if (parent->mp_ki >= NUMKEYS(parent->mp_page)) {
+                                                       cursor_pop_page(&mx.mx_cursor);
+                                                       top = parent;
+                                               } else {
+                                                       ni = NODEPTR(parent->mp_page, parent->mp_ki);
+                                                       top->mp_page = mdb_get_page(&mx.mx_txn, ni->mn_pgno);
                                                }
                                        }
                                }
@@ -2861,7 +2922,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
 
        if (mdp->h.md_parent == NULL) {
                if ((pdp = mdb_new_page(txn, dbi, P_BRANCH, 1)) == NULL)
-                       return MDB_FAIL;
+                       return ENOMEM;
                mdp->h.md_pi = 0;
                mdp->h.md_parent = &pdp->p;
                txn->mt_dbs[dbi].md_root = pdp->p.mp_pgno;
@@ -2869,23 +2930,23 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
                txn->mt_dbs[dbi].md_depth++;
 
                /* Add left (implicit) pointer. */
-               if (mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL,
-                   mdp->p.mp_pgno, 0) != MDB_SUCCESS)
-                       return MDB_FAIL;
+               if ((rc = mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL,
+                   mdp->p.mp_pgno, 0)) != MDB_SUCCESS)
+                       return rc;
        } else {
                DPRINTF("parent branch page is %lu", mdp->h.md_parent->mp_pgno);
        }
 
        /* Create a right sibling. */
        if ((rdp = mdb_new_page(txn, dbi, mdp->p.mp_flags, 1)) == NULL)
-               return MDB_FAIL;
+               return ENOMEM;
        rdp->h.md_parent = mdp->h.md_parent;
        rdp->h.md_pi = mdp->h.md_pi + 1;
        DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno);
 
        /* Move half of the keys to the right sibling. */
        if ((copy = malloc(txn->mt_env->me_psize)) == NULL)
-               return MDB_FAIL;
+               return ENOMEM;
        memcpy(copy, &mdp->p, txn->mt_env->me_psize);
        memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_psize - PAGEHDRSZ);
        mdp->p.mp_lower = PAGEHDRSZ;
@@ -2927,7 +2988,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
        }
        if (rc != MDB_SUCCESS) {
                free(copy);
-               return MDB_FAIL;
+               return rc;
        }
 
        for (i = j = 0; i <= NUMKEYS(copy); j++) {
@@ -2994,26 +3055,46 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
        unsigned int     ki;
        MDB_node        *leaf;
        MDB_pageparent  mpp;
-       MDB_val xdata, *rdata;
+       MDB_val xdata, *rdata, dkey;
        MDB_db dummy;
+       char dbuf[PAGESIZE];
+       int do_sub = 0;
 
        DPRINTF("==> put key %.*s, size %zu, data size %zu",
                (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size);
 
+       dkey.mv_size = 0;
        mpp.mp_parent = NULL;
        mpp.mp_pi = 0;
        rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp);
        if (rc == MDB_SUCCESS) {
                leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
                if (leaf && exact) {
-                       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
-                               goto put_sub;
-                       }
                        if (flags == MDB_NOOVERWRITE) {
                                DPRINTF("duplicate key %.*s",
                                    (int)key->mv_size, (char *)key->mv_data);
                                return MDB_KEYEXIST;
                        }
+                       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                               /* Was a single item before, must convert now */
+                               if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+                                       dkey.mv_size = NODEDSZ(leaf);
+                                       memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
+                                       memset(&dummy, 0, sizeof(dummy));
+                                       dummy.md_root = P_INVALID;
+                                       if (dkey.mv_size == sizeof(MDB_db)) {
+                                               memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
+                                               goto put_sub;
+                                       }
+                                       mdb_del_node(mpp.mp_page, ki);
+                                       do_sub = 1;
+                                       rdata = &xdata;
+                                       xdata.mv_size = sizeof(MDB_db);
+                                       xdata.mv_data = &dummy;
+                                       goto new_sub;
+                               }
+                               goto put_sub;
+                       }
                        /* same size, just replace it */
                        if (NODEDSZ(leaf) == data->mv_size) {
                                memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
@@ -3035,6 +3116,7 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
                mpp.mp_page = &dp->p;
                txn->mt_dbs[dbi].md_root = mpp.mp_page->mp_pgno;
                txn->mt_dbs[dbi].md_depth++;
+               txn->mt_dbxs[dbi].md_dirty = 1;
                ki = 0;
        }
        else
@@ -3044,20 +3126,9 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
        DPRINTF("there are %u keys, should insert new key at index %i",
                NUMKEYS(mpp.mp_page), ki);
 
-       /* For sorted dups, the data item at this level is a DB record
-        * for a child DB; the actual data elements are stored as keys
-        * in the child DB.
-        */
-       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
-               rdata = &xdata;
-               xdata.mv_size = sizeof(MDB_db);
-               xdata.mv_data = &dummy;
-               memset(&dummy, 0, sizeof(dummy));
-               dummy.md_root = P_INVALID;
-       } else {
-               rdata = data;
-       }
+       rdata = data;
 
+new_sub:
        if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, rdata)) {
                rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID);
        } else {
@@ -3068,8 +3139,6 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
        if (rc != MDB_SUCCESS)
                txn->mt_flags |= MDB_TXN_ERROR;
        else {
-               txn->mt_dbs[dbi].md_entries++;
-
                /* Remember if we just added a subdatabase */
                if (flags & F_SUBDATA) {
                        leaf = NODEPTR(mpp.mp_page, ki);
@@ -3081,7 +3150,7 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
                 * size limits on dupdata. The actual data fields of the child
                 * DB are all zero size.
                 */
-               if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+               if (do_sub) {
                        MDB_xcursor mx;
 
                        leaf = NODEPTR(mpp.mp_page, ki);
@@ -3092,11 +3161,19 @@ put_sub:
                        xdata.mv_data = "";
                        if (flags == MDB_NODUPDATA)
                                flags = MDB_NOOVERWRITE;
+                       /* converted, write the original data first */
+                       if (dkey.mv_size) {
+                               dkey.mv_data = dbuf;
+                               rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, &dkey, &xdata, flags);
+                               if (rc) return rc;
+                               leaf->mn_flags |= F_DUPDATA;
+                       }
                        rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags);
                        mdb_xcursor_fini(txn, dbi, &mx);
                        memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
                                sizeof(MDB_db));
                }
+               txn->mt_dbs[dbi].md_entries++;
        }
 
 done:
@@ -3127,6 +3204,19 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        return mdb_put0(txn, dbi, key, data, flags);
 }
 
+int
+mdb_env_set_flags(MDB_env *env, unsigned int flag, int onoff)
+{
+#define        CHANGEABLE      (MDB_NOSYNC)
+       if ((flag & CHANGEABLE) != flag)
+               return EINVAL;
+       if (onoff)
+               env->me_flags |= flag;
+       else
+               env->me_flags &= ~flag;
+       return MDB_SUCCESS;
+}
+
 int
 mdb_env_get_flags(MDB_env *env, unsigned int *arg)
 {
@@ -3224,6 +3314,8 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
                txn->mt_dbxs[txn->mt_numdbs].md_dirty = dirty;
                memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db));
                *dbi = txn->mt_numdbs;
+               txn->mt_env->me_dbs[0][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
+               txn->mt_env->me_dbs[1][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
                txn->mt_numdbs++;
        }