pgno_t p_pgno; /**< page number */
void * p_next; /**< for in-memory list of freed structs */
} mp_p;
+ uint16_t mp_pad;
/** @defgroup mdb_page Page Flags
* @ingroup internal
* Flags for the page headers.
#define P_META 0x08 /**< meta page */
#define P_DIRTY 0x10 /**< dirty page */
#define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */
+#define P_SUBP 0x40 /**< for #MDB_DUPSORT sub-pages */
/** @} */
- uint32_t mp_flags; /**< @ref mdb_page */
+ uint16_t mp_flags; /**< @ref mdb_page */
#define mp_lower mp_pb.pb.pb_lower
#define mp_upper mp_pb.pb.pb_upper
#define mp_pages mp_pb.pb_pages
#define IS_BRANCH(p) F_ISSET((p)->mp_flags, P_BRANCH)
/** Test if a page is an overflow page */
#define IS_OVERFLOW(p) F_ISSET((p)->mp_flags, P_OVERFLOW)
+ /** Test if a page is a sub page */
+#define IS_SUBP(p) F_ISSET((p)->mp_flags, P_SUBP)
/** The number of overflow pages needed to store the given size. */
#define OVPAGES(size, psize) ((PAGEHDRSZ-1 + (size)) / (psize) + 1)
/** Handle for the default DB. */
#define MAIN_DBI 1
- /** Identify a data item as a valid sub-DB record */
-#define MDB_SUBDATA 0x8200
-
/** Meta page content. */
typedef struct MDB_meta {
/** Stamp identifying this as an MDB data file. It must be set
static int mdb_node_add(MDB_cursor *mc, indx_t indx,
MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags);
static void mdb_node_del(MDB_page *mp, indx_t indx, int ksize);
+static void mdb_node_shrink(MDB_page *mp, indx_t indx);
static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst);
static int mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data);
nkeys = NUMKEYS(mp);
- DPRINTF("searching %u keys in %s page %zu",
- nkeys, IS_LEAF(mp) ? "leaf" : "branch",
+ DPRINTF("searching %u keys in %s %spage %zu",
+ nkeys, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "",
mp->mp_pgno);
assert(nkeys > 0);
DPUTS("=====> move to next sibling page");
if (mdb_cursor_sibling(mc, 1) != MDB_SUCCESS) {
mc->mc_flags |= C_EOF;
+ mc->mc_flags &= ~C_INITIALIZED;
return MDB_NOTFOUND;
}
mp = mc->mc_pg[mc->mc_top];
MDB_val nodekey;
mp = mc->mc_pg[mc->mc_top];
+ if (!NUMKEYS(mp)) {
+ mc->mc_ki[mc->mc_top] = 0;
+ return MDB_NOTFOUND;
+ }
if (mp->mp_flags & P_LEAF2) {
nodekey.mv_size = mc->mc_db->md_pad;
nodekey.mv_data = LEAF2KEY(mp, 0, nodekey.mv_size);
return MDB_NOTFOUND;
}
}
+ if (!mc->mc_top) {
+ /* There are no other pages */
+ mc->mc_ki[mc->mc_top] = 0;
+ return MDB_NOTFOUND;
+ }
}
rc = mdb_page_search(mc, key, 0);
int rc;
MDB_node *leaf;
- rc = mdb_page_search(mc, NULL, 0);
- if (rc != MDB_SUCCESS)
- return rc;
+ if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
+ rc = mdb_page_search(mc, NULL, 0);
+ if (rc != MDB_SUCCESS)
+ return rc;
+ }
assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
leaf = NODEPTR(mc->mc_pg[mc->mc_top], 0);
lkey.mv_size = MAXKEYSIZE+1;
lkey.mv_data = NULL;
- rc = mdb_page_search(mc, &lkey, 0);
- if (rc != MDB_SUCCESS)
- return rc;
+ if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
+ rc = mdb_page_search(mc, &lkey, 0);
+ if (rc != MDB_SUCCESS)
+ return rc;
+ }
assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
{
MDB_node *leaf;
MDB_val xdata, *rdata, dkey;
+ MDB_page *fp;
MDB_db dummy;
- char dbuf[PAGESIZE];
int do_sub = 0;
size_t nsize;
- DKBUF;
int rc, rc2;
+ char pbuf[PAGESIZE];
+ char dbuf[MAXKEYSIZE+1];
+ DKBUF;
if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
return EACCES;
if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
/* Was a single item before, must convert now */
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ /* Just overwrite the current item */
+ if (flags == MDB_CURRENT)
+ goto current;
+
+ /* create a fake page for the dup items */
dkey.mv_size = NODEDSZ(leaf);
- dkey.mv_data = dbuf;
- memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
+ dkey.mv_data = NODEDATA(leaf);
/* data matches, ignore it */
if (!mc->mc_dbx->md_dcmp(data, &dkey))
return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
- memset(&dummy, 0, sizeof(dummy));
+ memcpy(dbuf, dkey.mv_data, dkey.mv_size);
+ dkey.mv_data = dbuf;
+ fp = (MDB_page *)pbuf;
+ fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
+ fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP;
+ fp->mp_lower = PAGEHDRSZ;
+ fp->mp_upper = PAGEHDRSZ + dkey.mv_size + data->mv_size;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
- dummy.md_pad = data->mv_size;
- dummy.md_flags = MDB_DUPFIXED;
- if (mc->mc_db->md_flags & MDB_INTEGERDUP)
- dummy.md_flags |= MDB_INTEGERKEY;
+ fp->mp_flags |= P_LEAF2;
+ fp->mp_pad = data->mv_size;
+ } else {
+ fp->mp_upper += 2 * sizeof(indx_t) + 2 * NODESIZE +
+ (dkey.mv_size & 1) + (data->mv_size & 1);
}
- dummy.md_flags |= MDB_SUBDATA;
- dummy.md_root = P_INVALID;
- if (dkey.mv_size == sizeof(MDB_db)) {
- memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
+ mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+ do_sub = 1;
+ rdata = &xdata;
+ xdata.mv_size = fp->mp_upper;
+ xdata.mv_data = pbuf;
+ flags |= F_DUPDATA;
+ goto new_sub;
+ }
+ if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
+ /* See if we need to convert from fake page to subDB */
+ MDB_page *mp;
+ unsigned int offset;
+ unsigned int i;
+
+ fp = NODEDATA(leaf);
+ if (flags == MDB_CURRENT) {
+ fp->mp_flags |= P_DIRTY;
+ fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
+ mc->mc_xcursor->mx_cursor.mc_pg[0] = fp;
+ flags |= F_DUPDATA;
goto put_sub;
}
+ if (mc->mc_db->md_flags & MDB_DUPFIXED) {
+ offset = fp->mp_pad;
+ } else {
+ offset = NODESIZE + sizeof(indx_t) + data->mv_size;
+ }
+ offset += offset & 1;
+ if (NODEDSZ(leaf) + offset >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) {
+ /* yes, convert it */
+ dummy.md_flags = 0;
+ if (mc->mc_db->md_flags & MDB_DUPFIXED) {
+ dummy.md_pad = fp->mp_pad;
+ dummy.md_flags = MDB_DUPFIXED;
+ if (mc->mc_db->md_flags & MDB_INTEGERDUP)
+ dummy.md_flags |= MDB_INTEGERKEY;
+ }
+ dummy.md_depth = 1;
+ dummy.md_branch_pages = 0;
+ dummy.md_leaf_pages = 1;
+ dummy.md_overflow_pages = 0;
+ dummy.md_entries = NUMKEYS(fp);
+ rdata = &xdata;
+ xdata.mv_size = sizeof(MDB_db);
+ xdata.mv_data = &dummy;
+ mp = mdb_page_alloc(mc, 1);
+ if (!mp)
+ return ENOMEM;
+ offset = mc->mc_txn->mt_env->me_psize - NODEDSZ(leaf);
+ flags |= F_DUPDATA|F_SUBDATA;
+ dummy.md_root = mp->mp_pgno;
+ } else {
+ /* no, just grow it */
+ rdata = &xdata;
+ xdata.mv_size = NODEDSZ(leaf) + offset;
+ xdata.mv_data = pbuf;
+ mp = (MDB_page *)pbuf;
+ mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
+ flags |= F_DUPDATA;
+ }
+ mp->mp_flags = fp->mp_flags | P_DIRTY;
+ mp->mp_pad = fp->mp_pad;
+ mp->mp_lower = fp->mp_lower;
+ mp->mp_upper = fp->mp_upper + offset;
+ if (IS_LEAF2(fp)) {
+ memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad);
+ } else {
+ nsize = NODEDSZ(leaf) - fp->mp_upper;
+ memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, nsize);
+ for (i=0; i<NUMKEYS(fp); i++)
+ mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset;
+ }
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
do_sub = 1;
- rdata = &xdata;
- xdata.mv_size = sizeof(MDB_db);
- xdata.mv_data = &dummy;
- /* new sub-DB, must fully init xcursor */
- if (flags == MDB_CURRENT)
- flags = 0;
goto new_sub;
}
+ /* data is on sub-DB, just store it */
+ flags |= F_DUPDATA|F_SUBDATA;
goto put_sub;
}
+current:
/* same size, just replace it */
if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
NODEDSZ(leaf) == data->mv_size) {
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
else {
/* Remember if we just added a subdatabase */
- if (flags & F_SUBDATA) {
+ if (flags & (F_SUBDATA|F_DUPDATA)) {
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
- leaf->mn_flags |= F_SUBDATA;
+ leaf->mn_flags |= (flags & (F_SUBDATA|F_DUPDATA));
}
/* Now store the actual data in the child DB. Note that we're
*/
if (do_sub) {
MDB_db *db;
- leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
+ int xflags;
put_sub:
- if (flags != MDB_CURRENT)
- mdb_xcursor_init1(mc, leaf);
xdata.mv_size = 0;
xdata.mv_data = "";
- if (flags == MDB_NODUPDATA)
- flags = MDB_NOOVERWRITE;
+ if (flags & MDB_CURRENT) {
+ xflags = MDB_CURRENT;
+ } else {
+ mdb_xcursor_init1(mc, leaf);
+ xflags = (flags & MDB_NODUPDATA) ? MDB_NOOVERWRITE : 0;
+ }
/* converted, write the original data first */
if (dkey.mv_size) {
- rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, flags);
+ rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
if (rc)
return rc;
- leaf->mn_flags |= F_DUPDATA;
}
- rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, flags);
- db = NODEDATA(leaf);
- assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
- memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
+ rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
+ if (flags & F_SUBDATA) {
+ db = NODEDATA(leaf);
+ memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
+ }
}
- mc->mc_db->md_entries++;
+ /* sub-writes might have failed so check rc again.
+ * Don't increment count if we just replaced an existing item.
+ */
+ if (!rc && !(flags & MDB_CURRENT))
+ mc->mc_db->md_entries++;
}
done:
return rc;
if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
if (flags != MDB_NODUPDATA) {
+ if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
+ mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf);
+ }
rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0);
/* If sub-DB still has entries, we're done */
if (mc->mc_xcursor->mx_db.md_root != P_INVALID) {
- MDB_db *db = NODEDATA(leaf);
- assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
- memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
+ if (leaf->mn_flags & F_SUBDATA) {
+ /* update subDB info */
+ MDB_db *db = NODEDATA(leaf);
+ memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
+ } else {
+ /* shrink fake page */
+ mdb_node_shrink(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
+ }
mc->mc_db->md_entries--;
return rc;
}
/* otherwise fall thru and delete the sub-DB */
}
- /* add all the child DB's pages to the free list */
- rc = mdb_page_search(&mc->mc_xcursor->mx_cursor, NULL, 0);
- if (rc == MDB_SUCCESS) {
- MDB_node *ni;
- MDB_cursor *mx;
- unsigned int i;
+ if (leaf->mn_flags & F_SUBDATA) {
+ /* add all the child DB's pages to the free list */
+ rc = mdb_page_search(&mc->mc_xcursor->mx_cursor, NULL, 0);
+ if (rc == MDB_SUCCESS) {
+ MDB_node *ni;
+ MDB_cursor *mx;
+ unsigned int i;
- mx = &mc->mc_xcursor->mx_cursor;
- mc->mc_db->md_entries -=
- mx->mc_db->md_entries;
-
- mdb_cursor_pop(mx);
- while (mx->mc_snum > 1) {
- for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) {
- MDB_page *mp;
- pgno_t pg;
- ni = NODEPTR(mx->mc_pg[mx->mc_top], i);
- pg = NODEPGNO(ni);
- if ((rc = mdb_page_get(mc->mc_txn, pg, &mp)))
- return rc;
- /* free it */
- mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
+ mx = &mc->mc_xcursor->mx_cursor;
+ mc->mc_db->md_entries -=
+ mx->mc_db->md_entries;
+
+ mdb_cursor_pop(mx);
+ while (mx->mc_snum > 0) {
+ for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) {
+ pgno_t pg;
+ ni = NODEPTR(mx->mc_pg[mx->mc_top], i);
+ pg = NODEPGNO(ni);
+ /* free it */
+ mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
+ }
+ if (!mx->mc_top)
+ break;
+ rc = mdb_cursor_sibling(mx, 1);
+ if (rc) {
+ /* no more siblings, go back to beginning
+ * of previous level. (stack was already popped
+ * by mdb_cursor_sibling)
+ */
+ for (i=1; i<mx->mc_top; i++) {
+ pgno_t pg;
+ ni = NODEPTR(mx->mc_pg[i-1],0);
+ pg = NODEPGNO(ni);
+ if ((rc = mdb_page_get(mc->mc_txn, pg, &mx->mc_pg[i])))
+ break;
+ }
+ }
}
- rc = mdb_cursor_sibling(mx, 1);
- if (rc)
- break;
+ /* free it */
+ mdb_midl_append(mc->mc_txn->mt_free_pgs,
+ mx->mc_db->md_root);
}
- /* free it */
- mdb_midl_append(mc->mc_txn->mt_free_pgs,
- mx->mc_db->md_root);
}
}
assert(mp->mp_upper >= mp->mp_lower);
- DPRINTF("add to %s page %zu index %i, data size %zu key size %zu [%s]",
+ DPRINTF("add to %s %spage %zu index %i, data size %zu key size %zu [%s]",
IS_LEAF(mp) ? "leaf" : "branch",
+ IS_SUBP(mp) ? "sub-" : "",
mp->mp_pgno, indx, data ? data->mv_size : 0,
key ? key->mv_size : 0, key ? DKEY(key) : NULL);
mp->mp_upper += sz;
}
+/** Compact the main page after deleting a node on a subpage.
+ * @param[in] mp The main page to operate on.
+ * @param[in] indx The index of the subpage on the main page.
+ */
+static void
+mdb_node_shrink(MDB_page *mp, indx_t indx)
+{
+ MDB_node *node;
+ MDB_page *sp, *xp;
+ char *base;
+ int osize, nsize;
+ int delta;
+ indx_t i, numkeys, ptr;
+
+ node = NODEPTR(mp, indx);
+ sp = (MDB_page *)NODEDATA(node);
+ osize = NODEDSZ(node);
+
+ delta = sp->mp_upper - sp->mp_lower;
+ SETDSZ(node, osize - delta);
+ xp = (MDB_page *)((char *)sp + delta);
+
+ /* shift subpage upward */
+ if (IS_LEAF2(sp)) {
+ nsize = NUMKEYS(sp) * sp->mp_pad;
+ memmove(METADATA(xp), METADATA(sp), nsize);
+ } else {
+ int i;
+ nsize = osize - sp->mp_upper;
+ numkeys = NUMKEYS(sp);
+ for (i=numkeys-1; i>=0; i--)
+ xp->mp_ptrs[i] = sp->mp_ptrs[i] - delta;
+ }
+ xp->mp_upper = sp->mp_lower;
+ xp->mp_lower = sp->mp_lower;
+ xp->mp_flags = sp->mp_flags;
+ xp->mp_pad = sp->mp_pad;
+ xp->mp_pgno = mp->mp_pgno;
+
+ /* shift lower nodes upward */
+ ptr = mp->mp_ptrs[indx];
+ numkeys = NUMKEYS(mp);
+ for (i = 0; i < numkeys; i++) {
+ if (mp->mp_ptrs[i] <= ptr)
+ mp->mp_ptrs[i] += delta;
+ }
+
+ base = (char *)mp + mp->mp_upper;
+ memmove(base + delta, base, ptr - mp->mp_upper + NODESIZE + NODEKSZ(node));
+ mp->mp_upper += delta;
+}
+
/** Initial setup of a sorted-dups cursor.
* Sorted duplicates are implemented as a sub-database for the given key.
* The duplicate data items are actually keys of the sub-database.
static void
mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
{
- MDB_db *db = NODEDATA(node);
MDB_xcursor *mx = mc->mc_xcursor;
- assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
- mx->mx_db = *db;
+
+ if (node->mn_flags & F_SUBDATA) {
+ MDB_db *db = NODEDATA(node);
+ mx->mx_db = *db;
+ mx->mx_cursor.mc_snum = 0;
+ mx->mx_cursor.mc_flags = 0;
+ } else {
+ MDB_page *fp = NODEDATA(node);
+ mx->mx_db.md_pad = mc->mc_pg[mc->mc_top]->mp_pad;
+ mx->mx_db.md_flags = 0;
+ mx->mx_db.md_depth = 1;
+ mx->mx_db.md_branch_pages = 0;
+ mx->mx_db.md_leaf_pages = 1;
+ mx->mx_db.md_overflow_pages = 0;
+ mx->mx_db.md_entries = NUMKEYS(fp);
+ mx->mx_db.md_root = fp->mp_pgno;
+ mx->mx_cursor.mc_snum = 1;
+ mx->mx_cursor.mc_flags = C_INITIALIZED;
+ mx->mx_cursor.mc_top = 0;
+ mx->mx_cursor.mc_pg[0] = fp;
+ mx->mx_cursor.mc_ki[0] = 0;
+ if (mc->mc_db->md_flags & MDB_DUPFIXED) {
+ mx->mx_db.md_flags = MDB_DUPFIXED;
+ mx->mx_db.md_pad = fp->mp_pad;
+ if (mc->mc_db->md_flags & MDB_INTEGERDUP)
+ mx->mx_db.md_flags |= MDB_INTEGERKEY;
+ }
+ }
DPRINTF("Sub-db %u for db %u root page %zu", mx->mx_cursor.mc_dbi, mc->mc_dbi,
- db->md_root);
+ mx->mx_db.md_root);
if (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY))
mx->mx_dbx.md_dirty = 1;
mx->mx_dbx.md_name.mv_data = NODEKEY(node);
mx->mx_dbx.md_name.mv_size = node->mn_ksize;
- mx->mx_cursor.mc_snum = 0;
- mx->mx_cursor.mc_flags = 0;
if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t))
mx->mx_dbx.md_cmp = mdb_cmp_long;
}
if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2)
return mdb_node_move(&mn, mc);
else { /* FIXME: if (has_enough_room()) */
+ mc->mc_flags &= ~C_INITIALIZED;
if (mc->mc_ki[ptop] == 0)
return mdb_page_merge(&mn, mc);
else