#endif
#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
+# undef MDB_DEBUG
+# define MDB_DEBUG 0
# define DPRINTF (void) /* Vararg macros may be unsupported */
#elif MDB_DEBUG
static int mdb_debug;
*/
#define P_INVALID (~(pgno_t)0)
- /** Test if a flag \b f is set in a flag word \b w. */
+ /** Test if the flags \b f are set in a flag word \b w. */
#define F_ISSET(w, f) (((w) & (f)) == (f))
/** Used for offsets within a single page.
* slot's address is saved in thread-specific data so that subsequent read
* transactions started by the same thread need no further locking to proceed.
*
+ * If #MDB_NOTLS is set, the slot address is not saved in thread-specific data.
+ *
+ * No reader table is used if the database is on a read-only filesystem.
+ *
* Since the database uses multi-version concurrency control, readers don't
* actually need any locking. This table is used to keep track of which
* readers are using data from which old transactions, so that we'll know
* the longer we delay reclaiming old pages, the more likely it is that a
* string of contiguous pages can be found after coalescing old pages from
* many old transactions together.
- *
- * @todo We don't actually do such coalescing yet, we grab pages from one
- * old transaction at a time.
* @{
*/
/** Number of slots in the reader table.
pgno_t md_root; /**< the root page of this tree */
} MDB_db;
+ /** mdb_dbi_open flags */
+#define PERSISTENT_FLAGS 0x7fff
+#define VALID_FLAGS (MDB_REVERSEKEY|MDB_DUPSORT|MDB_INTEGERKEY|MDB_DUPFIXED|\
+ MDB_INTEGERDUP|MDB_REVERSEDUP|MDB_CREATE)
+
/** Handle for the DB used to track free pages. */
#define FREE_DBI 0
/** Handle for the default DB. */
*/
MDB_IDL mt_free_pgs;
union {
- MDB_ID2L dirty_list; /**< modified pages */
- MDB_reader *reader; /**< this thread's slot in the reader table */
+ MDB_ID2L dirty_list; /**< for write txns: modified pages */
+ MDB_reader *reader; /**< this thread's reader table slot or NULL */
} mt_u;
/** Array of records for each DB known in the environment. */
MDB_dbx *mt_dbxs;
*/
#define DB_DIRTY 0x01 /**< DB was written in this txn */
#define DB_STALE 0x02 /**< DB record is older than txnID */
+#define DB_NEW 0x04 /**< DB handle opened in this txn */
+#define DB_VALID 0x08 /**< DB handle is valid */
+#define MDB_VALID 0x8000 /**< DB handle is valid, for me_dbflags */
/** @} */
- /** Array of cursors for each DB */
+ /** In write txns, array of cursors for each DB */
MDB_cursor **mt_cursors;
/** Array of flags for each DB */
unsigned char *mt_dbflags;
#define MDB_TXN_DIRTY 0x04 /**< must write, even if dirty list is empty */
/** @} */
unsigned int mt_flags; /**< @ref mdb_txn */
+ /** dirty_list maxsize - #allocated pages including in parent txns */
+ unsigned int mt_dirty_room;
/** Tracks which of the two meta pages was used at the start
* of this transaction.
*/
unsigned char mx_dbflag;
} MDB_xcursor;
- /** A set of pages freed by an earlier transaction. */
-typedef struct MDB_oldpages {
- /** Usually we only read one record from the FREEDB at a time, but
- * in case we read more, this will chain them together.
- */
- struct MDB_oldpages *mo_next;
- /** The ID of the transaction in which these pages were freed. */
- txnid_t mo_txnid;
- /** An #MDB_IDL of the pages */
- pgno_t mo_pages[1]; /* dynamic */
-} MDB_oldpages;
+ /** State of FreeDB old pages, stored in the MDB_env */
+typedef struct MDB_pgstate {
+ txnid_t mf_pglast; /**< ID of last old page record we used */
+ pgno_t *mf_pghead; /**< old pages reclaimed from freelist */
+ pgno_t *mf_pgfree; /**< memory to free when dropping me_pghead */
+} MDB_pgstate;
/** The database environment. */
struct MDB_env {
HANDLE me_mfd; /**< just for writing the meta pages */
/** Failed to update the meta page. Probably an I/O error. */
#define MDB_FATAL_ERROR 0x80000000U
- /** Read-only Filesystem. Allow read access, no locking. */
-#define MDB_ROFS 0x40000000U
/** Some fields are initialized. */
#define MDB_ENV_ACTIVE 0x20000000U
+ /** me_txkey is set */
+#define MDB_ENV_TXKEY 0x10000000U
uint32_t me_flags; /**< @ref mdb_env */
unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */
unsigned int me_maxreaders; /**< size of the reader table */
pid_t me_pid; /**< process ID of this env */
char *me_path; /**< path to the DB files */
char *me_map; /**< the memory map of the data file */
- MDB_txninfo *me_txns; /**< the memory map of the lock file */
+ MDB_txninfo *me_txns; /**< the memory map of the lock file or NULL */
MDB_meta *me_metas[2]; /**< pointers to the two meta pages */
MDB_txn *me_txn; /**< current write transaction */
size_t me_mapsize; /**< size of the data memory map */
off_t me_size; /**< current file size */
pgno_t me_maxpg; /**< me_mapsize / me_psize */
- txnid_t me_pgfirst; /**< ID of first old page record we used */
- txnid_t me_pglast; /**< ID of last old page record we used */
MDB_dbx *me_dbxs; /**< array of static DB info */
- uint16_t *me_dbflags; /**< array of DB flags */
- MDB_oldpages *me_pghead; /**< list of old page records */
- MDB_oldpages *me_pgfree; /**< list of page records to free */
+ uint16_t *me_dbflags; /**< array of flags from MDB_db.md_flags */
pthread_key_t me_txkey; /**< thread-key for readers */
+ MDB_pgstate me_pgstate; /**< state of old pages from freeDB */
+# define me_pglast me_pgstate.mf_pglast
+# define me_pghead me_pgstate.mf_pghead
+# define me_pgfree me_pgstate.mf_pgfree
MDB_page *me_dpages; /**< list of malloc'd blocks for re-use */
/** IDL of pages that became unused in a write txn */
MDB_IDL me_free_pgs;
- /** ID2L of pages that were written during a write txn */
- MDB_ID2 me_dirty_list[MDB_IDL_UM_SIZE];
+ /** ID2L of pages written during a write txn. Length MDB_IDL_UM_SIZE. */
+ MDB_ID2L me_dirty_list;
+ /** Max number of freelist items that can fit in a single overflow page */
+ unsigned int me_maxfree_1pg;
+ /** Max size of a node on a page */
+ unsigned int me_nodemax;
#ifdef _WIN32
HANDLE me_rmutex; /* Windows mutexes don't reside in shared mem */
HANDLE me_wmutex;
sem_t *me_wmutex;
#endif
};
+
+ /** Nested transaction */
+typedef struct MDB_ntxn {
+ MDB_txn mnt_txn; /* the transaction */
+ MDB_pgstate mnt_pgstate; /* parent transaction's saved freestate */
+} MDB_ntxn;
+
/** max number of pages to commit in one writev() call */
#define MDB_COMMIT_PAGES 64
#if defined(IOV_MAX) && IOV_MAX < MDB_COMMIT_PAGES
static size_t mdb_branch_size(MDB_env *env, MDB_val *key);
static int mdb_rebalance(MDB_cursor *mc);
-static int mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key);
+static int mdb_update_key(MDB_cursor *mc, MDB_val *key);
static void mdb_cursor_pop(MDB_cursor *mc);
static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp);
"MDB_TLS_FULL: Thread-local storage keys full - too many environments open",
"MDB_TXN_FULL: Transaction has too many dirty pages - transaction too big",
"MDB_CURSOR_FULL: Internal error - cursor stack limit reached",
- "MDB_PAGE_FULL: Internal error - page has no more space"
+ "MDB_PAGE_FULL: Internal error - page has no more space",
+ "MDB_MAP_RESIZED: Database contents grew beyond environment mapsize",
+ "MDB_INCOMPATIBLE: Database flags changed or would change",
+ "MDB_BAD_RSLOT: Invalid reuse of reader locktable slot",
};
char *
char *ptr = buf;
unsigned char *c = key->mv_data;
unsigned int i;
+
+ if (!key)
+ return "";
+
if (key->mv_size > MDB_MAXKEYSIZE)
return "MDB_MAXKEYSIZE";
/* may want to make this a dynamic check: if the key is mostly
DKBUF;
nkeys = NUMKEYS(mp);
- fprintf(stderr, "numkeys %d\n", nkeys);
+ fprintf(stderr, "Page %zu numkeys %d\n", mp->mp_pgno, nkeys);
for (i=0; i<nkeys; i++) {
node = NODEPTR(mp, i);
key.mv_size = node->mn_ksize;
key.mv_data = node->mn_data;
nsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t);
- if (F_ISSET(node->mn_flags, F_BIGDATA))
- nsize += sizeof(pgno_t);
- else
- nsize += NODEDSZ(node);
- fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key));
+ if (IS_BRANCH(mp)) {
+ fprintf(stderr, "key %d: page %zu, %s\n", i, NODEPGNO(node),
+ DKEY(&key));
+ } else {
+ if (F_ISSET(node->mn_flags, F_BIGDATA))
+ nsize += sizeof(pgno_t);
+ else
+ nsize += NODEDSZ(node);
+ fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key));
+ }
}
}
*mp = NULL;
/* If our dirty list is already full, we can't do anything */
- if (txn->mt_u.dirty_list[0].mid >= MDB_IDL_UM_MAX)
+ if (txn->mt_dirty_room == 0)
return MDB_TXN_FULL;
/* The free list won't have any content at all until txn 2 has
* after txn 3 commits, and so will be safe to re-use in txn 4.
*/
if (txn->mt_txnid > 3) {
-
if (!txn->mt_env->me_pghead &&
txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
/* See if there's anything in the free DB */
txnid_t *kptr;
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
- if (!txn->mt_env->me_pgfirst) {
+ if (!txn->mt_env->me_pglast) {
mdb_page_search(&m2, NULL, 0);
leaf = NODEPTR(m2.mc_pg[m2.mc_top], 0);
kptr = (txnid_t *)NODEKEY(leaf);
last = *kptr;
} else {
MDB_val key;
- int exact;
again:
- exact = 0;
last = txn->mt_env->me_pglast + 1;
leaf = NULL;
key.mv_data = &last;
key.mv_size = sizeof(last);
- rc = mdb_cursor_set(&m2, &key, &data, MDB_SET, &exact);
+ rc = mdb_cursor_set(&m2, &key, &data, MDB_SET_RANGE, NULL);
if (rc)
goto none;
last = *(txnid_t *)key.mv_data;
if (oldest > last) {
/* It's usable, grab it.
*/
- MDB_oldpages *mop;
- pgno_t *idl;
+ pgno_t *idl, *mop;
- if (!txn->mt_env->me_pgfirst) {
+ if (!txn->mt_env->me_pglast) {
mdb_node_read(txn, leaf, &data);
}
- txn->mt_env->me_pglast = last;
- if (!txn->mt_env->me_pgfirst)
- txn->mt_env->me_pgfirst = last;
idl = (MDB_ID *) data.mv_data;
/* We might have a zero-length IDL due to freelist growth
* during a prior commit
*/
- if (!idl[0]) goto again;
- mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
+ if (!idl[0]) {
+ txn->mt_env->me_pglast = last;
+ goto again;
+ }
+ mop = malloc(MDB_IDL_SIZEOF(idl));
if (!mop)
return ENOMEM;
- mop->mo_next = txn->mt_env->me_pghead;
- mop->mo_txnid = last;
- txn->mt_env->me_pghead = mop;
- memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
+ txn->mt_env->me_pglast = last;
+ txn->mt_env->me_pghead = txn->mt_env->me_pgfree = mop;
+ memcpy(mop, idl, MDB_IDL_SIZEOF(idl));
#if MDB_DEBUG > 1
{
unsigned int i;
DPRINTF("IDL read txn %zu root %zu num %zu",
- mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
+ last, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
for (i=0; i<idl[0]; i++) {
DPRINTF("IDL %zu", idl[i+1]);
}
}
none:
if (txn->mt_env->me_pghead) {
- MDB_oldpages *mop = txn->mt_env->me_pghead;
+ pgno_t *mop = txn->mt_env->me_pghead;
if (num > 1) {
MDB_cursor m2;
- int retry = 500, readit = 0, n2 = num-1;
+ int retry = 1, readit = 0, n2 = num-1;
unsigned int i, j, k;
/* If current list is too short, must fetch more and coalesce */
- if (mop->mo_pages[0] < (unsigned)num)
+ if (mop[0] < (unsigned)num)
readit = 1;
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
}
if (readit) {
MDB_val key, data;
- MDB_oldpages *mop2;
- pgno_t *idl;
- int exact;
+ pgno_t *idl, *mop2;
- last = mop->mo_txnid + 1;
+ last = txn->mt_env->me_pglast + 1;
/* We haven't hit the readers list yet? */
if (!oldest) {
if (oldest - last < 1)
break;
- exact = 0;
key.mv_data = &last;
key.mv_size = sizeof(last);
- rc = mdb_cursor_set(&m2, &key, &data, MDB_SET, &exact);
- if (rc)
+ rc = mdb_cursor_set(&m2,&key,&data,MDB_SET_RANGE,NULL);
+ if (rc) {
+ if (rc == MDB_NOTFOUND)
+ break;
return rc;
+ }
+ last = *(txnid_t*)key.mv_data;
+ if (oldest <= last)
+ break;
idl = (MDB_ID *) data.mv_data;
- mop2 = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - 2*sizeof(pgno_t) + MDB_IDL_SIZEOF(mop->mo_pages));
+ mop2 = malloc(MDB_IDL_SIZEOF(idl) + MDB_IDL_SIZEOF(mop));
if (!mop2)
return ENOMEM;
/* merge in sorted order */
- i = idl[0]; j = mop->mo_pages[0]; mop2->mo_pages[0] = k = i+j;
- mop->mo_pages[0] = P_INVALID;
+ i = idl[0]; j = mop[0]; mop2[0] = k = i+j;
+ mop[0] = P_INVALID;
while (i>0 || j>0) {
- if (i && idl[i] < mop->mo_pages[j])
- mop2->mo_pages[k--] = idl[i--];
+ if (i && idl[i] < mop[j])
+ mop2[k--] = idl[i--];
else
- mop2->mo_pages[k--] = mop->mo_pages[j--];
+ mop2[k--] = mop[j--];
}
txn->mt_env->me_pglast = last;
- mop2->mo_txnid = last;
- mop2->mo_next = mop->mo_next;
- txn->mt_env->me_pghead = mop2;
- free(mop);
+ free(txn->mt_env->me_pgfree);
+ txn->mt_env->me_pghead = txn->mt_env->me_pgfree = mop2;
mop = mop2;
/* Keep trying to read until we have enough */
- if (mop->mo_pages[0] < (unsigned)num) {
+ if (mop[0] < (unsigned)num) {
continue;
}
}
/* current list has enough pages, but are they contiguous? */
- for (i=mop->mo_pages[0]; i>=(unsigned)num; i--) {
- if (mop->mo_pages[i-n2] == mop->mo_pages[i] + n2) {
- pgno = mop->mo_pages[i];
+ for (i=mop[0]; i>=(unsigned)num; i--) {
+ if (mop[i-n2] == mop[i] + n2) {
+ pgno = mop[i];
i -= n2;
/* move any stragglers down */
- for (j=i+num; j<=mop->mo_pages[0]; j++)
- mop->mo_pages[i++] = mop->mo_pages[j];
- mop->mo_pages[0] -= num;
+ for (j=i+num; j<=mop[0]; j++)
+ mop[i++] = mop[j];
+ mop[0] -= num;
break;
}
}
- /* Stop if we succeeded, or no more retries */
+ /* Stop if we succeeded, or no retries */
if (!retry || pgno != P_INVALID)
break;
readit = 1;
- retry--;
} while (1);
} else {
/* peel pages off tail, so we only have to truncate the list */
- pgno = MDB_IDL_LAST(mop->mo_pages);
- mop->mo_pages[0]--;
+ pgno = MDB_IDL_LAST(mop);
+ mop[0]--;
}
- if (MDB_IDL_IS_ZERO(mop->mo_pages)) {
- txn->mt_env->me_pghead = mop->mo_next;
- if (mc->mc_dbi == FREE_DBI) {
- mop->mo_next = txn->mt_env->me_pgfree;
- txn->mt_env->me_pgfree = mop;
- } else {
- free(mop);
- }
+ if (MDB_IDL_IS_ZERO(mop)) {
+ free(txn->mt_env->me_pgfree);
+ txn->mt_env->me_pghead = txn->mt_env->me_pgfree = NULL;
}
}
}
} else {
mdb_mid2l_insert(txn->mt_u.dirty_list, &mid);
}
+ txn->mt_dirty_room--;
*mp = np;
return MDB_SUCCESS;
return 0;
}
}
- if (mc->mc_txn->mt_u.dirty_list[0].mid >= MDB_IDL_UM_MAX)
- return MDB_TXN_FULL;
+ assert(mc->mc_txn->mt_u.dirty_list[0].mid < MDB_IDL_UM_MAX);
/* No - copy it */
np = mdb_page_malloc(mc);
if (!np)
{
MDB_env *env = txn->mt_env;
unsigned int i;
+ uint16_t x;
int rc;
/* Setup db info */
txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
if (txn->mt_flags & MDB_TXN_RDONLY) {
- if (env->me_flags & MDB_ROFS) {
+ if (!env->me_txns) {
i = mdb_env_pick_meta(env);
txn->mt_txnid = env->me_metas[i]->mm_txnid;
txn->mt_u.reader = NULL;
} else {
- MDB_reader *r = pthread_getspecific(env->me_txkey);
- if (!r) {
+ MDB_reader *r = (env->me_flags & MDB_NOTLS) ? txn->mt_u.reader :
+ pthread_getspecific(env->me_txkey);
+ if (r) {
+ if (r->mr_pid != env->me_pid || r->mr_txnid != (txnid_t)-1)
+ return MDB_BAD_RSLOT;
+ } else {
pid_t pid = env->me_pid;
pthread_t tid = pthread_self();
env->me_numreaders = env->me_txns->mti_numreaders;
UNLOCK_MUTEX_R(env);
r = &env->me_txns->mti_readers[i];
- if ((rc = pthread_setspecific(env->me_txkey, r)) != 0) {
+ if (!(env->me_flags & MDB_NOTLS) &&
+ (rc = pthread_setspecific(env->me_txkey, r)) != 0) {
env->me_txns->mti_readers[i].mr_pid = 0;
return rc;
}
if (txn->mt_txnid == mdb_debug_start)
mdb_debug = 1;
#endif
+ txn->mt_dirty_room = MDB_IDL_UM_MAX;
txn->mt_u.dirty_list = env->me_dirty_list;
txn->mt_u.dirty_list[0].mid = 0;
txn->mt_free_pgs = env->me_free_pgs;
/* Copy the DB info and flags */
memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db));
- for (i=2; i<txn->mt_numdbs; i++)
- txn->mt_dbs[i].md_flags = env->me_dbflags[i];
- txn->mt_dbflags[0] = txn->mt_dbflags[1] = 0;
- if (txn->mt_numdbs > 2)
- memset(txn->mt_dbflags+2, DB_STALE, txn->mt_numdbs-2);
+ for (i=2; i<txn->mt_numdbs; i++) {
+ x = env->me_dbflags[i];
+ txn->mt_dbs[i].md_flags = x & PERSISTENT_FLAGS;
+ txn->mt_dbflags[i] = (x & MDB_VALID) ? DB_VALID|DB_STALE : 0;
+ }
+ txn->mt_dbflags[0] = txn->mt_dbflags[1] = DB_VALID;
+
+ if (env->me_maxpg < txn->mt_next_pgno) {
+ mdb_txn_reset0(txn);
+ return MDB_MAP_RESIZED;
+ }
return MDB_SUCCESS;
}
{
int rc;
- if (! (txn && txn->mt_flags & MDB_TXN_RDONLY))
+ if (!txn || txn->mt_numdbs || !(txn->mt_flags & MDB_TXN_RDONLY))
return EINVAL;
if (txn->mt_env->me_flags & MDB_FATAL_ERROR) {
mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
{
MDB_txn *txn;
- int rc, size;
+ MDB_ntxn *ntxn;
+ int rc, size, tsize = sizeof(MDB_txn);
if (env->me_flags & MDB_FATAL_ERROR) {
DPUTS("environment had fatal error, must shutdown!");
{
return EINVAL;
}
+ tsize = sizeof(MDB_ntxn);
}
- size = sizeof(MDB_txn) + env->me_maxdbs * (sizeof(MDB_db)+1);
+ size = tsize + env->me_maxdbs * (sizeof(MDB_db)+1);
if (!(flags & MDB_RDONLY))
size += env->me_maxdbs * sizeof(MDB_cursor *);
DPRINTF("calloc: %s", strerror(ErrCode()));
return ENOMEM;
}
- txn->mt_dbs = (MDB_db *)(txn+1);
+ txn->mt_dbs = (MDB_db *) ((char *)txn + tsize);
if (flags & MDB_RDONLY) {
txn->mt_flags |= MDB_TXN_RDONLY;
txn->mt_dbflags = (unsigned char *)(txn->mt_dbs + env->me_maxdbs);
txn->mt_env = env;
if (parent) {
+ unsigned int i;
txn->mt_free_pgs = mdb_midl_alloc();
if (!txn->mt_free_pgs) {
free(txn);
}
txn->mt_txnid = parent->mt_txnid;
txn->mt_toggle = parent->mt_toggle;
+ txn->mt_dirty_room = parent->mt_dirty_room;
txn->mt_u.dirty_list[0].mid = 0;
txn->mt_free_pgs[0] = 0;
txn->mt_next_pgno = parent->mt_next_pgno;
txn->mt_numdbs = parent->mt_numdbs;
txn->mt_dbxs = parent->mt_dbxs;
memcpy(txn->mt_dbs, parent->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
- memcpy(txn->mt_dbflags, parent->mt_dbflags, txn->mt_numdbs);
- mdb_cursor_shadow(parent, txn);
+ /* Copy parent's mt_dbflags, but clear DB_NEW */
+ for (i=0; i<txn->mt_numdbs; i++)
+ txn->mt_dbflags[i] = parent->mt_dbflags[i] & ~DB_NEW;
rc = 0;
+ ntxn = (MDB_ntxn *)txn;
+ ntxn->mnt_pgstate = env->me_pgstate; /* save parent me_pghead & co */
+ if (env->me_pghead) {
+ size = MDB_IDL_SIZEOF(env->me_pghead);
+ env->me_pghead = malloc(size);
+ if (env->me_pghead)
+ memcpy(env->me_pghead, ntxn->mnt_pgstate.mf_pghead, size);
+ else
+ rc = ENOMEM;
+ }
+ env->me_pgfree = env->me_pghead;
+ if (!rc)
+ rc = mdb_cursor_shadow(parent, txn);
+ if (rc)
+ mdb_txn_reset0(txn);
} else {
rc = mdb_txn_renew0(txn);
}
}
/** Common code for #mdb_txn_reset() and #mdb_txn_abort().
+ * May be called twice for readonly txns: First reset it, then abort.
* @param[in] txn the transaction handle to reset
*/
static void
mdb_txn_reset0(MDB_txn *txn)
{
MDB_env *env = txn->mt_env;
+ unsigned int i;
+
+ /* Close any DBI handles opened in this txn */
+ for (i=2; i<txn->mt_numdbs; i++) {
+ if (txn->mt_dbflags[i] & DB_NEW) {
+ char *ptr = env->me_dbxs[i].md_name.mv_data;
+ env->me_dbxs[i].md_name.mv_data = NULL;
+ env->me_dbxs[i].md_name.mv_size = 0;
+ free(ptr);
+ }
+ }
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
- if (!(env->me_flags & MDB_ROFS))
+ if (txn->mt_u.reader) {
txn->mt_u.reader->mr_txnid = (txnid_t)-1;
+ if (!(env->me_flags & MDB_NOTLS))
+ txn->mt_u.reader = NULL; /* txn does not own reader */
+ }
+ txn->mt_numdbs = 0; /* mark txn as reset, do not close DBs again */
} else {
- MDB_oldpages *mop;
MDB_page *dp;
- unsigned int i;
/* close(free) all cursors */
for (i=0; i<txn->mt_numdbs; i++) {
}
}
+ free(env->me_pgfree);
+
if (txn->mt_parent) {
txn->mt_parent->mt_child = NULL;
+ env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate;
mdb_midl_free(txn->mt_free_pgs);
free(txn->mt_u.dirty_list);
return;
env->me_free_pgs = txn->mt_free_pgs;
}
- while ((mop = txn->mt_env->me_pghead)) {
- txn->mt_env->me_pghead = mop->mo_next;
- free(mop);
- }
- txn->mt_env->me_pgfirst = 0;
+ txn->mt_env->me_pghead = txn->mt_env->me_pgfree = NULL;
txn->mt_env->me_pglast = 0;
env->me_txn = NULL;
txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
(void *) txn, (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
+ /* This call is only valid for read-only txns */
+ if (!(txn->mt_flags & MDB_TXN_RDONLY))
+ return;
+
mdb_txn_reset0(txn);
}
mdb_txn_abort(txn->mt_child);
mdb_txn_reset0(txn);
+ /* Free reader slot tied to this txn (if MDB_NOTLS && writable FS) */
+ if ((txn->mt_flags & MDB_TXN_RDONLY) && txn->mt_u.reader)
+ txn->mt_u.reader->mr_pid = 0;
+
free(txn);
}
MDB_page *dp;
MDB_env *env;
pgno_t next, freecnt;
+ txnid_t oldpg_txnid, id;
MDB_cursor mc;
assert(txn != NULL);
env = txn->mt_env;
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
- if (txn->mt_numdbs > env->me_numdbs) {
- /* update the DB flags */
- MDB_dbi i;
- for (i = env->me_numdbs; i<txn->mt_numdbs; i++)
- env->me_dbflags[i] = txn->mt_dbs[i].md_flags;
- env->me_numdbs = i;
+ /* update the DB flags */
+ for (i = 2; i<txn->mt_numdbs; i++) {
+ if (txn->mt_dbflags[i] & DB_NEW)
+ env->me_dbflags[i] = txn->mt_dbs[i].md_flags | MDB_VALID;
}
+ if (txn->mt_numdbs > env->me_numdbs)
+ env->me_numdbs = txn->mt_numdbs;
+ txn->mt_numdbs = 2; /* so txn_abort() doesn't close any new handles */
mdb_txn_abort(txn);
return MDB_SUCCESS;
}
}
if (txn->mt_parent) {
- MDB_db *ip, *jp;
- MDB_dbi i;
- unsigned x, y;
+ MDB_txn *parent = txn->mt_parent;
+ unsigned x, y, len;
MDB_ID2L dst, src;
+ /* Append our free list to parent's */
+ if (mdb_midl_append_list(&parent->mt_free_pgs, txn->mt_free_pgs)) {
+ mdb_txn_abort(txn);
+ return ENOMEM;
+ }
+ mdb_midl_free(txn->mt_free_pgs);
+
+ parent->mt_next_pgno = txn->mt_next_pgno;
+ parent->mt_flags = txn->mt_flags;
+
/* Merge (and close) our cursors with parent's */
mdb_cursor_merge(txn);
- /* Update parent's DB table */
- ip = &txn->mt_parent->mt_dbs[2];
- jp = &txn->mt_dbs[2];
- for (i = 2; i < txn->mt_numdbs; i++) {
- if (ip->md_root != jp->md_root)
- *ip = *jp;
- ip++; jp++;
- }
+ /* Update parent's DB table. */
+ memcpy(parent->mt_dbs, txn->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
txn->mt_parent->mt_numdbs = txn->mt_numdbs;
+ txn->mt_parent->mt_dbflags[0] = txn->mt_dbflags[0];
+ txn->mt_parent->mt_dbflags[1] = txn->mt_dbflags[1];
+ for (i=2; i<txn->mt_numdbs; i++) {
+ /* preserve parent's DB_NEW status */
+ x = txn->mt_parent->mt_dbflags[i] & DB_NEW;
+ txn->mt_parent->mt_dbflags[i] = txn->mt_dbflags[i] | x;
+ }
- /* Append our free list to parent's */
- mdb_midl_append_list(&txn->mt_parent->mt_free_pgs,
- txn->mt_free_pgs);
- mdb_midl_free(txn->mt_free_pgs);
-
- /* Merge our dirty list with parent's */
dst = txn->mt_parent->mt_u.dirty_list;
src = txn->mt_u.dirty_list;
- x = mdb_mid2l_search(dst, src[1].mid);
- for (y=1; y<=src[0].mid; y++) {
- while (x <= dst[0].mid && dst[x].mid != src[y].mid) x++;
- if (x > dst[0].mid)
- break;
- free(dst[x].mptr);
- dst[x].mptr = src[y].mptr;
- }
+ /* Find len = length of merging our dirty list with parent's */
x = dst[0].mid;
- for (; y<=src[0].mid; y++) {
- if (++x >= MDB_IDL_UM_MAX) {
- mdb_txn_abort(txn);
- return MDB_TXN_FULL;
+ dst[0].mid = 0; /* simplify loops */
+ if (parent->mt_parent) {
+ len = x + src[0].mid;
+ y = mdb_mid2l_search(src, dst[x].mid + 1) - 1;
+ for (i = x; y && i; y--) {
+ pgno_t yp = src[y].mid;
+ while (yp < dst[i].mid)
+ i--;
+ if (yp == dst[i].mid) {
+ i--;
+ len--;
+ }
}
- dst[x] = src[y];
+ } else { /* Simplify the above for single-ancestor case */
+ len = MDB_IDL_UM_MAX - txn->mt_dirty_room;
}
- dst[0].mid = x;
+ /* Merge our dirty list with parent's */
+ y = src[0].mid;
+ for (i = len; y; dst[i--] = src[y--]) {
+ pgno_t yp = src[y].mid;
+ while (yp < dst[x].mid)
+ dst[i--] = dst[x--];
+ if (yp == dst[x].mid)
+ free(dst[x--].mptr);
+ }
+ assert(i == x);
+ dst[0].mid = len;
free(txn->mt_u.dirty_list);
+ parent->mt_dirty_room = txn->mt_dirty_room;
+
txn->mt_parent->mt_child = NULL;
+ free(((MDB_ntxn *)txn)->mnt_pgstate.mf_pgfree);
free(txn);
return MDB_SUCCESS;
}
DPRINTF("committing txn %zu %p on mdbenv %p, root page %zu",
txn->mt_txnid, (void *)txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root);
- /* Update DB root pointers. Their pages have already been
- * touched so this is all in-place and cannot fail.
- */
+ /* Update DB root pointers */
if (txn->mt_numdbs > 2) {
MDB_dbi i;
MDB_val data;
for (i = 2; i < txn->mt_numdbs; i++) {
if (txn->mt_dbflags[i] & DB_DIRTY) {
data.mv_data = &txn->mt_dbs[i];
- mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
+ rc = mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
+ if (rc)
+ goto fail;
}
}
}
+ /* Save the freelist as of this transaction to the freeDB. This
+ * can change the freelist, so keep trying until it stabilizes.
+ *
+ * env->me_pglast and the length of txn->mt_free_pgs cannot decrease,
+ * except the code below can decrease env->me_pglast to split pghead.
+ * Page numbers cannot disappear from txn->mt_free_pgs. New pages
+ * can only appear in env->me_pghead when env->me_pglast increases.
+ * Until then, the me_pghead pointer won't move but can become NULL.
+ */
+
mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
+ oldpg_txnid = id = 0;
+ freecnt = 0;
/* should only be one record now */
- if (env->me_pghead || env->me_pgfirst) {
+ if (env->me_pghead || env->me_pglast) {
/* make sure first page of freeDB is touched and on freelist */
rc = mdb_page_search(&mc, NULL, MDB_PS_MODIFY);
if (rc && rc != MDB_NOTFOUND) {
}
/* Delete IDLs we used from the free list */
- if (env->me_pgfirst) {
- txnid_t cur;
+ if (env->me_pglast) {
MDB_val key;
- int exact = 0;
-
- key.mv_size = sizeof(cur);
- for (cur = env->me_pgfirst; cur <= env->me_pglast; cur++) {
- key.mv_data = &cur;
- mdb_cursor_set(&mc, &key, NULL, MDB_SET, &exact);
+ do {
+free_pgfirst:
+ rc = mdb_cursor_first(&mc, &key, NULL);
+ if (rc)
+ goto fail;
+ oldpg_txnid = *(txnid_t *)key.mv_data;
+again:
+ assert(oldpg_txnid <= env->me_pglast);
+ id = 0;
rc = mdb_cursor_del(&mc, 0);
if (rc)
goto fail;
- }
- env->me_pgfirst = 0;
- env->me_pglast = 0;
+ } while (oldpg_txnid < env->me_pglast);
}
- /* save to free list */
+ /* Save IDL of pages freed by this txn, to freeDB */
free2:
- freecnt = txn->mt_free_pgs[0];
- if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
+ if (freecnt != txn->mt_free_pgs[0]) {
MDB_val key, data;
/* make sure last page of freeDB is touched and on freelist */
/* write to last page of freeDB */
key.mv_size = sizeof(pgno_t);
key.mv_data = &txn->mt_txnid;
- data.mv_data = txn->mt_free_pgs;
/* The free list can still grow during this call,
- * despite the pre-emptive touches above. So check
- * and make sure the entire thing got written.
+ * despite the pre-emptive touches above. So retry
+ * until the reserved space remains big enough.
*/
do {
+ assert(freecnt < txn->mt_free_pgs[0]);
freecnt = txn->mt_free_pgs[0];
data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
- mdb_midl_sort(txn->mt_free_pgs);
- rc = mdb_cursor_put(&mc, &key, &data, 0);
+ rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
if (rc)
goto fail;
} while (freecnt != txn->mt_free_pgs[0]);
+ mdb_midl_sort(txn->mt_free_pgs);
+ memcpy(data.mv_data, txn->mt_free_pgs, data.mv_size);
+ if (oldpg_txnid < env->me_pglast || (!env->me_pghead && id))
+ goto free_pgfirst; /* used up freeDB[oldpg_txnid] */
}
- /* should only be one record now */
-again:
+
+ /* Put back page numbers we took from freeDB but did not use */
if (env->me_pghead) {
+ for (;;) {
MDB_val key, data;
- MDB_oldpages *mop;
- pgno_t orig;
- txnid_t id;
+ pgno_t orig, *mop;
mop = env->me_pghead;
- id = mop->mo_txnid;
+ id = env->me_pglast;
key.mv_size = sizeof(id);
key.mv_data = &id;
- data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
- data.mv_data = mop->mo_pages;
- orig = mop->mo_pages[0];
/* These steps may grow the freelist again
* due to freed overflow pages...
*/
- rc = mdb_cursor_put(&mc, &key, &data, 0);
- if (rc)
- goto fail;
- if (mop == env->me_pghead && env->me_pghead->mo_txnid == id) {
- /* could have been used again here */
- if (mop->mo_pages[0] != orig) {
- data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
- data.mv_data = mop->mo_pages;
- id = mop->mo_txnid;
- rc = mdb_cursor_put(&mc, &key, &data, 0);
- if (rc)
- goto fail;
- }
- } else {
- /* was completely used up */
- rc = mdb_cursor_del(&mc, 0);
+ i = 2;
+ do {
+ orig = mop[0];
+ if (orig > env->me_maxfree_1pg && id > 4)
+ orig = env->me_maxfree_1pg; /* Do not use more than 1 page */
+ data.mv_size = (orig + 1) * sizeof(pgno_t);
+ rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
if (rc)
goto fail;
- if (env->me_pghead)
- goto again;
- }
- env->me_pgfirst = 0;
- env->me_pglast = 0;
+ assert(!env->me_pghead || env->me_pglast);
+ /* mop could have been used again here */
+ if (id != env->me_pglast || env->me_pghead == NULL)
+ goto again; /* was completely used up */
+ assert(mop == env->me_pghead);
+ } while (mop[0] < orig && --i);
+ memcpy(data.mv_data, mop, data.mv_size);
+ if (mop[0] <= orig)
+ break;
+ *(pgno_t *)data.mv_data = orig;
+ mop[orig] = mop[0] - orig;
+ env->me_pghead = mop += orig;
+ /* Save more oldpages at the previous txnid. */
+ assert(env->me_pglast == id && id == oldpg_txnid);
+ env->me_pglast = --oldpg_txnid;
+ }
}
/* Check for growth of freelist again */
if (freecnt != txn->mt_free_pgs[0])
goto free2;
- if (env->me_pghead) {
- free(env->me_pghead);
- env->me_pghead = NULL;
- }
-
- while (env->me_pgfree) {
- MDB_oldpages *mop = env->me_pgfree;
- env->me_pgfree = mop->mo_next;
- free(mop);
- }
+ free(env->me_pgfree);
+ env->me_pghead = env->me_pgfree = NULL;
if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
if (mdb_midl_shrink(&txn->mt_free_pgs))
}
done:
+ env->me_pglast = 0;
env->me_txn = NULL;
- if (txn->mt_numdbs > env->me_numdbs) {
- /* update the DB flags */
- MDB_dbi i;
- for (i = env->me_numdbs; i<txn->mt_numdbs; i++)
- env->me_dbflags[i] = txn->mt_dbs[i].md_flags;
- env->me_numdbs = i;
+ /* update the DB flags */
+ for (i = 2; i<txn->mt_numdbs; i++) {
+ if (txn->mt_dbflags[i] & DB_NEW)
+ env->me_dbflags[i] = txn->mt_dbs[i].md_flags | MDB_VALID;
}
+ if (txn->mt_numdbs > env->me_numdbs)
+ env->me_numdbs = txn->mt_numdbs;
UNLOCK_MUTEX_W(env);
free(txn);
if (!e)
return ENOMEM;
- e->me_free_pgs = mdb_midl_alloc();
- if (!e->me_free_pgs) {
- free(e);
- return ENOMEM;
- }
e->me_maxreaders = DEFAULT_READERS;
- e->me_maxdbs = 2;
+ e->me_maxdbs = e->me_numdbs = 2;
e->me_fd = INVALID_HANDLE_VALUE;
e->me_lfd = INVALID_HANDLE_VALUE;
e->me_mfd = INVALID_HANDLE_VALUE;
env->me_map = NULL;
return ErrCode();
}
-#endif
+ /* Turn off readahead. It's harmful when the DB is larger than RAM. */
+#ifdef MADV_RANDOM
+ madvise(env->me_map, env->me_mapsize, MADV_RANDOM);
+#else
+#ifdef POSIX_MADV_RANDOM
+ posix_madvise(env->me_map, env->me_mapsize, POSIX_MADV_RANDOM);
+#endif /* POSIX_MADV_RANDOM */
+#endif /* MADV_RANDOM */
+#endif /* _WIN32 */
if (newenv) {
if (flags & MDB_FIXEDMAP)
return EBUSY; /* TODO: Make a new MDB_* error code? */
}
env->me_psize = meta.mm_psize;
+ env->me_maxfree_1pg = (env->me_psize - PAGEHDRSZ) / sizeof(pgno_t) - 1;
+ env->me_nodemax = (env->me_psize - PAGEHDRSZ) / MDB_MINKEYS;
env->me_maxpg = env->me_mapsize / env->me_psize;
* @param[in] lpath The pathname of the file used for the lock region.
* @param[in] mode The Unix permissions for the file, if we create it.
* @param[out] excl Resulting file lock type: -1 none, 0 shared, 1 exclusive
+ * @param[in,out] excl In -1, out lock type: -1 none, 0 shared, 1 exclusive
* @return 0 on success, non-zero on failure.
*/
static int
mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
{
+#ifdef _WIN32
+# define MDB_ERRCODE_ROFS ERROR_WRITE_PROTECT
+#else
+# define MDB_ERRCODE_ROFS EROFS
+#ifdef O_CLOEXEC /* Linux: Open file and set FD_CLOEXEC atomically */
+# define MDB_CLOEXEC O_CLOEXEC
+#else
+ int fdflags;
+# define MDB_CLOEXEC 0
+#endif
+#endif
int rc;
off_t size, rsize;
- *excl = -1;
-
#ifdef _WIN32
- if ((env->me_lfd = CreateFile(lpath, GENERIC_READ|GENERIC_WRITE,
+ env->me_lfd = CreateFile(lpath, GENERIC_READ|GENERIC_WRITE,
FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS,
- FILE_ATTRIBUTE_NORMAL, NULL)) == INVALID_HANDLE_VALUE) {
+ FILE_ATTRIBUTE_NORMAL, NULL);
+#else
+ env->me_lfd = open(lpath, O_RDWR|O_CREAT|MDB_CLOEXEC, mode);
+#endif
+ if (env->me_lfd == INVALID_HANDLE_VALUE) {
rc = ErrCode();
- if (rc == ERROR_WRITE_PROTECT && (env->me_flags & MDB_RDONLY)) {
- env->me_flags |= MDB_ROFS;
+ if (rc == MDB_ERRCODE_ROFS && (env->me_flags & MDB_RDONLY)) {
return MDB_SUCCESS;
}
goto fail_errno;
}
- /* Try to get exclusive lock. If we succeed, then
- * nobody is using the lock region and we should initialize it.
- */
- if ((rc = mdb_env_excl_lock(env, excl))) goto fail;
- size = GetFileSize(env->me_lfd, NULL);
-
-#else
-#if !(O_CLOEXEC)
- {
- int fdflags;
- if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT, mode)) == -1) {
- rc = ErrCode();
- if (rc == EROFS && (env->me_flags & MDB_RDONLY)) {
- env->me_flags |= MDB_ROFS;
- return MDB_SUCCESS;
- }
- goto fail_errno;
- }
- /* Lose record locks when exec*() */
- if ((fdflags = fcntl(env->me_lfd, F_GETFD) | FD_CLOEXEC) >= 0)
+#if ! ((MDB_CLOEXEC) || defined(_WIN32))
+ /* Lose record locks when exec*() */
+ if ((fdflags = fcntl(env->me_lfd, F_GETFD) | FD_CLOEXEC) >= 0)
fcntl(env->me_lfd, F_SETFD, fdflags);
- }
-#else /* O_CLOEXEC on Linux: Open file and set FD_CLOEXEC atomically */
- if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT|O_CLOEXEC, mode)) == -1) {
- rc = ErrCode();
- if (rc == EROFS && (env->me_flags & MDB_RDONLY)) {
- env->me_flags |= MDB_ROFS;
- return MDB_SUCCESS;
+#endif
+
+ if (!(env->me_flags & MDB_NOTLS)) {
+ rc = pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
+ if (rc)
+ goto fail;
+ env->me_flags |= MDB_ENV_TXKEY;
+#ifdef _WIN32
+ /* Windows TLS callbacks need help finding their TLS info. */
+ if (mdb_tls_nkeys >= MAX_TLS_KEYS) {
+ rc = MDB_TLS_FULL;
+ goto fail;
}
- goto fail_errno;
- }
+ mdb_tls_keys[mdb_tls_nkeys++] = env->me_txkey;
#endif
+ }
/* Try to get exclusive lock. If we succeed, then
* nobody is using the lock region and we should initialize it.
*/
if ((rc = mdb_env_excl_lock(env, excl))) goto fail;
+#ifdef _WIN32
+ size = GetFileSize(env->me_lfd, NULL);
+#else
size = lseek(env->me_lfd, 0, SEEK_END);
#endif
rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
* environment and re-opening it with the new flags.
*/
#define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC)
-#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP)
+#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP|MDB_NOTLS)
int
-mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
+mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode)
{
- int oflags, rc, len, excl;
+ int oflags, rc, len, excl = -1;
char *lpath, *dpath;
if (env->me_fd!=INVALID_HANDLE_VALUE || (flags & ~(CHANGEABLE|CHANGELESS)))
sprintf(dpath, "%s" DATANAME, path);
}
+ rc = MDB_SUCCESS;
flags |= env->me_flags;
- /* silently ignore WRITEMAP if we're only getting read access */
- if (F_ISSET(flags, MDB_RDONLY|MDB_WRITEMAP))
- flags ^= MDB_WRITEMAP;
+ if (flags & MDB_RDONLY) {
+ /* silently ignore WRITEMAP when we're only getting read access */
+ flags &= ~MDB_WRITEMAP;
+ } else {
+ if (!((env->me_free_pgs = mdb_midl_alloc()) &&
+ (env->me_dirty_list = calloc(MDB_IDL_UM_SIZE, sizeof(MDB_ID2)))))
+ rc = ENOMEM;
+ }
env->me_flags = flags |= MDB_ENV_ACTIVE;
+ if (rc)
+ goto leave;
+
+ env->me_path = strdup(path);
+ env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
+ env->me_dbflags = calloc(env->me_maxdbs, sizeof(uint16_t));
+ if (!(env->me_dbxs && env->me_path && env->me_dbflags)) {
+ rc = ENOMEM;
+ goto leave;
+ }
rc = mdb_env_setup_locks(env, lpath, mode, &excl);
if (rc)
}
}
DPRINTF("opened dbenv %p", (void *) env);
- rc = pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
- if (rc)
- goto leave;
- env->me_numdbs = 2; /* this notes that me_txkey was set */
-#ifdef _WIN32
- /* Windows TLS callbacks need help finding their TLS info. */
- if (mdb_tls_nkeys < MAX_TLS_KEYS)
- mdb_tls_keys[mdb_tls_nkeys++] = env->me_txkey;
- else {
- rc = MDB_TLS_FULL;
- goto leave;
- }
-#endif
if (excl > 0) {
rc = mdb_env_share_locks(env, &excl);
- if (rc)
- goto leave;
}
- env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
- env->me_dbflags = calloc(env->me_maxdbs, sizeof(uint16_t));
- env->me_path = strdup(path);
- if (!env->me_dbxs || !env->me_dbflags || !env->me_path)
- rc = ENOMEM;
}
leave:
free(env->me_dbflags);
free(env->me_dbxs);
free(env->me_path);
+ free(env->me_dirty_list);
+ if (env->me_free_pgs)
+ mdb_midl_free(env->me_free_pgs);
- if (env->me_numdbs) {
+ if (env->me_flags & MDB_ENV_TXKEY) {
pthread_key_delete(env->me_txkey);
#ifdef _WIN32
/* Delete our key from the global list */
close(env->me_lfd);
}
- env->me_flags &= ~MDB_ENV_ACTIVE;
+ env->me_flags &= ~(MDB_ENV_ACTIVE|MDB_ENV_TXKEY);
}
int
if (rc)
goto leave;
- if (!(env->me_flags & MDB_ROFS)) {
+ if (env->me_txns) {
/* We must start the actual read txn after blocking writers */
mdb_txn_reset0(txn);
rc = write(newfd, env->me_map, wsize);
rc = (rc == (int)wsize) ? MDB_SUCCESS : ErrCode();
#endif
- if (! (env->me_flags & MDB_ROFS))
+ if (env->me_txns)
UNLOCK_MUTEX_W(env);
if (rc)
mdb_env_close(MDB_env *env)
{
MDB_page *dp;
+ int i;
if (env == NULL)
return;
+ for (i = env->me_numdbs; --i > MAIN_DBI; )
+ free(env->me_dbxs[i].md_name.mv_data);
+
VGMEMP_DESTROY(env);
while ((dp = env->me_dpages) != NULL) {
VGMEMP_DEFINED(&dp->mp_next, sizeof(dp->mp_next));
}
mdb_env_close0(env, 0);
- mdb_midl_free(env->me_free_pgs);
free(env);
}
{
MDB_page *p = NULL;
- if (txn->mt_env->me_flags & MDB_WRITEMAP) {
- if (pgno < txn->mt_next_pgno)
- p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
- goto done;
- }
- if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && txn->mt_u.dirty_list[0].mid) {
- unsigned x;
- x = mdb_mid2l_search(txn->mt_u.dirty_list, pgno);
- if (x <= txn->mt_u.dirty_list[0].mid && txn->mt_u.dirty_list[x].mid == pgno) {
- p = txn->mt_u.dirty_list[x].mptr;
- }
- }
- if (!p) {
- if (pgno < txn->mt_next_pgno)
- p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
+ if (!((txn->mt_flags & MDB_TXN_RDONLY) |
+ (txn->mt_env->me_flags & MDB_WRITEMAP)))
+ {
+ MDB_txn *tx2 = txn;
+ do {
+ MDB_ID2L dl = tx2->mt_u.dirty_list;
+ if (dl[0].mid) {
+ unsigned x = mdb_mid2l_search(dl, pgno);
+ if (x <= dl[0].mid && dl[x].mid == pgno) {
+ p = dl[x].mptr;
+ goto done;
+ }
+ }
+ } while ((tx2 = tx2->mt_parent) != NULL);
}
-done:
- *ret = p;
- if (!p) {
+
+ if (pgno < txn->mt_next_pgno) {
+ p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
+ } else {
DPRINTF("page %zu not found", pgno);
assert(p != NULL);
}
+
+done:
+ *ret = p;
return (p != NULL) ? MDB_SUCCESS : MDB_PAGE_NOTFOUND;
}
if (*mc->mc_dbflag & DB_STALE) {
MDB_val data;
int exact = 0;
+ uint16_t flags;
MDB_node *leaf = mdb_node_search(&mc2,
&mc->mc_dbx->md_name, &exact);
if (!exact)
return MDB_NOTFOUND;
mdb_node_read(mc->mc_txn, leaf, &data);
+ memcpy(&flags, ((char *) data.mv_data + offsetof(MDB_db, md_flags)),
+ sizeof(uint16_t));
+ /* The txn may not know this DBI, or another process may
+ * have dropped and recreated the DB with other flags.
+ */
+ if ((mc->mc_db->md_flags & PERSISTENT_FLAGS) != flags)
+ return MDB_INCOMPATIBLE;
memcpy(mc->mc_db, data.mv_data, sizeof(MDB_db));
}
if (flags & MDB_PS_MODIFY)
dbflag = DB_DIRTY;
- *mc->mc_dbflag = dbflag;
+ *mc->mc_dbflag &= ~DB_STALE;
+ *mc->mc_dbflag |= dbflag;
}
}
root = mc->mc_db->md_root;
assert(data);
DPRINTF("===> get db %u key [%s]", dbi, DKEY(key));
- if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
+ if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
return EINVAL;
if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) {
return rc;
mdb_cursor_push(mc, mp);
+ if (!move_right)
+ mc->mc_ki[mc->mc_top] = NUMKEYS(mp)-1;
return MDB_SUCCESS;
}
mdb_xcursor_init1(mc, leaf);
}
if (data) {
- if ((rc = mdb_node_read(mc->mc_txn, leaf, data) != MDB_SUCCESS))
+ if ((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
return rc;
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
mdb_xcursor_init1(mc, leaf);
}
if (data) {
- if ((rc = mdb_node_read(mc->mc_txn, leaf, data) != MDB_SUCCESS))
+ if ((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
return rc;
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
}
assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
- mc->mc_flags |= C_INITIALIZED|C_EOF;
mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]) - 1;
}
+ mc->mc_flags |= C_INITIALIZED|C_EOF;
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
switch (op) {
case MDB_GET_CURRENT:
- if (!mc->mc_flags & C_INITIALIZED) {
+ if (!(mc->mc_flags & C_INITIALIZED)) {
rc = EINVAL;
} else {
MDB_page *mp = mc->mc_pg[mc->mc_top];
case MDB_PREV:
case MDB_PREV_DUP:
case MDB_PREV_NODUP:
- if (!(mc->mc_flags & C_INITIALIZED) || (mc->mc_flags & C_EOF)) {
+ if (!(mc->mc_flags & C_INITIALIZED)) {
rc = mdb_cursor_last(mc, key, data);
mc->mc_flags |= C_INITIALIZED;
mc->mc_ki[mc->mc_top]++;
rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, MDB_PS_MODIFY);
if (rc)
return rc;
- *mc->mc_dbflag = DB_DIRTY;
+ *mc->mc_dbflag |= DB_DIRTY;
}
for (mc->mc_top = 0; mc->mc_top < mc->mc_snum; mc->mc_top++) {
rc = mdb_page_touch(mc);
if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
return EACCES;
- if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE)
+ if (flags != MDB_CURRENT && (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE))
return EINVAL;
if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT) && data->mv_size > MDB_MAXKEYSIZE)
mdb_cursor_push(mc, np);
mc->mc_db->md_root = np->mp_pgno;
mc->mc_db->md_depth++;
- *mc->mc_dbflag = DB_DIRTY;
+ *mc->mc_dbflag |= DB_DIRTY;
if ((mc->mc_db->md_flags & (MDB_DUPSORT|MDB_DUPFIXED))
== MDB_DUPFIXED)
np->mp_flags |= P_LEAF2;
rc = MDB_NOTFOUND;
mc->mc_ki[mc->mc_top]++;
} else {
- rc = 0;
+ /* new key is <= last key */
+ rc = MDB_KEYEXIST;
}
}
} else {
}
offset += offset & 1;
if (NODESIZE + sizeof(indx_t) + NODEKSZ(leaf) + NODEDSZ(leaf) +
- offset >= (mc->mc_txn->mt_env->me_psize - PAGEHDRSZ) /
- MDB_MINKEYS) {
+ offset >= mc->mc_txn->mt_env->me_nodemax) {
/* yes, convert it */
dummy.md_flags = 0;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
*/
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = NODEDATA(leaf);
- else
+ else if (data->mv_size)
memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
+ else
+ memcpy(NODEKEY(leaf), key->mv_data, key->mv_size);
goto done;
}
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
}
}
}
+ /* we've done our job */
+ dkey.mv_size = 0;
}
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
}
}
done:
+ /* If we succeeded and the key didn't exist before, make sure
+ * the cursor is marked valid.
+ */
+ if (!rc && insert)
+ mc->mc_flags |= C_INITIALIZED;
return rc;
}
if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
return EACCES;
- if (!mc->mc_flags & C_INITIALIZED)
+ if (!(mc->mc_flags & C_INITIALIZED))
return EINVAL;
rc = mdb_cursor_touch(mc);
size_t sz;
sz = LEAFSIZE(key, data);
- if (sz >= env->me_psize / MDB_MINKEYS) {
+ if (sz >= env->me_nodemax) {
/* put on overflow page */
sz -= data->mv_size - sizeof(pgno_t);
}
size_t sz;
sz = INDXSIZE(key);
- if (sz >= env->me_psize / MDB_MINKEYS) {
+ if (sz >= env->me_nodemax) {
/* put on overflow page */
/* not implemented */
/* sz -= key->size - sizeof(pgno_t); */
if (F_ISSET(flags, F_BIGDATA)) {
/* Data already on overflow page. */
node_size += sizeof(pgno_t);
- } else if (node_size + data->mv_size >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) {
+ } else if (node_size + data->mv_size >= mc->mc_txn->mt_env->me_nodemax) {
int ovpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
int rc;
/* Put data on overflow page. */
}
DPRINTF("Sub-db %u for db %u root page %zu", mx->mx_cursor.mc_dbi, mc->mc_dbi,
mx->mx_db.md_root);
- mx->mx_dbflag = (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY)) ?
- DB_DIRTY : 0;
+ mx->mx_dbflag = DB_VALID | (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY) ?
+ DB_DIRTY : 0);
mx->mx_dbx.md_name.mv_data = NODEKEY(node);
mx->mx_dbx.md_name.mv_size = node->mn_ksize;
#if UINT_MAX < SIZE_MAX
MDB_xcursor *mx = NULL;
size_t size = sizeof(MDB_cursor);
- if (txn == NULL || ret == NULL || dbi >= txn->mt_numdbs)
+ if (txn == NULL || ret == NULL || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
return EINVAL;
/* Allow read access to the freelist */
* @return 0 on success, non-zero on failure.
*/
static int
-mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
+mdb_update_key(MDB_cursor *mc, MDB_val *key)
{
+ MDB_page *mp;
MDB_node *node;
char *base;
size_t len;
int delta, delta0;
- indx_t ptr, i, numkeys;
+ indx_t ptr, i, numkeys, indx;
DKBUF;
+ indx = mc->mc_ki[mc->mc_top];
+ mp = mc->mc_pg[mc->mc_top];
node = NODEPTR(mp, indx);
ptr = mp->mp_ptrs[indx];
#if MDB_DEBUG
delta += (delta & 1);
if (delta) {
if (delta > 0 && SIZELEFT(mp) < delta) {
- DPRINTF("OUCH! Not enough room, delta = %d", delta);
- return MDB_PAGE_FULL;
+ pgno_t pgno;
+ /* not enough space left, do a delete and split */
+ DPRINTF("Not enough room, delta = %d, splitting...", delta);
+ pgno = NODEPGNO(node);
+ mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+ return mdb_page_split(mc, key, NULL, pgno, MDB_SPLIT_REPLACE);
}
numkeys = NUMKEYS(mp);
return MDB_SUCCESS;
}
+static void
+mdb_cursor_copy(const MDB_cursor *csrc, MDB_cursor *cdst);
+
/** Move a node from csrc to cdst.
*/
static int
mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
{
- int rc;
MDB_node *srcnode;
MDB_val key, data;
pgno_t srcpg;
+ MDB_cursor mn;
+ int rc;
unsigned short flags;
DKBUF;
}
cdst->mc_snum = snum--;
cdst->mc_top = snum;
- rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &bkey);
+ mdb_cursor_copy(cdst, &mn);
+ mn.mc_ki[snum] = 0;
+ rc = mdb_update_key(&mn, &bkey);
+ if (rc)
+ return rc;
}
DPRINTF("moving %s node %u [%s] on page %zu to node %u on page %zu",
}
DPRINTF("update separator for source page %zu to [%s]",
csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key));
- if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1],
- &key)) != MDB_SUCCESS)
+ mdb_cursor_copy(csrc, &mn);
+ mn.mc_snum--;
+ mn.mc_top--;
+ if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS)
return rc;
}
if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
MDB_val nullkey;
+ indx_t ix = csrc->mc_ki[csrc->mc_top];
nullkey.mv_size = 0;
- rc = mdb_update_key(csrc->mc_pg[csrc->mc_top], 0, &nullkey);
+ csrc->mc_ki[csrc->mc_top] = 0;
+ rc = mdb_update_key(csrc, &nullkey);
+ csrc->mc_ki[csrc->mc_top] = ix;
assert(rc == MDB_SUCCESS);
}
}
}
DPRINTF("update separator for destination page %zu to [%s]",
cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key));
- if ((rc = mdb_update_key(cdst->mc_pg[cdst->mc_top-1], cdst->mc_ki[cdst->mc_top-1],
- &key)) != MDB_SUCCESS)
+ mdb_cursor_copy(cdst, &mn);
+ mn.mc_snum--;
+ mn.mc_top--;
+ if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS)
return rc;
}
if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) {
MDB_val nullkey;
+ indx_t ix = cdst->mc_ki[cdst->mc_top];
nullkey.mv_size = 0;
- rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &nullkey);
+ cdst->mc_ki[cdst->mc_top] = 0;
+ rc = mdb_update_key(cdst, &nullkey);
+ cdst->mc_ki[cdst->mc_top] = ix;
assert(rc == MDB_SUCCESS);
}
}
mdb_node_del(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1], 0);
if (csrc->mc_ki[csrc->mc_top-1] == 0) {
key.mv_size = 0;
- if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], 0, &key)) != MDB_SUCCESS)
+ csrc->mc_top--;
+ rc = mdb_update_key(csrc, &key);
+ csrc->mc_top++;
+ if (rc)
return rc;
}
{
MDB_node *node;
int rc;
- unsigned int ptop;
+ unsigned int ptop, minkeys;
MDB_cursor mn;
+ minkeys = 1 + (IS_BRANCH(mc->mc_pg[mc->mc_top]));
#if MDB_DEBUG
{
pgno_t pgno;
}
#endif
- if (PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) >= FILL_THRESHOLD) {
+ if (PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) >= FILL_THRESHOLD &&
+ NUMKEYS(mc->mc_pg[mc->mc_top]) >= minkeys) {
#if MDB_DEBUG
pgno_t pgno;
COPY_PGNO(pgno, mc->mc_pg[mc->mc_top]->mp_pgno);
if (mc->mc_snum < 2) {
MDB_page *mp = mc->mc_pg[0];
+ if (IS_SUBP(mp)) {
+ DPUTS("Can't rebalance a subpage, ignoring");
+ return MDB_SUCCESS;
+ }
if (NUMKEYS(mp) == 0) {
DPUTS("tree is completely empty");
mc->mc_db->md_root = P_INVALID;
if (m3->mc_snum < mc->mc_snum) continue;
if (m3->mc_pg[0] == mp) {
m3->mc_pg[0] = mc->mc_pg[0];
+ m3->mc_snum = 1;
+ m3->mc_top = 0;
}
}
}
DPRINTF("found neighbor page %zu (%u keys, %.1f%% full)",
mn.mc_pg[mn.mc_top]->mp_pgno, NUMKEYS(mn.mc_pg[mn.mc_top]), (float)PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) / 10);
- /* If the neighbor page is above threshold and has at least two
- * keys, move one key from it.
- *
- * Otherwise we should try to merge them.
+ /* If the neighbor page is above threshold and has enough keys,
+ * move one key from it. Otherwise we should try to merge them.
+ * (A branch page must never have less than 2 keys.)
*/
- if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2)
+ minkeys = 1 + (IS_BRANCH(mn.mc_pg[mn.mc_top]));
+ if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) > minkeys)
return mdb_node_move(&mn, mc);
- else { /* FIXME: if (has_enough_room()) */
- mc->mc_flags &= ~C_INITIALIZED;
+ else {
if (mc->mc_ki[ptop] == 0)
- return mdb_page_merge(&mn, mc);
+ rc = mdb_page_merge(&mn, mc);
else
- return mdb_page_merge(mc, &mn);
+ rc = mdb_page_merge(mc, &mn);
+ mc->mc_flags &= ~C_INITIALIZED;
}
+ return rc;
}
/** Complete a delete operation started by #mdb_cursor_del(). */
rc = mdb_rebalance(mc);
if (rc != MDB_SUCCESS)
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
+ /* if mc points past last node in page, invalidate */
+ else if (mc->mc_ki[mc->mc_top] >= NUMKEYS(mc->mc_pg[mc->mc_top]))
+ mc->mc_flags &= ~C_INITIALIZED;
return rc;
}
DPRINTF("====> delete db %u key [%s]", dbi, DKEY(key));
- if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
+ if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
return EINVAL;
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
xdata = NULL;
}
rc = mdb_cursor_set(&mc, key, xdata, op, &exact);
- if (rc == 0)
+ if (rc == 0) {
+ /* let mdb_page_split know about this cursor if needed:
+ * delete will trigger a rebalance; if it needs to move
+ * a node from one page to another, it will have to
+ * update the parent's separator key(s). If the new sepkey
+ * is larger than the current one, the parent page may
+ * run out of space, triggering a split. We need this
+ * cursor to be consistent until the end of the rebalance.
+ */
+ mc.mc_next = txn->mt_cursors[dbi];
+ txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_del(&mc, data ? 0 : MDB_NODUPDATA);
+ txn->mt_cursors[dbi] = mc.mc_next;
+ }
return rc;
}
assert(key != NULL);
assert(data != NULL);
- if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
+ if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
return EINVAL;
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
return EINVAL;
}
- if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND)) != flags)
+ if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP)) != flags)
return EINVAL;
mdb_cursor_init(&mc, txn, dbi, &mx);
: ((f & MDB_REVERSEDUP) ? mdb_cmp_memnr : mdb_cmp_memn));
}
-#define PERSISTENT_FLAGS 0xffff
-#define VALID_FLAGS (MDB_REVERSEKEY|MDB_DUPSORT|MDB_INTEGERKEY|MDB_DUPFIXED|\
- MDB_INTEGERDUP|MDB_REVERSEDUP|MDB_CREATE)
int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
{
MDB_val key, data;
return MDB_DBS_FULL;
/* Find the DB info */
- dbflag = 0;
+ dbflag = DB_NEW|DB_VALID;
exact = 0;
key.mv_size = len;
key.mv_data = (void *)name;
dummy.md_root = P_INVALID;
dummy.md_flags = flags & PERSISTENT_FLAGS;
rc = mdb_cursor_put(&mc, &key, &data, F_SUBDATA);
- dbflag = DB_DIRTY;
+ dbflag |= DB_DIRTY;
}
/* OK, got info, add to table */
mdb_default_cmp(txn, slot);
if (!unused) {
txn->mt_numdbs++;
- txn->mt_env->me_numdbs++;
}
}
void mdb_dbi_close(MDB_env *env, MDB_dbi dbi)
{
char *ptr;
- if (dbi <= MAIN_DBI || dbi >= env->me_numdbs)
+ if (dbi <= MAIN_DBI || dbi >= env->me_maxdbs)
return;
ptr = env->me_dbxs[dbi].md_name.mv_data;
env->me_dbxs[dbi].md_name.mv_data = NULL;
unsigned int i;
/* LEAF2 pages have no nodes, cannot have sub-DBs */
- if (!subs || IS_LEAF2(mc->mc_pg[mc->mc_top]))
+ if (IS_LEAF2(mc->mc_pg[mc->mc_top]))
mdb_cursor_pop(mc);
mdb_cursor_copy(mc, &mx);
if (IS_LEAF(mc->mc_pg[mc->mc_top])) {
for (i=0; i<NUMKEYS(mc->mc_pg[mc->mc_top]); i++) {
ni = NODEPTR(mc->mc_pg[mc->mc_top], i);
- if (ni->mn_flags & F_SUBDATA) {
+ if (ni->mn_flags & F_BIGDATA) {
+ int j, ovpages = OVPAGES(NODEDSZ(ni), mc->mc_txn->mt_env->me_psize);
+ pgno_t pg;
+ memcpy(&pg, NODEDATA(ni), sizeof(pg));
+ for (j=0; j<ovpages; j++) {
+ mdb_midl_append(&mc->mc_txn->mt_free_pgs, pg);
+ pg++;
+ }
+ } else if (subs && (ni->mn_flags & F_SUBDATA)) {
mdb_xcursor_init1(mc, ni);
rc = mdb_drop0(&mc->mc_xcursor->mx_cursor, 0);
if (rc)
}
if (!mc->mc_top)
break;
+ mc->mc_ki[mc->mc_top] = i;
rc = mdb_cursor_sibling(mc, 1);
if (rc) {
/* no more siblings, go back to beginning
* of previous level.
*/
mdb_cursor_pop(mc);
- for (i=1; i<mc->mc_top; i++)
+ mc->mc_ki[0] = 0;
+ for (i=1; i<mc->mc_snum; i++) {
+ mc->mc_ki[i] = 0;
mc->mc_pg[i] = mx.mc_pg[i];
+ }
}
}
/* free it */
MDB_cursor *mc;
int rc;
- if (!txn || !dbi || dbi >= txn->mt_numdbs || (unsigned)del > 1)
+ if (!txn || !dbi || dbi >= txn->mt_numdbs || (unsigned)del > 1 || !(txn->mt_dbflags[dbi] & DB_VALID))
return EINVAL;
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY))
/* Can't delete the main DB */
if (del && dbi > MAIN_DBI) {
rc = mdb_del(txn, MAIN_DBI, &mc->mc_dbx->md_name, NULL);
- if (!rc)
+ if (!rc) {
+ txn->mt_dbflags[dbi] = DB_STALE;
mdb_dbi_close(txn->mt_env, dbi);
+ }
} else {
/* reset the DB record, mark it dirty */
txn->mt_dbflags[dbi] |= DB_DIRTY;
txn->mt_dbs[dbi].md_entries = 0;
txn->mt_dbs[dbi].md_root = P_INVALID;
- if (!txn->mt_u.dirty_list[0].mid) {
- MDB_cursor m2;
- MDB_val key, data;
- /* make sure we have at least one dirty page in this txn
- * otherwise these changes will be ignored.
- */
- key.mv_size = sizeof(txnid_t);
- key.mv_data = &txn->mt_txnid;
- data.mv_size = sizeof(MDB_ID);
- data.mv_data = txn->mt_free_pgs;
- mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
- rc = mdb_cursor_put(&m2, &key, &data, 0);
- }
+ txn->mt_flags |= MDB_TXN_DIRTY;
}
leave:
mdb_cursor_close(mc);
int mdb_set_compare(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp)
{
- if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
+ if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
return EINVAL;
txn->mt_dbxs[dbi].md_cmp = cmp;
int mdb_set_dupsort(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp)
{
- if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
+ if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
return EINVAL;
txn->mt_dbxs[dbi].md_dcmp = cmp;
int mdb_set_relfunc(MDB_txn *txn, MDB_dbi dbi, MDB_rel_func *rel)
{
- if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
+ if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
return EINVAL;
txn->mt_dbxs[dbi].md_rel = rel;
int mdb_set_relctx(MDB_txn *txn, MDB_dbi dbi, void *ctx)
{
- if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
+ if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
return EINVAL;
txn->mt_dbxs[dbi].md_relctx = ctx;