# define DPRINTF (void) /* Vararg macros may be unsupported */
#elif DEBUG
# define DPRINTF(fmt, ...) /* Requires 2 or more args */ \
- fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, __VA_ARGS__)
+ fprintf(stderr, "%s:%d:(%p) " fmt "\n", __func__, __LINE__, pthread_self(), __VA_ARGS__)
#else
# define DPRINTF(fmt, ...) ((void) 0)
#endif
#define DKEY(x)
#endif
+/* The DB view is always consistent because all writes are wrapped in
+ * the wmutex. Finer-grained locks aren't necessary.
+ */
+#ifndef LAZY_LOCKS
+#define LAZY_LOCKS 1
+#endif
+#if LAZY_LOCKS
+#define LAZY_MUTEX_LOCK(x)
+#define LAZY_MUTEX_UNLOCK(x)
+#define LAZY_RWLOCK_UNLOCK(x)
+#define LAZY_RWLOCK_WRLOCK(x)
+#define LAZY_RWLOCK_RDLOCK(x)
+#define LAZY_RWLOCK_DEF(x)
+#define LAZY_RWLOCK_INIT(x,y)
+#define LAZY_RWLOCK_DESTROY(x)
+#else
+#define LAZY_MUTEX_LOCK(x) pthread_mutex_lock(x)
+#define LAZY_MUTEX_UNLOCK(x) pthread_mutex_unlock(x)
+#define LAZY_RWLOCK_UNLOCK(x) pthread_rwlock_unlock(x)
+#define LAZY_RWLOCK_WRLOCK(x) pthread_rwlock_wrlock(x)
+#define LAZY_RWLOCK_RDLOCK(x) pthread_rwlock_rdlock(x)
+#define LAZY_RWLOCK_DEF(x) pthread_rwlock_t x
+#define LAZY_RWLOCK_INIT(x,y) pthread_rwlock_init(x,y)
+#define LAZY_RWLOCK_DESTROY(x) pthread_rwlock_destroy(x)
+#endif
+
#define P_INVALID (~0UL)
#define F_ISSET(w, f) (((w) & (f)) == (f))
#define CURSOR_TOP(c) (&(c)->mc_stack[(c)->mc_snum-1])
#define CURSOR_PARENT(c) (&(c)->mc_stack[(c)->mc_snum-2])
+#define CURSOR_STACK 32
struct MDB_xcursor;
struct MDB_cursor {
MDB_txn *mc_txn;
- MDB_ppage mc_stack[32]; /* stack of parent pages */
+ MDB_ppage mc_stack[CURSOR_STACK]; /* stack of parent pages */
unsigned int mc_snum; /* number of pushed pages */
MDB_dbi mc_dbi;
short mc_initialized; /* 1 if initialized */
#define MDB_TXN_RDONLY 0x01 /* read-only transaction */
#define MDB_TXN_ERROR 0x02 /* an error has occurred */
-#define MDB_TXN_METOGGLE 0x04 /* used meta page 1 */
unsigned int mt_flags;
+ unsigned int mt_toggle;
};
/* Context for sorted-dup records */
char *me_map;
MDB_txninfo *me_txns;
MDB_meta *me_metas[2];
- MDB_meta *me_meta;
MDB_txn *me_txn; /* current write transaction */
size_t me_mapsize;
off_t me_size; /* current file size */
MDB_dpage *me_dpages;
pgno_t me_free_pgs[MDB_IDL_UM_SIZE];
MIDL2 me_dirty_list[MDB_IDL_DB_SIZE];
+ LAZY_RWLOCK_DEF(me_dblock);
};
#define NODESIZE offsetof(MDB_node, mn_data)
static int mdb_env_read_header(MDB_env *env, MDB_meta *meta);
static int mdb_env_read_meta(MDB_env *env, int *which);
static int mdb_env_write_meta(MDB_txn *txn);
-static MDB_page *mdb_get_page(MDB_txn *txn, pgno_t pgno);
+static int mdb_get_page(MDB_txn *txn, pgno_t pgno, MDB_page **mp);
static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
MDB_val *key, int *exactp, unsigned int *kip);
MDB_val *key, MDB_val *data);
static void mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx);
-static void mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node);
+static void mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx,
+ MDB_page *mp, MDB_node *node);
static void mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx);
static size_t mdb_leaf_size(MDB_env *env, MDB_val *key,
if (txn->mt_txnid > 2) {
- oldest = txn->mt_txnid - 2;
+ oldest = txn->mt_txnid - 1;
if (!txn->mt_env->me_pghead && txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
/* See if there's anything in the free DB */
MDB_pageparent mpp;
if ((dp = mdb_alloc_page(txn, pp->mp_parent, pp->mp_pi, 1)) == NULL)
return ENOMEM;
DPRINTF("touched page %lu -> %lu", mp->mp_pgno, dp->p.mp_pgno);
+ assert(mp->mp_pgno != dp->p.mp_pgno);
mdb_midl_insert(txn->mt_free_pgs, mp->mp_pgno);
pgno = dp->p.mp_pgno;
memcpy(&dp->p, mp, txn->mt_env->me_psize);
{
MDB_env *env = txn->mt_env;
- int rc, toggle;
-
if (env->me_flags & MDB_FATAL_ERROR) {
DPUTS("mdb_txn_begin: environment had fatal error, must shutdown!");
return MDB_PANIC;
}
- if (!(txn->mt_flags & MDB_TXN_RDONLY)) {
- txn->mt_u.dirty_list = env->me_dirty_list;
- txn->mt_u.dirty_list[0].mid = 0;
- txn->mt_free_pgs = env->me_free_pgs;
- txn->mt_free_pgs[0] = 0;
-
- pthread_mutex_lock(&env->me_txns->mti_wmutex);
- env->me_txns->mti_txnid++;
- }
-
- txn->mt_txnid = env->me_txns->mti_txnid;
-
if (txn->mt_flags & MDB_TXN_RDONLY) {
MDB_reader *r = pthread_getspecific(env->me_txkey);
if (!r) {
unsigned int i;
+ pid_t pid = getpid();
+ pthread_t tid = pthread_self();
+
pthread_mutex_lock(&env->me_txns->mti_mutex);
for (i=0; i<env->me_txns->mti_numreaders; i++)
if (env->me_txns->mti_readers[i].mr_pid == 0)
pthread_mutex_unlock(&env->me_txns->mti_mutex);
return ENOSPC;
}
- env->me_txns->mti_readers[i].mr_pid = getpid();
- env->me_txns->mti_readers[i].mr_tid = pthread_self();
- r = &env->me_txns->mti_readers[i];
- pthread_setspecific(env->me_txkey, r);
+ env->me_txns->mti_readers[i].mr_pid = pid;
+ env->me_txns->mti_readers[i].mr_tid = tid;
if (i >= env->me_txns->mti_numreaders)
env->me_txns->mti_numreaders = i+1;
pthread_mutex_unlock(&env->me_txns->mti_mutex);
+ r = &env->me_txns->mti_readers[i];
+ pthread_setspecific(env->me_txkey, r);
}
+ txn->mt_txnid = env->me_txns->mti_txnid;
+ txn->mt_toggle = env->me_txns->mti_me_toggle;
r->mr_txnid = txn->mt_txnid;
txn->mt_u.reader = r;
} else {
- env->me_txn = txn;
- }
+ pthread_mutex_lock(&env->me_txns->mti_wmutex);
- toggle = env->me_txns->mti_me_toggle;
- if ((rc = mdb_env_read_meta(env, &toggle)) != MDB_SUCCESS) {
- mdb_txn_reset0(txn);
- return rc;
+ txn->mt_txnid = env->me_txns->mti_txnid+1;
+ txn->mt_toggle = env->me_txns->mti_me_toggle;
+ txn->mt_u.dirty_list = env->me_dirty_list;
+ txn->mt_u.dirty_list[0].mid = 0;
+ txn->mt_free_pgs = env->me_free_pgs;
+ txn->mt_free_pgs[0] = 0;
+ txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
+ env->me_txn = txn;
}
/* Copy the DB arrays */
+ LAZY_RWLOCK_RDLOCK(&env->me_dblock);
txn->mt_numdbs = env->me_numdbs;
txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
- memcpy(txn->mt_dbs, env->me_meta->mm_dbs, 2 * sizeof(MDB_db));
+ memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db));
if (txn->mt_numdbs > 2)
memcpy(txn->mt_dbs+2, env->me_dbs[env->me_db_toggle]+2,
(txn->mt_numdbs - 2) * sizeof(MDB_db));
+ LAZY_RWLOCK_UNLOCK(&env->me_dblock);
- if (!(txn->mt_flags & MDB_TXN_RDONLY)) {
- if (toggle)
- txn->mt_flags |= MDB_TXN_METOGGLE;
- txn->mt_next_pgno = env->me_meta->mm_last_pg+1;
- }
-
- DPRINTF("begin transaction %lu on mdbenv %p, root page %lu",
- txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
+ DPRINTF("begin txn %p %lu%c on mdbenv %p, root page %lu", txn,
+ txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
+ (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
return MDB_SUCCESS;
}
}
env->me_txn = NULL;
- env->me_txns->mti_txnid--;
for (i=2; i<env->me_numdbs; i++)
env->me_dbxs[i].md_dirty = 0;
pthread_mutex_unlock(&env->me_txns->mti_wmutex);
if (!txn->mt_u.dirty_list[0].mid)
goto done;
- DPRINTF("committing transaction %lu on mdbenv %p, root page %lu",
+ DPRINTF("committing txn %p %lu on mdbenv %p, root page %lu", txn,
txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
/* should only be one record now */
dp->p.mp_flags &= ~P_DIRTY;
if (++n >= MDB_COMMIT_PAGES) {
done = 0;
+ i++;
break;
}
}
}
txn->mt_u.dirty_list[i].mid = 0;
}
+ txn->mt_u.dirty_list[0].mid = 0;
if ((n = mdb_env_sync(env, 0)) != 0 ||
(n = mdb_env_write_meta(txn)) != MDB_SUCCESS) {
/* update the DB tables */
{
int toggle = !env->me_db_toggle;
+ MDB_db *ip, *jp;
- for (i = 2; i < env->me_numdbs; i++) {
- if (txn->mt_dbxs[i].md_dirty) {
- env->me_dbs[toggle][i] = txn->mt_dbs[i];
- txn->mt_dbxs[i].md_dirty = 0;
- }
+ ip = &env->me_dbs[toggle][2];
+ jp = &txn->mt_dbs[2];
+ LAZY_RWLOCK_WRLOCK(&env->me_dblock);
+ for (i = 2; i < txn->mt_numdbs; i++) {
+ if (ip->md_root != jp->md_root)
+ *ip = *jp;
+ ip++; jp++;
}
- for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
- txn->mt_dbxs[i].md_dirty = 0;
- env->me_dbxs[i] = txn->mt_dbxs[i];
- env->me_dbs[toggle][i] = txn->mt_dbs[i];
+
+ for (i = 2; i < txn->mt_numdbs; i++) {
+ if (txn->mt_dbxs[i].md_dirty)
+ txn->mt_dbxs[i].md_dirty = 0;
}
env->me_db_toggle = toggle;
env->me_numdbs = txn->mt_numdbs;
+ LAZY_RWLOCK_UNLOCK(&env->me_dblock);
}
pthread_mutex_unlock(&env->me_txns->mti_wmutex);
assert(txn != NULL);
assert(txn->mt_env != NULL);
- toggle = !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE);
+ toggle = !txn->mt_toggle;
DPRINTF("writing meta page %d for root page %lu",
toggle, txn->mt_dbs[MAIN_DBI].md_root);
env->me_flags |= MDB_FATAL_ERROR;
return rc;
}
+ /* Memory ordering issues are irrelevant; since the entire writer
+ * is wrapped by wmutex, all of these changes will become visible
+ * after the wmutex is unlocked. Since the DB is multi-version,
+ * readers will get consistent data regardless of how fresh or
+ * how stale their view of these values is.
+ */
+ LAZY_MUTEX_LOCK(&env->me_txns->mti_mutex);
txn->mt_env->me_txns->mti_me_toggle = toggle;
+ txn->mt_env->me_txns->mti_txnid = txn->mt_txnid;
+ LAZY_MUTEX_UNLOCK(&env->me_txns->mti_mutex);
return MDB_SUCCESS;
}
assert(env != NULL);
- if (which)
- toggle = *which;
- else if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
+ if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
toggle = 1;
- if (env->me_meta != env->me_metas[toggle])
- env->me_meta = env->me_metas[toggle];
-
DPRINTF("Using meta page %d", toggle);
+ *which = toggle;
return MDB_SUCCESS;
}
static int
mdb_env_open2(MDB_env *env, unsigned int flags)
{
- int i, newenv = 0;
+ int i, newenv = 0, toggle;
MDB_meta meta;
MDB_page *p;
env->me_metas[0] = METADATA(p);
env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize);
- if ((i = mdb_env_read_meta(env, NULL)) != 0)
+ if ((i = mdb_env_read_meta(env, &toggle)) != 0)
return i;
DPRINTF("opened database version %u, pagesize %u",
- env->me_meta->mm_version, env->me_psize);
- DPRINTF("depth: %u", env->me_meta->mm_dbs[MAIN_DBI].md_depth);
- DPRINTF("entries: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_entries);
- DPRINTF("branch pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_branch_pages);
- DPRINTF("leaf pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_leaf_pages);
- DPRINTF("overflow pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_overflow_pages);
- DPRINTF("root: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_root);
+ env->me_metas[toggle]->mm_version, env->me_psize);
+ DPRINTF("depth: %u", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_depth);
+ DPRINTF("entries: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_entries);
+ DPRINTF("branch pages: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_branch_pages);
+ DPRINTF("leaf pages: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_leaf_pages);
+ DPRINTF("overflow pages: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_overflow_pages);
+ DPRINTF("root: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_root);
return MDB_SUCCESS;
}
mdb_env_share_locks(MDB_env *env)
{
struct flock lock_info;
+ int toggle = 0;
- env->me_txns->mti_txnid = env->me_meta->mm_txnid;
if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
- env->me_txns->mti_me_toggle = 1;
+ toggle = 1;
+ env->me_txns->mti_me_toggle = toggle;
+ env->me_txns->mti_txnid = env->me_metas[toggle]->mm_txnid;
memset((void *)&lock_info, 0, sizeof(lock_info));
lock_info.l_type = F_RDLCK;
env->me_path = strdup(path);
DPRINTF("opened dbenv %p", (void *) env);
pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
+ LAZY_RWLOCK_INIT(&env->me_dblock, NULL);
if (excl)
mdb_env_share_locks(env);
env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
free(env->me_dbxs);
free(env->me_path);
+ LAZY_RWLOCK_DESTROY(&env->me_dblock);
pthread_key_delete(env->me_txkey);
if (env->me_map) {
if (env->me_txns) {
pid_t pid = getpid();
size_t size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
- int i;
+ unsigned int i;
for (i=0; i<env->me_txns->mti_numreaders; i++)
if (env->me_txns->mti_readers[i].mr_pid == pid)
env->me_txns->mti_readers[i].mr_pid = 0;
DPRINTF("pushing page %lu on db %u cursor %p", mp->mp_pgno,
cursor->mc_dbi, (void *) cursor);
+ assert(cursor->mc_snum < CURSOR_STACK);
+
ppage = &cursor->mc_stack[cursor->mc_snum++];
ppage->mp_page = mp;
ppage->mp_ki = 0;
return ppage;
}
-static MDB_page *
-mdb_get_page(MDB_txn *txn, pgno_t pgno)
+static int
+mdb_get_page(MDB_txn *txn, pgno_t pgno, MDB_page **ret)
{
MDB_page *p = NULL;
- int found = 0;
if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && txn->mt_u.dirty_list[0].mid) {
MDB_dpage *dp;
if (x <= txn->mt_u.dirty_list[0].mid && txn->mt_u.dirty_list[x].mid == pgno) {
dp = txn->mt_u.dirty_list[x].mptr;
p = &dp->p;
- found = 1;
}
}
- if (!found) {
- if (pgno > txn->mt_env->me_meta->mm_last_pg)
- return NULL;
- p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
+ if (!p) {
+ if (pgno <= txn->mt_env->me_metas[txn->mt_toggle]->mm_last_pg)
+ p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
+ }
+ *ret = p;
+ if (!p) {
+ DPRINTF("page %lu not found", pgno);
+ assert(p != NULL);
}
- return p;
+ return (p != NULL) ? MDB_SUCCESS : MDB_PAGE_NOTFOUND;
}
static int
CURSOR_TOP(cursor)->mp_ki = i;
mpp->mp_parent = mp;
- if ((mp = mdb_get_page(txn, NODEPGNO(node))) == NULL)
- return MDB_PAGE_NOTFOUND;
+ if ((rc = mdb_get_page(txn, NODEPGNO(node), &mp)))
+ return rc;
mpp->mp_pi = i;
mpp->mp_page = mp;
return MDB_NOTFOUND;
}
- if ((mpp->mp_page = mdb_get_page(txn, root)) == NULL)
- return MDB_PAGE_NOTFOUND;
+ if (rc = mdb_get_page(txn, root, &mpp->mp_page))
+ return rc;
- DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags);
+ DPRINTF("db %u root page %lu has flags 0x%X",
+ dbi, root, mpp->mp_page->mp_flags);
if (modify) {
/* For sub-databases, update main root first */
{
MDB_page *omp; /* overflow mpage */
pgno_t pgno;
+ int rc;
if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) {
data->mv_size = leaf->mn_dsize;
*/
data->mv_size = leaf->mn_dsize;
memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
- if ((omp = mdb_get_page(txn, pgno)) == NULL) {
+ if (rc = mdb_get_page(txn, pgno, &omp)) {
DPRINTF("read overflow page %lu failed", pgno);
- return MDB_PAGE_NOTFOUND;
+ return rc;
}
data->mv_data = METADATA(omp);
MDB_xcursor mx;
mdb_xcursor_init0(txn, dbi, &mx);
- mdb_xcursor_init1(txn, dbi, &mx, leaf);
+ mdb_xcursor_init1(txn, dbi, &mx, mpp.mp_page, leaf);
rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi, NULL, NULL, 0, &mpp);
if (rc != MDB_SUCCESS)
return rc;
assert(IS_BRANCH(parent->mp_page));
indx = NODEPTR(parent->mp_page, parent->mp_ki);
- if ((mp = mdb_get_page(cursor->mc_txn, NODEPGNO(indx))) == NULL)
- return MDB_PAGE_NOTFOUND;
+ if (rc = mdb_get_page(cursor->mc_txn, NODEPGNO(indx), &mp))
+ return rc;;
#if 0
mp->parent = parent->mp_page;
mp->parent_index = parent->mp_ki;
leaf = NODEPTR(mp, top->mp_ki);
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, mp, leaf);
}
if (data) {
if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
leaf = NODEPTR(mp, top->mp_ki);
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, mp, leaf);
}
if (data) {
if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
}
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, mpp.mp_page, leaf);
}
if (data) {
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
if (data) {
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, mpp.mp_page, leaf);
rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
if (rc)
return rc;
if (data) {
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, mpp.mp_page, leaf);
rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
if (rc)
return rc;
assert(mp->mp_upper >= mp->mp_lower);
- DPRINTF("add node [%s] to %s page %lu at index %i, key size %zu",
- key ? DKEY(key) : NULL,
+ DPRINTF("add to %s page %lu index %i, data size %zu key size %zu [%s]",
IS_LEAF(mp) ? "leaf" : "branch",
- mp->mp_pgno, indx, key ? key->mv_size : 0);
+ mp->mp_pgno, indx, data ? data->mv_size : 0,
+ key ? key->mv_size : 0, key ? DKEY(key) : NULL);
if (IS_LEAF2(mp)) {
/* Move higher keys up one slot. */
}
static void
-mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node)
+mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_page *mp, MDB_node *node)
{
MDB_db *db = NODEDATA(node);
MDB_dbi dbn;
} else {
dbn = 2;
}
+ DPRINTF("Sub-db %u for db %u root page %lu", dbn, dbi, db->md_root);
mx->mx_dbs[dbn] = *db;
+ if (F_ISSET(mp->mp_flags, P_DIRTY))
+ mx->mx_dbxs[dbn].md_dirty = 1;
mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node);
mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize;
mx->mx_txn.mt_next_pgno = txn->mt_next_pgno;
MDB_page *root;
MDB_pageparent npp;
indx_t si = 0, di = 0;
+ int rc;
assert(txn != NULL);
assert(mpp != NULL);
} else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) {
DPUTS("collapsing root page!");
txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
- if ((root = mdb_get_page(txn, txn->mt_dbs[dbi].md_root)) == NULL)
- return MDB_PAGE_NOTFOUND;
+ if (rc = mdb_get_page(txn, txn->mt_dbs[dbi].md_root, &root))
+ return rc;
txn->mt_dbs[dbi].md_depth--;
txn->mt_dbs[dbi].md_branch_pages--;
} else
*/
DPUTS("reading right neighbor");
node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
- if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
- return MDB_PAGE_NOTFOUND;
+ if (rc = mdb_get_page(txn, NODEPGNO(node), &npp.mp_page))
+ return rc;
npp.mp_pi = mpp->mp_pi + 1;
si = 0;
di = NUMKEYS(mpp->mp_page);
*/
DPUTS("reading left neighbor");
node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
- if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
- return MDB_PAGE_NOTFOUND;
+ if (rc = mdb_get_page(txn, NODEPGNO(node), &npp.mp_page))
+ return rc;
npp.mp_pi = mpp->mp_pi - 1;
si = NUMKEYS(npp.mp_page) - 1;
di = 0;
MDB_pageparent mp2;
mdb_xcursor_init0(txn, dbi, &mx);
- mdb_xcursor_init1(txn, dbi, &mx, leaf);
+ mdb_xcursor_init1(txn, dbi, &mx, mpp.mp_page, leaf);
if (flags == MDB_DEL_DUP) {
rc = mdb_del(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, NULL, 0);
mdb_xcursor_fini(txn, dbi, &mx);
- if (rc != MDB_SUCCESS)
- return rc;
/* If sub-DB still has entries, we're done */
if (mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root != P_INVALID) {
memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
top = parent;
} else {
ni = NODEPTR(parent->mp_page, parent->mp_ki);
- top->mp_page = mdb_get_page(&mx.mx_txn, NODEPGNO(ni));
+ rc = mdb_get_page(&mx.mx_txn, NODEPGNO(ni), &top->mp_page);
}
}
}
int rc = MDB_SUCCESS, ins_new = 0;
indx_t newindx;
pgno_t pgno = 0;
- unsigned int i, j, split_indx;
+ unsigned int i, j, split_indx, nkeys, pmax;
MDB_node *node;
MDB_val sepkey, rkey, rdata;
- MDB_page *copy;
+ MDB_page *copy, *cptr;
MDB_dpage *mdp, *rdp, *pdp;
MDB_dhead *dh;
DKBUF;
rdp->h.md_pi = mdp->h.md_pi + 1;
DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno);
- split_indx = NUMKEYS(&mdp->p) / 2 + 1;
+ nkeys = NUMKEYS(&mdp->p);
+ split_indx = nkeys / 2 + 1;
if (IS_LEAF2(&rdp->p)) {
char *split, *ins;
int x;
- unsigned int nkeys = NUMKEYS(&mdp->p), lsize, rsize, ksize;
+ unsigned int lsize, rsize, ksize;
/* Move half of the keys to the right sibling */
copy = NULL;
x = *newindxp - split_indx;
goto newsep;
}
- /* Move half of the keys to the right sibling. */
- if ((copy = malloc(txn->mt_env->me_psize)) == NULL)
- return ENOMEM;
- memcpy(copy, &mdp->p, txn->mt_env->me_psize);
- memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_psize - PAGEHDRSZ);
- mdp->p.mp_lower = PAGEHDRSZ;
- mdp->p.mp_upper = txn->mt_env->me_psize;
+ /* For leaf pages, check the split point based on what
+ * fits where, since otherwise add_node can fail.
+ */
+ if (IS_LEAF(&mdp->p)) {
+ unsigned int psize, nsize;
+ /* Maximum free space in an empty page */
+ pmax = txn->mt_env->me_psize - PAGEHDRSZ;
+ nsize = mdb_leaf_size(txn->mt_env, newkey, newdata);
+ if (newindx <= split_indx) {
+split1:
+ psize = nsize;
+ for (i=0; i<split_indx; i++) {
+ node = NODEPTR(&mdp->p, i);
+ psize += NODESIZE + NODEKSZ(node);
+ if (F_ISSET(node->mn_flags, F_BIGDATA))
+ psize += sizeof(pgno_t);
+ else
+ psize += NODEDSZ(node);
+ if (psize > pmax) {
+ split_indx--;
+ goto split1;
+ }
+ }
+ } else {
+split2:
+ psize = nsize;
+ for (i=split_indx; i<nkeys; i++) {
+ node = NODEPTR(&mdp->p, i);
+ psize += NODESIZE + NODEKSZ(node);
+ if (F_ISSET(node->mn_flags, F_BIGDATA))
+ psize += sizeof(pgno_t);
+ else
+ psize += NODEDSZ(node);
+ if (psize > pmax) {
+ split_indx++;
+ goto split2;
+ }
+ }
+ }
+ }
/* First find the separating key between the split pages.
*/
sepkey.mv_size = newkey->mv_size;
sepkey.mv_data = newkey->mv_data;
} else {
- node = NODEPTR(copy, split_indx);
+ node = NODEPTR(&mdp->p, split_indx);
sepkey.mv_size = node->mn_ksize;
sepkey.mv_data = NODEKEY(node);
}
return rc;
}
if (rc != MDB_SUCCESS) {
- free(copy);
return rc;
}
- for (i = j = 0; i <= NUMKEYS(copy); j++) {
- if (i < split_indx) {
- /* Re-insert in left sibling. */
- pdp = mdp;
- } else {
- /* Insert in right sibling. */
- if (i == split_indx)
- /* Reset insert index for right sibling. */
- j = (i == newindx && ins_new);
- pdp = rdp;
+ /* Move half of the keys to the right sibling. */
+ if ((copy = malloc(txn->mt_env->me_psize)) == NULL)
+ return ENOMEM;
+
+ copy->mp_pgno = mdp->p.mp_pgno;
+ copy->mp_flags = mdp->p.mp_flags;
+ copy->mp_lower = PAGEHDRSZ;
+ copy->mp_upper = txn->mt_env->me_psize;
+ cptr = copy;
+ for (i = j = 0; i <= nkeys; j++) {
+ if (i == split_indx) {
+ /* Insert in right sibling. */
+ /* Reset insert index for right sibling. */
+ j = (i == newindx && ins_new);
+ cptr = &rdp->p;
}
if (i == newindx && !ins_new) {
/* Update page and index for the new key. */
*newindxp = j;
- *mpp = &pdp->p;
- } else if (i == NUMKEYS(copy)) {
+ if (cptr == &rdp->p)
+ *mpp = cptr;
+ } else if (i == nkeys) {
break;
} else {
- node = NODEPTR(copy, i);
+ node = NODEPTR(&mdp->p, i);
rkey.mv_data = NODEKEY(node);
rkey.mv_size = node->mn_ksize;
if (IS_LEAF(&mdp->p)) {
rkey.mv_size = 0;
}
- rc = mdb_add_node(txn, dbi, &pdp->p, j, &rkey, &rdata, pgno,flags);
+ rc = mdb_add_node(txn, dbi, cptr, j, &rkey, &rdata, pgno, flags);
}
+ nkeys = NUMKEYS(copy);
+ for (i=0; i<nkeys; i++)
+ mdp->p.mp_ptrs[i] = copy->mp_ptrs[i];
+ mdp->p.mp_lower = copy->mp_lower;
+ mdp->p.mp_upper = copy->mp_upper;
+ memcpy(NODEPTR(&mdp->p, nkeys-1), NODEPTR(copy, nkeys-1),
+ txn->mt_env->me_psize - copy->mp_upper);
free(copy);
return rc;
goto put_sub;
}
/* same size, just replace it */
- if (NODEDSZ(leaf) == data->mv_size) {
+ if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
+ NODEDSZ(leaf) == data->mv_size) {
memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
goto done;
}
leaf = NODEPTR(mpp.mp_page, ki);
put_sub:
mdb_xcursor_init0(txn, dbi, &mx);
- mdb_xcursor_init1(txn, dbi, &mx, leaf);
+ mdb_xcursor_init1(txn, dbi, &mx, mpp.mp_page, leaf);
xdata.mv_size = 0;
xdata.mv_data = "";
if (flags == MDB_NODUPDATA)
int
mdb_env_stat(MDB_env *env, MDB_stat *arg)
{
+ int toggle;
+
if (env == NULL || arg == NULL)
return EINVAL;
- return mdb_stat0(env, &env->me_meta->mm_dbs[MAIN_DBI], arg);
+ mdb_env_read_meta(env, &toggle);
+
+ return mdb_stat0(env, &env->me_metas[toggle]->mm_dbs[MAIN_DBI], arg);
}
int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)