]> git.sur5r.net Git - openldap/commitdiff
Fix 2e3bc39fa94f21d692d8e94183f57aef9122c487
authorHoward Chu <hyc@symas.com>
Mon, 12 Sep 2011 03:54:32 +0000 (20:54 -0700)
committerHoward Chu <hyc@symas.com>
Mon, 12 Sep 2011 05:17:10 +0000 (22:17 -0700)
Various mistakes when converting from previous data structures.
Add a few more debug asserts/sanity checks.
Split all "if (foo) return" constructs to separate lines to allow
easier breakpoint setting.

Add mtest6 for checking mdb_split() behavior. This needs to be
expanded to check rebalance/merge cases too.

libraries/libmdb/.gitignore
libraries/libmdb/Makefile
libraries/libmdb/mdb.c
libraries/libmdb/mtest6.c [new file with mode: 0644]

index 7455ec9c11cee126775f8dde7937d5c1e1035d6f..9da29839c476142beb338780929df5e033a96126 100644 (file)
@@ -1,5 +1,5 @@
 mtest
-mtest[23]
+mtest[23456]
 testdb
 mdb_stat
 *.[ao]
index 3973979e878b8a25221d36df6b7c0f184beebfc8..e49e275638aabf2e14770057ca480092b569f814 100644 (file)
@@ -26,6 +26,7 @@ mtest2:       mtest2.o libmdb.a
 mtest3:        mtest3.o libmdb.a
 mtest4:        mtest4.o libmdb.a
 mtest5:        mtest5.o libmdb.a
+mtest6:        mtest6.o libmdb.a
 
 mdb.o: mdb.c mdb.h midl.h
        $(CC) $(CFLAGS) -fPIC $(CPPFLAGS) -c mdb.c
index b392826d07effad746b7ecadc8d00271b46b595c..806d493bb1fc9680821458f8f85e72fbdfa5bb40 100644 (file)
@@ -177,7 +177,7 @@ typedef ULONG               pgno_t;
 #elif DEBUG
        /**     Print a debug message with printf formatting. */
 # define DPRINTF(fmt, ...)     /**< Requires 2 or more args */ \
-       fprintf(stderr, "%s:%d:(%p) " fmt "\n", __func__, __LINE__, pthread_self(), __VA_ARGS__)
+       fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__)
 #else
 # define DPRINTF(fmt, ...)     ((void) 0)
 #endif
@@ -615,6 +615,9 @@ typedef struct MDB_db {
        /** Handle for the default DB. */
 #define        MAIN_DBI        1
 
+       /** Identify a data item as a valid sub-DB record */
+#define        MDB_SUBDATA     0x8200
+
        /** Meta page content. */
 typedef struct MDB_meta {
                /** Stamp identifying this as an MDB data file. It must be set
@@ -702,6 +705,10 @@ struct MDB_cursor {
        MDB_txn         *mc_txn;
        /** The database handle this cursor operates on */
        MDB_dbi         mc_dbi;
+       /** The database record for this cursor */
+       MDB_db          *mc_db;
+       /** The database auxiliary record for this cursor */
+       MDB_dbx         *mc_dbx;
        unsigned short  mc_snum;        /**< number of pushed pages */
        unsigned short  mc_top;         /**< index of top page, mc_snum-1 */
        unsigned int    mc_flags;
@@ -720,20 +727,10 @@ struct MDB_cursor {
 typedef struct MDB_xcursor {
        /** A sub-cursor for traversing the Dup DB */
        MDB_cursor mx_cursor;
-       /** A fake transaction struct for pointing to our own table
-        *      of DB info.
-        */
-       MDB_txn mx_txn;
-       /**     Our private DB information tables. Slots 0 and 1 are always
-        *      copies of the corresponding slots in the main transaction. These
-        *      hold the FREEDB and MAINDB, respectively. If the main cursor is
-        *      on a sub-database, that will be copied to slot 2, and the duplicate
-        *      database info will be in slot 3. If the main cursor is on the MAINDB
-        *      then the duplicate DB info will be in slot 2 and slot 3 will be unused.
-        */
-       MDB_dbx mx_dbxs[4];
-       /** MDB_db table */
-       MDB_db  mx_dbs[4];
+       /** The database record for this Dup DB */
+       MDB_db  mx_db;
+       /**     The auxiliary DB record for this Dup DB */
+       MDB_dbx mx_dbx;
 } MDB_xcursor;
 
        /** A set of pages freed by an earlier transaction. */
@@ -828,10 +825,9 @@ static int mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_curso
 static int     mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data);
 static int     mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data);
 
+static void    mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi);
 static void    mdb_xcursor_init0(MDB_cursor *mc);
 static void    mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node);
-static void    mdb_xcursor_init2(MDB_cursor *mc);
-static void    mdb_xcursor_fini(MDB_cursor *mc);
 
 static size_t  mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data);
 static size_t  mdb_branch_size(MDB_env *env, MDB_val *key);
@@ -957,10 +953,7 @@ mdb_alloc_page(MDB_cursor *mc, int num)
                        MDB_node *leaf;
                        ULONG *kptr, oldest;
 
-                       m2.mc_txn = txn;
-                       m2.mc_dbi = FREE_DBI;
-                       m2.mc_snum = 0;
-                       m2.mc_flags = 0;
+                       mdb_cursor_init(&m2, txn, FREE_DBI);
                        mdb_search_page(&m2, NULL, 0);
                        leaf = NODEPTR(m2.mc_pg[m2.mc_top], 0);
                        kptr = (ULONG *)NODEKEY(leaf);
@@ -1034,8 +1027,10 @@ mdb_alloc_page(MDB_cursor *mc, int num)
 
        if (pgno == P_INVALID) {
                /* DB size is maxed out */
-               if (txn->mt_next_pgno + num >= txn->mt_env->me_maxpg)
+               if (txn->mt_next_pgno + num >= txn->mt_env->me_maxpg) {
+                       assert(txn->mt_next_pgno + num < txn->mt_env->me_maxpg);
                        return NULL;
+               }
        }
        if (txn->mt_env->me_dpages && num == 1) {
                np = txn->mt_env->me_dpages;
@@ -1330,9 +1325,7 @@ mdb_txn_commit(MDB_txn *txn)
        DPRINTF("committing txn %lu %p on mdbenv %p, root page %lu",
            txn->mt_txnid, txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root);
 
-       mc.mc_txn = txn;
-       mc.mc_dbi = FREE_DBI;
-       mc.mc_flags = 0;
+       mdb_cursor_init(&mc, txn, FREE_DBI);
 
        /* should only be one record now */
        if (env->me_pghead) {
@@ -1401,8 +1394,7 @@ mdb_txn_commit(MDB_txn *txn)
                MDB_val data;
                data.mv_size = sizeof(MDB_db);
 
-               mc.mc_dbi = MAIN_DBI;
-               mc.mc_flags = 0;
+               mdb_cursor_init(&mc, txn, MAIN_DBI);
                for (i = 2; i < txn->mt_numdbs; i++) {
                        if (txn->mt_dbxs[i].md_dirty) {
                                data.mv_data = &txn->mt_dbs[i];
@@ -1446,7 +1438,7 @@ mdb_txn_commit(MDB_txn *txn)
                                return n;
                        }
                }
-               done = 1;;
+               done = 1;
 #else
                struct iovec     iov[MDB_COMMIT_PAGES];
                n = 0;
@@ -1778,7 +1770,8 @@ mdb_env_create(MDB_env **env)
        MDB_env *e;
 
        e = calloc(1, sizeof(MDB_env));
-       if (!e) return ENOMEM;
+       if (!e)
+               return ENOMEM;
 
        e->me_maxreaders = DEFAULT_READERS;
        e->me_maxdbs = 2;
@@ -2399,9 +2392,9 @@ mdb_search_node(MDB_cursor *mc, MDB_val *key, int *exactp)
 
        low = IS_LEAF(mp) ? 0 : 1;
        high = nkeys - 1;
-       cmp = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_cmp;
+       cmp = mc->mc_dbx->md_cmp;
        if (IS_LEAF2(mp)) {
-               nodekey.mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
+               nodekey.mv_size = mc->mc_db->md_pad;
                node = NODEPTR(mp, 0);  /* fake */
        }
        while (low <= high) {
@@ -2474,8 +2467,10 @@ cursor_push_page(MDB_cursor *mc, MDB_page *mp)
        DPRINTF("pushing page %lu on db %u cursor %p", mp->mp_pgno,
                mc->mc_dbi, (void *) mc);
 
-       if (mc->mc_snum >= CURSOR_STACK)
+       if (mc->mc_snum >= CURSOR_STACK) {
+               assert(mc->mc_snum < CURSOR_STACK);
                return ENOMEM;
+       }
 
        mc->mc_top = mc->mc_snum++;
        mc->mc_pg[mc->mc_top] = mp;
@@ -2588,7 +2583,7 @@ mdb_search_page(MDB_cursor *mc, MDB_val *key, int modify)
                DPUTS("transaction has failed, must abort");
                return EINVAL;
        } else
-               root = mc->mc_txn->mt_dbs[mc->mc_dbi].md_root;
+               root = mc->mc_db->md_root;
 
        if (root == P_INVALID) {                /* Tree is empty. */
                DPUTS("tree is empty");
@@ -2606,19 +2601,18 @@ mdb_search_page(MDB_cursor *mc, MDB_val *key, int modify)
 
        if (modify) {
                /* For sub-databases, update main root first */
-               if (mc->mc_dbi > MAIN_DBI && !mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty) {
+               if (mc->mc_dbi > MAIN_DBI && !mc->mc_dbx->md_dirty) {
                        MDB_cursor mc2;
-                       mc2.mc_txn = mc->mc_txn;
-                       mc2.mc_dbi = MAIN_DBI;
-                       rc = mdb_search_page(&mc2, &mc->mc_txn->mt_dbxs[mc->mc_dbi].md_name, 1);
+                       mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI);
+                       rc = mdb_search_page(&mc2, &mc->mc_dbx->md_name, 1);
                        if (rc)
                                return rc;
-                       mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty = 1;
+                       mc->mc_dbx->md_dirty = 1;
                }
                if (!F_ISSET(mc->mc_pg[0]->mp_flags, P_DIRTY)) {
                        if ((rc = mdb_touch(mc)))
                                return rc;
-                       mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = mc->mc_pg[0]->mp_pgno;
+                       mc->mc_db->md_root = mc->mc_pg[0]->mp_pgno;
                }
        }
 
@@ -2671,9 +2665,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi,
                return EINVAL;
        }
 
-       mc.mc_txn = txn;
-       mc.mc_dbi = dbi;
-       mc.mc_flags = 0;
+       mdb_cursor_init(&mc, txn, dbi);
        if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
                mc.mc_xcursor = &mx;
                mdb_xcursor_init0(&mc);
@@ -2687,36 +2679,34 @@ static int
 mdb_sibling(MDB_cursor *mc, int move_right)
 {
        int              rc;
-       unsigned int    ptop;
        MDB_node        *indx;
        MDB_page        *mp;
 
        if (mc->mc_snum < 2) {
                return MDB_NOTFOUND;            /* root has no siblings */
        }
-       ptop = mc->mc_top-1;
 
+       cursor_pop_page(mc);
        DPRINTF("parent page is page %lu, index %u",
-               mc->mc_pg[ptop]->mp_pgno, mc->mc_ki[ptop]);
+               mc->mc_pg[mc->mc_top]->mp_pgno, mc->mc_ki[mc->mc_top]);
 
-       cursor_pop_page(mc);
-       if (move_right ? (mc->mc_ki[ptop] + 1u >= NUMKEYS(mc->mc_pg[ptop]))
-                      : (mc->mc_ki[ptop] == 0)) {
+       if (move_right ? (mc->mc_ki[mc->mc_top] + 1u >= NUMKEYS(mc->mc_pg[mc->mc_top]))
+                      : (mc->mc_ki[mc->mc_top] == 0)) {
                DPRINTF("no more keys left, moving to %s sibling",
                    move_right ? "right" : "left");
                if ((rc = mdb_sibling(mc, move_right)) != MDB_SUCCESS)
                        return rc;
        } else {
                if (move_right)
-                       mc->mc_ki[ptop]++;
+                       mc->mc_ki[mc->mc_top]++;
                else
-                       mc->mc_ki[ptop]--;
+                       mc->mc_ki[mc->mc_top]--;
                DPRINTF("just moving to %s index key %u",
-                   move_right ? "right" : "left", mc->mc_ki[ptop]);
+                   move_right ? "right" : "left", mc->mc_ki[mc->mc_top]);
        }
-       assert(IS_BRANCH(mc->mc_pg[ptop]));
+       assert(IS_BRANCH(mc->mc_pg[mc->mc_top]));
 
-       indx = NODEPTR(mc->mc_pg[ptop], mc->mc_ki[ptop]);
+       indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
        if ((rc = mdb_get_page(mc->mc_txn, NODEPGNO(indx), &mp)))
                return rc;;
 
@@ -2740,7 +2730,7 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
 
        mp = mc->mc_pg[mc->mc_top];
 
-       if (mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT) {
+       if (mc->mc_db->md_flags & MDB_DUPSORT) {
                leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
                if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                        if (op == MDB_NEXT || op == MDB_NEXT_DUP) {
@@ -2772,7 +2762,7 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
            mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top]);
 
        if (IS_LEAF2(mp)) {
-               key->mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
+               key->mv_size = mc->mc_db->md_pad;
                key->mv_data = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], key->mv_size);
                return MDB_SUCCESS;
        }
@@ -2809,7 +2799,7 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
 
        mp = mc->mc_pg[mc->mc_top];
 
-       if (mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT) {
+       if (mc->mc_db->md_flags & MDB_DUPSORT) {
                leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
                if (op == MDB_PREV || op == MDB_PREV_DUP) {
                        if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
@@ -2844,7 +2834,7 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
            mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top]);
 
        if (IS_LEAF2(mp)) {
-               key->mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
+               key->mv_size = mc->mc_db->md_pad;
                key->mv_data = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], key->mv_size);
                return MDB_SUCCESS;
        }
@@ -2887,13 +2877,13 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                MDB_val nodekey;
 
                if (mc->mc_pg[mc->mc_top]->mp_flags & P_LEAF2) {
-                       nodekey.mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
+                       nodekey.mv_size = mc->mc_db->md_pad;
                        nodekey.mv_data = LEAF2KEY(mc->mc_pg[mc->mc_top], 0, nodekey.mv_size);
                } else {
                        leaf = NODEPTR(mc->mc_pg[mc->mc_top], 0);
                        MDB_SET_KEY(leaf, &nodekey);
                }
-               rc = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_cmp(key, &nodekey);
+               rc = mc->mc_dbx->md_cmp(key, &nodekey);
                if (rc == 0) {
                        /* Probably happens rarely, but first node on the page
                         * was the one we wanted.
@@ -2915,7 +2905,7 @@ set1:
                                        leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
                                        MDB_SET_KEY(leaf, &nodekey);
                                }
-                               rc = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_cmp(key, &nodekey);
+                               rc = mc->mc_dbx->md_cmp(key, &nodekey);
                                if (rc == 0) {
                                        /* last node was the one we wanted */
                                        mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top])-1;
@@ -2969,7 +2959,7 @@ set3:
        mc->mc_flags &= ~C_EOF;
 
        if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
-               key->mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
+               key->mv_size = mc->mc_db->md_pad;
                key->mv_data = LEAF2KEY(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], key->mv_size);
                return MDB_SUCCESS;
        }
@@ -2997,7 +2987,7 @@ set3:
                        MDB_val d2;
                        if ((rc = mdb_read_data(mc->mc_txn, leaf, &d2)) != MDB_SUCCESS)
                                return rc;
-                       rc = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dcmp(data, &d2);
+                       rc = mc->mc_dbx->md_dcmp(data, &d2);
                        if (rc) {
                                if (op == MDB_GET_BOTH || rc > 0)
                                        return MDB_NOTFOUND;
@@ -3032,8 +3022,10 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data)
        mc->mc_flags |= C_INITIALIZED;
        mc->mc_flags &= ~C_EOF;
 
+       mc->mc_ki[mc->mc_top] = 0;
+
        if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
-               key->mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
+               key->mv_size = mc->mc_db->md_pad;
                key->mv_data = LEAF2KEY(mc->mc_pg[mc->mc_top], 0, key->mv_size);
                return MDB_SUCCESS;
        }
@@ -3077,7 +3069,7 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data)
        mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]) - 1;
 
        if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
-               key->mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
+               key->mv_size = mc->mc_db->md_pad;
                key->mv_data = LEAF2KEY(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], key->mv_size);
                return MDB_SUCCESS;
        }
@@ -3126,7 +3118,7 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                break;
        case MDB_GET_MULTIPLE:
                if (data == NULL ||
-                       !(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPFIXED) ||
+                       !(mc->mc_db->md_flags & MDB_DUPFIXED) ||
                        !(mc->mc_flags & C_INITIALIZED)) {
                        rc = EINVAL;
                        break;
@@ -3138,7 +3130,7 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                goto fetchm;
        case MDB_NEXT_MULTIPLE:
                if (data == NULL ||
-                       !(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPFIXED)) {
+                       !(mc->mc_db->md_flags & MDB_DUPFIXED)) {
                        rc = EINVAL;
                        break;
                }
@@ -3152,7 +3144,7 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
 fetchm:
                                mx = &mc->mc_xcursor->mx_cursor;
                                data->mv_size = NUMKEYS(mx->mc_pg[mx->mc_top]) *
-                                       mx->mc_txn->mt_dbs[mx->mc_dbi].md_pad;
+                                       mx->mc_db->md_pad;
                                data->mv_data = METADATA(mx->mc_pg[mx->mc_top]);
                                mx->mc_ki[mx->mc_top] = NUMKEYS(mx->mc_pg[mx->mc_top])-1;
                        } else {
@@ -3181,7 +3173,7 @@ fetchm:
                break;
        case MDB_FIRST_DUP:
                if (data == NULL ||
-                       !(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT) ||
+                       !(mc->mc_db->md_flags & MDB_DUPSORT) ||
                        !(mc->mc_flags & C_INITIALIZED) ||
                        !(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) {
                        rc = EINVAL;
@@ -3194,7 +3186,7 @@ fetchm:
                break;
        case MDB_LAST_DUP:
                if (data == NULL ||
-                       !(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT) ||
+                       !(mc->mc_db->md_flags & MDB_DUPSORT) ||
                        !(mc->mc_flags & C_INITIALIZED) ||
                        !(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) {
                        rc = EINVAL;
@@ -3216,20 +3208,21 @@ mdb_cursor_touch(MDB_cursor *mc)
 {
        int rc;
 
-       if (mc->mc_dbi > MAIN_DBI && !mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty) {
+       if (mc->mc_dbi > MAIN_DBI && !mc->mc_dbx->md_dirty) {
                MDB_cursor mc2;
-               mc2.mc_txn = mc->mc_txn;
-               mc2.mc_dbi = MAIN_DBI;
-               rc = mdb_search_page(&mc2, &mc->mc_txn->mt_dbxs[mc->mc_dbi].md_name, 1);
-               if (rc) return rc;
-               mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty = 1;
+               mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI);
+               rc = mdb_search_page(&mc2, &mc->mc_dbx->md_name, 1);
+               if (rc)
+                        return rc;
+               mc->mc_dbx->md_dirty = 1;
        }
        for (mc->mc_top = 0; mc->mc_top < mc->mc_snum; mc->mc_top++) {
                if (!F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY)) {
                        rc = mdb_touch(mc);
-                       if (rc) return rc;
+                       if (rc)
+                               return rc;
                        if (!mc->mc_top) {
-                               mc->mc_txn->mt_dbs[mc->mc_dbi].md_root =
+                               mc->mc_db->md_root =
                                        mc->mc_pg[mc->mc_top]->mp_pgno;
                        }
                }
@@ -3263,7 +3256,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                if (!(mc->mc_flags & C_INITIALIZED))
                        return EINVAL;
                rc = MDB_SUCCESS;
-       } else if (mc->mc_txn->mt_dbs[mc->mc_dbi].md_root == P_INVALID) {
+       } else if (mc->mc_db->md_root == P_INVALID) {
                MDB_page *np;
                /* new database, write a root leaf page */
                DPUTS("allocating new root leaf page");
@@ -3272,10 +3265,10 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                }
                mc->mc_snum = 0;
                cursor_push_page(mc, np);
-               mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = np->mp_pgno;
-               mc->mc_txn->mt_dbs[mc->mc_dbi].md_depth++;
-               mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty = 1;
-               if ((mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED))
+               mc->mc_db->md_root = np->mp_pgno;
+               mc->mc_db->md_depth++;
+               mc->mc_dbx->md_dirty = 1;
+               if ((mc->mc_db->md_flags & (MDB_DUPSORT|MDB_DUPFIXED))
                        == MDB_DUPFIXED)
                        np->mp_flags |= P_LEAF2;
                mc->mc_flags |= C_INITIALIZED;
@@ -3296,14 +3289,15 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
 
        /* Cursor is positioned, now make sure all pages are writable */
        rc2 = mdb_cursor_touch(mc);
-       if (rc2) return rc2;
+       if (rc2)
+               return rc2;
 
 top:
        /* The key already exists */
        if (rc == MDB_SUCCESS) {
                /* there's only a key anyway, so this is a no-op */
                if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
-                       unsigned int ksize = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
+                       unsigned int ksize = mc->mc_db->md_pad;
                        if (key->mv_size != ksize)
                                return EINVAL;
                        if (flags == MDB_CURRENT) {
@@ -3316,22 +3310,23 @@ top:
                leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
 
                /* DB has dups? */
-               if (F_ISSET(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags, MDB_DUPSORT)) {
+               if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
                        /* Was a single item before, must convert now */
                        if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                                dkey.mv_size = NODEDSZ(leaf);
                                dkey.mv_data = dbuf;
                                memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
                                /* data matches, ignore it */
-                               if (!mdb_dcmp(mc->mc_txn, mc->mc_dbi, data, &dkey))
+                               if (!mc->mc_dbx->md_dcmp(data, &dkey))
                                        return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
                                memset(&dummy, 0, sizeof(dummy));
-                               if (mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPFIXED) {
+                               if (mc->mc_db->md_flags & MDB_DUPFIXED) {
                                        dummy.md_pad = data->mv_size;
                                        dummy.md_flags = MDB_DUPFIXED;
-                                       if (mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_INTEGERDUP)
+                                       if (mc->mc_db->md_flags & MDB_INTEGERDUP)
                                                dummy.md_flags |= MDB_INTEGERKEY;
                                }
+                               dummy.md_flags |= MDB_SUBDATA;
                                dummy.md_root = P_INVALID;
                                if (dkey.mv_size == sizeof(MDB_db)) {
                                        memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
@@ -3386,11 +3381,10 @@ new_sub:
                 * DB are all zero size.
                 */
                if (do_sub) {
+                       MDB_db *db;
                        leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
 put_sub:
-                       if (flags == MDB_CURRENT)
-                               mdb_xcursor_init2(mc);
-                       else
+                       if (flags != MDB_CURRENT)
                                mdb_xcursor_init1(mc, leaf);
                        xdata.mv_size = 0;
                        xdata.mv_data = "";
@@ -3399,16 +3393,16 @@ put_sub:
                        /* converted, write the original data first */
                        if (dkey.mv_size) {
                                rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, flags);
-                               if (rc) return rc;
+                               if (rc)
+                                       return rc;
                                leaf->mn_flags |= F_DUPDATA;
                        }
                        rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, flags);
-                       mdb_xcursor_fini(mc);
-                       memcpy(NODEDATA(leaf),
-                               &mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi],
-                               sizeof(MDB_db));
+                       db = NODEDATA(leaf);
+                       assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
+                       memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
                }
-               mc->mc_txn->mt_dbs[mc->mc_dbi].md_entries++;
+               mc->mc_db->md_entries++;
        }
 done:
        return rc;
@@ -3427,22 +3421,20 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
                return EINVAL;
 
        rc = mdb_cursor_touch(mc);
-       if (rc) return rc;
+       if (rc)
+               return rc;
 
        leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
 
        if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                if (flags != MDB_NODUPDATA) {
-                       mdb_xcursor_init2(mc);
                        rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0);
-                       mdb_xcursor_fini(mc);
                        /* If sub-DB still has entries, we're done */
-                       if (mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_root
-                               != P_INVALID) {
-                               memcpy(NODEDATA(leaf),
-                                       &mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi],
-                                       sizeof(MDB_db));
-                               mc->mc_txn->mt_dbs[mc->mc_dbi].md_entries--;
+                       if (mc->mc_xcursor->mx_db.md_root != P_INVALID) {
+                               MDB_db *db = NODEDATA(leaf);
+                               assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
+                               memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
+                               mc->mc_db->md_entries--;
                                return rc;
                        }
                        /* otherwise fall thru and delete the sub-DB */
@@ -3456,26 +3448,28 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
                        unsigned int i;
 
                        mx = &mc->mc_xcursor->mx_cursor;
-                       mc->mc_txn->mt_dbs[mc->mc_dbi].md_entries -=
-                               mx->mc_txn->mt_dbs[mx->mc_dbi].md_entries;
+                       mc->mc_db->md_entries -=
+                               mx->mc_db->md_entries;
 
                        cursor_pop_page(mx);
-                       if (mx->mc_snum) {
-                               while (mx->mc_snum > 1) {
-                                       for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) {
-                                               pgno_t pg;
-                                               ni = NODEPTR(mx->mc_pg[mx->mc_top], i);
-                                               pg = NODEPGNO(ni);
-                                               /* free it */
-                                               mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
-                                       }
-                                       rc = mdb_sibling(mx, 1);
-                                       if (rc) break;
+                       while (mx->mc_snum > 1) {
+                               for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) {
+                                       MDB_page *mp;
+                                       pgno_t pg;
+                                       ni = NODEPTR(mx->mc_pg[mx->mc_top], i);
+                                       pg = NODEPGNO(ni);
+                                       if ((rc = mdb_get_page(mc->mc_txn, pg, &mp)))
+                                               return rc;
+                                       /* free it */
+                                       mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
                                }
+                               rc = mdb_sibling(mx, 1);
+                               if (rc)
+                                       break;
                        }
                        /* free it */
                        mdb_midl_append(mc->mc_txn->mt_free_pgs,
-                               mx->mc_txn->mt_dbs[mx->mc_dbi].md_root);
+                               mx->mc_db->md_root);
                }
        }
 
@@ -3498,11 +3492,11 @@ mdb_new_page(MDB_cursor *mc, uint32_t flags, int num)
        np->mp_upper = mc->mc_txn->mt_env->me_psize;
 
        if (IS_BRANCH(np))
-               mc->mc_txn->mt_dbs[mc->mc_dbi].md_branch_pages++;
+               mc->mc_db->md_branch_pages++;
        else if (IS_LEAF(np))
-               mc->mc_txn->mt_dbs[mc->mc_dbi].md_leaf_pages++;
+               mc->mc_db->md_leaf_pages++;
        else if (IS_OVERFLOW(np)) {
-               mc->mc_txn->mt_dbs[mc->mc_dbi].md_overflow_pages += num;
+               mc->mc_db->md_overflow_pages += num;
                np->mp_pages = num;
        }
 
@@ -3560,7 +3554,7 @@ mdb_add_node(MDB_cursor *mc, indx_t indx,
 
        if (IS_LEAF2(mp)) {
                /* Move higher keys up one slot. */
-               int ksize = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad, dif;
+               int ksize = mc->mc_db->md_pad, dif;
                char *ptr = LEAF2KEY(mp, indx, ksize);
                dif = NUMKEYS(mp) - indx;
                if (dif > 0)
@@ -3703,29 +3697,17 @@ static void
 mdb_xcursor_init0(MDB_cursor *mc)
 {
        MDB_xcursor *mx = mc->mc_xcursor;
-       MDB_dbi dbn;
-
-       mx->mx_txn = *mc->mc_txn;
-       mx->mx_txn.mt_dbxs = mx->mx_dbxs;
-       mx->mx_txn.mt_dbs = mx->mx_dbs;
-       mx->mx_dbxs[0] = mc->mc_txn->mt_dbxs[0];
-       mx->mx_dbxs[1] = mc->mc_txn->mt_dbxs[1];
-       if (mc->mc_dbi > 1) {
-               mx->mx_dbxs[2] = mc->mc_txn->mt_dbxs[mc->mc_dbi];
-               dbn = 2;
-       } else {
-               dbn = 1;
-       }
-       mx->mx_dbxs[dbn+1].md_parent = dbn;
-       mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp;
-       mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel;
-       mx->mx_dbxs[dbn+1].md_dirty = 0;
-       mx->mx_txn.mt_numdbs = dbn+2;
-       mx->mx_txn.mt_u = mc->mc_txn->mt_u;
 
        mx->mx_cursor.mc_xcursor = NULL;
-       mx->mx_cursor.mc_txn = &mx->mx_txn;
-       mx->mx_cursor.mc_dbi = dbn+1;
+       mx->mx_cursor.mc_txn = mc->mc_txn;
+       mx->mx_cursor.mc_db = &mx->mx_db;
+       mx->mx_cursor.mc_dbx = &mx->mx_dbx;
+       mx->mx_cursor.mc_dbi = mc->mc_dbi+1;
+       mx->mx_dbx.md_parent = mc->mc_dbi;
+       mx->mx_dbx.md_cmp = mc->mc_dbx->md_dcmp;
+       mx->mx_dbx.md_dcmp = NULL;
+       mx->mx_dbx.md_rel = mc->mc_dbx->md_rel;
+       mx->mx_dbx.md_dirty = 0;
 }
 
 static void
@@ -3733,57 +3715,27 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
 {
        MDB_db *db = NODEDATA(node);
        MDB_xcursor *mx = mc->mc_xcursor;
-       MDB_dbi dbn;
-       mx->mx_dbs[0] = mc->mc_txn->mt_dbs[0];
-       mx->mx_dbs[1] = mc->mc_txn->mt_dbs[1];
-       if (mc->mc_dbi > 1) {
-               mx->mx_dbs[2] = mc->mc_txn->mt_dbs[mc->mc_dbi];
-               mx->mx_dbxs[2].md_dirty = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty;
-               dbn = 3;
-       } else {
-               dbn = 2;
-       }
-       DPRINTF("Sub-db %u for db %u root page %lu", dbn, mc->mc_dbi, db->md_root);
-       mx->mx_dbs[dbn] = *db;
+       assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
+       mx->mx_db = *db;
+       DPRINTF("Sub-db %u for db %u root page %zu", mx->mx_cursor.mc_dbi, mc->mc_dbi,
+               db->md_root);
        if (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY))
-               mx->mx_dbxs[dbn].md_dirty = 1;
-       mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node);
-       mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize;
-       mx->mx_txn.mt_next_pgno = mc->mc_txn->mt_next_pgno;
+               mx->mx_dbx.md_dirty = 1;
+       mx->mx_dbx.md_name.mv_data = NODEKEY(node);
+       mx->mx_dbx.md_name.mv_size = node->mn_ksize;
        mx->mx_cursor.mc_snum = 0;
        mx->mx_cursor.mc_flags = 0;
 }
 
 static void
-mdb_xcursor_init2(MDB_cursor *mc)
-{
-       MDB_xcursor *mx = mc->mc_xcursor;
-       MDB_dbi dbn;
-       mx->mx_dbs[0] = mc->mc_txn->mt_dbs[0];
-       mx->mx_dbs[1] = mc->mc_txn->mt_dbs[1];
-       if (mc->mc_dbi > 1) {
-               mx->mx_dbs[2] = mc->mc_txn->mt_dbs[mc->mc_dbi];
-               mx->mx_dbxs[2].md_dirty = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty;
-               dbn = 3;
-       } else {
-               dbn = 2;
-       }
-       DPRINTF("Sub-db %u for db %u root page %lu", dbn, mc->mc_dbi,
-               mx->mx_dbs[dbn].md_root);
-       mx->mx_txn.mt_next_pgno = mc->mc_txn->mt_next_pgno;
-}
-
-static void
-mdb_xcursor_fini(MDB_cursor *mc)
+mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi)
 {
-       MDB_xcursor *mx = mc->mc_xcursor;
-       mc->mc_txn->mt_next_pgno = mx->mx_txn.mt_next_pgno;
-       mc->mc_txn->mt_dbs[0] = mx->mx_dbs[0];
-       mc->mc_txn->mt_dbs[1] = mx->mx_dbs[1];
-       if (mc->mc_dbi > 1) {
-               mc->mc_txn->mt_dbs[mc->mc_dbi] = mx->mx_dbs[2];
-               mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty = mx->mx_dbxs[2].md_dirty;
-       }
+       mc->mc_dbi = dbi;
+       mc->mc_txn = txn;
+       mc->mc_db = &txn->mt_dbs[dbi];
+       mc->mc_dbx = &txn->mt_dbxs[dbi];
+       mc->mc_snum = 0;
+       mc->mc_flags = 0;
 }
 
 int
@@ -3798,9 +3750,8 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
        if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)
                size += sizeof(MDB_xcursor);
 
-       if ((mc = calloc(1, size)) != NULL) {
-               mc->mc_dbi = dbi;
-               mc->mc_txn = txn;
+       if ((mc = malloc(size)) != NULL) {
+               mdb_cursor_init(mc, txn, dbi);
                if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
                        MDB_xcursor *mx = (MDB_xcursor *)(mc + 1);
                        mc->mc_xcursor = mx;
@@ -3824,7 +3775,7 @@ mdb_cursor_count(MDB_cursor *mc, unsigned long *countp)
        if (mc == NULL || countp == NULL)
                return EINVAL;
 
-       if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT))
+       if (!(mc->mc_db->md_flags & MDB_DUPSORT))
                return EINVAL;
 
        leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
@@ -3834,7 +3785,7 @@ mdb_cursor_count(MDB_cursor *mc, unsigned long *countp)
                if (!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED))
                        return EINVAL;
 
-               *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
+               *countp = mc->mc_xcursor->mx_db.md_entries;
        }
        return MDB_SUCCESS;
 }
@@ -3905,27 +3856,30 @@ mdb_move_node(MDB_cursor *csrc, MDB_cursor *cdst)
        /* Mark src and dst as dirty. */
        if ((rc = mdb_touch(csrc)) ||
            (rc = mdb_touch(cdst)))
-               return rc;;
+               return rc;
 
        if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
                srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);        /* fake */
-               key.mv_size = csrc->mc_txn->mt_dbs[csrc->mc_dbi].md_pad;
+               key.mv_size = csrc->mc_db->md_pad;
                key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
                data.mv_size = 0;
                data.mv_data = NULL;
        } else {
+               srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]);
                if (csrc->mc_ki[csrc->mc_top] == 0 && IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
                        unsigned int snum = csrc->mc_snum;
+                       MDB_node *s2;
                        /* must find the lowest key below src */
                        mdb_search_page_root(csrc, NULL, 0);
-                       srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
+                       s2 = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
+                       key.mv_size = NODEKSZ(s2);
+                       key.mv_data = NODEKEY(s2);
                        csrc->mc_snum = snum--;
                        csrc->mc_top = snum;
                } else {
-                       srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]);
+                       key.mv_size = NODEKSZ(srcnode);
+                       key.mv_data = NODEKEY(srcnode);
                }
-               key.mv_size = NODEKSZ(srcnode);
-               key.mv_data = NODEKEY(srcnode);
                data.mv_size = NODEDSZ(srcnode);
                data.mv_data = NODEDATA(srcnode);
        }
@@ -3952,9 +3906,9 @@ mdb_move_node(MDB_cursor *csrc, MDB_cursor *cdst)
        if (csrc->mc_ki[csrc->mc_top] == 0) {
                if (csrc->mc_ki[csrc->mc_top-1] != 0) {
                        if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
-                               key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
+                               key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size);
                        } else {
-                               srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]);
+                               srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
                                key.mv_size = NODEKSZ(srcnode);
                                key.mv_data = NODEKEY(srcnode);
                        }
@@ -4017,7 +3971,7 @@ mdb_merge(MDB_cursor *csrc, MDB_cursor *cdst)
         */
        j = NUMKEYS(cdst->mc_pg[cdst->mc_top]);
        if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
-               key.mv_size = csrc->mc_txn->mt_dbs[csrc->mc_dbi].md_pad;
+               key.mv_size = csrc->mc_db->md_pad;
                key.mv_data = METADATA(csrc->mc_pg[csrc->mc_top]);
                for (i = 0; i < NUMKEYS(csrc->mc_pg[csrc->mc_top]); i++, j++) {
                        rc = mdb_add_node(cdst, j, &key, NULL, 0, 0);
@@ -4053,9 +4007,9 @@ mdb_merge(MDB_cursor *csrc, MDB_cursor *cdst)
 
        mdb_midl_append(csrc->mc_txn->mt_free_pgs, csrc->mc_pg[csrc->mc_top]->mp_pgno);
        if (IS_LEAF(csrc->mc_pg[csrc->mc_top]))
-               csrc->mc_txn->mt_dbs[csrc->mc_dbi].md_leaf_pages--;
+               csrc->mc_db->md_leaf_pages--;
        else
-               csrc->mc_txn->mt_dbs[csrc->mc_dbi].md_branch_pages--;
+               csrc->mc_db->md_branch_pages--;
        cursor_pop_page(csrc);
 
        return mdb_rebalance(csrc);
@@ -4068,6 +4022,8 @@ mdb_cursor_copy(const MDB_cursor *csrc, MDB_cursor *cdst)
 
        cdst->mc_txn = csrc->mc_txn;
        cdst->mc_dbi = csrc->mc_dbi;
+       cdst->mc_db  = csrc->mc_db;
+       cdst->mc_dbx = csrc->mc_dbx;
        cdst->mc_snum = csrc->mc_snum;
        cdst->mc_top = csrc->mc_top;
        cdst->mc_flags = csrc->mc_flags;
@@ -4082,7 +4038,6 @@ static int
 mdb_rebalance(MDB_cursor *mc)
 {
        MDB_node        *node;
-       MDB_page        *root;
        int rc;
        unsigned int ptop;
        MDB_cursor      mn;
@@ -4100,18 +4055,20 @@ mdb_rebalance(MDB_cursor *mc)
        if (mc->mc_snum < 2) {
                if (NUMKEYS(mc->mc_pg[mc->mc_top]) == 0) {
                        DPUTS("tree is completely empty");
-                       mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = P_INVALID;
-                       mc->mc_txn->mt_dbs[mc->mc_dbi].md_depth = 0;
-                       mc->mc_txn->mt_dbs[mc->mc_dbi].md_leaf_pages = 0;
+                       mc->mc_db->md_root = P_INVALID;
+                       mc->mc_db->md_depth = 0;
+                       mc->mc_db->md_leaf_pages = 0;
                        mdb_midl_append(mc->mc_txn->mt_free_pgs, mc->mc_pg[mc->mc_top]->mp_pgno);
+                       mc->mc_snum = 0;
                } else if (IS_BRANCH(mc->mc_pg[mc->mc_top]) && NUMKEYS(mc->mc_pg[mc->mc_top]) == 1) {
                        DPUTS("collapsing root page!");
                        mdb_midl_append(mc->mc_txn->mt_free_pgs, mc->mc_pg[mc->mc_top]->mp_pgno);
-                       mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = NODEPGNO(NODEPTR(mc->mc_pg[mc->mc_top], 0));
-                       if ((rc = mdb_get_page(mc->mc_txn, mc->mc_txn->mt_dbs[mc->mc_dbi].md_root, &root)))
+                       mc->mc_db->md_root = NODEPGNO(NODEPTR(mc->mc_pg[mc->mc_top], 0));
+                       if ((rc = mdb_get_page(mc->mc_txn, mc->mc_db->md_root,
+                               &mc->mc_pg[mc->mc_top])))
                                return rc;
-                       mc->mc_txn->mt_dbs[mc->mc_dbi].md_depth--;
-                       mc->mc_txn->mt_dbs[mc->mc_dbi].md_branch_pages--;
+                       mc->mc_db->md_depth--;
+                       mc->mc_db->md_branch_pages--;
                } else
                        DPUTS("root page doesn't need rebalancing");
                return MDB_SUCCESS;
@@ -4191,8 +4148,8 @@ mdb_del0(MDB_cursor *mc, MDB_node *leaf)
                        pg++;
                }
        }
-       mdb_del_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad);
-       mc->mc_txn->mt_dbs[mc->mc_dbi].md_entries--;
+       mdb_del_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], mc->mc_db->md_pad);
+       mc->mc_db->md_entries--;
        rc = mdb_rebalance(mc);
        if (rc != MDB_SUCCESS)
                mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
@@ -4226,9 +4183,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                return EINVAL;
        }
 
-       mc.mc_txn = txn;
-       mc.mc_dbi = dbi;
-       mc.mc_flags = 0;
+       mdb_cursor_init(&mc, txn, dbi);
        if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
                mc.mc_xcursor = &mx;
                mdb_xcursor_init0(&mc);
@@ -4287,17 +4242,17 @@ mdb_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
                mc->mc_ki[1] = mc->mc_ki[0];
                mc->mc_pg[0] = pp;
                mc->mc_ki[0] = 0;
-               mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = pp->mp_pgno;
-               DPRINTF("root split! new root = %lu", pp->mp_pgno);
-               mc->mc_txn->mt_dbs[mc->mc_dbi].md_depth++;
+               mc->mc_db->md_root = pp->mp_pgno;
+               DPRINTF("root split! new root = %zu", pp->mp_pgno);
+               mc->mc_db->md_depth++;
 
                /* Add left (implicit) pointer. */
                if ((rc = mdb_add_node(mc, 0, NULL, NULL, mp->mp_pgno, 0)) != MDB_SUCCESS) {
                        /* undo the pre-push */
                        mc->mc_pg[0] = mc->mc_pg[1];
                        mc->mc_ki[0] = mc->mc_ki[1];
-                       mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = mp->mp_pgno;
-                       mc->mc_txn->mt_dbs[mc->mc_dbi].md_depth--;
+                       mc->mc_db->md_root = mp->mp_pgno;
+                       mc->mc_db->md_depth--;
                        return rc;
                }
                mc->mc_snum = 2;
@@ -4326,7 +4281,7 @@ mdb_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
                /* Move half of the keys to the right sibling */
                copy = NULL;
                x = mc->mc_ki[mc->mc_top] - split_indx;
-               ksize = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
+               ksize = mc->mc_db->md_pad;
                split = LEAF2KEY(mp, split_indx, ksize);
                rsize = (nkeys - split_indx) * ksize;
                lsize = (nkeys - split_indx) * sizeof(indx_t);
@@ -4549,10 +4504,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA)) != flags)
                return EINVAL;
 
-       mc.mc_txn = txn;
-       mc.mc_dbi = dbi;
-       mc.mc_snum = 0;
-       mc.mc_flags = 0;
+       mdb_cursor_init(&mc, txn, dbi);
        if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
                mc.mc_xcursor = &mx;
                mdb_xcursor_init0(&mc);
@@ -4701,9 +4653,7 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
                memset(&dummy, 0, sizeof(dummy));
                dummy.md_root = P_INVALID;
                dummy.md_flags = flags & 0xffff;
-               mc.mc_txn = txn;
-               mc.mc_dbi = MAIN_DBI;
-               mc.mc_flags = 0;
+               mdb_cursor_init(&mc, txn, MAIN_DBI);
                rc = mdb_cursor_put(&mc, &key, &data, F_SUBDATA);
                dirty = 1;
        }
diff --git a/libraries/libmdb/mtest6.c b/libraries/libmdb/mtest6.c
new file mode 100644 (file)
index 0000000..36ad267
--- /dev/null
@@ -0,0 +1,131 @@
+/* mtest6.c - memory-mapped database tester/toy */
+/*
+ * Copyright 2011 Howard Chu, Symas Corp.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * <http://www.OpenLDAP.org/license.html>.
+ */
+
+/* Tests for DB splits and merges */
+#define _XOPEN_SOURCE 500              /* srandom(), random() */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include "mdb.h"
+
+char dkbuf[1024];
+
+int main(int argc,char * argv[])
+{
+       int i = 0, j = 0, rc;
+       MDB_env *env;
+       MDB_dbi dbi;
+       MDB_val key, data;
+       MDB_txn *txn;
+       MDB_stat mst;
+       MDB_cursor *cursor;
+       int count;
+       int *values;
+       long kval;
+       char *sval;
+
+       srandom(time(NULL));
+
+       rc = mdb_env_create(&env);
+       rc = mdb_env_set_mapsize(env, 10485760);
+       rc = mdb_env_set_maxdbs(env, 4);
+       rc = mdb_env_open(env, "./testdb", MDB_FIXEDMAP|MDB_NOSYNC, 0664);
+       rc = mdb_txn_begin(env, 0, &txn);
+       rc = mdb_open(txn, "id2", MDB_CREATE|MDB_INTEGERKEY, &dbi);
+       rc = mdb_cursor_open(txn, dbi, &cursor);
+       rc = mdb_stat(txn, dbi, &mst);
+
+       sval = calloc(1, mst.ms_psize / 4);
+       key.mv_size = sizeof(long);
+       key.mv_data = &kval;
+       data.mv_size = mst.ms_psize / 4 - 30;
+       data.mv_data = sval;
+
+       printf("Adding 12 values, should yield 3 splits\n");
+       for (i=0;i<12;i++) {
+               kval = i*5;
+               sprintf(sval, "%08x", kval);
+               rc = mdb_cursor_put(cursor, &key, &data, MDB_NOOVERWRITE);
+       }
+       printf("Adding 12 more values, should yield 3 splits\n");
+       for (i=0;i<12;i++) {
+               kval = i*5+4;
+               sprintf(sval, "%08x", kval);
+               rc = mdb_cursor_put(cursor, &key, &data, MDB_NOOVERWRITE);
+       }
+       printf("Adding 12 more values, should yield 3 splits\n");
+       for (i=0;i<12;i++) {
+               kval = i*5+1;
+               sprintf(sval, "%08x", kval);
+               rc = mdb_cursor_put(cursor, &key, &data, MDB_NOOVERWRITE);
+       }
+       rc = mdb_cursor_get(cursor, &key, &data, MDB_FIRST);
+
+       do {
+               printf("key: %p %s, data: %p %.*s\n",
+                       key.mv_data,  mdb_dkey(&key, dkbuf),
+                       data.mv_data, (int) data.mv_size, (char *) data.mv_data);
+       } while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0);
+       mdb_cursor_close(cursor);
+       mdb_txn_commit(txn);
+
+#if 0
+       j=0;
+
+       for (i= count - 1; i > -1; i-= (random()%5)) {
+               j++;
+               txn=NULL;
+               rc = mdb_txn_begin(env, 0, &txn);
+               sprintf(kval, "%03x", values[i & ~0x0f]);
+               sprintf(sval, "%03x %d foo bar", values[i], values[i]);
+               key.mv_size = sizeof(int);
+               key.mv_data = kval;
+               data.mv_size = sizeof(sval);
+               data.mv_data = sval;
+               rc = mdb_del(txn, dbi, &key, &data);
+               if (rc) {
+                       j--;
+                       mdb_txn_abort(txn);
+               } else {
+                       rc = mdb_txn_commit(txn);
+               }
+       }
+       free(values);
+       printf("Deleted %d values\n", j);
+
+       rc = mdb_env_stat(env, &mst);
+       rc = mdb_txn_begin(env, 1, &txn);
+       rc = mdb_cursor_open(txn, dbi, &cursor);
+       printf("Cursor next\n");
+       while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) {
+               printf("key: %.*s, data: %.*s\n",
+                       (int) key.mv_size,  (char *) key.mv_data,
+                       (int) data.mv_size, (char *) data.mv_data);
+       }
+       printf("Cursor prev\n");
+       while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_PREV)) == 0) {
+               printf("key: %.*s, data: %.*s\n",
+                       (int) key.mv_size,  (char *) key.mv_data,
+                       (int) data.mv_size, (char *) data.mv_data);
+       }
+       mdb_cursor_close(cursor);
+       mdb_close(txn, dbi);
+
+       mdb_txn_abort(txn);
+#endif
+       mdb_env_close(env);
+
+       return 0;
+}