*/
#include <sys/types.h>
#include <sys/stat.h>
-#include <sys/queue.h>
#include <sys/param.h>
#include <sys/uio.h>
#include <sys/mman.h>
#define DEBUG 1
#endif
-#if DEBUG && defined(__GNUC__)
-# define DPRINTF(fmt, ...) \
- fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, ##__VA_ARGS__)
+#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
+# define DPRINTF (void) /* Vararg macros may be unsupported */
+#elif DEBUG
+# define DPRINTF(fmt, ...) /* Requires 2 or more args */ \
+ fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, __VA_ARGS__)
#else
-# define DPRINTF(...) ((void) 0)
+# define DPRINTF(fmt, ...) ((void) 0)
#endif
+#define DPUTS(arg) DPRINTF("%s", arg)
#define PAGESIZE 4096
#define MDB_MINKEYS 4
#define IS_BRANCH(p) F_ISSET((p)->mp_flags, P_BRANCH)
#define IS_OVERFLOW(p) F_ISSET((p)->mp_flags, P_OVERFLOW)
-#define OVPAGES(size, psize) (PAGEHDRSZ + size + psize - 1) / psize;
+#define OVPAGES(size, psize) ((PAGEHDRSZ-1 + (size)) / (psize) + 1)
typedef struct MDB_db {
uint32_t md_pad;
static int mdb_touch(MDB_txn *txn, MDB_pageparent *mp);
typedef struct MDB_ppage { /* ordered list of pages */
- SLIST_ENTRY(MDB_ppage) mp_entry;
MDB_page *mp_page;
unsigned int mp_ki; /* cursor index on page */
} MDB_ppage;
-SLIST_HEAD(page_stack, MDB_ppage);
-/* FIXME: tree depth is mostly bounded, we should just
- * use a fixed array and avoid malloc/pointer chasing
- */
-#define CURSOR_EMPTY(c) SLIST_EMPTY(&(c)->mc_stack)
-#define CURSOR_TOP(c) SLIST_FIRST(&(c)->mc_stack)
-#define CURSOR_POP(c) SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry)
-#define CURSOR_PUSH(c,p) SLIST_INSERT_HEAD(&(c)->mc_stack, p, mp_entry)
+#define CURSOR_TOP(c) (&(c)->mc_stack[(c)->mc_snum-1])
+#define CURSOR_PARENT(c) (&(c)->mc_stack[(c)->mc_snum-2])
struct MDB_xcursor;
struct MDB_cursor {
MDB_txn *mc_txn;
- struct page_stack mc_stack; /* stack of parent pages */
+ MDB_ppage mc_stack[32]; /* stack of parent pages */
+ unsigned int mc_snum; /* number of pushed pages */
MDB_dbi mc_dbi;
short mc_initialized; /* 1 if initialized */
short mc_eof; /* 1 if end is reached */
struct MDB_xcursor *mc_xcursor;
};
-#define METADATA(p) ((void *)((char *)p + PAGEHDRSZ))
+#define METADATA(p) ((void *)((char *)(p) + PAGEHDRSZ))
typedef struct MDB_node {
#define mn_pgno mn_p.np_pgno
unsigned int mn_ksize:12; /* key size */
#define F_BIGDATA 0x01 /* data put on overflow page */
#define F_SUBDATA 0x02 /* data is a sub-database */
+#define F_DUPDATA 0x04 /* data has duplicates */
char mn_data[1];
} MDB_node;
int
mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
{
- return txn->mt_dbxs[dbi].md_cmp(a, b);
-}
+ if (txn->mt_dbxs[dbi].md_cmp)
+ return txn->mt_dbxs[dbi].md_cmp(a, b);
-static int
-_mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *key1, const MDB_val *key2)
-{
if (txn->mt_dbs[dbi].md_flags & (MDB_REVERSEKEY
#if __BYTE_ORDER == __LITTLE_ENDIAN
|MDB_INTEGERKEY
#endif
))
- return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
+ return memnrcmp(a->mv_data, a->mv_size, b->mv_data, b->mv_size);
else
- return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
+ return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size);
+}
+
+int
+mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
+{
+ if (txn->mt_dbxs[dbi].md_dcmp)
+ return txn->mt_dbxs[dbi].md_dcmp(a, b);
+
+ return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size);
}
/* Allocate new page(s) for writing */
int rc, toggle;
if (env->me_flags & MDB_FATAL_ERROR) {
- DPRINTF("mdb_txn_begin: environment had fatal error, must shutdown!");
+ DPUTS("mdb_txn_begin: environment had fatal error, must shutdown!");
return MDB_PANIC;
}
if ((txn = calloc(1, sizeof(MDB_txn))) == NULL) {
}
if (txn != env->me_txn) {
- DPRINTF("attempt to commit unknown transaction");
+ DPUTS("attempt to commit unknown transaction");
mdb_txn_abort(txn);
return EINVAL;
}
if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
- DPRINTF("error flag is set, can't commit");
+ DPUTS("error flag is set, can't commit");
mdb_txn_abort(txn);
return EINVAL;
}
/* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
*/
next = 0;
+ i = 1;
do {
n = 0;
done = 1;
size = 0;
- for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
+ for (; i<=txn->mt_u.dirty_list[0].mid; i++) {
dp = txn->mt_u.dirty_list[i].mptr;
if (dp->p.mp_pgno != next) {
if (n) {
if (rc != size) {
n = errno;
if (rc > 0)
- DPRINTF("short write, filesystem full?");
+ DPUTS("short write, filesystem full?");
else
DPRINTF("writev: %s", strerror(errno));
mdb_txn_abort(txn);
if (rc != size) {
n = errno;
if (rc > 0)
- DPRINTF("short write, filesystem full?");
+ DPUTS("short write, filesystem full?");
else
DPRINTF("writev: %s", strerror(errno));
mdb_txn_abort(txn);
m = METADATA(p);
if (m->mm_magic != MDB_MAGIC) {
- DPRINTF("meta has invalid magic");
+ DPUTS("meta has invalid magic");
return EINVAL;
}
int rc;
unsigned int psize;
- DPRINTF("writing new meta page");
+ DPUTS("writing new meta page");
psize = sysconf(_SC_PAGE_SIZE);
meta->mm_magic = MDB_MAGIC;
/* Write to the SYNC fd */
rc = pwrite(env->me_mfd, ptr, len, off);
if (rc != len) {
- DPRINTF("write failed, disk error?");
+ DPUTS("write failed, disk error?");
/* On a failure, the pagecache still contains the new data.
* Write some old data back, to prevent it from being used.
* Use the non-SYNC fd; we know it will fail anyway.
if ((i = mdb_env_read_header(env, &meta)) != 0) {
if (i != ENOENT)
return i;
- DPRINTF("new mdbenv");
+ DPUTS("new mdbenv");
newenv = 1;
}
} else {
if (env->me_txns->mti_magic != MDB_MAGIC) {
- DPRINTF("lock region has invalid magic");
+ DPUTS("lock region has invalid magic");
rc = EINVAL;
goto fail;
}
nodekey.mv_size = node->mn_ksize;
nodekey.mv_data = NODEKEY(node);
- if (txn->mt_dbxs[dbi].md_cmp)
- rc = txn->mt_dbxs[dbi].md_cmp(key, &nodekey);
- else
- rc = _mdb_cmp(txn, dbi, key, &nodekey);
+ rc = mdb_cmp(txn, dbi, key, &nodekey);
if (IS_LEAF(mp))
DPRINTF("found leaf index %u [%.*s], rc = %i",
{
MDB_ppage *top;
- top = CURSOR_TOP(cursor);
- CURSOR_POP(cursor);
-
- DPRINTF("popped page %lu off cursor %p", top->mp_page->mp_pgno, (void *) cursor);
+ if (cursor->mc_snum) {
+ top = CURSOR_TOP(cursor);
+ cursor->mc_snum--;
- free(top);
+ DPRINTF("popped page %lu off cursor %p", top->mp_page->mp_pgno, (void *) cursor);
+ }
}
static MDB_ppage *
DPRINTF("pushing page %lu on cursor %p", mp->mp_pgno, (void *) cursor);
- if ((ppage = calloc(1, sizeof(MDB_ppage))) == NULL)
- return NULL;
+ ppage = &cursor->mc_stack[cursor->mc_snum++];
ppage->mp_page = mp;
- CURSOR_PUSH(cursor, ppage);
+ ppage->mp_ki = 0;
return ppage;
}
* committed root page.
*/
if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
- DPRINTF("transaction has failed, must abort");
+ DPUTS("transaction has failed, must abort");
return EINVAL;
} else
root = txn->mt_dbs[dbi].md_root;
if (root == P_INVALID) { /* Tree is empty. */
- DPRINTF("tree is empty");
+ DPUTS("tree is empty");
return MDB_NOTFOUND;
}
DPRINTF("read overflow page %lu failed", pgno);
return MDB_PAGE_NOTFOUND;
}
- data->mv_data = omp;
+ data->mv_data = METADATA(omp);
return MDB_SUCCESS;
}
leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL);
if (leaf && exact) {
/* Return first duplicate data item */
- if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
MDB_xcursor mx;
mdb_xcursor_init0(txn, dbi, &mx);
{
int rc;
MDB_node *indx;
- MDB_ppage *parent, *top;
+ MDB_ppage *parent;
MDB_page *mp;
- top = CURSOR_TOP(cursor);
- if ((parent = SLIST_NEXT(top, mp_entry)) == NULL) {
+ if (cursor->mc_snum < 2) {
return MDB_NOTFOUND; /* root has no siblings */
}
+ parent = CURSOR_PARENT(cursor);
DPRINTF("parent page is page %lu, index %u",
parent->mp_page->mp_pgno, parent->mp_ki);
assert(cursor->mc_initialized);
+ top = CURSOR_TOP(cursor);
+ mp = top->mp_page;
+
if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- if (op == MDB_NEXT || op == MDB_NEXT_DUP) {
+ leaf = NODEPTR(mp, top->mp_ki);
+ if ((op == MDB_NEXT || op == MDB_NEXT_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT);
if (op != MDB_NEXT || rc == MDB_SUCCESS)
return rc;
}
}
- top = CURSOR_TOP(cursor);
- mp = top->mp_page;
-
DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
if (top->mp_ki + 1 >= NUMKEYS(mp)) {
- DPRINTF("=====> move to next sibling page");
+ DPUTS("=====> move to next sibling page");
if (mdb_sibling(cursor, 1) != MDB_SUCCESS) {
cursor->mc_eof = 1;
return MDB_NOTFOUND;
assert(IS_LEAF(mp));
leaf = NODEPTR(mp, top->mp_ki);
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ }
if (data) {
if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
return rc;
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
if (rc != MDB_SUCCESS)
return rc;
assert(cursor->mc_initialized);
+ top = CURSOR_TOP(cursor);
+ mp = top->mp_page;
+
if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- if (op == MDB_PREV || op == MDB_PREV_DUP) {
+ leaf = NODEPTR(mp, top->mp_ki);
+ if ((op == MDB_PREV || op == MDB_PREV_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV);
if (op != MDB_PREV || rc == MDB_SUCCESS)
return rc;
}
}
- top = CURSOR_TOP(cursor);
- mp = top->mp_page;
-
DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
if (top->mp_ki == 0) {
- DPRINTF("=====> move to prev sibling page");
+ DPUTS("=====> move to prev sibling page");
if (mdb_sibling(cursor, 0) != MDB_SUCCESS) {
return MDB_NOTFOUND;
}
assert(IS_LEAF(mp));
leaf = NODEPTR(mp, top->mp_ki);
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ }
if (data) {
if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
return rc;
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
if (rc != MDB_SUCCESS)
return rc;
assert(key);
assert(key->mv_size > 0);
- while (CURSOR_TOP(cursor) != NULL)
- cursor_pop_page(cursor);
+ cursor->mc_snum = 0;
rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp);
if (rc != MDB_SUCCESS)
}
if (leaf == NULL) {
- DPRINTF("===> inexact leaf not found, goto sibling");
+ DPUTS("===> inexact leaf not found, goto sibling");
if ((rc = mdb_sibling(cursor, 1)) != MDB_SUCCESS)
return rc; /* no entries matched */
top = CURSOR_TOP(cursor);
cursor->mc_initialized = 1;
cursor->mc_eof = 0;
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ }
if (data) {
- MDB_val d2;
- if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS)
- return rc;
-
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
if (op == MDB_SET || op == MDB_SET_RANGE) {
rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
} else {
if (rc != MDB_SUCCESS)
return rc;
}
+ } else if (op == MDB_GET_BOTH || op == MDB_GET_BOTH_RANGE) {
+ MDB_val d2;
+ if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS)
+ return rc;
+ rc = mdb_dcmp(cursor->mc_txn, cursor->mc_dbi, data, &d2);
+ if (rc) {
+ if (op == MDB_GET_BOTH || rc > 0)
+ return MDB_NOTFOUND;
+ }
+
} else {
- *data = d2;
+ if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+ return rc;
}
-
}
rc = mdb_set_key(leaf, key);
MDB_pageparent mpp;
MDB_node *leaf;
- while (CURSOR_TOP(cursor) != NULL)
- cursor_pop_page(cursor);
+ cursor->mc_snum = 0;
rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, NULL, cursor, 0, &mpp);
if (rc != MDB_SUCCESS)
cursor->mc_eof = 0;
if (data) {
- if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
- return rc;
-
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
if (rc)
return rc;
+ } else {
+ if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+ return rc;
}
}
return mdb_set_key(leaf, key);
MDB_node *leaf;
MDB_val lkey;
- while (CURSOR_TOP(cursor) != NULL)
- cursor_pop_page(cursor);
+ cursor->mc_snum = 0;
lkey.mv_size = MAXKEYSIZE+1;
lkey.mv_data = NULL;
top->mp_ki = NUMKEYS(top->mp_page) - 1;
if (data) {
- if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
- return rc;
-
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
if (rc)
return rc;
+ } else {
+ if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+ return rc;
}
}
switch (op) {
case MDB_GET_BOTH:
case MDB_GET_BOTH_RANGE:
- if (data == NULL) {
+ if (data == NULL || cursor->mc_xcursor == NULL) {
rc = EINVAL;
break;
}
case MDB_SET_RANGE:
if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
rc = EINVAL;
- } else if (op != MDB_SET_RANGE)
+ } else if (op == MDB_SET_RANGE)
rc = mdb_cursor_set(cursor, key, data, op, NULL);
else
rc = mdb_cursor_set(cursor, key, data, op, &exact);
mx->mx_dbxs[dbn+1].md_dirty = 0;
mx->mx_txn.mt_numdbs = dbn+2;
- SLIST_INIT(&mx->mx_cursor.mc_stack);
+ mx->mx_cursor.mc_snum = 0;
mx->mx_cursor.mc_txn = &mx->mx_txn;
mx->mx_cursor.mc_dbi = dbn+1;
}
size += sizeof(MDB_xcursor);
if ((cursor = calloc(1, size)) != NULL) {
- SLIST_INIT(&cursor->mc_stack);
cursor->mc_dbi = dbi;
cursor->mc_txn = txn;
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
int
mdb_cursor_count(MDB_cursor *mc, unsigned long *countp)
{
+ MDB_ppage *top;
+ MDB_node *leaf;
+
if (mc == NULL || countp == NULL)
return EINVAL;
if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT))
return EINVAL;
- if (!mc->mc_xcursor->mx_cursor.mc_initialized)
- return EINVAL;
+ top = CURSOR_TOP(mc);
+ leaf = NODEPTR(top->mp_page, top->mp_ki);
+ if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ *countp = 1;
+ } else {
+ if (!mc->mc_xcursor->mx_cursor.mc_initialized)
+ return EINVAL;
- *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
+ *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
+ }
return MDB_SUCCESS;
}
mdb_cursor_close(MDB_cursor *cursor)
{
if (cursor != NULL) {
- while(!CURSOR_EMPTY(cursor))
- cursor_pop_page(cursor);
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- while(!CURSOR_EMPTY(&cursor->mc_xcursor->mx_cursor))
- cursor_pop_page(&cursor->mc_xcursor->mx_cursor);
- }
-
free(cursor);
}
}
if (mpp->mp_parent == NULL) {
if (NUMKEYS(mpp->mp_page) == 0) {
- DPRINTF("tree is completely empty");
+ DPUTS("tree is completely empty");
txn->mt_dbs[dbi].md_root = P_INVALID;
txn->mt_dbs[dbi].md_depth--;
txn->mt_dbs[dbi].md_leaf_pages--;
} else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) {
- DPRINTF("collapsing root page!");
+ DPUTS("collapsing root page!");
txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
if ((root = mdb_get_page(txn, txn->mt_dbs[dbi].md_root)) == NULL)
return MDB_PAGE_NOTFOUND;
txn->mt_dbs[dbi].md_depth--;
txn->mt_dbs[dbi].md_branch_pages--;
} else
- DPRINTF("root page doesn't need rebalancing");
+ DPUTS("root page doesn't need rebalancing");
return MDB_SUCCESS;
}
if (mpp->mp_pi == 0) {
/* We're the leftmost leaf in our parent.
*/
- DPRINTF("reading right neighbor");
+ DPUTS("reading right neighbor");
node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
return MDB_PAGE_NOTFOUND;
} else {
/* There is at least one neighbor to the left.
*/
- DPRINTF("reading left neighbor");
+ DPUTS("reading left neighbor");
node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
return MDB_PAGE_NOTFOUND;
return MDB_NOTFOUND;
}
- if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
MDB_xcursor mx;
MDB_pageparent mp2;
if (mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root != P_INVALID) {
memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
sizeof(MDB_db));
+ txn->mt_dbs[dbi].md_entries--;
return rc;
}
/* otherwise fall thru and delete the sub-DB */
unsigned int i;
cursor_pop_page(&mx.mx_cursor);
- top = CURSOR_TOP(&mx.mx_cursor);
- if (top != NULL) {
- parent = SLIST_NEXT(top, mp_entry);
- while (parent != NULL) {
+ if (mx.mx_cursor.mc_snum) {
+ top = CURSOR_TOP(&mx.mx_cursor);
+ while (mx.mx_cursor.mc_snum > 1) {
+ parent = CURSOR_PARENT(&mx.mx_cursor);
for (i=0; i<NUMKEYS(top->mp_page); i++) {
ni = NODEPTR(top->mp_page, i);
mdb_midl_insert(txn->mt_free_pgs, ni->mn_pgno);
}
- if (parent) {
- parent->mp_ki++;
- if (parent->mp_ki >= NUMKEYS(parent->mp_page)) {
- cursor_pop_page(&mx.mx_cursor);
- top = CURSOR_TOP(&mx.mx_cursor);
- parent = SLIST_NEXT(top, mp_entry);
- } else {
- ni = NODEPTR(parent->mp_page, parent->mp_ki);
- top->mp_page = mdb_get_page(&mx.mx_txn, ni->mn_pgno);
- }
+ parent->mp_ki++;
+ if (parent->mp_ki >= NUMKEYS(parent->mp_page)) {
+ cursor_pop_page(&mx.mx_cursor);
+ top = parent;
+ } else {
+ ni = NODEPTR(parent->mp_page, parent->mp_ki);
+ top->mp_page = mdb_get_page(&mx.mx_txn, ni->mn_pgno);
}
}
}
unsigned int ki;
MDB_node *leaf;
MDB_pageparent mpp;
- MDB_val xdata, *rdata;
+ MDB_val xdata, *rdata, dkey;
MDB_db dummy;
+ char dbuf[PAGESIZE];
+ int do_sub = 0;
DPRINTF("==> put key %.*s, size %zu, data size %zu",
(int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size);
+ dkey.mv_size = 0;
mpp.mp_parent = NULL;
mpp.mp_pi = 0;
rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp);
if (rc == MDB_SUCCESS) {
leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
if (leaf && exact) {
- if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
- goto put_sub;
- }
if (flags == MDB_NOOVERWRITE) {
DPRINTF("duplicate key %.*s",
(int)key->mv_size, (char *)key->mv_data);
return MDB_KEYEXIST;
}
+ if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ /* Was a single item before, must convert now */
+ if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ dkey.mv_size = NODEDSZ(leaf);
+ memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
+ memset(&dummy, 0, sizeof(dummy));
+ dummy.md_root = P_INVALID;
+ if (dkey.mv_size == sizeof(MDB_db)) {
+ memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
+ goto put_sub;
+ }
+ mdb_del_node(mpp.mp_page, ki);
+ do_sub = 1;
+ rdata = &xdata;
+ xdata.mv_size = sizeof(MDB_db);
+ xdata.mv_data = &dummy;
+ goto new_sub;
+ }
+ goto put_sub;
+ }
/* same size, just replace it */
if (NODEDSZ(leaf) == data->mv_size) {
memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
} else if (rc == MDB_NOTFOUND) {
MDB_dpage *dp;
/* new file, just write a root leaf page */
- DPRINTF("allocating new root leaf page");
+ DPUTS("allocating new root leaf page");
if ((dp = mdb_new_page(txn, dbi, P_LEAF, 1)) == NULL) {
return ENOMEM;
}
mpp.mp_page = &dp->p;
txn->mt_dbs[dbi].md_root = mpp.mp_page->mp_pgno;
txn->mt_dbs[dbi].md_depth++;
+ txn->mt_dbxs[dbi].md_dirty = 1;
ki = 0;
}
else
DPRINTF("there are %u keys, should insert new key at index %i",
NUMKEYS(mpp.mp_page), ki);
- /* For sorted dups, the data item at this level is a DB record
- * for a child DB; the actual data elements are stored as keys
- * in the child DB.
- */
- if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
- rdata = &xdata;
- xdata.mv_size = sizeof(MDB_db);
- xdata.mv_data = &dummy;
- memset(&dummy, 0, sizeof(dummy));
- dummy.md_root = P_INVALID;
- } else {
- rdata = data;
- }
+ rdata = data;
+new_sub:
if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, rdata)) {
rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID);
} else {
if (rc != MDB_SUCCESS)
txn->mt_flags |= MDB_TXN_ERROR;
else {
- txn->mt_dbs[dbi].md_entries++;
-
/* Remember if we just added a subdatabase */
if (flags & F_SUBDATA) {
leaf = NODEPTR(mpp.mp_page, ki);
* size limits on dupdata. The actual data fields of the child
* DB are all zero size.
*/
- if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ if (do_sub) {
MDB_xcursor mx;
leaf = NODEPTR(mpp.mp_page, ki);
xdata.mv_data = "";
if (flags == MDB_NODUPDATA)
flags = MDB_NOOVERWRITE;
+ /* converted, write the original data first */
+ if (dkey.mv_size) {
+ dkey.mv_data = dbuf;
+ rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, &dkey, &xdata, flags);
+ if (rc) return rc;
+ leaf->mn_flags |= F_DUPDATA;
+ }
rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags);
mdb_xcursor_fini(txn, dbi, &mx);
memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
sizeof(MDB_db));
}
+ txn->mt_dbs[dbi].md_entries++;
}
done:
txn->mt_dbxs[txn->mt_numdbs].md_dirty = dirty;
memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db));
*dbi = txn->mt_numdbs;
+ txn->mt_env->me_dbs[0][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
+ txn->mt_env->me_dbs[1][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
txn->mt_numdbs++;
}