#include <unistd.h>
#if !(defined(BYTE_ORDER) || defined(__BYTE_ORDER))
+#include <netinet/in.h>
#include <resolv.h> /* defines BYTE_ORDER on HPUX and Solaris */
#endif
#define BIG_ENDIAN __BIG_ENDIAN
#endif
-#if defined(__i386) || defined(__x86_64)
+#if defined(__i386) || defined(__x86_64) || defined(_M_IX86)
#define MISALIGNED_OK 1
#endif
*/
#define ErrCode() errno
- /** An abstraction for a file handle.
- * On POSIX systems file handles are small integers. On Windows
- * they're opaque pointers.
- */
-#define HANDLE int
-
/** A value for an invalid file handle.
* Mainly used to initialize file variables and signify that they are
* unused.
* slot's address is saved in thread-specific data so that subsequent read
* transactions started by the same thread need no further locking to proceed.
*
+ * If #MDB_NOTLS is set, the slot address is not saved in thread-specific data.
+ *
* No reader table is used if the database is on a read-only filesystem.
*
* Since the database uses multi-version concurrency control, readers don't
} MDB_db;
/** mdb_dbi_open flags */
-#define PERSISTENT_FLAGS 0x7fff
+#define MDB_VALID 0x8000 /**< DB handle is valid, for me_dbflags */
+#define PERSISTENT_FLAGS (0xffff & ~(MDB_VALID))
#define VALID_FLAGS (MDB_REVERSEKEY|MDB_DUPSORT|MDB_INTEGERKEY|MDB_DUPFIXED|\
MDB_INTEGERDUP|MDB_REVERSEDUP|MDB_CREATE)
#define DB_DIRTY 0x01 /**< DB was written in this txn */
#define DB_STALE 0x02 /**< DB record is older than txnID */
#define DB_NEW 0x04 /**< DB handle opened in this txn */
-#define DB_VALID 0x08 /**< DB handle is valid */
-#define MDB_VALID 0x8000 /**< DB handle is valid, for me_dbflags */
+#define DB_VALID 0x08 /**< DB handle is valid, see also #MDB_VALID */
/** @} */
/** In write txns, array of cursors for each DB */
MDB_cursor **mt_cursors;
#define MDB_TXN_DIRTY 0x04 /**< must write, even if dirty list is empty */
/** @} */
unsigned int mt_flags; /**< @ref mdb_txn */
- /** dirty_list maxsize - #allocated pages including in parent txns */
+ /** dirty_list maxsize - # of allocated pages allowed, including in parent txns */
unsigned int mt_dirty_room;
/** Tracks which of the two meta pages was used at the start
* of this transaction.
#define C_SHADOW 0x08 /**< Cursor is a dup from a parent txn */
#define C_ALLOCD 0x10 /**< Cursor was malloc'd */
#define C_SPLITTING 0x20 /**< Cursor is in page_split */
+#define C_UNTRACK 0x40 /**< Un-track cursor when closing */
/** @} */
unsigned int mc_flags; /**< @ref mdb_cursor */
MDB_page *mc_pg[CURSOR_STACK]; /**< stack of pushed pages */
HANDLE me_mfd; /**< just for writing the meta pages */
/** Failed to update the meta page. Probably an I/O error. */
#define MDB_FATAL_ERROR 0x80000000U
- /** Read-only Filesystem. Allow read access, no locking. */
-#define MDB_ROFS 0x40000000U
/** Some fields are initialized. */
#define MDB_ENV_ACTIVE 0x20000000U
+ /** me_txkey is set */
+#define MDB_ENV_TXKEY 0x10000000U
uint32_t me_flags; /**< @ref mdb_env */
unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */
unsigned int me_maxreaders; /**< size of the reader table */
MDB_page *me_dpages; /**< list of malloc'd blocks for re-use */
/** IDL of pages that became unused in a write txn */
MDB_IDL me_free_pgs;
- /** ID2L of pages that were written during a write txn */
- MDB_ID2 me_dirty_list[MDB_IDL_UM_SIZE];
+ /** ID2L of pages written during a write txn. Length MDB_IDL_UM_SIZE. */
+ MDB_ID2L me_dirty_list;
/** Max number of freelist items that can fit in a single overflow page */
unsigned int me_maxfree_1pg;
/** Max size of a node on a page */
static int mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp);
static int mdb_page_touch(MDB_cursor *mc);
-static int mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **mp);
+static int mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **mp, int *lvl);
static int mdb_page_search_root(MDB_cursor *mc,
MDB_val *key, int modify);
#define MDB_PS_MODIFY 1
static int mdb_env_read_header(MDB_env *env, MDB_meta *meta);
static int mdb_env_pick_meta(const MDB_env *env);
static int mdb_env_write_meta(MDB_txn *txn);
+#if !(defined(_WIN32) || defined(MDB_USE_POSIX_SEM)) /* Drop unused excl arg */
+# define mdb_env_close0(env, excl) mdb_env_close1(env)
+#endif
static void mdb_env_close0(MDB_env *env, int excl);
static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp);
"MDB_PAGE_FULL: Internal error - page has no more space",
"MDB_MAP_RESIZED: Database contents grew beyond environment mapsize",
"MDB_INCOMPATIBLE: Database flags changed or would change",
+ "MDB_BAD_RSLOT: Invalid reuse of reader locktable slot",
};
char *
}
/** Display all the keys in the page. */
-static void
+void
mdb_page_list(MDB_page *mp)
{
MDB_node *node;
count = 0;
for (i = 0; i<txn->mt_numdbs; i++) {
- MDB_xcursor mx, *mxp;
- mxp = (txn->mt_dbs[i].md_flags & MDB_DUPSORT) ? &mx : NULL;
- mdb_cursor_init(&mc, txn, i, mxp);
+ MDB_xcursor mx;
+ mdb_cursor_init(&mc, txn, i, &mx);
if (txn->mt_dbs[i].md_root == P_INVALID)
continue;
count += txn->mt_dbs[i].md_branch_pages +
int
mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
{
- if (txn->mt_dbxs[dbi].md_dcmp)
- return txn->mt_dbxs[dbi].md_dcmp(a, b);
- else
- return EINVAL; /* too bad you can't distinguish this from a valid result */
+ return txn->mt_dbxs[dbi].md_dcmp(a, b);
}
-/** Allocate a single page.
- * Re-use old malloc'd pages first, otherwise just malloc.
+/** Allocate a page.
+ * Re-use old malloc'd pages first for singletons, otherwise just malloc.
*/
static MDB_page *
-mdb_page_malloc(MDB_cursor *mc) {
- MDB_page *ret;
- size_t sz = mc->mc_txn->mt_env->me_psize;
- if ((ret = mc->mc_txn->mt_env->me_dpages) != NULL) {
- VGMEMP_ALLOC(mc->mc_txn->mt_env, ret, sz);
- VGMEMP_DEFINED(ret, sizeof(ret->mp_next));
- mc->mc_txn->mt_env->me_dpages = ret->mp_next;
- } else if ((ret = malloc(sz)) != NULL) {
- VGMEMP_ALLOC(mc->mc_txn->mt_env, ret, sz);
+mdb_page_malloc(MDB_cursor *mc, unsigned num)
+{
+ MDB_env *env = mc->mc_txn->mt_env;
+ MDB_page *ret = env->me_dpages;
+ size_t sz = env->me_psize;
+ if (num == 1) {
+ if (ret) {
+ VGMEMP_ALLOC(env, ret, sz);
+ VGMEMP_DEFINED(ret, sizeof(ret->mp_next));
+ env->me_dpages = ret->mp_next;
+ return ret;
+ }
+ } else {
+ sz *= num;
+ }
+ if ((ret = malloc(sz)) != NULL) {
+ VGMEMP_ALLOC(env, ret, sz);
}
return ret;
}
+/** Free a single page.
+ * Saves single pages to a list, for future reuse.
+ * (This is not used for multi-page overflow pages.)
+ */
static void
mdb_page_free(MDB_env *env, MDB_page *mp)
{
env->me_dpages = mp;
}
+/* Return all dirty pages to dpage list */
+static void
+mdb_dlist_free(MDB_txn *txn)
+{
+ MDB_env *env = txn->mt_env;
+ MDB_ID2L dl = txn->mt_u.dirty_list;
+ unsigned i, n = dl[0].mid;
+
+ for (i = 1; i <= n; i++) {
+ MDB_page *dp = dl[i].mptr;
+ if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
+ mdb_page_free(env, dp);
+ } else {
+ /* large pages just get freed directly */
+ VGMEMP_FREE(env, dp);
+ free(dp);
+ }
+ }
+ dl[0].mid = 0;
+}
+
+/** Find oldest txnid still referenced. Expects txn->mt_txnid > 0. */
+static txnid_t
+mdb_find_oldest(MDB_txn *txn)
+{
+ int i;
+ txnid_t mr, oldest = txn->mt_txnid - 1;
+ MDB_reader *r = txn->mt_env->me_txns->mti_readers;
+ for (i = txn->mt_env->me_txns->mti_numreaders; --i >= 0; ) {
+ if (r[i].mr_pid) {
+ mr = r[i].mr_txnid;
+ if (oldest > mr)
+ oldest = mr;
+ }
+ }
+ return oldest;
+}
+
/** Allocate pages for writing.
* If there are free pages available from older transactions, they
* will be re-used first. Otherwise a new page will be allocated.
if (!txn->mt_env->me_pghead &&
txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
/* See if there's anything in the free DB */
- MDB_reader *r;
MDB_cursor m2;
MDB_node *leaf;
MDB_val data;
last = *(txnid_t *)key.mv_data;
}
- {
- unsigned int i, nr;
- txnid_t mr;
- oldest = txn->mt_txnid - 1;
- nr = txn->mt_env->me_txns->mti_numreaders;
- r = txn->mt_env->me_txns->mti_readers;
- for (i=0; i<nr; i++) {
- if (!r[i].mr_pid) continue;
- mr = r[i].mr_txnid;
- if (mr < oldest)
- oldest = mr;
- }
- }
+ if (!oldest)
+ oldest = mdb_find_oldest(txn);
if (oldest > last) {
/* It's usable, grab it.
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
do {
+#ifdef MDB_PARANOID /* Seems like we can ignore this now */
/* If on freelist, don't try to read more. If what we have
* right now isn't enough just use new pages.
* TODO: get all of this working. Many circular dependencies...
retry = 0;
readit = 0;
}
+#endif
if (readit) {
MDB_val key, data;
pgno_t *idl, *mop2;
/* We haven't hit the readers list yet? */
if (!oldest) {
- MDB_reader *r;
- unsigned int nr;
- txnid_t mr;
-
- oldest = txn->mt_txnid - 1;
- nr = txn->mt_env->me_txns->mti_numreaders;
- r = txn->mt_env->me_txns->mti_readers;
- for (i=0; i<nr; i++) {
- if (!r[i].mr_pid) continue;
- mr = r[i].mr_txnid;
- if (mr < oldest)
- oldest = mr;
- }
+ oldest = mdb_find_oldest(txn);
}
/* There's nothing we can use on the freelist */
np = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
np->mp_pgno = pgno;
} else {
- if (txn->mt_env->me_dpages && num == 1) {
- np = txn->mt_env->me_dpages;
- VGMEMP_ALLOC(txn->mt_env, np, txn->mt_env->me_psize);
- VGMEMP_DEFINED(np, sizeof(np->mp_next));
- txn->mt_env->me_dpages = np->mp_next;
- } else {
- size_t sz = txn->mt_env->me_psize * num;
- if ((np = malloc(sz)) == NULL)
- return ENOMEM;
- VGMEMP_ALLOC(txn->mt_env, np, sz);
- }
+ if (!(np = mdb_page_malloc(mc, num)))
+ return ENOMEM;
if (pgno == P_INVALID) {
np->mp_pgno = txn->mt_next_pgno;
txn->mt_next_pgno += num;
/** Copy a page: avoid copying unused portions of the page.
* @param[in] dst page to copy into
* @param[in] src page to copy from
+ * @param[in] psize size of a page
*/
static void
mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize)
if (m2 == mc || m2->mc_snum < mc->mc_snum) continue;
if (m2->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top]) {
m2->mc_pg[mc->mc_top] = mp;
+ if (mc->mc_db->md_flags & MDB_DUPSORT)
+ m2->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
}
}
}
SETPGNO(NODEPTR(mc->mc_pg[mc->mc_top-1], mc->mc_ki[mc->mc_top-1]), mp->mp_pgno);
else
mc->mc_db->md_root = mp->mp_pgno;
- } else if (mc->mc_txn->mt_parent) {
+ } else if (mc->mc_txn->mt_parent && !(mp->mp_flags & P_SUBP)) {
MDB_page *np;
- MDB_ID2 mid;
+ MDB_ID2 mid, *dl = mc->mc_txn->mt_u.dirty_list;
/* If txn has a parent, make sure the page is in our
* dirty list.
*/
- if (mc->mc_txn->mt_u.dirty_list[0].mid) {
- unsigned x = mdb_mid2l_search(mc->mc_txn->mt_u.dirty_list, mp->mp_pgno);
- if (x <= mc->mc_txn->mt_u.dirty_list[0].mid &&
- mc->mc_txn->mt_u.dirty_list[x].mid == mp->mp_pgno) {
- if (mc->mc_txn->mt_u.dirty_list[x].mptr != mp) {
- mp = mc->mc_txn->mt_u.dirty_list[x].mptr;
- mc->mc_pg[mc->mc_top] = mp;
- }
+ if (dl[0].mid) {
+ unsigned x = mdb_mid2l_search(dl, mp->mp_pgno);
+ if (x <= dl[0].mid && dl[x].mid == mp->mp_pgno) {
+ np = dl[x].mptr;
+ if (mp != np)
+ mc->mc_pg[mc->mc_top] = np;
return 0;
}
}
- assert(mc->mc_txn->mt_u.dirty_list[0].mid < MDB_IDL_UM_MAX);
+ assert(dl[0].mid < MDB_IDL_UM_MAX);
/* No - copy it */
- np = mdb_page_malloc(mc);
+ np = mdb_page_malloc(mc, 1);
if (!np)
return ENOMEM;
memcpy(np, mp, mc->mc_txn->mt_env->me_psize);
mid.mid = np->mp_pgno;
mid.mptr = np;
- mdb_mid2l_insert(mc->mc_txn->mt_u.dirty_list, &mid);
+ mdb_mid2l_insert(dl, &mid);
mp = np;
goto finish;
}
mc->mc_dbflag = &dst->mt_dbflags[i];
mc->mc_snum = m2->mc_snum;
mc->mc_top = m2->mc_top;
- mc->mc_flags = m2->mc_flags | C_SHADOW;
+ mc->mc_flags = m2->mc_flags | (C_SHADOW|C_ALLOCD);
for (j=0; j<mc->mc_snum; j++) {
mc->mc_pg[j] = m2->mc_pg[j];
mc->mc_ki[j] = m2->mc_ki[j];
return MDB_SUCCESS;
}
-/** Merge shadow cursors back into parent's */
+/** Close this write txn's cursors, after optionally merging its shadow
+ * cursors back into parent's.
+ * @param[in] txn the transaction handle.
+ * @param[in] merge 0 to not merge cursors, C_SHADOW to merge.
+ * @return 0 on success, non-zero on failure.
+ */
static void
-mdb_cursor_merge(MDB_txn *txn)
+mdb_cursors_close(MDB_txn *txn, unsigned merge)
{
- MDB_dbi i;
- for (i=0; i<txn->mt_numdbs; i++) {
- if (txn->mt_cursors[i]) {
- MDB_cursor *mc;
- while ((mc = txn->mt_cursors[i])) {
- txn->mt_cursors[i] = mc->mc_next;
- if (mc->mc_flags & C_SHADOW) {
+ MDB_cursor **cursors = txn->mt_cursors, *mc, *next;
+ int i, j;
+
+ for (i = txn->mt_numdbs; --i >= 0; ) {
+ for (mc = cursors[i]; mc; mc = next) {
+ next = mc->mc_next;
+ if (mc->mc_flags & merge) {
MDB_cursor *m2 = mc->mc_orig;
- unsigned int j;
m2->mc_snum = mc->mc_snum;
m2->mc_top = mc->mc_top;
- for (j=0; j<mc->mc_snum; j++) {
+ for (j = mc->mc_snum; --j >= 0; ) {
m2->mc_pg[j] = mc->mc_pg[j];
m2->mc_ki[j] = mc->mc_ki[j];
}
}
if (mc->mc_flags & C_ALLOCD)
free(mc);
- }
}
+ cursors[i] = NULL;
}
}
/** Common code for #mdb_txn_begin() and #mdb_txn_renew().
* @param[in] txn the transaction handle to initialize
- * @return 0 on success, non-zero on failure. This can only
- * fail for read-only transactions, and then only if the
- * reader table is full.
+ * @return 0 on success, non-zero on failure.
*/
static int
mdb_txn_renew0(MDB_txn *txn)
MDB_env *env = txn->mt_env;
unsigned int i;
uint16_t x;
- int rc;
+ int rc, new_notls = 0;
/* Setup db info */
txn->mt_numdbs = env->me_numdbs;
txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
if (txn->mt_flags & MDB_TXN_RDONLY) {
- if (env->me_flags & MDB_ROFS) {
+ if (!env->me_txns) {
i = mdb_env_pick_meta(env);
txn->mt_txnid = env->me_metas[i]->mm_txnid;
txn->mt_u.reader = NULL;
} else {
- MDB_reader *r = pthread_getspecific(env->me_txkey);
- if (!r) {
+ MDB_reader *r = (env->me_flags & MDB_NOTLS) ? txn->mt_u.reader :
+ pthread_getspecific(env->me_txkey);
+ if (r) {
+ if (r->mr_pid != env->me_pid || r->mr_txnid != (txnid_t)-1)
+ return MDB_BAD_RSLOT;
+ } else {
pid_t pid = env->me_pid;
pthread_t tid = pthread_self();
env->me_numreaders = env->me_txns->mti_numreaders;
UNLOCK_MUTEX_R(env);
r = &env->me_txns->mti_readers[i];
- if ((rc = pthread_setspecific(env->me_txkey, r)) != 0) {
- env->me_txns->mti_readers[i].mr_pid = 0;
+ new_notls = (env->me_flags & MDB_NOTLS);
+ if (!new_notls && (rc=pthread_setspecific(env->me_txkey, r))) {
+ r->mr_pid = 0;
return rc;
}
}
/* Copy the DB info and flags */
memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db));
for (i=2; i<txn->mt_numdbs; i++) {
- txn->mt_dbs[i].md_flags = x = env->me_dbflags[i];
+ x = env->me_dbflags[i];
+ txn->mt_dbs[i].md_flags = x & PERSISTENT_FLAGS;
txn->mt_dbflags[i] = (x & MDB_VALID) ? DB_VALID|DB_STALE : 0;
}
txn->mt_dbflags[0] = txn->mt_dbflags[1] = DB_VALID;
if (env->me_maxpg < txn->mt_next_pgno) {
mdb_txn_reset0(txn);
+ if (new_notls) {
+ txn->mt_u.reader->mr_pid = 0;
+ txn->mt_u.reader = NULL;
+ }
return MDB_MAP_RESIZED;
}
{
int rc;
- if (! (txn && (txn->mt_flags & MDB_TXN_RDONLY)))
+ if (!txn || txn->mt_dbxs) /* A reset txn has mt_dbxs==NULL */
return EINVAL;
if (txn->mt_env->me_flags & MDB_FATAL_ERROR) {
if (parent) {
unsigned int i;
- txn->mt_free_pgs = mdb_midl_alloc();
- if (!txn->mt_free_pgs) {
- free(txn);
- return ENOMEM;
- }
txn->mt_u.dirty_list = malloc(sizeof(MDB_ID2)*MDB_IDL_UM_SIZE);
- if (!txn->mt_u.dirty_list) {
- free(txn->mt_free_pgs);
+ if (!txn->mt_u.dirty_list ||
+ !(txn->mt_free_pgs = mdb_midl_alloc(MDB_IDL_UM_MAX)))
+ {
+ free(txn->mt_u.dirty_list);
free(txn);
return ENOMEM;
}
return rc;
}
+/** Export or close DBI handles opened in this txn. */
+static void
+mdb_dbis_update(MDB_txn *txn, int keep)
+{
+ int i;
+ MDB_dbi n = txn->mt_numdbs;
+ MDB_env *env = txn->mt_env;
+ unsigned char *tdbflags = txn->mt_dbflags;
+
+ for (i = n; --i >= 2;) {
+ if (tdbflags[i] & DB_NEW) {
+ if (keep) {
+ env->me_dbflags[i] = txn->mt_dbs[i].md_flags | MDB_VALID;
+ } else {
+ char *ptr = env->me_dbxs[i].md_name.mv_data;
+ env->me_dbxs[i].md_name.mv_data = NULL;
+ env->me_dbxs[i].md_name.mv_size = 0;
+ env->me_dbflags[i] = 0;
+ free(ptr);
+ }
+ }
+ }
+ if (keep && env->me_numdbs < n)
+ env->me_numdbs = n;
+}
+
/** Common code for #mdb_txn_reset() and #mdb_txn_abort().
+ * May be called twice for readonly txns: First reset it, then abort.
* @param[in] txn the transaction handle to reset
*/
static void
mdb_txn_reset0(MDB_txn *txn)
{
MDB_env *env = txn->mt_env;
- unsigned int i;
/* Close any DBI handles opened in this txn */
- for (i=2; i<txn->mt_numdbs; i++) {
- if (txn->mt_dbflags[i] & DB_NEW) {
- char *ptr = env->me_dbxs[i].md_name.mv_data;
- env->me_dbxs[i].md_name.mv_data = NULL;
- env->me_dbxs[i].md_name.mv_size = 0;
- free(ptr);
- }
- }
+ mdb_dbis_update(txn, 0);
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
- if (!(env->me_flags & MDB_ROFS))
+ if (txn->mt_u.reader) {
txn->mt_u.reader->mr_txnid = (txnid_t)-1;
- } else {
- MDB_page *dp;
-
- /* close(free) all cursors */
- for (i=0; i<txn->mt_numdbs; i++) {
- if (txn->mt_cursors[i]) {
- MDB_cursor *mc;
- while ((mc = txn->mt_cursors[i])) {
- txn->mt_cursors[i] = mc->mc_next;
- if (mc->mc_flags & C_ALLOCD)
- free(mc);
- }
- }
+ if (!(env->me_flags & MDB_NOTLS))
+ txn->mt_u.reader = NULL; /* txn does not own reader */
}
+ txn->mt_numdbs = 0; /* close nothing if called again */
+ txn->mt_dbxs = NULL; /* mark txn as reset */
+ } else {
+ mdb_cursors_close(txn, 0);
if (!(env->me_flags & MDB_WRITEMAP)) {
- /* return all dirty pages to dpage list */
- for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
- dp = txn->mt_u.dirty_list[i].mptr;
- if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
- mdb_page_free(txn->mt_env, dp);
- } else {
- /* large pages just get freed directly */
- VGMEMP_FREE(txn->mt_env, dp);
- free(dp);
- }
- }
+ mdb_dlist_free(txn);
}
-
free(env->me_pgfree);
if (txn->mt_parent) {
txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
(void *) txn, (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
+ /* This call is only valid for read-only txns */
+ if (!(txn->mt_flags & MDB_TXN_RDONLY))
+ return;
+
mdb_txn_reset0(txn);
}
mdb_txn_abort(txn->mt_child);
mdb_txn_reset0(txn);
+ /* Free reader slot tied to this txn (if MDB_NOTLS && writable FS) */
+ if ((txn->mt_flags & MDB_TXN_RDONLY) && txn->mt_u.reader)
+ txn->mt_u.reader->mr_pid = 0;
+
free(txn);
}
assert(txn->mt_env != NULL);
if (txn->mt_child) {
- mdb_txn_commit(txn->mt_child);
+ rc = mdb_txn_commit(txn->mt_child);
txn->mt_child = NULL;
+ if (rc) {
+ mdb_txn_abort(txn);
+ return rc;
+ }
}
env = txn->mt_env;
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
- /* update the DB flags */
- for (i = 2; i<txn->mt_numdbs; i++) {
- if (txn->mt_dbflags[i] & DB_NEW)
- env->me_dbflags[i] = txn->mt_dbs[i].md_flags | MDB_VALID;
- }
- if (txn->mt_numdbs > env->me_numdbs)
- env->me_numdbs = txn->mt_numdbs;
+ mdb_dbis_update(txn, 1);
txn->mt_numdbs = 2; /* so txn_abort() doesn't close any new handles */
mdb_txn_abort(txn);
return MDB_SUCCESS;
parent->mt_next_pgno = txn->mt_next_pgno;
parent->mt_flags = txn->mt_flags;
- /* Merge (and close) our cursors with parent's */
- mdb_cursor_merge(txn);
+ /* Merge our cursors into parent's and close them */
+ mdb_cursors_close(txn, C_SHADOW);
/* Update parent's DB table. */
memcpy(parent->mt_dbs, txn->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
return EINVAL;
}
+ mdb_cursors_close(txn, 0);
+
if (!txn->mt_u.dirty_list[0].mid && !(txn->mt_flags & MDB_TXN_DIRTY))
goto done;
dp = txn->mt_u.dirty_list[i].mptr;
/* clear dirty flag */
dp->mp_flags &= ~P_DIRTY;
- txn->mt_u.dirty_list[i].mid = 0;
}
txn->mt_u.dirty_list[0].mid = 0;
goto sync;
#endif
} while (!done);
- /* Drop the dirty pages.
- */
- for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
- dp = txn->mt_u.dirty_list[i].mptr;
- if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
- mdb_page_free(txn->mt_env, dp);
- } else {
- VGMEMP_FREE(txn->mt_env, dp);
- free(dp);
- }
- txn->mt_u.dirty_list[i].mid = 0;
- }
- txn->mt_u.dirty_list[0].mid = 0;
+ mdb_dlist_free(txn);
sync:
if ((n = mdb_env_sync(env, 0)) != 0 ||
done:
env->me_pglast = 0;
env->me_txn = NULL;
- /* update the DB flags */
- for (i = 2; i<txn->mt_numdbs; i++) {
- if (txn->mt_dbflags[i] & DB_NEW)
- env->me_dbflags[i] = txn->mt_dbs[i].md_flags | MDB_VALID;
- }
- if (txn->mt_numdbs > env->me_numdbs)
- env->me_numdbs = txn->mt_numdbs;
+ mdb_dbis_update(txn, 1);
UNLOCK_MUTEX_W(env);
free(txn);
if (!e)
return ENOMEM;
- e->me_free_pgs = mdb_midl_alloc();
- if (!e->me_free_pgs) {
- free(e);
- return ENOMEM;
- }
e->me_maxreaders = DEFAULT_READERS;
- e->me_maxdbs = 2;
+ e->me_maxdbs = e->me_numdbs = 2;
e->me_fd = INVALID_HANDLE_VALUE;
e->me_lfd = INVALID_HANDLE_VALUE;
e->me_mfd = INVALID_HANDLE_VALUE;
* @param[in] lpath The pathname of the file used for the lock region.
* @param[in] mode The Unix permissions for the file, if we create it.
* @param[out] excl Resulting file lock type: -1 none, 0 shared, 1 exclusive
+ * @param[in,out] excl In -1, out lock type: -1 none, 0 shared, 1 exclusive
* @return 0 on success, non-zero on failure.
*/
static int
mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
{
+#ifdef _WIN32
+# define MDB_ERRCODE_ROFS ERROR_WRITE_PROTECT
+#else
+# define MDB_ERRCODE_ROFS EROFS
+#ifdef O_CLOEXEC /* Linux: Open file and set FD_CLOEXEC atomically */
+# define MDB_CLOEXEC O_CLOEXEC
+#else
+ int fdflags;
+# define MDB_CLOEXEC 0
+#endif
+#endif
int rc;
off_t size, rsize;
- *excl = -1;
-
#ifdef _WIN32
- if ((env->me_lfd = CreateFile(lpath, GENERIC_READ|GENERIC_WRITE,
+ env->me_lfd = CreateFile(lpath, GENERIC_READ|GENERIC_WRITE,
FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS,
- FILE_ATTRIBUTE_NORMAL, NULL)) == INVALID_HANDLE_VALUE) {
+ FILE_ATTRIBUTE_NORMAL, NULL);
+#else
+ env->me_lfd = open(lpath, O_RDWR|O_CREAT|MDB_CLOEXEC, mode);
+#endif
+ if (env->me_lfd == INVALID_HANDLE_VALUE) {
rc = ErrCode();
- if (rc == ERROR_WRITE_PROTECT && (env->me_flags & MDB_RDONLY)) {
- env->me_flags |= MDB_ROFS;
+ if (rc == MDB_ERRCODE_ROFS && (env->me_flags & MDB_RDONLY)) {
return MDB_SUCCESS;
}
goto fail_errno;
}
- /* Try to get exclusive lock. If we succeed, then
- * nobody is using the lock region and we should initialize it.
- */
- if ((rc = mdb_env_excl_lock(env, excl))) goto fail;
- size = GetFileSize(env->me_lfd, NULL);
-
-#else
-#if !(O_CLOEXEC)
- {
- int fdflags;
- if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT, mode)) == -1) {
- rc = ErrCode();
- if (rc == EROFS && (env->me_flags & MDB_RDONLY)) {
- env->me_flags |= MDB_ROFS;
- return MDB_SUCCESS;
- }
- goto fail_errno;
- }
- /* Lose record locks when exec*() */
- if ((fdflags = fcntl(env->me_lfd, F_GETFD) | FD_CLOEXEC) >= 0)
+#if ! ((MDB_CLOEXEC) || defined(_WIN32))
+ /* Lose record locks when exec*() */
+ if ((fdflags = fcntl(env->me_lfd, F_GETFD) | FD_CLOEXEC) >= 0)
fcntl(env->me_lfd, F_SETFD, fdflags);
- }
-#else /* O_CLOEXEC on Linux: Open file and set FD_CLOEXEC atomically */
- if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT|O_CLOEXEC, mode)) == -1) {
- rc = ErrCode();
- if (rc == EROFS && (env->me_flags & MDB_RDONLY)) {
- env->me_flags |= MDB_ROFS;
- return MDB_SUCCESS;
+#endif
+
+ if (!(env->me_flags & MDB_NOTLS)) {
+ rc = pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
+ if (rc)
+ goto fail;
+ env->me_flags |= MDB_ENV_TXKEY;
+#ifdef _WIN32
+ /* Windows TLS callbacks need help finding their TLS info. */
+ if (mdb_tls_nkeys >= MAX_TLS_KEYS) {
+ rc = MDB_TLS_FULL;
+ goto fail;
}
- goto fail_errno;
- }
+ mdb_tls_keys[mdb_tls_nkeys++] = env->me_txkey;
#endif
+ }
/* Try to get exclusive lock. If we succeed, then
* nobody is using the lock region and we should initialize it.
*/
if ((rc = mdb_env_excl_lock(env, excl))) goto fail;
+#ifdef _WIN32
+ size = GetFileSize(env->me_lfd, NULL);
+#else
size = lseek(env->me_lfd, 0, SEEK_END);
#endif
rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
* environment and re-opening it with the new flags.
*/
#define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC)
-#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP)
+#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP|MDB_NOTLS)
int
mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode)
{
- int oflags, rc, len, excl;
+ int oflags, rc, len, excl = -1;
char *lpath, *dpath;
if (env->me_fd!=INVALID_HANDLE_VALUE || (flags & ~(CHANGEABLE|CHANGELESS)))
sprintf(dpath, "%s" DATANAME, path);
}
+ rc = MDB_SUCCESS;
flags |= env->me_flags;
- /* silently ignore WRITEMAP if we're only getting read access */
- if (F_ISSET(flags, MDB_RDONLY|MDB_WRITEMAP))
- flags ^= MDB_WRITEMAP;
+ if (flags & MDB_RDONLY) {
+ /* silently ignore WRITEMAP when we're only getting read access */
+ flags &= ~MDB_WRITEMAP;
+ } else {
+ if (!((env->me_free_pgs = mdb_midl_alloc(MDB_IDL_UM_MAX)) &&
+ (env->me_dirty_list = calloc(MDB_IDL_UM_SIZE, sizeof(MDB_ID2)))))
+ rc = ENOMEM;
+ }
env->me_flags = flags |= MDB_ENV_ACTIVE;
+ if (rc)
+ goto leave;
+
+ env->me_path = strdup(path);
+ env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
+ env->me_dbflags = calloc(env->me_maxdbs, sizeof(uint16_t));
+ if (!(env->me_dbxs && env->me_path && env->me_dbflags)) {
+ rc = ENOMEM;
+ goto leave;
+ }
rc = mdb_env_setup_locks(env, lpath, mode, &excl);
if (rc)
}
}
DPRINTF("opened dbenv %p", (void *) env);
- rc = pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
- if (rc)
- goto leave;
- env->me_numdbs = 2; /* this notes that me_txkey was set */
-#ifdef _WIN32
- /* Windows TLS callbacks need help finding their TLS info. */
- if (mdb_tls_nkeys < MAX_TLS_KEYS)
- mdb_tls_keys[mdb_tls_nkeys++] = env->me_txkey;
- else {
- rc = MDB_TLS_FULL;
- goto leave;
- }
-#endif
if (excl > 0) {
rc = mdb_env_share_locks(env, &excl);
- if (rc)
- goto leave;
}
- env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
- env->me_dbflags = calloc(env->me_maxdbs, sizeof(uint16_t));
- env->me_path = strdup(path);
- if (!env->me_dbxs || !env->me_dbflags || !env->me_path)
- rc = ENOMEM;
}
leave:
return rc;
}
-/** Destroy resources from mdb_env_open() and clear our readers */
+/** Destroy resources from mdb_env_open(), clear our readers & DBIs */
static void
mdb_env_close0(MDB_env *env, int excl)
{
if (!(env->me_flags & MDB_ENV_ACTIVE))
return;
+ /* Doing this here since me_dbxs may not exist during mdb_env_close */
+ for (i = env->me_maxdbs; --i > MAIN_DBI; )
+ free(env->me_dbxs[i].md_name.mv_data);
+
free(env->me_dbflags);
free(env->me_dbxs);
free(env->me_path);
+ free(env->me_dirty_list);
+ if (env->me_free_pgs)
+ mdb_midl_free(env->me_free_pgs);
- if (env->me_numdbs) {
+ if (env->me_flags & MDB_ENV_TXKEY) {
pthread_key_delete(env->me_txkey);
#ifdef _WIN32
/* Delete our key from the global list */
close(env->me_lfd);
}
- env->me_flags &= ~MDB_ENV_ACTIVE;
+ env->me_flags &= ~(MDB_ENV_ACTIVE|MDB_ENV_TXKEY);
}
int
-mdb_env_copy(MDB_env *env, const char *path)
+mdb_env_copyfd(MDB_env *env, int fd)
{
MDB_txn *txn = NULL;
- int rc, len;
+ int rc;
size_t wsize;
- char *lpath, *ptr;
- HANDLE newfd = INVALID_HANDLE_VALUE;
-
- if (env->me_flags & MDB_NOSUBDIR) {
- lpath = (char *)path;
- } else {
- len = strlen(path);
- len += sizeof(DATANAME);
- lpath = malloc(len);
- if (!lpath)
- return ENOMEM;
- sprintf(lpath, "%s" DATANAME, path);
- }
-
- /* The destination path must exist, but the destination file must not.
- * We don't want the OS to cache the writes, since the source data is
- * already in the OS cache.
- */
-#ifdef _WIN32
- newfd = CreateFile(lpath, GENERIC_WRITE, 0, NULL, CREATE_NEW,
- FILE_FLAG_NO_BUFFERING|FILE_FLAG_WRITE_THROUGH, NULL);
-#else
- newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL
-#ifdef O_DIRECT
- |O_DIRECT
-#endif
- , 0666);
-#endif
- if (!(env->me_flags & MDB_NOSUBDIR))
- free(lpath);
- if (newfd == INVALID_HANDLE_VALUE) {
- rc = ErrCode();
- goto leave;
- }
-
-#ifdef F_NOCACHE /* __APPLE__ */
- rc = fcntl(newfd, F_NOCACHE, 1);
- if (rc) {
- rc = ErrCode();
- goto leave;
- }
-#endif
+ char *ptr;
/* Do the lock/unlock of the reader mutex before starting the
* write txn. Otherwise other read txns could block writers.
*/
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn);
if (rc)
- goto leave;
+ return rc;
- if (!(env->me_flags & MDB_ROFS)) {
+ if (env->me_txns) {
/* We must start the actual read txn after blocking writers */
mdb_txn_reset0(txn);
#ifdef _WIN32
{
DWORD len;
- rc = WriteFile(newfd, env->me_map, wsize, &len, NULL);
+ rc = WriteFile(fd, env->me_map, wsize, &len, NULL);
rc = (len == wsize) ? MDB_SUCCESS : ErrCode();
}
#else
- rc = write(newfd, env->me_map, wsize);
+ rc = write(fd, env->me_map, wsize);
rc = (rc == (int)wsize) ? MDB_SUCCESS : ErrCode();
#endif
- if (! (env->me_flags & MDB_ROFS))
+ if (env->me_txns)
UNLOCK_MUTEX_W(env);
if (rc)
w2 = MAX_WRITE;
else
w2 = wsize;
- rc = WriteFile(newfd, ptr, w2, &len, NULL);
+ rc = WriteFile(fd, ptr, w2, &len, NULL);
rc = (len == w2) ? MDB_SUCCESS : ErrCode();
if (rc) break;
wsize -= w2;
w2 = MAX_WRITE;
else
w2 = wsize;
- wres = write(newfd, ptr, w2);
+ wres = write(fd, ptr, w2);
rc = (wres > 0) ? MDB_SUCCESS : ErrCode();
if (rc) break;
wsize -= wres;
ptr += wres;
}
#endif
+
+leave:
mdb_txn_abort(txn);
+ return rc;
+}
+
+int
+mdb_env_copy(MDB_env *env, const char *path)
+{
+ int rc, len;
+ char *lpath;
+ HANDLE newfd = INVALID_HANDLE_VALUE;
+
+ if (env->me_flags & MDB_NOSUBDIR) {
+ lpath = (char *)path;
+ } else {
+ len = strlen(path);
+ len += sizeof(DATANAME);
+ lpath = malloc(len);
+ if (!lpath)
+ return ENOMEM;
+ sprintf(lpath, "%s" DATANAME, path);
+ }
+
+ /* The destination path must exist, but the destination file must not.
+ * We don't want the OS to cache the writes, since the source data is
+ * already in the OS cache.
+ */
+#ifdef _WIN32
+ newfd = CreateFile(lpath, GENERIC_WRITE, 0, NULL, CREATE_NEW,
+ FILE_FLAG_NO_BUFFERING|FILE_FLAG_WRITE_THROUGH, NULL);
+#else
+ newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL
+#ifdef O_DIRECT
+ |O_DIRECT
+#endif
+ , 0666);
+#endif
+ if (!(env->me_flags & MDB_NOSUBDIR))
+ free(lpath);
+ if (newfd == INVALID_HANDLE_VALUE) {
+ rc = ErrCode();
+ goto leave;
+ }
+
+#ifdef F_NOCACHE /* __APPLE__ */
+ rc = fcntl(newfd, F_NOCACHE, 1);
+ if (rc) {
+ rc = ErrCode();
+ goto leave;
+ }
+#endif
+
+ rc = mdb_env_copyfd(env, newfd);
leave:
if (newfd != INVALID_HANDLE_VALUE)
mdb_env_close(MDB_env *env)
{
MDB_page *dp;
- int i;
if (env == NULL)
return;
- for (i = env->me_numdbs; --i > MAIN_DBI; )
- free(env->me_dbxs[i].md_name.mv_data);
-
VGMEMP_DESTROY(env);
while ((dp = env->me_dpages) != NULL) {
VGMEMP_DEFINED(&dp->mp_next, sizeof(dp->mp_next));
}
mdb_env_close0(env, 0);
- mdb_midl_free(env->me_free_pgs);
free(env);
}
* @param[in] txn the transaction for this access.
* @param[in] pgno the page number for the page to retrieve.
* @param[out] ret address of a pointer where the page's address will be stored.
+ * @param[out] lvl dirty_list inheritance level of found page. 1=current txn, 0=mapped page.
* @return 0 on success, non-zero on failure.
*/
static int
-mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret)
+mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl)
{
MDB_page *p = NULL;
+ int level;
if (!((txn->mt_flags & MDB_TXN_RDONLY) |
(txn->mt_env->me_flags & MDB_WRITEMAP)))
{
MDB_txn *tx2 = txn;
+ level = 1;
do {
MDB_ID2L dl = tx2->mt_u.dirty_list;
if (dl[0].mid) {
goto done;
}
}
+ level++;
} while ((tx2 = tx2->mt_parent) != NULL);
}
if (pgno < txn->mt_next_pgno) {
+ level = 0;
p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
} else {
DPRINTF("page %zu not found", pgno);
assert(p != NULL);
+ return MDB_PAGE_NOTFOUND;
}
done:
*ret = p;
- return (p != NULL) ? MDB_SUCCESS : MDB_PAGE_NOTFOUND;
+ if (lvl)
+ *lvl = level;
+ return MDB_SUCCESS;
}
/** Search for the page a given key should be in.
* @param[in,out] mc the cursor for this operation.
* @param[in] key the key to search for. If NULL, search for the lowest
* page. (This is used by #mdb_cursor_first().)
- * @param[in] flags If MDB_PS_MODIFY set, visited pages are updated with new page numbers.
- * If MDB_PS_ROOTONLY set, just fetch root node, no further lookups.
+ * @param[in] modify If true, visited pages are updated with new page numbers.
* @return 0 on success, non-zero on failure.
*/
static int
assert(i < NUMKEYS(mp));
node = NODEPTR(mp, i);
- if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp)))
+ if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp, NULL)) != 0)
return rc;
mc->mc_ki[mc->mc_top] = i;
return MDB_SUCCESS;
}
+/** Search for the lowest key under the current branch page.
+ * This just bypasses a NUMKEYS check in the current page
+ * before calling mdb_page_search_root(), because the callers
+ * are all in situations where the current page is known to
+ * be underfilled.
+ */
+static int
+mdb_page_search_lowest(MDB_cursor *mc)
+{
+ MDB_page *mp = mc->mc_pg[mc->mc_top];
+ MDB_node *node = NODEPTR(mp, 0);
+ int rc;
+
+ if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp, NULL)) != 0)
+ return rc;
+
+ mc->mc_ki[mc->mc_top] = 0;
+ if ((rc = mdb_cursor_push(mc, mp)))
+ return rc;
+ return mdb_page_search_root(mc, NULL, 0);
+}
+
/** Search for the page a given key should be in.
* Pushes parent pages on the cursor stack. This function just sets up
* the search; it finds the root page for \b mc's database and sets this
* @param[in,out] mc the cursor for this operation.
* @param[in] key the key to search for. If NULL, search for the lowest
* page. (This is used by #mdb_cursor_first().)
- * @param[in] modify If true, visited pages are updated with new page numbers.
+ * @param[in] flags If MDB_PS_MODIFY set, visited pages are updated with new page numbers.
+ * If MDB_PS_ROOTONLY set, just fetch root node, no further lookups.
* @return 0 on success, non-zero on failure.
*/
static int
assert(root > 1);
if (!mc->mc_pg[0] || mc->mc_pg[0]->mp_pgno != root)
- if ((rc = mdb_page_get(mc->mc_txn, root, &mc->mc_pg[0])))
+ if ((rc = mdb_page_get(mc->mc_txn, root, &mc->mc_pg[0], NULL)) != 0)
return rc;
mc->mc_snum = 1;
*/
data->mv_size = NODEDSZ(leaf);
memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
- if ((rc = mdb_page_get(txn, pgno, &omp))) {
+ if ((rc = mdb_page_get(txn, pgno, &omp, NULL)) != 0) {
DPRINTF("read overflow page %zu failed", pgno);
return rc;
}
assert(IS_BRANCH(mc->mc_pg[mc->mc_top]));
indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
- if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp)))
+ if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp, NULL) != 0))
return rc;
mdb_cursor_push(mc, mp);
if (!(mc->mc_flags & C_EOF)) {
- if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
- MDB_val lkey;
+ if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
+ MDB_val lkey;
- lkey.mv_size = MDB_MAXKEYSIZE+1;
- lkey.mv_data = NULL;
- rc = mdb_page_search(mc, &lkey, 0);
- if (rc != MDB_SUCCESS)
- return rc;
- }
- assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
+ lkey.mv_size = MDB_MAXKEYSIZE+1;
+ lkey.mv_data = NULL;
+ rc = mdb_page_search(mc, &lkey, 0);
+ if (rc != MDB_SUCCESS)
+ return rc;
+ }
+ assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
- mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]) - 1;
}
+ mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]) - 1;
mc->mc_flags |= C_INITIALIZED|C_EOF;
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (mc->mc_dbi > MAIN_DBI && !(*mc->mc_dbflag & DB_DIRTY)) {
MDB_cursor mc2;
MDB_xcursor mcx;
- mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI,
- mc->mc_txn->mt_dbs[MAIN_DBI].md_flags & MDB_DUPSORT ? &mcx : NULL);
+ mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI, &mcx);
rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, MDB_PS_MODIFY);
if (rc)
return rc;
if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
MDB_page *omp;
pgno_t pg;
- int ovpages, dpages;
+ unsigned psize = mc->mc_txn->mt_env->me_psize;
+ int level, ovpages, dpages = OVPAGES(data->mv_size, psize);
- ovpages = OVPAGES(NODEDSZ(leaf), mc->mc_txn->mt_env->me_psize);
- dpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
memcpy(&pg, NODEDATA(leaf), sizeof(pg));
- mdb_page_get(mc->mc_txn, pg, &omp);
+ if ((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0)
+ return rc2;
+ ovpages = omp->mp_pages;
+
/* Is the ov page writable and large enough? */
if ((omp->mp_flags & P_DIRTY) && ovpages >= dpages) {
/* yes, overwrite it. Note in this case we don't
- * bother to try shrinking the node if the new data
+ * bother to try shrinking the page if the new data
* is smaller than the overflow threshold.
*/
+ if (level > 1) {
+ /* It is writable only in a parent txn */
+ size_t sz = (size_t) psize * ovpages, off;
+ MDB_page *np = mdb_page_malloc(mc, ovpages);
+ MDB_ID2 id2;
+ if (!np)
+ return ENOMEM;
+ id2.mid = pg;
+ id2.mptr = np;
+ mdb_mid2l_insert(mc->mc_txn->mt_u.dirty_list, &id2);
+ if (!(flags & MDB_RESERVE)) {
+ /* Copy end of page, adjusting alignment so
+ * compiler may copy words instead of bytes.
+ */
+ off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t);
+ memcpy((size_t *)((char *)np + off),
+ (size_t *)((char *)omp + off), sz - off);
+ sz = PAGEHDRSZ;
+ }
+ memcpy(np, omp, sz); /* Copy beginning of page */
+ omp = np;
+ }
+ SETDSZ(leaf, data->mv_size);
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = METADATA(omp);
else
mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
{
MDB_cursor *mc;
- MDB_xcursor *mx = NULL;
size_t size = sizeof(MDB_cursor);
if (txn == NULL || ret == NULL || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
size += sizeof(MDB_xcursor);
if ((mc = malloc(size)) != NULL) {
- if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
- mx = (MDB_xcursor *)(mc + 1);
- }
- mdb_cursor_init(mc, txn, dbi, mx);
+ mdb_cursor_init(mc, txn, dbi, (MDB_xcursor *)(mc + 1));
if (txn->mt_cursors) {
mc->mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = mc;
+ mc->mc_flags |= C_UNTRACK;
}
mc->mc_flags |= C_ALLOCD;
} else {
int
mdb_cursor_renew(MDB_txn *txn, MDB_cursor *mc)
{
+ unsigned flags;
+
if (txn == NULL || mc == NULL || mc->mc_dbi >= txn->mt_numdbs)
return EINVAL;
- if (txn->mt_cursors)
+ if ((mc->mc_flags & C_UNTRACK) || txn->mt_cursors)
return EINVAL;
+ flags = mc->mc_flags;
+
mdb_cursor_init(mc, txn, mc->mc_dbi, mc->mc_xcursor);
+
+ mc->mc_flags |= (flags & C_ALLOCD);
return MDB_SUCCESS;
}
{
if (mc != NULL) {
/* remove from txn, if tracked */
- if (mc->mc_txn->mt_cursors) {
+ if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) {
MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi];
while (*prev && *prev != mc) prev = &(*prev)->mc_next;
if (*prev == mc)
}
/** Replace the key for a node with a new key.
- * @param[in] mp The page containing the node to operate on.
- * @param[in] indx The index of the node to operate on.
+ * @param[in] mc Cursor pointing to the node to operate on.
* @param[in] key The new key to use.
* @return 0 on success, non-zero on failure.
*/
unsigned int snum = csrc->mc_snum;
MDB_node *s2;
/* must find the lowest key below src */
- mdb_page_search_root(csrc, NULL, 0);
+ mdb_page_search_lowest(csrc);
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.mv_size = csrc->mc_db->md_pad;
key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size);
MDB_node *s2;
MDB_val bkey;
/* must find the lowest key below dst */
- mdb_page_search_root(cdst, NULL, 0);
+ mdb_page_search_lowest(cdst);
if (IS_LEAF2(cdst->mc_pg[cdst->mc_top])) {
bkey.mv_size = cdst->mc_db->md_pad;
bkey.mv_data = LEAF2KEY(cdst->mc_pg[cdst->mc_top], 0, bkey.mv_size);
unsigned int snum = csrc->mc_snum;
MDB_node *s2;
/* must find the lowest key below src */
- mdb_page_search_root(csrc, NULL, 0);
+ mdb_page_search_lowest(csrc);
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.mv_size = csrc->mc_db->md_pad;
key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size);
unsigned int ptop, minkeys;
MDB_cursor mn;
+ minkeys = 1 + (IS_BRANCH(mc->mc_pg[mc->mc_top]));
#if MDB_DEBUG
{
pgno_t pgno;
}
#endif
- if (PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) >= FILL_THRESHOLD) {
+ if (PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) >= FILL_THRESHOLD &&
+ NUMKEYS(mc->mc_pg[mc->mc_top]) >= minkeys) {
#if MDB_DEBUG
pgno_t pgno;
COPY_PGNO(pgno, mc->mc_pg[mc->mc_top]->mp_pgno);
if (mc->mc_snum < 2) {
MDB_page *mp = mc->mc_pg[0];
+ if (IS_SUBP(mp)) {
+ DPUTS("Can't rebalance a subpage, ignoring");
+ return MDB_SUCCESS;
+ }
if (NUMKEYS(mp) == 0) {
DPUTS("tree is completely empty");
mc->mc_db->md_root = P_INVALID;
DPUTS("collapsing root page!");
mdb_midl_append(&mc->mc_txn->mt_free_pgs, mp->mp_pgno);
mc->mc_db->md_root = NODEPGNO(NODEPTR(mp, 0));
- if ((rc = mdb_page_get(mc->mc_txn, mc->mc_db->md_root,
- &mc->mc_pg[0])))
+ rc = mdb_page_get(mc->mc_txn,mc->mc_db->md_root,&mc->mc_pg[0],NULL);
+ if (rc)
return rc;
mc->mc_db->md_depth--;
mc->mc_db->md_branch_pages--;
DPUTS("reading right neighbor");
mn.mc_ki[ptop]++;
node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]);
- if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mn.mc_pg[mn.mc_top])))
+ rc = mdb_page_get(mc->mc_txn,NODEPGNO(node),&mn.mc_pg[mn.mc_top],NULL);
+ if (rc)
return rc;
mn.mc_ki[mn.mc_top] = 0;
mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]);
DPUTS("reading left neighbor");
mn.mc_ki[ptop]--;
node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]);
- if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mn.mc_pg[mn.mc_top])))
+ rc = mdb_page_get(mc->mc_txn,NODEPGNO(node),&mn.mc_pg[mn.mc_top],NULL);
+ if (rc)
return rc;
mn.mc_ki[mn.mc_top] = NUMKEYS(mn.mc_pg[mn.mc_top]) - 1;
mc->mc_ki[mc->mc_top] = 0;
/* add overflow pages to free list */
if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_BIGDATA)) {
int i, ovpages;
+ MDB_page *omp;
pgno_t pg;
memcpy(&pg, NODEDATA(leaf), sizeof(pg));
- ovpages = OVPAGES(NODEDSZ(leaf), mc->mc_txn->mt_env->me_psize);
+ if ((rc = mdb_page_get(mc->mc_txn, pg, &omp, NULL)) != 0)
+ return rc;
+ assert(IS_OVERFLOW(omp));
+ ovpages = omp->mp_pages;
mc->mc_db->md_overflow_pages -= ovpages;
for (i=0; i<ovpages; i++) {
DPRINTF("freed ov page %zu", pg);
/* Move half of the keys to the right sibling. */
/* grab a page to hold a temporary copy */
- copy = mdb_page_malloc(mc);
+ copy = mdb_page_malloc(mc, 1);
if (copy == NULL)
return ENOMEM;
}
} else {
mc->mc_ki[ptop]++;
+ /* Make sure mc_ki is still valid.
+ */
+ if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
+ mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
+ for (i=0; i<ptop; i++) {
+ mc->mc_pg[i] = mn.mc_pg[i];
+ mc->mc_ki[i] = mn.mc_ki[i];
+ }
+ mc->mc_pg[ptop] = mn.mc_pg[ptop];
+ mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
+ }
}
/* return tmp page to freelist */
if (!unused && txn->mt_numdbs >= txn->mt_env->me_maxdbs)
return MDB_DBS_FULL;
+ /* Cannot mix named databases with some mainDB flags */
+ if (txn->mt_dbs[MAIN_DBI].md_flags & (MDB_DUPSORT|MDB_INTEGERKEY))
+ return (flags & MDB_CREATE) ? MDB_INCOMPATIBLE : MDB_NOTFOUND;
+
/* Find the DB info */
dbflag = DB_NEW|DB_VALID;
exact = 0;
ptr = env->me_dbxs[dbi].md_name.mv_data;
env->me_dbxs[dbi].md_name.mv_data = NULL;
env->me_dbxs[dbi].md_name.mv_size = 0;
+ env->me_dbflags[dbi] = 0;
free(ptr);
}
for (i=0; i<NUMKEYS(mc->mc_pg[mc->mc_top]); i++) {
ni = NODEPTR(mc->mc_pg[mc->mc_top], i);
if (ni->mn_flags & F_BIGDATA) {
- int j, ovpages = OVPAGES(NODEDSZ(ni), mc->mc_txn->mt_env->me_psize);
+ int j, ovpages;
+ MDB_page *omp;
pgno_t pg;
memcpy(&pg, NODEDATA(ni), sizeof(pg));
+ rc = mdb_page_get(mc->mc_txn, pg, &omp, NULL);
+ if (rc != 0)
+ return rc;
+ assert(IS_OVERFLOW(omp));
+ ovpages = omp->mp_pages;
for (j=0; j<ovpages; j++) {
mdb_midl_append(&mc->mc_txn->mt_free_pgs, pg);
pg++;