* BerkeleyDB API, but much simplified.
*/
/*
- * Copyright 2011-2012 Howard Chu, Symas Corp.
+ * Copyright 2011-2013 Howard Chu, Symas Corp.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
/** The version number for a database's file format. */
#define MDB_VERSION 1
- /** The maximum size of a key in the database.
- * While data items have essentially unbounded size, we require that
- * keys all fit onto a regular page. This limit could be raised a bit
- * further if needed; to something just under #MDB_PAGESIZE / #MDB_MINKEYS.
+ /** @brief The maximum size of a key in the database.
+ *
+ * We require that keys all fit onto a regular page. This limit
+ * could be raised a bit further if needed; to something just
+ * under #MDB_PAGESIZE / #MDB_MINKEYS.
+ *
+ * Note that data items in an #MDB_DUPSORT database are actually keys
+ * of a subDB, so they're also limited to this size.
*/
-#define MAXKEYSIZE 511
+#ifndef MDB_MAXKEYSIZE
+#define MDB_MAXKEYSIZE 511
+#endif
+
+ /** @brief The maximum size of a data item.
+ *
+ * We only store a 32 bit value for node sizes.
+ */
+#define MAXDATASIZE 0xffffffffUL
#if MDB_DEBUG
/** A key buffer.
* @ingroup debug
* This is used for printing a hex dump of a key's contents.
*/
-#define DKBUF char kbuf[(MAXKEYSIZE*2+1)]
+#define DKBUF char kbuf[(MDB_MAXKEYSIZE*2+1)]
/** Display a key in hex.
* @ingroup debug
* Invoke a function to display a key in hex.
*/
#define P_INVALID (~(pgno_t)0)
- /** Test if a flag \b f is set in a flag word \b w. */
+ /** Test if the flags \b f are set in a flag word \b w. */
#define F_ISSET(w, f) (((w) & (f)) == (f))
/** Used for offsets within a single page.
* slot's address is saved in thread-specific data so that subsequent read
* transactions started by the same thread need no further locking to proceed.
*
+ * No reader table is used if the database is on a read-only filesystem.
+ *
* Since the database uses multi-version concurrency control, readers don't
* actually need any locking. This table is used to keep track of which
* readers are using data from which old transactions, so that we'll know
*/
MDB_IDL mt_free_pgs;
union {
- MDB_ID2L dirty_list; /**< modified pages */
- MDB_reader *reader; /**< this thread's slot in the reader table */
+ MDB_ID2L dirty_list; /**< for write txns: modified pages */
+ MDB_reader *reader; /**< this thread's reader table slot or NULL */
} mt_u;
/** Array of records for each DB known in the environment. */
MDB_dbx *mt_dbxs;
#define DB_DIRTY 0x01 /**< DB was written in this txn */
#define DB_STALE 0x02 /**< DB record is older than txnID */
/** @} */
- /** Array of cursors for each DB */
+ /** In write txns, array of cursors for each DB */
MDB_cursor **mt_cursors;
/** Array of flags for each DB */
unsigned char *mt_dbflags;
unsigned char mx_dbflag;
} MDB_xcursor;
- /** A set of pages freed by an earlier transaction. */
-typedef struct MDB_oldpages {
- /** Usually we only read one record from the FREEDB at a time, but
- * in case we read more, this will chain them together.
- */
- struct MDB_oldpages *mo_next;
- /** The ID of the transaction in which these pages were freed. */
- txnid_t mo_txnid;
- /** An #MDB_IDL of the pages */
- pgno_t mo_pages[1]; /* dynamic */
-} MDB_oldpages;
-
/** The database environment. */
struct MDB_env {
HANDLE me_fd; /**< The main data file */
pid_t me_pid; /**< process ID of this env */
char *me_path; /**< path to the DB files */
char *me_map; /**< the memory map of the data file */
- MDB_txninfo *me_txns; /**< the memory map of the lock file */
+ MDB_txninfo *me_txns; /**< the memory map of the lock file or NULL */
MDB_meta *me_metas[2]; /**< pointers to the two meta pages */
MDB_txn *me_txn; /**< current write transaction */
size_t me_mapsize; /**< size of the data memory map */
off_t me_size; /**< current file size */
pgno_t me_maxpg; /**< me_mapsize / me_psize */
- txnid_t me_pgfirst; /**< ID of first old page record we used */
txnid_t me_pglast; /**< ID of last old page record we used */
MDB_dbx *me_dbxs; /**< array of static DB info */
- uint16_t *me_dbflags; /**< array of DB flags */
- MDB_oldpages *me_pghead; /**< list of old page records */
- MDB_oldpages *me_pgfree; /**< list of page records to free */
+ uint16_t *me_dbflags; /**< array of flags from MDB_db.md_flags */
+ pgno_t *me_pghead; /**< old pages reclaimed from freelist */
+ pgno_t *me_pgfree; /**< memory to free when dropping me_pghead */
pthread_key_t me_txkey; /**< thread-key for readers */
MDB_page *me_dpages; /**< list of malloc'd blocks for re-use */
/** IDL of pages that became unused in a write txn */
MDB_IDL me_free_pgs;
/** ID2L of pages that were written during a write txn */
MDB_ID2 me_dirty_list[MDB_IDL_UM_SIZE];
+ /** Max number of freelist items that can fit in a single overflow page */
+ unsigned int me_maxfree_1pg;
#ifdef _WIN32
HANDLE me_rmutex; /* Windows mutexes don't reside in shared mem */
HANDLE me_wmutex;
static size_t mdb_branch_size(MDB_env *env, MDB_val *key);
static int mdb_rebalance(MDB_cursor *mc);
-static int mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key);
+static int mdb_update_key(MDB_cursor *mc, MDB_val *key);
static void mdb_cursor_pop(MDB_cursor *mc);
static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp);
"MDB_DBS_FULL: Environment maxdbs limit reached",
"MDB_READERS_FULL: Environment maxreaders limit reached",
"MDB_TLS_FULL: Thread-local storage keys full - too many environments open",
- "MDB_TXN_FULL: Nested transaction has too many dirty pages - transaction too big",
+ "MDB_TXN_FULL: Transaction has too many dirty pages - transaction too big",
"MDB_CURSOR_FULL: Internal error - cursor stack limit reached",
- "MDB_PAGE_FULL: Internal error - page has no more space"
+ "MDB_PAGE_FULL: Internal error - page has no more space",
+ "MDB_MAP_RESIZED: Database contents grew beyond environment mapsize",
};
char *
char *ptr = buf;
unsigned char *c = key->mv_data;
unsigned int i;
- if (key->mv_size > MAXKEYSIZE)
- return "MAXKEYSIZE";
+
+ if (!key)
+ return "";
+
+ if (key->mv_size > MDB_MAXKEYSIZE)
+ return "MDB_MAXKEYSIZE";
/* may want to make this a dynamic check: if the key is mostly
* printable characters, print it as-is instead of converting to hex.
*/
DKBUF;
nkeys = NUMKEYS(mp);
- fprintf(stderr, "numkeys %d\n", nkeys);
+ fprintf(stderr, "Page %zu numkeys %d\n", mp->mp_pgno, nkeys);
for (i=0; i<nkeys; i++) {
node = NODEPTR(mp, i);
key.mv_size = node->mn_ksize;
key.mv_data = node->mn_data;
nsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t);
- if (F_ISSET(node->mn_flags, F_BIGDATA))
- nsize += sizeof(pgno_t);
- else
- nsize += NODEDSZ(node);
- fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key));
+ if (IS_BRANCH(mp)) {
+ fprintf(stderr, "key %d: page %zu, %s\n", i, NODEPGNO(node),
+ DKEY(&key));
+ } else {
+ if (F_ISSET(node->mn_flags, F_BIGDATA))
+ nsize += sizeof(pgno_t);
+ else
+ nsize += NODEDSZ(node);
+ fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key));
+ }
}
}
return ret;
}
+static void
+mdb_page_free(MDB_env *env, MDB_page *mp)
+{
+ mp->mp_next = env->me_dpages;
+ VGMEMP_FREE(env, mp);
+ env->me_dpages = mp;
+}
+
/** Allocate pages for writing.
* If there are free pages available from older transactions, they
* will be re-used first. Otherwise a new page will be allocated.
int rc;
*mp = NULL;
+
+ /* If our dirty list is already full, we can't do anything */
+ if (txn->mt_u.dirty_list[0].mid >= MDB_IDL_UM_MAX)
+ return MDB_TXN_FULL;
+
/* The free list won't have any content at all until txn 2 has
* committed. The pages freed by txn 2 will be unreferenced
* after txn 3 commits, and so will be safe to re-use in txn 4.
*/
if (txn->mt_txnid > 3) {
-
if (!txn->mt_env->me_pghead &&
txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
/* See if there's anything in the free DB */
txnid_t *kptr;
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
- if (!txn->mt_env->me_pgfirst) {
+ if (!txn->mt_env->me_pglast) {
mdb_page_search(&m2, NULL, 0);
leaf = NODEPTR(m2.mc_pg[m2.mc_top], 0);
kptr = (txnid_t *)NODEKEY(leaf);
last = *kptr;
} else {
MDB_val key;
- int exact;
again:
- exact = 0;
last = txn->mt_env->me_pglast + 1;
leaf = NULL;
key.mv_data = &last;
key.mv_size = sizeof(last);
- rc = mdb_cursor_set(&m2, &key, &data, MDB_SET, &exact);
+ rc = mdb_cursor_set(&m2, &key, &data, MDB_SET_RANGE, NULL);
if (rc)
goto none;
last = *(txnid_t *)key.mv_data;
if (oldest > last) {
/* It's usable, grab it.
*/
- MDB_oldpages *mop;
- pgno_t *idl;
+ pgno_t *idl, *mop;
- if (!txn->mt_env->me_pgfirst) {
+ if (!txn->mt_env->me_pglast) {
mdb_node_read(txn, leaf, &data);
}
- txn->mt_env->me_pglast = last;
- if (!txn->mt_env->me_pgfirst)
- txn->mt_env->me_pgfirst = last;
idl = (MDB_ID *) data.mv_data;
/* We might have a zero-length IDL due to freelist growth
* during a prior commit
*/
- if (!idl[0]) goto again;
- mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
+ if (!idl[0]) {
+ txn->mt_env->me_pglast = last;
+ goto again;
+ }
+ mop = malloc(MDB_IDL_SIZEOF(idl));
if (!mop)
return ENOMEM;
- mop->mo_next = txn->mt_env->me_pghead;
- mop->mo_txnid = last;
- txn->mt_env->me_pghead = mop;
- memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
+ txn->mt_env->me_pglast = last;
+ txn->mt_env->me_pghead = txn->mt_env->me_pgfree = mop;
+ memcpy(mop, idl, MDB_IDL_SIZEOF(idl));
#if MDB_DEBUG > 1
{
unsigned int i;
DPRINTF("IDL read txn %zu root %zu num %zu",
- mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
+ last, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
for (i=0; i<idl[0]; i++) {
DPRINTF("IDL %zu", idl[i+1]);
}
}
none:
if (txn->mt_env->me_pghead) {
- MDB_oldpages *mop = txn->mt_env->me_pghead;
+ pgno_t *mop = txn->mt_env->me_pghead;
if (num > 1) {
MDB_cursor m2;
- int retry = 60, readit = 0, n2 = num-1;
+ int retry = 1, readit = 0, n2 = num-1;
unsigned int i, j, k;
/* If current list is too short, must fetch more and coalesce */
- if (mop->mo_pages[0] < (unsigned)num)
+ if (mop[0] < (unsigned)num)
readit = 1;
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
do {
- /* bail out if we're operating on the freelist.
+ /* If on freelist, don't try to read more. If what we have
+ * right now isn't enough just use new pages.
* TODO: get all of this working. Many circular dependencies...
*/
- if (mc->mc_dbi == FREE_DBI)
- break;
+ if (mc->mc_dbi == FREE_DBI) {
+ retry = 0;
+ readit = 0;
+ }
if (readit) {
MDB_val key, data;
- MDB_oldpages *mop2;
- pgno_t *idl;
- int exact;
+ pgno_t *idl, *mop2;
- last = mop->mo_txnid + 1;
+ last = txn->mt_env->me_pglast + 1;
/* We haven't hit the readers list yet? */
if (!oldest) {
if (oldest - last < 1)
break;
- exact = 0;
key.mv_data = &last;
key.mv_size = sizeof(last);
- rc = mdb_cursor_set(&m2, &key, &data, MDB_SET, &exact);
- if (rc)
+ rc = mdb_cursor_set(&m2,&key,&data,MDB_SET_RANGE,NULL);
+ if (rc) {
+ if (rc == MDB_NOTFOUND)
+ break;
return rc;
+ }
+ last = *(txnid_t*)key.mv_data;
+ if (oldest <= last)
+ break;
idl = (MDB_ID *) data.mv_data;
- mop2 = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - 2*sizeof(pgno_t) + MDB_IDL_SIZEOF(mop->mo_pages));
+ mop2 = malloc(MDB_IDL_SIZEOF(idl) + MDB_IDL_SIZEOF(mop));
if (!mop2)
return ENOMEM;
/* merge in sorted order */
- i = idl[0]; j = mop->mo_pages[0]; mop2->mo_pages[0] = k = i+j;
- mop->mo_pages[0] = P_INVALID;
+ i = idl[0]; j = mop[0]; mop2[0] = k = i+j;
+ mop[0] = P_INVALID;
while (i>0 || j>0) {
- if (i && idl[i] < mop->mo_pages[j])
- mop2->mo_pages[k--] = idl[i--];
+ if (i && idl[i] < mop[j])
+ mop2[k--] = idl[i--];
else
- mop2->mo_pages[k--] = mop->mo_pages[j--];
+ mop2[k--] = mop[j--];
}
txn->mt_env->me_pglast = last;
- mop2->mo_txnid = last;
- mop2->mo_next = mop->mo_next;
- txn->mt_env->me_pghead = mop2;
- free(mop);
+ free(txn->mt_env->me_pgfree);
+ txn->mt_env->me_pghead = txn->mt_env->me_pgfree = mop2;
mop = mop2;
/* Keep trying to read until we have enough */
- if (mop->mo_pages[0] < (unsigned)num) {
+ if (mop[0] < (unsigned)num) {
continue;
}
}
/* current list has enough pages, but are they contiguous? */
- for (i=mop->mo_pages[0]; i>=(unsigned)num; i--) {
- if (mop->mo_pages[i-n2] == mop->mo_pages[i] + n2) {
- pgno = mop->mo_pages[i];
+ for (i=mop[0]; i>=(unsigned)num; i--) {
+ if (mop[i-n2] == mop[i] + n2) {
+ pgno = mop[i];
i -= n2;
/* move any stragglers down */
- for (j=i+num; j<=mop->mo_pages[0]; j++)
- mop->mo_pages[i++] = mop->mo_pages[j];
- mop->mo_pages[0] -= num;
+ for (j=i+num; j<=mop[0]; j++)
+ mop[i++] = mop[j];
+ mop[0] -= num;
break;
}
}
- /* Stop if we succeeded, or no more retries */
+ /* Stop if we succeeded, or no retries */
if (!retry || pgno != P_INVALID)
break;
readit = 1;
- retry--;
} while (1);
} else {
/* peel pages off tail, so we only have to truncate the list */
- pgno = MDB_IDL_LAST(mop->mo_pages);
- mop->mo_pages[0]--;
+ pgno = MDB_IDL_LAST(mop);
+ mop[0]--;
}
- if (MDB_IDL_IS_ZERO(mop->mo_pages)) {
- txn->mt_env->me_pghead = mop->mo_next;
- if (mc->mc_dbi == FREE_DBI) {
- mop->mo_next = txn->mt_env->me_pgfree;
- txn->mt_env->me_pgfree = mop;
- } else {
- free(mop);
- }
+ if (MDB_IDL_IS_ZERO(mop)) {
+ free(txn->mt_env->me_pgfree);
+ txn->mt_env->me_pghead = txn->mt_env->me_pgfree = NULL;
}
}
}
return 0;
}
}
+ if (mc->mc_txn->mt_u.dirty_list[0].mid >= MDB_IDL_UM_MAX)
+ return MDB_TXN_FULL;
/* No - copy it */
np = mdb_page_malloc(mc);
if (!np)
if (txn->mt_numdbs > 2)
memset(txn->mt_dbflags+2, DB_STALE, txn->mt_numdbs-2);
+ if (env->me_maxpg < txn->mt_next_pgno) {
+ mdb_txn_reset0(txn);
+ return MDB_MAP_RESIZED;
+ }
+
return MDB_SUCCESS;
}
if (!(env->me_flags & MDB_ROFS))
txn->mt_u.reader->mr_txnid = (txnid_t)-1;
} else {
- MDB_oldpages *mop;
MDB_page *dp;
unsigned int i;
for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
dp = txn->mt_u.dirty_list[i].mptr;
if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
- dp->mp_next = txn->mt_env->me_dpages;
- VGMEMP_FREE(txn->mt_env, dp);
- txn->mt_env->me_dpages = dp;
+ mdb_page_free(txn->mt_env, dp);
} else {
/* large pages just get freed directly */
VGMEMP_FREE(txn->mt_env, dp);
env->me_free_pgs = txn->mt_free_pgs;
}
- while ((mop = txn->mt_env->me_pghead)) {
- txn->mt_env->me_pghead = mop->mo_next;
- free(mop);
- }
- txn->mt_env->me_pgfirst = 0;
+ free(txn->mt_env->me_pgfree);
+ txn->mt_env->me_pghead = txn->mt_env->me_pgfree = NULL;
txn->mt_env->me_pglast = 0;
env->me_txn = NULL;
MDB_page *dp;
MDB_env *env;
pgno_t next, freecnt;
+ txnid_t oldpg_txnid, id;
MDB_cursor mc;
assert(txn != NULL);
}
if (txn->mt_parent) {
- MDB_db *ip, *jp;
- MDB_dbi i;
+ MDB_txn *parent = txn->mt_parent;
unsigned x, y;
MDB_ID2L dst, src;
+ parent->mt_next_pgno = txn->mt_next_pgno;
+ parent->mt_flags = txn->mt_flags;
+
/* Merge (and close) our cursors with parent's */
mdb_cursor_merge(txn);
- /* Update parent's DB table */
- ip = &txn->mt_parent->mt_dbs[2];
- jp = &txn->mt_dbs[2];
- for (i = 2; i < txn->mt_numdbs; i++) {
- if (ip->md_root != jp->md_root)
- *ip = *jp;
- ip++; jp++;
- }
+ /* Update parent's DB table. */
+ memcpy(parent->mt_dbs, txn->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
+ memcpy(parent->mt_dbflags, txn->mt_dbflags, txn->mt_numdbs);
txn->mt_parent->mt_numdbs = txn->mt_numdbs;
/* Append our free list to parent's */
DPRINTF("committing txn %zu %p on mdbenv %p, root page %zu",
txn->mt_txnid, (void *)txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root);
- /* Update DB root pointers. Their pages have already been
- * touched so this is all in-place and cannot fail.
- */
+ /* Update DB root pointers */
if (txn->mt_numdbs > 2) {
MDB_dbi i;
MDB_val data;
for (i = 2; i < txn->mt_numdbs; i++) {
if (txn->mt_dbflags[i] & DB_DIRTY) {
data.mv_data = &txn->mt_dbs[i];
- mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
+ rc = mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
+ if (rc)
+ goto fail;
}
}
}
+ /* Save the freelist as of this transaction to the freeDB. This
+ * can change the freelist, so keep trying until it stabilizes.
+ *
+ * env->me_pglast and the length of txn->mt_free_pgs cannot decrease,
+ * except the code below can decrease env->me_pglast to split pghead.
+ * Page numbers cannot disappear from txn->mt_free_pgs. New pages
+ * can only appear in env->me_pghead when env->me_pglast increases.
+ * Until then, the me_pghead pointer won't move but can become NULL.
+ */
+
mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
+ oldpg_txnid = id = 0;
+ freecnt = 0;
/* should only be one record now */
- if (env->me_pghead) {
+ if (env->me_pghead || env->me_pglast) {
/* make sure first page of freeDB is touched and on freelist */
rc = mdb_page_search(&mc, NULL, MDB_PS_MODIFY);
if (rc && rc != MDB_NOTFOUND) {
}
/* Delete IDLs we used from the free list */
- if (env->me_pgfirst) {
- txnid_t cur;
+ if (env->me_pglast) {
MDB_val key;
- int exact = 0;
-
- key.mv_size = sizeof(cur);
- for (cur = env->me_pgfirst; cur <= env->me_pglast; cur++) {
- key.mv_data = &cur;
- mdb_cursor_set(&mc, &key, NULL, MDB_SET, &exact);
+ do {
+free_pgfirst:
+ rc = mdb_cursor_first(&mc, &key, NULL);
+ if (rc)
+ goto fail;
+ oldpg_txnid = *(txnid_t *)key.mv_data;
+again:
+ assert(oldpg_txnid <= env->me_pglast);
+ id = 0;
rc = mdb_cursor_del(&mc, 0);
if (rc)
goto fail;
- }
- env->me_pgfirst = 0;
- env->me_pglast = 0;
+ } while (oldpg_txnid < env->me_pglast);
}
- /* save to free list */
+ /* Save IDL of pages freed by this txn, to freeDB */
free2:
- freecnt = txn->mt_free_pgs[0];
- if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
+ if (freecnt != txn->mt_free_pgs[0]) {
MDB_val key, data;
/* make sure last page of freeDB is touched and on freelist */
- key.mv_size = MAXKEYSIZE+1;
+ key.mv_size = MDB_MAXKEYSIZE+1;
key.mv_data = NULL;
rc = mdb_page_search(&mc, &key, MDB_PS_MODIFY);
if (rc && rc != MDB_NOTFOUND)
/* write to last page of freeDB */
key.mv_size = sizeof(pgno_t);
key.mv_data = &txn->mt_txnid;
- data.mv_data = txn->mt_free_pgs;
/* The free list can still grow during this call,
- * despite the pre-emptive touches above. So check
- * and make sure the entire thing got written.
+ * despite the pre-emptive touches above. So retry
+ * until the reserved space remains big enough.
*/
do {
+ assert(freecnt < txn->mt_free_pgs[0]);
freecnt = txn->mt_free_pgs[0];
data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
- mdb_midl_sort(txn->mt_free_pgs);
- rc = mdb_cursor_put(&mc, &key, &data, 0);
+ rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
if (rc)
goto fail;
} while (freecnt != txn->mt_free_pgs[0]);
+ mdb_midl_sort(txn->mt_free_pgs);
+ memcpy(data.mv_data, txn->mt_free_pgs, data.mv_size);
+ if (oldpg_txnid < env->me_pglast || (!env->me_pghead && id))
+ goto free_pgfirst; /* used up freeDB[oldpg_txnid] */
}
- /* should only be one record now */
-again:
+
+ /* Put back page numbers we took from freeDB but did not use */
if (env->me_pghead) {
+ for (;;) {
MDB_val key, data;
- MDB_oldpages *mop;
- pgno_t orig;
- txnid_t id;
+ pgno_t orig, *mop;
mop = env->me_pghead;
- id = mop->mo_txnid;
+ id = env->me_pglast;
key.mv_size = sizeof(id);
key.mv_data = &id;
- data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
- data.mv_data = mop->mo_pages;
- orig = mop->mo_pages[0];
/* These steps may grow the freelist again
* due to freed overflow pages...
*/
- rc = mdb_cursor_put(&mc, &key, &data, 0);
- if (rc)
- goto fail;
- if (mop == env->me_pghead && env->me_pghead->mo_txnid == id) {
- /* could have been used again here */
- if (mop->mo_pages[0] != orig) {
- data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
- data.mv_data = mop->mo_pages;
- id = mop->mo_txnid;
- rc = mdb_cursor_put(&mc, &key, &data, 0);
- if (rc)
- goto fail;
- }
- } else {
- /* was completely used up */
- rc = mdb_cursor_del(&mc, 0);
+ i = 2;
+ do {
+ orig = mop[0];
+ if (orig > env->me_maxfree_1pg && id > 4)
+ orig = env->me_maxfree_1pg; /* Do not use more than 1 page */
+ data.mv_size = (orig + 1) * sizeof(pgno_t);
+ rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
if (rc)
goto fail;
- if (env->me_pghead)
- goto again;
- }
- env->me_pgfirst = 0;
- env->me_pglast = 0;
+ assert(!env->me_pghead || env->me_pglast);
+ /* mop could have been used again here */
+ if (id != env->me_pglast || env->me_pghead == NULL)
+ goto again; /* was completely used up */
+ assert(mop == env->me_pghead);
+ } while (mop[0] < orig && --i);
+ memcpy(data.mv_data, mop, data.mv_size);
+ if (mop[0] <= orig)
+ break;
+ *(pgno_t *)data.mv_data = orig;
+ mop[orig] = mop[0] - orig;
+ env->me_pghead = mop += orig;
+ /* Save more oldpages at the previous txnid. */
+ assert(env->me_pglast == id && id == oldpg_txnid);
+ env->me_pglast = --oldpg_txnid;
+ }
}
/* Check for growth of freelist again */
if (freecnt != txn->mt_free_pgs[0])
goto free2;
- if (env->me_pghead) {
- free(env->me_pghead);
- env->me_pghead = NULL;
- }
-
- while (env->me_pgfree) {
- MDB_oldpages *mop = env->me_pgfree;
- env->me_pgfree = mop->mo_next;
- free(mop);
- }
+ free(env->me_pgfree);
+ env->me_pghead = env->me_pgfree = NULL;
if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
if (mdb_midl_shrink(&txn->mt_free_pgs))
for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
dp = txn->mt_u.dirty_list[i].mptr;
if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
- dp->mp_next = txn->mt_env->me_dpages;
- VGMEMP_FREE(txn->mt_env, dp);
- txn->mt_env->me_dpages = dp;
+ mdb_page_free(txn->mt_env, dp);
} else {
VGMEMP_FREE(txn->mt_env, dp);
free(dp);
}
done:
+ env->me_pglast = 0;
env->me_txn = NULL;
if (txn->mt_numdbs > env->me_numdbs) {
/* update the DB flags */
MDB_pagebuf pbuf;
MDB_page *p;
MDB_meta *m;
- int rc, err;
+ int i, rc, err;
/* We don't know the page size yet, so use a minimum value.
+ * Read both meta pages so we can use the latest one.
*/
+ for (i=0; i<2; i++) {
#ifdef _WIN32
- if (!ReadFile(env->me_fd, &pbuf, MDB_PAGESIZE, (DWORD *)&rc, NULL) || rc == 0)
+ if (!ReadFile(env->me_fd, &pbuf, MDB_PAGESIZE, (DWORD *)&rc, NULL) || rc == 0)
#else
- if ((rc = read(env->me_fd, &pbuf, MDB_PAGESIZE)) == 0)
+ if ((rc = read(env->me_fd, &pbuf, MDB_PAGESIZE)) == 0)
#endif
- {
- return ENOENT;
- }
- else if (rc != MDB_PAGESIZE) {
- err = ErrCode();
- if (rc > 0)
- err = MDB_INVALID;
- DPRINTF("read: %s", strerror(err));
- return err;
- }
+ {
+ return ENOENT;
+ }
+ else if (rc != MDB_PAGESIZE) {
+ err = ErrCode();
+ if (rc > 0)
+ err = MDB_INVALID;
+ DPRINTF("read: %s", strerror(err));
+ return err;
+ }
- p = (MDB_page *)&pbuf;
+ p = (MDB_page *)&pbuf;
- if (!F_ISSET(p->mp_flags, P_META)) {
- DPRINTF("page %zu not a meta page", p->mp_pgno);
- return MDB_INVALID;
- }
+ if (!F_ISSET(p->mp_flags, P_META)) {
+ DPRINTF("page %zu not a meta page", p->mp_pgno);
+ return MDB_INVALID;
+ }
- m = METADATA(p);
- if (m->mm_magic != MDB_MAGIC) {
- DPUTS("meta has invalid magic");
- return MDB_INVALID;
- }
+ m = METADATA(p);
+ if (m->mm_magic != MDB_MAGIC) {
+ DPUTS("meta has invalid magic");
+ return MDB_INVALID;
+ }
- if (m->mm_version != MDB_VERSION) {
- DPRINTF("database is version %u, expected version %u",
- m->mm_version, MDB_VERSION);
- return MDB_VERSION_MISMATCH;
- }
+ if (m->mm_version != MDB_VERSION) {
+ DPRINTF("database is version %u, expected version %u",
+ m->mm_version, MDB_VERSION);
+ return MDB_VERSION_MISMATCH;
+ }
- memcpy(meta, m, sizeof(*m));
+ if (i) {
+ if (m->mm_txnid > meta->mm_txnid)
+ memcpy(meta, m, sizeof(*m));
+ } else {
+ memcpy(meta, m, sizeof(*m));
+#ifdef _WIN32
+ if (SetFilePointer(env->me_fd, meta->mm_psize, NULL, FILE_BEGIN) != meta->mm_psize)
+#else
+ if (lseek(env->me_fd, meta->mm_psize, SEEK_SET) != meta->mm_psize)
+#endif
+ return ErrCode();
+ }
+ }
return 0;
}
meta->mm_magic = MDB_MAGIC;
meta->mm_version = MDB_VERSION;
+ meta->mm_mapsize = env->me_mapsize;
meta->mm_psize = psize;
meta->mm_last_pg = 1;
meta->mm_flags = env->me_flags & 0xffff;
#ifdef _WIN32
{
DWORD len;
+ SetFilePointer(env->me_fd, 0, NULL, FILE_BEGIN);
rc = WriteFile(env->me_fd, p, psize * 2, &len, NULL);
rc = (len == psize * 2) ? MDB_SUCCESS : ErrCode();
}
#else
+ lseek(env->me_fd, 0, SEEK_SET);
rc = write(env->me_fd, p, psize * 2);
rc = (rc == (int)psize * 2) ? MDB_SUCCESS : ErrCode();
#endif
return EBUSY; /* TODO: Make a new MDB_* error code? */
}
env->me_psize = meta.mm_psize;
+ env->me_maxfree_1pg = (env->me_psize - PAGEHDRSZ) / sizeof(pgno_t) - 1;
env->me_maxpg = env->me_mapsize / env->me_psize;
{
MDB_page *p = NULL;
- if (txn->mt_env->me_flags & MDB_WRITEMAP) {
- if (pgno < txn->mt_next_pgno)
- p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
- goto done;
- }
- if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && txn->mt_u.dirty_list[0].mid) {
- unsigned x;
- x = mdb_mid2l_search(txn->mt_u.dirty_list, pgno);
- if (x <= txn->mt_u.dirty_list[0].mid && txn->mt_u.dirty_list[x].mid == pgno) {
- p = txn->mt_u.dirty_list[x].mptr;
- }
- }
- if (!p) {
- if (pgno < txn->mt_next_pgno)
- p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
+ if (!((txn->mt_flags & MDB_TXN_RDONLY) |
+ (txn->mt_env->me_flags & MDB_WRITEMAP)))
+ {
+ MDB_txn *tx2 = txn;
+ do {
+ MDB_ID2L dl = tx2->mt_u.dirty_list;
+ if (dl[0].mid) {
+ unsigned x = mdb_mid2l_search(dl, pgno);
+ if (x <= dl[0].mid && dl[x].mid == pgno) {
+ p = dl[x].mptr;
+ goto done;
+ }
+ }
+ } while ((tx2 = tx2->mt_parent) != NULL);
}
-done:
- *ret = p;
- if (!p) {
+
+ if (pgno < txn->mt_next_pgno) {
+ p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
+ } else {
DPRINTF("page %zu not found", pgno);
assert(p != NULL);
}
+
+done:
+ *ret = p;
return (p != NULL) ? MDB_SUCCESS : MDB_PAGE_NOTFOUND;
}
if (key == NULL) /* Initialize cursor to first page. */
i = 0;
- else if (key->mv_size > MAXKEYSIZE && key->mv_data == NULL) {
+ else if (key->mv_size > MDB_MAXKEYSIZE && key->mv_data == NULL) {
/* cursor to last page */
i = NUMKEYS(mp)-1;
} else {
if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
return EINVAL;
- if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
+ if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) {
return EINVAL;
}
if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
MDB_val lkey;
- lkey.mv_size = MAXKEYSIZE+1;
+ lkey.mv_size = MDB_MAXKEYSIZE+1;
lkey.mv_data = NULL;
rc = mdb_page_search(mc, &lkey, 0);
if (rc != MDB_SUCCESS)
switch (op) {
case MDB_GET_CURRENT:
- if (!mc->mc_flags & C_INITIALIZED) {
+ if (!(mc->mc_flags & C_INITIALIZED)) {
rc = EINVAL;
} else {
MDB_page *mp = mc->mc_pg[mc->mc_top];
case MDB_SET:
case MDB_SET_KEY:
case MDB_SET_RANGE:
- if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
+ if (key == NULL || key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) {
rc = EINVAL;
} else if (op == MDB_SET_RANGE)
rc = mdb_cursor_set(mc, key, data, op, NULL);
size_t nsize;
int rc, rc2;
MDB_pagebuf pbuf;
- char dbuf[MAXKEYSIZE+1];
+ char dbuf[MDB_MAXKEYSIZE+1];
unsigned int nflags;
DKBUF;
if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
return EACCES;
+ if (flags != MDB_CURRENT && (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE))
+ return EINVAL;
+
+ if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT) && data->mv_size > MDB_MAXKEYSIZE)
+ return EINVAL;
+
+#if SIZE_MAX > MAXDATASIZE
+ if (data->mv_size > MAXDATASIZE)
+ return EINVAL;
+#endif
+
DPRINTF("==> put db %u key [%s], size %zu, data size %zu",
mc->mc_dbi, DKEY(key), key ? key->mv_size:0, data->mv_size);
*/
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = NODEDATA(leaf);
- else
+ else if (data->mv_size)
memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
+ else
+ memcpy(NODEKEY(leaf), key->mv_data, key->mv_size);
goto done;
}
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
}
}
done:
+ /* If we succeeded and the key didn't exist before, make sure
+ * the cursor is marked valid.
+ */
+ if (!rc && insert)
+ mc->mc_flags |= C_INITIALIZED;
return rc;
}
if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
return EACCES;
- if (!mc->mc_flags & C_INITIALIZED)
+ if (!(mc->mc_flags & C_INITIALIZED))
return EINVAL;
rc = mdb_cursor_touch(mc);
* @return 0 on success, non-zero on failure.
*/
static int
-mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
+mdb_update_key(MDB_cursor *mc, MDB_val *key)
{
+ MDB_page *mp;
MDB_node *node;
char *base;
size_t len;
int delta, delta0;
- indx_t ptr, i, numkeys;
+ indx_t ptr, i, numkeys, indx;
DKBUF;
+ indx = mc->mc_ki[mc->mc_top];
+ mp = mc->mc_pg[mc->mc_top];
node = NODEPTR(mp, indx);
ptr = mp->mp_ptrs[indx];
#if MDB_DEBUG
{
MDB_val k2;
- char kbuf2[(MAXKEYSIZE*2+1)];
+ char kbuf2[(MDB_MAXKEYSIZE*2+1)];
k2.mv_data = NODEKEY(node);
k2.mv_size = node->mn_ksize;
DPRINTF("update key %u (ofs %u) [%s] to [%s] on page %zu",
delta += (delta & 1);
if (delta) {
if (delta > 0 && SIZELEFT(mp) < delta) {
- DPRINTF("OUCH! Not enough room, delta = %d", delta);
- return MDB_PAGE_FULL;
+ pgno_t pgno;
+ /* not enough space left, do a delete and split */
+ DPRINTF("Not enough room, delta = %d, splitting...", delta);
+ pgno = NODEPGNO(node);
+ mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+ return mdb_page_split(mc, key, NULL, pgno, MDB_SPLIT_REPLACE);
}
numkeys = NUMKEYS(mp);
return MDB_SUCCESS;
}
+static void
+mdb_cursor_copy(const MDB_cursor *csrc, MDB_cursor *cdst);
+
/** Move a node from csrc to cdst.
*/
static int
mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
{
- int rc;
MDB_node *srcnode;
MDB_val key, data;
pgno_t srcpg;
+ MDB_cursor mn;
+ int rc;
unsigned short flags;
DKBUF;
}
cdst->mc_snum = snum--;
cdst->mc_top = snum;
- rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &bkey);
+ mdb_cursor_copy(cdst, &mn);
+ mn.mc_ki[snum] = 0;
+ rc = mdb_update_key(&mn, &bkey);
+ if (rc)
+ return rc;
}
DPRINTF("moving %s node %u [%s] on page %zu to node %u on page %zu",
}
DPRINTF("update separator for source page %zu to [%s]",
csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key));
- if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1],
- &key)) != MDB_SUCCESS)
+ mdb_cursor_copy(csrc, &mn);
+ mn.mc_snum--;
+ mn.mc_top--;
+ if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS)
return rc;
}
if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
MDB_val nullkey;
+ indx_t ix = csrc->mc_ki[csrc->mc_top];
nullkey.mv_size = 0;
- rc = mdb_update_key(csrc->mc_pg[csrc->mc_top], 0, &nullkey);
+ csrc->mc_ki[csrc->mc_top] = 0;
+ rc = mdb_update_key(csrc, &nullkey);
+ csrc->mc_ki[csrc->mc_top] = ix;
assert(rc == MDB_SUCCESS);
}
}
}
DPRINTF("update separator for destination page %zu to [%s]",
cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key));
- if ((rc = mdb_update_key(cdst->mc_pg[cdst->mc_top-1], cdst->mc_ki[cdst->mc_top-1],
- &key)) != MDB_SUCCESS)
+ mdb_cursor_copy(cdst, &mn);
+ mn.mc_snum--;
+ mn.mc_top--;
+ if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS)
return rc;
}
if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) {
MDB_val nullkey;
+ indx_t ix = cdst->mc_ki[cdst->mc_top];
nullkey.mv_size = 0;
- rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &nullkey);
+ cdst->mc_ki[cdst->mc_top] = 0;
+ rc = mdb_update_key(cdst, &nullkey);
+ cdst->mc_ki[cdst->mc_top] = ix;
assert(rc == MDB_SUCCESS);
}
}
mdb_node_del(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1], 0);
if (csrc->mc_ki[csrc->mc_top-1] == 0) {
key.mv_size = 0;
- if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], 0, &key)) != MDB_SUCCESS)
+ csrc->mc_top--;
+ rc = mdb_update_key(csrc, &key);
+ csrc->mc_top++;
+ if (rc)
return rc;
}
*/
if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2)
return mdb_node_move(&mn, mc);
- else { /* FIXME: if (has_enough_room()) */
- mc->mc_flags &= ~C_INITIALIZED;
+ else {
if (mc->mc_ki[ptop] == 0)
- return mdb_page_merge(&mn, mc);
+ rc = mdb_page_merge(&mn, mc);
else
- return mdb_page_merge(mc, &mn);
+ rc = mdb_page_merge(mc, &mn);
+ mc->mc_flags &= ~C_INITIALIZED;
}
+ return rc;
}
/** Complete a delete operation started by #mdb_cursor_del(). */
return EACCES;
}
- if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
+ if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) {
return EINVAL;
}
xdata = NULL;
}
rc = mdb_cursor_set(&mc, key, xdata, op, &exact);
- if (rc == 0)
+ if (rc == 0) {
+ /* let mdb_page_split know about this cursor if needed:
+ * delete will trigger a rebalance; if it needs to move
+ * a node from one page to another, it will have to
+ * update the parent's separator key(s). If the new sepkey
+ * is larger than the current one, the parent page may
+ * run out of space, triggering a split. We need this
+ * cursor to be consistent until the end of the rebalance.
+ */
+ mc.mc_next = txn->mt_cursors[dbi];
+ txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_del(&mc, data ? 0 : MDB_NODUPDATA);
+ txn->mt_cursors[dbi] = mc.mc_next;
+ }
return rc;
}
}
/* return tmp page to freelist */
- copy->mp_next = mc->mc_txn->mt_env->me_dpages;
- VGMEMP_FREE(mc->mc_txn->mt_env, copy);
- mc->mc_txn->mt_env->me_dpages = copy;
+ mdb_page_free(mc->mc_txn->mt_env, copy);
done:
{
/* Adjust other cursors pointing to mp */
return EACCES;
}
- if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
+ if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) {
return EINVAL;
}
/* Can't delete the main DB */
if (del && dbi > MAIN_DBI) {
rc = mdb_del(txn, MAIN_DBI, &mc->mc_dbx->md_name, NULL);
- if (!rc)
+ if (!rc) {
+ txn->mt_dbflags[dbi] = DB_STALE;
mdb_dbi_close(txn->mt_env, dbi);
+ }
} else {
/* reset the DB record, mark it dirty */
txn->mt_dbflags[dbi] |= DB_DIRTY;