]> git.sur5r.net Git - openldap/commitdiff
New sorted-dup subpage support
authorHoward Chu <hyc@symas.com>
Fri, 16 Sep 2011 23:51:32 +0000 (16:51 -0700)
committerHoward Chu <hyc@symas.com>
Sat, 17 Sep 2011 10:33:07 +0000 (03:33 -0700)
Instead of converting directly to a subDB when the first duplicate
item is seen for a key, convert to a subpage instead. Allow the
subpage to grow up to the overflow limit, then convert to a subDB.
This saves a significant amount of space in a typical slapd index
database.

Currently we don't convert back to the smaller form if items are
later deleted. Probably could do that with some hysteresis, e.g.,
convert back from subDB to subpage when the size drops below
(overflow limit/2). Maybe later.

libraries/libmdb/mdb.c

index 112936a3655af1f780eaefdf4ecb1ccac31e27c1..8a877c744f6f90ffc7864a77f0cab46c5e0cfc13 100644 (file)
@@ -512,6 +512,7 @@ typedef struct MDB_page {
                pgno_t          p_pgno; /**< page number */
                void *          p_next; /**< for in-memory list of freed structs */
        } mp_p;
+       uint16_t        mp_pad;
 /**    @defgroup mdb_page      Page Flags
  *     @ingroup internal
  *     Flags for the page headers.
@@ -523,8 +524,9 @@ typedef struct MDB_page {
 #define        P_META           0x08           /**< meta page */
 #define        P_DIRTY          0x10           /**< dirty page */
 #define        P_LEAF2          0x20           /**< for #MDB_DUPFIXED records */
+#define        P_SUBP           0x40           /**< for #MDB_DUPSORT sub-pages */
 /** @} */
-       uint32_t        mp_flags;               /**< @ref mdb_page */
+       uint16_t        mp_flags;               /**< @ref mdb_page */
 #define mp_lower       mp_pb.pb.pb_lower
 #define mp_upper       mp_pb.pb.pb_upper
 #define mp_pages       mp_pb.pb_pages
@@ -566,6 +568,8 @@ typedef struct MDB_page {
 #define IS_BRANCH(p)    F_ISSET((p)->mp_flags, P_BRANCH)
        /** Test if a page is an overflow page */
 #define IS_OVERFLOW(p)  F_ISSET((p)->mp_flags, P_OVERFLOW)
+       /** Test if a page is a sub page */
+#define IS_SUBP(p)      F_ISSET((p)->mp_flags, P_SUBP)
 
        /** The number of overflow pages needed to store the given size. */
 #define OVPAGES(size, psize)   ((PAGEHDRSZ-1 + (size)) / (psize) + 1)
@@ -666,9 +670,6 @@ typedef struct MDB_db {
        /** Handle for the default DB. */
 #define        MAIN_DBI        1
 
-       /** Identify a data item as a valid sub-DB record */
-#define        MDB_SUBDATA     0x8200
-
        /** Meta page content. */
 typedef struct MDB_meta {
                /** Stamp identifying this as an MDB data file. It must be set
@@ -873,6 +874,7 @@ static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp);
 static int  mdb_node_add(MDB_cursor *mc, indx_t indx,
                            MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags);
 static void mdb_node_del(MDB_page *mp, indx_t indx, int ksize);
+static void mdb_node_shrink(MDB_page *mp, indx_t indx);
 static int     mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst);
 static int  mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
 static size_t  mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data);
@@ -2561,8 +2563,8 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp)
 
        nkeys = NUMKEYS(mp);
 
-       DPRINTF("searching %u keys in %s page %zu",
-           nkeys, IS_LEAF(mp) ? "leaf" : "branch",
+       DPRINTF("searching %u keys in %s %spage %zu",
+           nkeys, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "",
            mp->mp_pgno);
 
        assert(nkeys > 0);
@@ -2984,6 +2986,7 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
                DPUTS("=====> move to next sibling page");
                if (mdb_cursor_sibling(mc, 1) != MDB_SUCCESS) {
                        mc->mc_flags |= C_EOF;
+                       mc->mc_flags &= ~C_INITIALIZED;
                        return MDB_NOTFOUND;
                }
                mp = mc->mc_pg[mc->mc_top];
@@ -3113,6 +3116,10 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                MDB_val nodekey;
 
                mp = mc->mc_pg[mc->mc_top];
+               if (!NUMKEYS(mp)) {
+                       mc->mc_ki[mc->mc_top] = 0;
+                       return MDB_NOTFOUND;
+               }
                if (mp->mp_flags & P_LEAF2) {
                        nodekey.mv_size = mc->mc_db->md_pad;
                        nodekey.mv_data = LEAF2KEY(mp, 0, nodekey.mv_size);
@@ -3170,6 +3177,11 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                                return MDB_NOTFOUND;
                        }
                }
+               if (!mc->mc_top) {
+                       /* There are no other pages */
+                       mc->mc_ki[mc->mc_top] = 0;
+                       return MDB_NOTFOUND;
+               }
        }
 
        rc = mdb_page_search(mc, key, 0);
@@ -3257,9 +3269,11 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data)
        int              rc;
        MDB_node        *leaf;
 
-       rc = mdb_page_search(mc, NULL, 0);
-       if (rc != MDB_SUCCESS)
-               return rc;
+       if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
+               rc = mdb_page_search(mc, NULL, 0);
+               if (rc != MDB_SUCCESS)
+                       return rc;
+       }
        assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
 
        leaf = NODEPTR(mc->mc_pg[mc->mc_top], 0);
@@ -3302,9 +3316,11 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data)
        lkey.mv_size = MAXKEYSIZE+1;
        lkey.mv_data = NULL;
 
-       rc = mdb_page_search(mc, &lkey, 0);
-       if (rc != MDB_SUCCESS)
-               return rc;
+       if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
+               rc = mdb_page_search(mc, &lkey, 0);
+               if (rc != MDB_SUCCESS)
+                       return rc;
+       }
        assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
 
        leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
@@ -3488,12 +3504,14 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
 {
        MDB_node        *leaf;
        MDB_val xdata, *rdata, dkey;
+       MDB_page        *fp;
        MDB_db dummy;
-       char dbuf[PAGESIZE];
        int do_sub = 0;
        size_t nsize;
-       DKBUF;
        int rc, rc2;
+       char pbuf[PAGESIZE];
+       char dbuf[MAXKEYSIZE+1];
+       DKBUF;
 
        if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
                return EACCES;
@@ -3564,37 +3582,111 @@ top:
                if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
                        /* Was a single item before, must convert now */
                        if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+                               /* Just overwrite the current item */
+                               if (flags == MDB_CURRENT)
+                                       goto current;
+
+                               /* create a fake page for the dup items */
                                dkey.mv_size = NODEDSZ(leaf);
-                               dkey.mv_data = dbuf;
-                               memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
+                               dkey.mv_data = NODEDATA(leaf);
                                /* data matches, ignore it */
                                if (!mc->mc_dbx->md_dcmp(data, &dkey))
                                        return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
-                               memset(&dummy, 0, sizeof(dummy));
+                               memcpy(dbuf, dkey.mv_data, dkey.mv_size);
+                               dkey.mv_data = dbuf;
+                               fp = (MDB_page *)pbuf;
+                               fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
+                               fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP;
+                               fp->mp_lower = PAGEHDRSZ;
+                               fp->mp_upper = PAGEHDRSZ + dkey.mv_size + data->mv_size;
                                if (mc->mc_db->md_flags & MDB_DUPFIXED) {
-                                       dummy.md_pad = data->mv_size;
-                                       dummy.md_flags = MDB_DUPFIXED;
-                                       if (mc->mc_db->md_flags & MDB_INTEGERDUP)
-                                               dummy.md_flags |= MDB_INTEGERKEY;
+                                       fp->mp_flags |= P_LEAF2;
+                                       fp->mp_pad = data->mv_size;
+                               } else {
+                                       fp->mp_upper += 2 * sizeof(indx_t) + 2 * NODESIZE +
+                                               (dkey.mv_size & 1) + (data->mv_size & 1);
                                }
-                               dummy.md_flags |= MDB_SUBDATA;
-                               dummy.md_root = P_INVALID;
-                               if (dkey.mv_size == sizeof(MDB_db)) {
-                                       memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
+                               mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+                               do_sub = 1;
+                               rdata = &xdata;
+                               xdata.mv_size = fp->mp_upper;
+                               xdata.mv_data = pbuf;
+                               flags |= F_DUPDATA;
+                               goto new_sub;
+                       }
+                       if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
+                               /* See if we need to convert from fake page to subDB */
+                               MDB_page *mp;
+                               unsigned int offset;
+                               unsigned int i;
+
+                               fp = NODEDATA(leaf);
+                               if (flags == MDB_CURRENT) {
+                                       fp->mp_flags |= P_DIRTY;
+                                       fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
+                                       mc->mc_xcursor->mx_cursor.mc_pg[0] = fp;
+                                       flags |= F_DUPDATA;
                                        goto put_sub;
                                }
+                               if (mc->mc_db->md_flags & MDB_DUPFIXED) {
+                                       offset = fp->mp_pad;
+                               } else {
+                                       offset = NODESIZE + sizeof(indx_t) + data->mv_size;
+                               }
+                               offset += offset & 1;
+                               if (NODEDSZ(leaf) + offset >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) {
+                                       /* yes, convert it */
+                                       dummy.md_flags = 0;
+                                       if (mc->mc_db->md_flags & MDB_DUPFIXED) {
+                                               dummy.md_pad = fp->mp_pad;
+                                               dummy.md_flags = MDB_DUPFIXED;
+                                               if (mc->mc_db->md_flags & MDB_INTEGERDUP)
+                                                       dummy.md_flags |= MDB_INTEGERKEY;
+                                       }
+                                       dummy.md_depth = 1;
+                                       dummy.md_branch_pages = 0;
+                                       dummy.md_leaf_pages = 1;
+                                       dummy.md_overflow_pages = 0;
+                                       dummy.md_entries = NUMKEYS(fp);
+                                       rdata = &xdata;
+                                       xdata.mv_size = sizeof(MDB_db);
+                                       xdata.mv_data = &dummy;
+                                       mp = mdb_page_alloc(mc, 1);
+                                       if (!mp)
+                                               return ENOMEM;
+                                       offset = mc->mc_txn->mt_env->me_psize - NODEDSZ(leaf);
+                                       flags |= F_DUPDATA|F_SUBDATA;
+                                       dummy.md_root = mp->mp_pgno;
+                               } else {
+                                       /* no, just grow it */
+                                       rdata = &xdata;
+                                       xdata.mv_size = NODEDSZ(leaf) + offset;
+                                       xdata.mv_data = pbuf;
+                                       mp = (MDB_page *)pbuf;
+                                       mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
+                                       flags |= F_DUPDATA;
+                               }
+                               mp->mp_flags = fp->mp_flags | P_DIRTY;
+                               mp->mp_pad   = fp->mp_pad;
+                               mp->mp_lower = fp->mp_lower;
+                               mp->mp_upper = fp->mp_upper + offset;
+                               if (IS_LEAF2(fp)) {
+                                       memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad);
+                               } else {
+                                       nsize = NODEDSZ(leaf) - fp->mp_upper;
+                                       memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, nsize);
+                                       for (i=0; i<NUMKEYS(fp); i++)
+                                               mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset;
+                               }
                                mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
                                do_sub = 1;
-                               rdata = &xdata;
-                               xdata.mv_size = sizeof(MDB_db);
-                               xdata.mv_data = &dummy;
-                               /* new sub-DB, must fully init xcursor */
-                               if (flags == MDB_CURRENT)
-                                       flags = 0;
                                goto new_sub;
                        }
+                       /* data is on sub-DB, just store it */
+                       flags |= F_DUPDATA|F_SUBDATA;
                        goto put_sub;
                }
+current:
                /* same size, just replace it */
                if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
                        NODEDSZ(leaf) == data->mv_size) {
@@ -3621,9 +3713,9 @@ new_sub:
                mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
        else {
                /* Remember if we just added a subdatabase */
-               if (flags & F_SUBDATA) {
+               if (flags & (F_SUBDATA|F_DUPDATA)) {
                        leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
-                       leaf->mn_flags |= F_SUBDATA;
+                       leaf->mn_flags |= (flags & (F_SUBDATA|F_DUPDATA));
                }
 
                /* Now store the actual data in the child DB. Note that we're
@@ -3633,27 +3725,33 @@ new_sub:
                 */
                if (do_sub) {
                        MDB_db *db;
-                       leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
+                       int xflags;
 put_sub:
-                       if (flags != MDB_CURRENT)
-                               mdb_xcursor_init1(mc, leaf);
                        xdata.mv_size = 0;
                        xdata.mv_data = "";
-                       if (flags == MDB_NODUPDATA)
-                               flags = MDB_NOOVERWRITE;
+                       if (flags & MDB_CURRENT) {
+                               xflags = MDB_CURRENT;
+                       } else {
+                               mdb_xcursor_init1(mc, leaf);
+                               xflags = (flags & MDB_NODUPDATA) ? MDB_NOOVERWRITE : 0;
+                       }
                        /* converted, write the original data first */
                        if (dkey.mv_size) {
-                               rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, flags);
+                               rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
                                if (rc)
                                        return rc;
-                               leaf->mn_flags |= F_DUPDATA;
                        }
-                       rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, flags);
-                       db = NODEDATA(leaf);
-                       assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
-                       memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
+                       rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
+                       if (flags & F_SUBDATA) {
+                               db = NODEDATA(leaf);
+                               memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
+                       }
                }
-               mc->mc_db->md_entries++;
+               /* sub-writes might have failed so check rc again.
+                * Don't increment count if we just replaced an existing item.
+                */
+               if (!rc && !(flags & MDB_CURRENT))
+                       mc->mc_db->md_entries++;
        }
 done:
        return rc;
@@ -3679,48 +3777,68 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
 
        if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
                if (flags != MDB_NODUPDATA) {
+                       if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
+                               mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf);
+                       }
                        rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0);
                        /* If sub-DB still has entries, we're done */
                        if (mc->mc_xcursor->mx_db.md_root != P_INVALID) {
-                               MDB_db *db = NODEDATA(leaf);
-                               assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
-                               memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
+                               if (leaf->mn_flags & F_SUBDATA) {
+                                       /* update subDB info */
+                                       MDB_db *db = NODEDATA(leaf);
+                                       memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
+                               } else {
+                                       /* shrink fake page */
+                                       mdb_node_shrink(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
+                               }
                                mc->mc_db->md_entries--;
                                return rc;
                        }
                        /* otherwise fall thru and delete the sub-DB */
                }
 
-               /* add all the child DB's pages to the free list */
-               rc = mdb_page_search(&mc->mc_xcursor->mx_cursor, NULL, 0);
-               if (rc == MDB_SUCCESS) {
-                       MDB_node *ni;
-                       MDB_cursor *mx;
-                       unsigned int i;
+               if (leaf->mn_flags & F_SUBDATA) {
+                       /* add all the child DB's pages to the free list */
+                       rc = mdb_page_search(&mc->mc_xcursor->mx_cursor, NULL, 0);
+                       if (rc == MDB_SUCCESS) {
+                               MDB_node *ni;
+                               MDB_cursor *mx;
+                               unsigned int i;
 
-                       mx = &mc->mc_xcursor->mx_cursor;
-                       mc->mc_db->md_entries -=
-                               mx->mc_db->md_entries;
-
-                       mdb_cursor_pop(mx);
-                       while (mx->mc_snum > 1) {
-                               for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) {
-                                       MDB_page *mp;
-                                       pgno_t pg;
-                                       ni = NODEPTR(mx->mc_pg[mx->mc_top], i);
-                                       pg = NODEPGNO(ni);
-                                       if ((rc = mdb_page_get(mc->mc_txn, pg, &mp)))
-                                               return rc;
-                                       /* free it */
-                                       mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
+                               mx = &mc->mc_xcursor->mx_cursor;
+                               mc->mc_db->md_entries -=
+                                       mx->mc_db->md_entries;
+
+                               mdb_cursor_pop(mx);
+                               while (mx->mc_snum > 0) {
+                                       for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) {
+                                               pgno_t pg;
+                                               ni = NODEPTR(mx->mc_pg[mx->mc_top], i);
+                                               pg = NODEPGNO(ni);
+                                               /* free it */
+                                               mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
+                                       }
+                                       if (!mx->mc_top)
+                                               break;
+                                       rc = mdb_cursor_sibling(mx, 1);
+                                       if (rc) {
+                                               /* no more siblings, go back to beginning
+                                                * of previous level. (stack was already popped
+                                                * by mdb_cursor_sibling)
+                                                */
+                                               for (i=1; i<mx->mc_top; i++) {
+                                                       pgno_t pg;
+                                                       ni = NODEPTR(mx->mc_pg[i-1],0);
+                                                       pg = NODEPGNO(ni);
+                                                       if ((rc = mdb_page_get(mc->mc_txn, pg, &mx->mc_pg[i])))
+                                                               break;
+                                               }
+                                       }
                                }
-                               rc = mdb_cursor_sibling(mx, 1);
-                               if (rc)
-                                       break;
+                               /* free it */
+                               mdb_midl_append(mc->mc_txn->mt_free_pgs,
+                                       mx->mc_db->md_root);
                        }
-                       /* free it */
-                       mdb_midl_append(mc->mc_txn->mt_free_pgs,
-                               mx->mc_db->md_root);
                }
        }
 
@@ -3839,8 +3957,9 @@ mdb_node_add(MDB_cursor *mc, indx_t indx,
 
        assert(mp->mp_upper >= mp->mp_lower);
 
-       DPRINTF("add to %s page %zu index %i, data size %zu key size %zu [%s]",
+       DPRINTF("add to %s %spage %zu index %i, data size %zu key size %zu [%s]",
            IS_LEAF(mp) ? "leaf" : "branch",
+               IS_SUBP(mp) ? "sub-" : "",
            mp->mp_pgno, indx, data ? data->mv_size : 0,
                key ? key->mv_size : 0, key ? DKEY(key) : NULL);
 
@@ -3991,6 +4110,58 @@ mdb_node_del(MDB_page *mp, indx_t indx, int ksize)
        mp->mp_upper += sz;
 }
 
+/** Compact the main page after deleting a node on a subpage.
+ * @param[in] mp The main page to operate on.
+ * @param[in] indx The index of the subpage on the main page.
+ */
+static void
+mdb_node_shrink(MDB_page *mp, indx_t indx)
+{
+       MDB_node *node;
+       MDB_page *sp, *xp;
+       char *base;
+       int osize, nsize;
+       int delta;
+       indx_t           i, numkeys, ptr;
+
+       node = NODEPTR(mp, indx);
+       sp = (MDB_page *)NODEDATA(node);
+       osize = NODEDSZ(node);
+
+       delta = sp->mp_upper - sp->mp_lower;
+       SETDSZ(node, osize - delta);
+       xp = (MDB_page *)((char *)sp + delta);
+
+       /* shift subpage upward */
+       if (IS_LEAF2(sp)) {
+               nsize = NUMKEYS(sp) * sp->mp_pad;
+               memmove(METADATA(xp), METADATA(sp), nsize);
+       } else {
+               int i;
+               nsize = osize - sp->mp_upper;
+               numkeys = NUMKEYS(sp);
+               for (i=numkeys-1; i>=0; i--)
+                       xp->mp_ptrs[i] = sp->mp_ptrs[i] - delta;
+       }
+       xp->mp_upper = sp->mp_lower;
+       xp->mp_lower = sp->mp_lower;
+       xp->mp_flags = sp->mp_flags;
+       xp->mp_pad = sp->mp_pad;
+       xp->mp_pgno = mp->mp_pgno;
+
+       /* shift lower nodes upward */
+       ptr = mp->mp_ptrs[indx];
+       numkeys = NUMKEYS(mp);
+       for (i = 0; i < numkeys; i++) {
+               if (mp->mp_ptrs[i] <= ptr)
+                       mp->mp_ptrs[i] += delta;
+       }
+
+       base = (char *)mp + mp->mp_upper;
+       memmove(base + delta, base, ptr - mp->mp_upper + NODESIZE + NODEKSZ(node));
+       mp->mp_upper += delta;
+}
+
 /** Initial setup of a sorted-dups cursor.
  * Sorted duplicates are implemented as a sub-database for the given key.
  * The duplicate data items are actually keys of the sub-database.
@@ -4026,18 +4197,41 @@ mdb_xcursor_init0(MDB_cursor *mc)
 static void
 mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
 {
-       MDB_db *db = NODEDATA(node);
        MDB_xcursor *mx = mc->mc_xcursor;
-       assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
-       mx->mx_db = *db;
+
+       if (node->mn_flags & F_SUBDATA) {
+               MDB_db *db = NODEDATA(node);
+               mx->mx_db = *db;
+               mx->mx_cursor.mc_snum = 0;
+               mx->mx_cursor.mc_flags = 0;
+       } else {
+               MDB_page *fp = NODEDATA(node);
+               mx->mx_db.md_pad = mc->mc_pg[mc->mc_top]->mp_pad;
+               mx->mx_db.md_flags = 0;
+               mx->mx_db.md_depth = 1;
+               mx->mx_db.md_branch_pages = 0;
+               mx->mx_db.md_leaf_pages = 1;
+               mx->mx_db.md_overflow_pages = 0;
+               mx->mx_db.md_entries = NUMKEYS(fp);
+               mx->mx_db.md_root = fp->mp_pgno;
+               mx->mx_cursor.mc_snum = 1;
+               mx->mx_cursor.mc_flags = C_INITIALIZED;
+               mx->mx_cursor.mc_top = 0;
+               mx->mx_cursor.mc_pg[0] = fp;
+               mx->mx_cursor.mc_ki[0] = 0;
+               if (mc->mc_db->md_flags & MDB_DUPFIXED) {
+                       mx->mx_db.md_flags = MDB_DUPFIXED;
+                       mx->mx_db.md_pad = fp->mp_pad;
+                       if (mc->mc_db->md_flags & MDB_INTEGERDUP)
+                               mx->mx_db.md_flags |= MDB_INTEGERKEY;
+               }
+       }
        DPRINTF("Sub-db %u for db %u root page %zu", mx->mx_cursor.mc_dbi, mc->mc_dbi,
-               db->md_root);
+               mx->mx_db.md_root);
        if (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY))
                mx->mx_dbx.md_dirty = 1;
        mx->mx_dbx.md_name.mv_data = NODEKEY(node);
        mx->mx_dbx.md_name.mv_size = node->mn_ksize;
-       mx->mx_cursor.mc_snum = 0;
-       mx->mx_cursor.mc_flags = 0;
        if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t))
                mx->mx_dbx.md_cmp = mdb_cmp_long;
 }
@@ -4468,6 +4662,7 @@ mdb_rebalance(MDB_cursor *mc)
        if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2)
                return mdb_node_move(&mn, mc);
        else { /* FIXME: if (has_enough_room()) */
+               mc->mc_flags &= ~C_INITIALIZED;
                if (mc->mc_ki[ptop] == 0)
                        return mdb_page_merge(&mn, mc);
                else