#include <resolv.h> /* defines BYTE_ORDER on HPUX and Solaris */
#endif
+#if defined(__APPLE__) || defined (BSD)
+#define USE_POSIX_SEM
+#endif
+
#ifndef _WIN32
#include <pthread.h>
-#ifdef __APPLE__
+#ifdef USE_POSIX_SEM
#include <semaphore.h>
#endif
#endif
#define close(fd) CloseHandle(fd)
#define munmap(ptr,len) UnmapViewOfFile(ptr)
#else
-#ifdef __APPLE__
+#ifdef USE_POSIX_SEM
#define LOCK_MUTEX_R(env) sem_wait((env)->me_rmutex)
#define UNLOCK_MUTEX_R(env) sem_post((env)->me_rmutex)
#define LOCK_MUTEX_W(env) sem_wait((env)->me_wmutex)
/** Unlock the writer mutex.
*/
#define UNLOCK_MUTEX_W(env) pthread_mutex_unlock(&(env)->me_txns->mti_wmutex)
-#endif /* __APPLE__ */
+#endif /* USE_POSIX_SEM */
/** Get the error code for the last failed system function.
*/
#define GET_PAGESIZE(x) ((x) = sysconf(_SC_PAGE_SIZE))
#endif
-#if defined(_WIN32) || defined(__APPLE__)
+#if defined(_WIN32) || defined(USE_POSIX_SEM)
#define MNAME_LEN 32
#else
#define MNAME_LEN (sizeof(pthread_mutex_t))
#define DKEY(x) 0
#endif
-/** @defgroup lazylock Lazy Locking
- * Macros for locks that aren't actually needed.
- * The DB view is always consistent because all writes are wrapped in
- * the wmutex. Finer-grained locks aren't necessary.
- * @{
- */
-#ifndef LAZY_LOCKS
- /** Use lazy locking. I.e., don't lock these accesses at all. */
-#define LAZY_LOCKS 1
-#endif
-#if LAZY_LOCKS
- /** Grab the reader lock */
-#define LAZY_MUTEX_LOCK(x)
- /** Release the reader lock */
-#define LAZY_MUTEX_UNLOCK(x)
- /** Release the DB table reader/writer lock */
-#define LAZY_RWLOCK_UNLOCK(x)
- /** Grab the DB table write lock */
-#define LAZY_RWLOCK_WRLOCK(x)
- /** Grab the DB table read lock */
-#define LAZY_RWLOCK_RDLOCK(x)
- /** Declare the DB table rwlock. Should not be followed by ';'. */
-#define LAZY_RWLOCK_DEF(x)
- /** Initialize the DB table rwlock */
-#define LAZY_RWLOCK_INIT(x,y)
- /** Destroy the DB table rwlock */
-#define LAZY_RWLOCK_DESTROY(x)
-#else
-#define LAZY_MUTEX_LOCK(x) pthread_mutex_lock(x)
-#define LAZY_MUTEX_UNLOCK(x) pthread_mutex_unlock(x)
-#define LAZY_RWLOCK_UNLOCK(x) pthread_rwlock_unlock(x)
-#define LAZY_RWLOCK_WRLOCK(x) pthread_rwlock_wrlock(x)
-#define LAZY_RWLOCK_RDLOCK(x) pthread_rwlock_rdlock(x)
-#define LAZY_RWLOCK_DEF(x) pthread_rwlock_t x;
-#define LAZY_RWLOCK_INIT(x,y) pthread_rwlock_init(x,y)
-#define LAZY_RWLOCK_DESTROY(x) pthread_rwlock_destroy(x)
-#endif
-/** @} */
-
/** An invalid page number.
* Mainly used to denote an empty tree.
*/
uint32_t mtb_magic;
/** Version number of this lock file. Must be set to #MDB_VERSION. */
uint32_t mtb_version;
-#if defined(_WIN32) || defined(__APPLE__)
+#if defined(_WIN32) || defined(USE_POSIX_SEM)
char mtb_rmname[MNAME_LEN];
#else
/** Mutex protecting access to this table.
*/
pthread_mutex_t mtb_mutex;
#endif
-#if MDB_VERSION == 1
/** The ID of the last transaction committed to the database.
* This is recorded here only for convenience; the value can always
* be determined by reading the main database meta pages.
- *
- * Value is unused, but maintained for backwards compatibility.
- * Drop this or mtb_me_toggle when changing MDB_VERSION.
- * (Reading both should have been done atomically.)
*/
txnid_t mtb_txnid;
-#endif
/** The number of slots that have been used in the reader table.
* This always records the maximum count, it is not decremented
* when readers release their slots.
*/
unsigned mtb_numreaders;
- /** The ID of the most recent meta page in the database.
- * This is recorded here only for convenience; the value can always
- * be determined by reading the main database meta pages.
- */
- uint32_t mtb_me_toggle;
} MDB_txbody;
/** The actual reader table definition. */
#define mti_version mt1.mtb.mtb_version
#define mti_mutex mt1.mtb.mtb_mutex
#define mti_rmname mt1.mtb.mtb_rmname
-#if MDB_VERSION == 1
#define mti_txnid mt1.mtb.mtb_txnid
-#endif
#define mti_numreaders mt1.mtb.mtb_numreaders
-#define mti_me_toggle mt1.mtb.mtb_me_toggle
char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
} mt1;
union {
-#if defined(_WIN32) || defined(__APPLE__)
+#if defined(_WIN32) || defined(USE_POSIX_SEM)
char mt2_wmname[MNAME_LEN];
#define mti_wmname mt2.mt2_wmname
#else
#define C_SUB 0x04 /**< Cursor is a sub-cursor */
#define C_SHADOW 0x08 /**< Cursor is a dup from a parent txn */
#define C_ALLOCD 0x10 /**< Cursor was malloc'd */
+#define C_SPLITTING 0x20 /**< Cursor is in page_split */
/** @} */
unsigned int mc_flags; /**< @ref mdb_cursor */
MDB_page *mc_pg[CURSOR_STACK]; /**< stack of pushed pages */
/** Failed to update the meta page. Probably an I/O error. */
#define MDB_FATAL_ERROR 0x80000000U
uint32_t me_flags; /**< @ref mdb_env */
- uint32_t me_extrapad; /**< unused for now */
+ unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */
unsigned int me_maxreaders; /**< size of the reader table */
MDB_dbi me_numdbs; /**< number of DBs opened */
MDB_dbi me_maxdbs; /**< size of the DB table */
size_t me_mapsize; /**< size of the data memory map */
off_t me_size; /**< current file size */
pgno_t me_maxpg; /**< me_mapsize / me_psize */
- unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */
- unsigned int me_db_toggle; /**< which DB table is current */
- txnid_t me_wtxnid; /**< ID of last txn we committed */
txnid_t me_pgfirst; /**< ID of first old page record we used */
txnid_t me_pglast; /**< ID of last old page record we used */
MDB_dbx *me_dbxs; /**< array of static DB info */
- MDB_db *me_dbs[2]; /**< two arrays of MDB_db info */
+ uint16_t *me_dbflags; /**< array of DB flags */
MDB_oldpages *me_pghead; /**< list of old page records */
MDB_oldpages *me_pgfree; /**< list of page records to free */
pthread_key_t me_txkey; /**< thread-key for readers */
MDB_IDL me_free_pgs;
/** ID2L of pages that were written during a write txn */
MDB_ID2 me_dirty_list[MDB_IDL_UM_SIZE];
- /** rwlock for the DB tables, if #LAZY_LOCKS is false */
- LAZY_RWLOCK_DEF(me_dblock)
#ifdef _WIN32
HANDLE me_rmutex; /* Windows mutexes don't reside in shared mem */
HANDLE me_wmutex;
#endif
-#ifdef __APPLE__
+#ifdef USE_POSIX_SEM
sem_t *me_rmutex; /* Apple doesn't support shared mutexes */
sem_t *me_wmutex;
#endif
static int mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **mp);
static int mdb_page_search_root(MDB_cursor *mc,
MDB_val *key, int modify);
+#define MDB_PS_MODIFY 1
+#define MDB_PS_ROOTONLY 2
static int mdb_page_search(MDB_cursor *mc,
- MDB_val *key, int modify);
+ MDB_val *key, int flags);
static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst);
+
+#define MDB_SPLIT_REPLACE MDB_APPENDDUP /**< newkey is not new */
static int mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata,
pgno_t newpgno, unsigned int nflags);
DKBUF;
nkeys = NUMKEYS(mp);
- DPRINTF("numkeys %d", nkeys);
+ fprintf(stderr, "numkeys %d\n", nkeys);
for (i=0; i<nkeys; i++) {
node = NODEPTR(mp, i);
key.mv_size = node->mn_ksize;
key.mv_data = node->mn_data;
- DPRINTF("key %d: %s", i, DKEY(&key));
+ fprintf(stderr, "key %d: %s\n", i, DKEY(&key));
}
}
+
+void
+mdb_cursor_chk(MDB_cursor *mc)
+{
+ unsigned int i;
+ MDB_node *node;
+ MDB_page *mp;
+
+ if (!mc->mc_snum && !(mc->mc_flags & C_INITIALIZED)) return;
+ for (i=0; i<mc->mc_top; i++) {
+ mp = mc->mc_pg[i];
+ node = NODEPTR(mp, mc->mc_ki[i]);
+ if (NODEPGNO(node) != mc->mc_pg[i+1]->mp_pgno)
+ printf("oops!\n");
+ }
+ if (mc->mc_ki[i] >= NUMKEYS(mc->mc_pg[i]))
+ printf("ack!\n");
+}
#endif
#if MDB_DEBUG > 2
mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0)
freecount += *(MDB_ID *)data.mv_data;
- freecount += txn->mt_dbs[0].md_branch_pages + txn->mt_dbs[0].md_leaf_pages +
- txn->mt_dbs[0].md_overflow_pages;
count = 0;
for (i = 0; i<txn->mt_numdbs; i++) {
+ MDB_xcursor mx, *mxp;
+ mxp = (txn->mt_dbs[i].md_flags & MDB_DUPSORT) ? &mx : NULL;
+ mdb_cursor_init(&mc, txn, i, mxp);
+ if (txn->mt_dbs[i].md_root == P_INVALID)
+ continue;
count += txn->mt_dbs[i].md_branch_pages +
txn->mt_dbs[i].md_leaf_pages +
txn->mt_dbs[i].md_overflow_pages;
if (txn->mt_dbs[i].md_flags & MDB_DUPSORT) {
- MDB_xcursor mx;
- mdb_cursor_init(&mc, txn, i, &mx);
mdb_page_search(&mc, NULL, 0);
do {
unsigned j;
while (mdb_cursor_sibling(&mc, 1) == 0);
}
}
- assert(freecount + count + 2 >= txn->mt_next_pgno - 1);
+ if (freecount + count + 2 /* metapages */ != txn->mt_next_pgno) {
+ fprintf(stderr, "audit: %lu freecount: %lu count: %lu total: %lu next_pgno: %lu\n",
+ txn->mt_txnid, freecount, count+2, freecount+count+2, txn->mt_next_pgno);
+ }
}
#endif
return np;
}
+/** Copy a page: avoid copying unused portions of the page.
+ * @param[in] dst page to copy into
+ * @param[in] src page to copy from
+ */
+static void
+mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize)
+{
+ dst->mp_flags = src->mp_flags | P_DIRTY;
+ dst->mp_pages = src->mp_pages;
+
+ if (IS_LEAF2(src)) {
+ memcpy(dst->mp_ptrs, src->mp_ptrs, psize - PAGEHDRSZ - SIZELEFT(src));
+ } else {
+ unsigned int i, nkeys = NUMKEYS(src);
+ for (i=0; i<nkeys; i++)
+ dst->mp_ptrs[i] = src->mp_ptrs[i];
+ memcpy((char *)dst+src->mp_upper, (char *)src+src->mp_upper,
+ psize - src->mp_upper);
+ }
+}
+
/** Touch a page: make it dirty and re-insert into tree with updated pgno.
* @param[in] mc cursor pointing to the page to be touched
* @return 0 on success, non-zero on failure.
DPRINTF("touched db %u page %zu -> %zu", mc->mc_dbi, mp->mp_pgno, np->mp_pgno);
assert(mp->mp_pgno != np->mp_pgno);
mdb_midl_append(&mc->mc_txn->mt_free_pgs, mp->mp_pgno);
- pgno = np->mp_pgno;
- memcpy(np, mp, mc->mc_txn->mt_env->me_psize);
+ if (SIZELEFT(mp)) {
+ /* If page isn't full, just copy the used portion */
+ mdb_page_copy(np, mp, mc->mc_txn->mt_env->me_psize);
+ } else {
+ pgno = np->mp_pgno;
+ memcpy(np, mp, mc->mc_txn->mt_env->me_psize);
+ np->mp_pgno = pgno;
+ np->mp_flags |= P_DIRTY;
+ }
mp = np;
- mp->mp_pgno = pgno;
- mp->mp_flags |= P_DIRTY;
finish:
/* Adjust other cursors pointing to mp */
mdb_txn_renew0(MDB_txn *txn)
{
MDB_env *env = txn->mt_env;
- char mt_dbflag = 0;
+ unsigned int i;
+
+ /* Setup db info */
+ txn->mt_numdbs = env->me_numdbs;
+ txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
if (txn->mt_flags & MDB_TXN_RDONLY) {
MDB_reader *r = pthread_getspecific(env->me_txkey);
if (!r) {
- unsigned int i;
pid_t pid = getpid();
pthread_t tid = pthread_self();
r = &env->me_txns->mti_readers[i];
pthread_setspecific(env->me_txkey, r);
}
- txn->mt_toggle = env->me_txns->mti_me_toggle;
- txn->mt_txnid = r->mr_txnid = env->me_metas[txn->mt_toggle]->mm_txnid;
-
- /* This happens if a different process was the
- * last writer to the DB.
- */
- if (env->me_wtxnid < txn->mt_txnid)
- mt_dbflag = DB_STALE;
+ txn->mt_txnid = r->mr_txnid = env->me_txns->mti_txnid;
+ txn->mt_toggle = txn->mt_txnid & 1;
+ txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
txn->mt_u.reader = r;
} else {
LOCK_MUTEX_W(env);
- txn->mt_toggle = env->me_txns->mti_me_toggle;
- txn->mt_txnid = env->me_metas[txn->mt_toggle]->mm_txnid;
- if (env->me_wtxnid < txn->mt_txnid)
- mt_dbflag = DB_STALE;
+ txn->mt_txnid = env->me_txns->mti_txnid;
+ txn->mt_toggle = txn->mt_txnid & 1;
+ txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
txn->mt_txnid++;
#if MDB_DEBUG
if (txn->mt_txnid == mdb_debug_start)
txn->mt_free_pgs[0] = 0;
env->me_txn = txn;
}
- txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
- /* Copy the DB arrays */
- LAZY_RWLOCK_RDLOCK(&env->me_dblock);
- txn->mt_numdbs = env->me_numdbs;
- txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
+ /* Copy the DB info and flags */
memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db));
+ for (i=2; i<txn->mt_numdbs; i++)
+ txn->mt_dbs[i].md_flags = env->me_dbflags[i];
+ txn->mt_dbflags[0] = txn->mt_dbflags[1] = 0;
if (txn->mt_numdbs > 2)
- memcpy(txn->mt_dbs+2, env->me_dbs[env->me_db_toggle]+2,
- (txn->mt_numdbs - 2) * sizeof(MDB_db));
- LAZY_RWLOCK_UNLOCK(&env->me_dblock);
-
- memset(txn->mt_dbflags, mt_dbflag, env->me_numdbs);
+ memset(txn->mt_dbflags+2, DB_STALE, txn->mt_numdbs-2);
return MDB_SUCCESS;
}
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
if (txn->mt_numdbs > env->me_numdbs) {
- /* update the DB tables */
- int toggle = !env->me_db_toggle;
- MDB_db *ip, *jp;
+ /* update the DB flags */
MDB_dbi i;
-
- ip = &env->me_dbs[toggle][env->me_numdbs];
- jp = &txn->mt_dbs[env->me_numdbs];
- LAZY_RWLOCK_WRLOCK(&env->me_dblock);
- for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
- *ip++ = *jp++;
- }
-
- env->me_db_toggle = toggle;
- env->me_numdbs = txn->mt_numdbs;
- LAZY_RWLOCK_UNLOCK(&env->me_dblock);
+ for (i = env->me_numdbs; i<txn->mt_numdbs; i++)
+ env->me_dbflags[i] = txn->mt_dbs[i].md_flags;
+ env->me_numdbs = i;
}
mdb_txn_abort(txn);
return MDB_SUCCESS;
DPRINTF("committing txn %zu %p on mdbenv %p, root page %zu",
txn->mt_txnid, (void *)txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root);
+ /* Update DB root pointers. Their pages have already been
+ * touched so this is all in-place and cannot fail.
+ */
+ if (txn->mt_numdbs > 2) {
+ MDB_dbi i;
+ MDB_val data;
+ data.mv_size = sizeof(MDB_db);
+
+ mdb_cursor_init(&mc, txn, MAIN_DBI, NULL);
+ for (i = 2; i < txn->mt_numdbs; i++) {
+ if (txn->mt_dbflags[i] & DB_DIRTY) {
+ data.mv_data = &txn->mt_dbs[i];
+ mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
+ }
+ }
+ }
+
mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
/* should only be one record now */
if (env->me_pghead) {
/* make sure first page of freeDB is touched and on freelist */
- mdb_page_search(&mc, NULL, 1);
+ mdb_page_search(&mc, NULL, MDB_PS_MODIFY);
}
/* Delete IDLs we used from the free list */
key.mv_data = &cur;
mdb_cursor_set(&mc, &key, NULL, MDB_SET, &exact);
- mdb_cursor_del(&mc, 0);
+ rc = mdb_cursor_del(&mc, 0);
+ if (rc) {
+ mdb_txn_abort(txn);
+ return rc;
+ }
}
env->me_pgfirst = 0;
env->me_pglast = 0;
/* make sure last page of freeDB is touched and on freelist */
key.mv_size = MAXKEYSIZE+1;
key.mv_data = NULL;
- mdb_page_search(&mc, &key, 1);
+ mdb_page_search(&mc, &key, MDB_PS_MODIFY);
mdb_midl_sort(txn->mt_free_pgs);
#if MDB_DEBUG > 1
env->me_free_pgs = txn->mt_free_pgs;
}
- /* Update DB root pointers. Their pages have already been
- * touched so this is all in-place and cannot fail.
- */
- {
- MDB_dbi i;
- MDB_val data;
- data.mv_size = sizeof(MDB_db);
-
- mdb_cursor_init(&mc, txn, MAIN_DBI, NULL);
- for (i = 2; i < txn->mt_numdbs; i++) {
- if (txn->mt_dbflags[i] & DB_DIRTY) {
- data.mv_data = &txn->mt_dbs[i];
- mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
- }
- }
- }
#if MDB_DEBUG > 2
mdb_audit(txn);
#endif
mdb_txn_abort(txn);
return n;
}
- env->me_wtxnid = txn->mt_txnid;
done:
env->me_txn = NULL;
- /* update the DB tables */
- {
- int toggle = !env->me_db_toggle;
- MDB_db *ip, *jp;
+ if (txn->mt_numdbs > env->me_numdbs) {
+ /* update the DB flags */
MDB_dbi i;
-
- ip = &env->me_dbs[toggle][2];
- jp = &txn->mt_dbs[2];
- LAZY_RWLOCK_WRLOCK(&env->me_dblock);
- for (i = 2; i < txn->mt_numdbs; i++) {
- if (ip->md_root != jp->md_root)
- *ip = *jp;
- ip++; jp++;
- }
-
- env->me_db_toggle = toggle;
- env->me_numdbs = txn->mt_numdbs;
- LAZY_RWLOCK_UNLOCK(&env->me_dblock);
+ for (i = env->me_numdbs; i<txn->mt_numdbs; i++)
+ env->me_dbflags[i] = txn->mt_dbs[i].md_flags;
+ env->me_numdbs = i;
}
UNLOCK_MUTEX_W(env);
* readers will get consistent data regardless of how fresh or
* how stale their view of these values is.
*/
- LAZY_MUTEX_LOCK(&env->me_txns->mti_mutex);
- txn->mt_env->me_txns->mti_me_toggle = toggle;
txn->mt_env->me_txns->mti_txnid = txn->mt_txnid;
- LAZY_MUTEX_UNLOCK(&env->me_txns->mti_mutex);
return MDB_SUCCESS;
}
}
-#ifndef _WIN32
/** Release a reader thread's slot in the reader lock table.
* This function is called automatically when a thread exits.
- * Windows doesn't support destructor callbacks for thread-specific storage,
- * so this function is not compiled there.
* @param[in] ptr This points to the slot in the reader lock table.
*/
static void
reader->mr_pid = 0;
reader->mr_tid = 0;
}
+
+#ifdef _WIN32
+/** Junk for arranging thread-specific callbacks on Windows. This is
+ * necessarily platform and compiler-specific. Windows supports up
+ * to 1088 keys. Let's assume nobody opens more than 64 environments
+ * in a single process, for now. They can override this if needed.
+ */
+#ifndef MAX_TLS_KEYS
+#define MAX_TLS_KEYS 64
+#endif
+static pthread_key_t mdb_tls_keys[MAX_TLS_KEYS];
+static int mdb_tls_nkeys;
+
+static void NTAPI mdb_tls_callback(PVOID module, DWORD reason, PVOID ptr)
+{
+ int i;
+ switch(reason) {
+ case DLL_PROCESS_ATTACH: break;
+ case DLL_THREAD_ATTACH: break;
+ case DLL_THREAD_DETACH:
+ for (i=0; i<mdb_tls_nkeys; i++) {
+ MDB_reader *r = pthread_getspecific(mdb_tls_keys[i]);
+ mdb_env_reader_dest(r);
+ }
+ break;
+ case DLL_PROCESS_DETACH: break;
+ }
+}
+#ifdef __GNUC__
+#ifdef _WIN64
+const PIMAGE_TLS_CALLBACK mdb_tls_cbp __attribute__((section (".CRT$XLB"))) = mdb_tls_callback;
+#else
+PIMAGE_TLS_CALLBACK mdb_tls_cbp __attribute__((section (".CRT$XLB"))) = mdb_tls_callback;
+#endif
+#else
+#ifdef _WIN64
+/* Force some symbol references.
+ * _tls_used forces the linker to create the TLS directory if not already done
+ * mdb_tls_cbp prevents whole-program-optimizer from dropping the symbol.
+ */
+#pragma comment(linker, "/INCLUDE:_tls_used")
+#pragma comment(linker, "/INCLUDE:mdb_tls_cbp")
+#pragma const_seg(".CRT$XLB")
+extern const PIMAGE_TLS_CALLBACK mdb_tls_callback;
+const PIMAGE_TLS_CALLBACK mdb_tls_cbp = mdb_tls_callback;
+#pragma const_seg()
+#else /* WIN32 */
+#pragma comment(linker, "/INCLUDE:__tls_used")
+#pragma comment(linker, "/INCLUDE:_mdb_tls_cbp")
+#pragma data_seg(".CRT$XLB")
+PIMAGE_TLS_CALLBACK mdb_tls_cbp = mdb_tls_callback;
+#pragma data_seg()
+#endif /* WIN 32/64 */
+#endif /* !__GNUC__ */
#endif
/** Downgrade the exclusive lock on the region back to shared */
{
int toggle = mdb_env_pick_meta(env);
- env->me_txns->mti_me_toggle = toggle;
env->me_txns->mti_txnid = env->me_metas[toggle]->mm_txnid;
#ifdef _WIN32
}
#endif
}
-#if defined(_WIN32) || defined(__APPLE__)
+
+static int
+mdb_env_excl_lock(MDB_env *env, int *excl)
+{
+#ifdef _WIN32
+ if (LockFile(env->me_lfd, 0, 0, 1, 0)) {
+ *excl = 1;
+ } else {
+ OVERLAPPED ov;
+ memset(&ov, 0, sizeof(ov));
+ if (!LockFileEx(env->me_lfd, 0, 0, 1, 0, &ov)) {
+ return ErrCode();
+ }
+ }
+#else
+ struct flock lock_info;
+ memset((void *)&lock_info, 0, sizeof(lock_info));
+ lock_info.l_type = F_WRLCK;
+ lock_info.l_whence = SEEK_SET;
+ lock_info.l_start = 0;
+ lock_info.l_len = 1;
+ if (!fcntl(env->me_lfd, F_SETLK, &lock_info)) {
+ *excl = 1;
+ } else {
+ lock_info.l_type = F_RDLCK;
+ if (fcntl(env->me_lfd, F_SETLKW, &lock_info)) {
+ return ErrCode();
+ }
+ }
+#endif
+ return 0;
+}
+
+#if defined(_WIN32) || defined(USE_POSIX_SEM)
/*
* hash_64 - 64 bit Fowler/Noll/Vo-0 FNV-1a hash code
*
/* Try to get exclusive lock. If we succeed, then
* nobody is using the lock region and we should initialize it.
*/
- {
- if (LockFile(env->me_lfd, 0, 0, 1, 0)) {
- *excl = 1;
- } else {
- OVERLAPPED ov;
- memset(&ov, 0, sizeof(ov));
- if (!LockFileEx(env->me_lfd, 0, 0, 1, 0, &ov)) {
- rc = ErrCode();
- goto fail;
- }
- }
- }
+ if ((rc = mdb_env_excl_lock(env, excl))) goto fail;
size = GetFileSize(env->me_lfd, NULL);
#else
/* Try to get exclusive lock. If we succeed, then
* nobody is using the lock region and we should initialize it.
*/
- {
- struct flock lock_info;
- memset((void *)&lock_info, 0, sizeof(lock_info));
- lock_info.l_type = F_WRLCK;
- lock_info.l_whence = SEEK_SET;
- lock_info.l_start = 0;
- lock_info.l_len = 1;
- rc = fcntl(env->me_lfd, F_SETLK, &lock_info);
- if (rc == 0) {
- *excl = 1;
- } else {
- lock_info.l_type = F_RDLCK;
- rc = fcntl(env->me_lfd, F_SETLKW, &lock_info);
- if (rc) {
- rc = ErrCode();
- goto fail;
- }
- }
- }
+ if ((rc = mdb_env_excl_lock(env, excl))) goto fail;
+
size = lseek(env->me_lfd, 0, SEEK_END);
#endif
rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
goto fail;
}
#else /* _WIN32 */
-#ifdef __APPLE__
+#ifdef USE_POSIX_SEM
struct stat stbuf;
struct {
dev_t dev;
val.mv_data = &idbuf;
val.mv_size = sizeof(idbuf);
mdb_hash_hex(&val, hexbuf);
- sprintf(env->me_txns->mti_rmname, "MDBr%s", hexbuf);
+ sprintf(env->me_txns->mti_rmname, "/MDBr%s", hexbuf);
if (sem_unlink(env->me_txns->mti_rmname)) {
rc = ErrCode();
if (rc != ENOENT && rc != EINVAL)
rc = ErrCode();
goto fail;
}
- sprintf(env->me_txns->mti_wmname, "MDBw%s", hexbuf);
+ sprintf(env->me_txns->mti_wmname, "/MDBw%s", hexbuf);
if (sem_unlink(env->me_txns->mti_wmname)) {
rc = ErrCode();
if (rc != ENOENT && rc != EINVAL)
rc = ErrCode();
goto fail;
}
-#else /* __APPLE__ */
+#else /* USE_POSIX_SEM */
pthread_mutexattr_t mattr;
pthread_mutexattr_init(&mattr);
}
pthread_mutex_init(&env->me_txns->mti_mutex, &mattr);
pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr);
-#endif /* __APPLE__ */
+#endif /* USE_POSIX_SEM */
#endif /* _WIN32 */
env->me_txns->mti_version = MDB_VERSION;
env->me_txns->mti_magic = MDB_MAGIC;
env->me_txns->mti_txnid = 0;
env->me_txns->mti_numreaders = 0;
- env->me_txns->mti_me_toggle = 0;
} else {
if (env->me_txns->mti_magic != MDB_MAGIC) {
goto fail;
}
#endif
-#ifdef __APPLE__
+#ifdef USE_POSIX_SEM
env->me_rmutex = sem_open(env->me_txns->mti_rmname, 0);
if (!env->me_rmutex) {
rc = ErrCode();
env->me_path = strdup(path);
DPRINTF("opened dbenv %p", (void *) env);
pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
- LAZY_RWLOCK_INIT(&env->me_dblock, NULL);
+#ifdef _WIN32
+ /* Windows TLS callbacks need help finding their TLS info. */
+ if (mdb_tls_nkeys < MAX_TLS_KEYS)
+ mdb_tls_keys[mdb_tls_nkeys++] = env->me_txkey;
+ else {
+ rc = ENOMEM;
+ goto leave;
+ }
+#endif
if (excl)
mdb_env_share_locks(env);
- env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
- env->me_dbs[0] = calloc(env->me_maxdbs, sizeof(MDB_db));
- env->me_dbs[1] = calloc(env->me_maxdbs, sizeof(MDB_db));
env->me_numdbs = 2;
+ env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
+ env->me_dbflags = calloc(env->me_maxdbs, sizeof(uint16_t));
+ if (!env->me_dbxs || !env->me_dbflags)
+ rc = ENOMEM;
}
leave:
free(dp);
}
- free(env->me_dbs[1]);
- free(env->me_dbs[0]);
+ free(env->me_dbflags);
free(env->me_dbxs);
free(env->me_path);
- LAZY_RWLOCK_DESTROY(&env->me_dblock);
pthread_key_delete(env->me_txkey);
+#ifdef _WIN32
+ /* Delete our key from the global list */
+ { int i;
+ for (i=0; i<mdb_tls_nkeys; i++)
+ if (mdb_tls_keys[i] == env->me_txkey) {
+ mdb_tls_keys[i] = mdb_tls_keys[mdb_tls_nkeys-1];
+ mdb_tls_nkeys--;
+ break;
+ }
+ }
+#endif
if (env->me_map) {
munmap(env->me_map, env->me_mapsize);
for (i=0; i<env->me_txns->mti_numreaders; i++)
if (env->me_txns->mti_readers[i].mr_pid == pid)
env->me_txns->mti_readers[i].mr_pid = 0;
+#ifdef _WIN32
+ CloseHandle(env->me_rmutex);
+ CloseHandle(env->me_wmutex);
+ /* Windows automatically destroys the mutexes when
+ * the last handle closes.
+ */
+#else
+#ifdef USE_POSIX_SEM
+ sem_close(env->me_rmutex);
+ sem_close(env->me_wmutex);
+ { int excl = 0;
+ if (!mdb_env_excl_lock(env, &excl) && excl) {
+ /* we are the only remaining user of the environment.
+ clean up semaphores. */
+ sem_unlink(env->me_txns->mti_rmname);
+ sem_unlink(env->me_txns->mti_wmname);
+ }
+ }
+#endif
+#endif
munmap((void *)env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo));
}
close(env->me_lfd);
static void
mdb_cursor_pop(MDB_cursor *mc)
{
- MDB_page *top;
-
if (mc->mc_snum) {
- top = mc->mc_pg[mc->mc_top];
+#if MDB_DEBUG
+ MDB_page *top = mc->mc_pg[mc->mc_top];
+#endif
mc->mc_snum--;
if (mc->mc_snum)
mc->mc_top--;
* @param[in,out] mc the cursor for this operation.
* @param[in] key the key to search for. If NULL, search for the lowest
* page. (This is used by #mdb_cursor_first().)
- * @param[in] modify If true, visited pages are updated with new page numbers.
+ * @param[in] flags If MDB_PS_MODIFY set, visited pages are updated with new page numbers.
+ * If MDB_PS_ROOTONLY set, just fetch root node, no further lookups.
* @return 0 on success, non-zero on failure.
*/
static int
* @return 0 on success, non-zero on failure.
*/
static int
-mdb_page_search(MDB_cursor *mc, MDB_val *key, int modify)
+mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags)
{
int rc;
pgno_t root;
/* Make sure we're using an up-to-date root */
if (mc->mc_dbi > MAIN_DBI) {
if ((*mc->mc_dbflag & DB_STALE) ||
- (modify && !(*mc->mc_dbflag & DB_DIRTY))) {
+ ((flags & MDB_PS_MODIFY) && !(*mc->mc_dbflag & DB_DIRTY))) {
MDB_cursor mc2;
unsigned char dbflag = 0;
mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI, NULL);
- rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, modify);
+ rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, flags & MDB_PS_MODIFY);
if (rc)
return rc;
if (*mc->mc_dbflag & DB_STALE) {
mdb_node_read(mc->mc_txn, leaf, &data);
memcpy(mc->mc_db, data.mv_data, sizeof(MDB_db));
}
- if (modify)
+ if (flags & MDB_PS_MODIFY)
dbflag = DB_DIRTY;
*mc->mc_dbflag = dbflag;
}
}
assert(root > 1);
- if ((rc = mdb_page_get(mc->mc_txn, root, &mc->mc_pg[0])))
- return rc;
+ if (!mc->mc_pg[0] || mc->mc_pg[0]->mp_pgno != root)
+ if ((rc = mdb_page_get(mc->mc_txn, root, &mc->mc_pg[0])))
+ return rc;
mc->mc_snum = 1;
mc->mc_top = 0;
DPRINTF("db %u root page %zu has flags 0x%X",
mc->mc_dbi, root, mc->mc_pg[0]->mp_flags);
- if (modify) {
+ if (flags & MDB_PS_MODIFY) {
if ((rc = mdb_page_touch(mc)))
return rc;
}
- return mdb_page_search_root(mc, key, modify);
+ if (flags & MDB_PS_ROOTONLY)
+ return MDB_SUCCESS;
+
+ return mdb_page_search_root(mc, key, flags);
}
/** Return the data associated with a given node.
{
int rc;
MDB_page *mp;
- MDB_node *leaf;
+ MDB_node *leaf = NULL;
DKBUF;
assert(mc);
* was the one we wanted.
*/
mc->mc_ki[mc->mc_top] = 0;
- leaf = NODEPTR(mp, 0);
if (exactp)
*exactp = 1;
goto set1;
if (rc == 0) {
/* last node was the one we wanted */
mc->mc_ki[mc->mc_top] = nkeys-1;
- leaf = NODEPTR(mp, nkeys-1);
if (exactp)
*exactp = 1;
goto set1;
}
if (rc < 0) {
- /* This is definitely the right page, skip search_page */
+ if (mc->mc_ki[mc->mc_top] < NUMKEYS(mp)) {
+ /* This is definitely the right page, skip search_page */
+ if (mp->mp_flags & P_LEAF2) {
+ nodekey.mv_data = LEAF2KEY(mp,
+ mc->mc_ki[mc->mc_top], nodekey.mv_size);
+ } else {
+ leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
+ MDB_SET_KEY(leaf, &nodekey);
+ }
+ rc = mc->mc_dbx->md_cmp(key, &nodekey);
+ if (rc == 0) {
+ /* current node was the one we wanted */
+ if (exactp)
+ *exactp = 1;
+ goto set1;
+ }
+ }
rc = 0;
goto set2;
}
{
int rc;
MDB_node *leaf;
- MDB_val lkey;
- lkey.mv_size = MAXKEYSIZE+1;
- lkey.mv_data = NULL;
+ if (!(mc->mc_flags & C_EOF)) {
if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
+ MDB_val lkey;
+
+ lkey.mv_size = MAXKEYSIZE+1;
+ lkey.mv_data = NULL;
rc = mdb_page_search(mc, &lkey, 0);
if (rc != MDB_SUCCESS)
return rc;
}
assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
- leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
- mc->mc_flags |= C_INITIALIZED;
- mc->mc_flags &= ~C_EOF;
-
mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]) - 1;
+ mc->mc_flags |= C_INITIALIZED|C_EOF;
+ }
+ leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
key->mv_size = mc->mc_db->md_pad;
case MDB_PREV:
case MDB_PREV_DUP:
case MDB_PREV_NODUP:
- if (!(mc->mc_flags & C_INITIALIZED) || (mc->mc_flags & C_EOF))
+ if (!(mc->mc_flags & C_INITIALIZED) || (mc->mc_flags & C_EOF)) {
rc = mdb_cursor_last(mc, key, data);
- else
+ mc->mc_flags &= ~C_EOF;
+ } else
rc = mdb_cursor_prev(mc, key, data, op);
break;
case MDB_FIRST:
if (mc->mc_dbi > MAIN_DBI && !(*mc->mc_dbflag & DB_DIRTY)) {
MDB_cursor mc2;
mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI, NULL);
- rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, 1);
+ rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, MDB_PS_MODIFY);
if (rc)
return rc;
*mc->mc_dbflag = DB_DIRTY;
MDB_val xdata, *rdata, dkey;
MDB_page *fp;
MDB_db dummy;
- int do_sub = 0;
+ int do_sub = 0, insert = 0;
unsigned int mcount = 0;
size_t nsize;
int rc, rc2;
} else {
int exact = 0;
MDB_val d2;
+ if (flags & MDB_APPEND) {
+ MDB_val k2;
+ rc = mdb_cursor_last(mc, &k2, &d2);
+ if (rc == 0) {
+ rc = mc->mc_dbx->md_cmp(key, &k2);
+ if (rc > 0) {
+ rc = MDB_NOTFOUND;
+ mc->mc_ki[mc->mc_top]++;
+ } else {
+ rc = 0;
+ }
+ }
+ } else {
rc = mdb_cursor_set(mc, key, &d2, MDB_SET, &exact);
+ }
if ((flags & MDB_NOOVERWRITE) && rc == 0) {
DPRINTF("duplicate key [%s]", DKEY(key));
*data = d2;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
fp->mp_flags |= P_LEAF2;
fp->mp_pad = data->mv_size;
+ fp->mp_upper += 2 * data->mv_size; /* leave space for 2 more */
} else {
fp->mp_upper += 2 * sizeof(indx_t) + 2 * NODESIZE +
(dkey.mv_size & 1) + (data->mv_size & 1);
fp = NODEDATA(leaf);
if (flags == MDB_CURRENT) {
+reuse:
fp->mp_flags |= P_DIRTY;
COPY_PGNO(fp->mp_pgno, mc->mc_pg[mc->mc_top]->mp_pgno);
mc->mc_xcursor->mx_cursor.mc_pg[0] = fp;
}
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
offset = fp->mp_pad;
+ if (SIZELEFT(fp) >= offset)
+ goto reuse;
+ offset *= 4; /* space for 4 more */
} else {
offset = NODESIZE + sizeof(indx_t) + data->mv_size;
}
mc->mc_db->md_entries--;
} else {
DPRINTF("inserting key at index %i", mc->mc_ki[mc->mc_top]);
+ insert = 1;
}
rdata = data;
if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) {
if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA )
nflags &= ~MDB_APPEND;
+ if (!insert)
+ nflags |= MDB_SPLIT_REPLACE;
rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags);
} else {
/* There is room already in this leaf page. */
rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
- if (rc == 0 && !do_sub) {
+ if (rc == 0 && !do_sub && insert) {
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
}
}
}
- xflags |= (flags & MDB_APPEND);
+ if (flags & MDB_APPENDDUP)
+ xflags |= MDB_APPEND;
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) {
void *db = NODEDATA(leaf);
if (node->mn_flags & F_SUBDATA) {
memcpy(&mx->mx_db, NODEDATA(node), sizeof(MDB_db));
+ mx->mx_cursor.mc_pg[0] = 0;
mx->mx_cursor.mc_snum = 0;
mx->mx_cursor.mc_flags = C_SUB;
} else {
mc->mc_dbflag = &txn->mt_dbflags[dbi];
mc->mc_snum = 0;
mc->mc_top = 0;
+ mc->mc_pg[0] = 0;
mc->mc_flags = 0;
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
assert(mx != NULL);
} else {
mc->mc_xcursor = NULL;
}
+ if (*mc->mc_dbflag & DB_STALE) {
+ mdb_page_search(mc, NULL, MDB_PS_ROOTONLY);
+ }
}
int
unsigned int nflags)
{
unsigned int flags;
- int rc = MDB_SUCCESS, ins_new = 0, new_root = 0, newpos = 1;
+ int rc = MDB_SUCCESS, ins_new = 0, new_root = 0, newpos = 1, did_split = 0;
indx_t newindx;
pgno_t pgno = 0;
unsigned int i, j, split_indx, nkeys, pmax;
DPRINTF("parent branch page is %zu", mc->mc_pg[ptop]->mp_pgno);
}
+ mc->mc_flags |= C_SPLITTING;
mdb_cursor_copy(mc, &mn);
mn.mc_pg[mn.mc_top] = rp;
mn.mc_ki[ptop] = mc->mc_ki[ptop]+1;
if (nflags & MDB_APPEND) {
mn.mc_ki[mn.mc_top] = 0;
sepkey = *newkey;
+ split_indx = newindx;
nkeys = 0;
- split_indx = 0;
goto newsep;
}
nkeys = NUMKEYS(mp);
split_indx = (nkeys + 1) / 2;
+ if (newindx < split_indx)
+ newpos = 0;
if (IS_LEAF2(rp)) {
char *split, *ins;
if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(mc->mc_txn->mt_env, &sepkey)) {
mn.mc_snum--;
mn.mc_top--;
+ did_split = 1;
rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno, 0);
+ /* root split? */
+ if (mn.mc_snum == mc->mc_snum) {
+ mc->mc_pg[mc->mc_snum] = mc->mc_pg[mc->mc_top];
+ mc->mc_ki[mc->mc_snum] = mc->mc_ki[mc->mc_top];
+ mc->mc_pg[mc->mc_top] = mc->mc_pg[ptop];
+ mc->mc_ki[mc->mc_top] = mc->mc_ki[ptop];
+ mc->mc_snum++;
+ mc->mc_top++;
+ ptop++;
+ }
/* Right page might now have changed parent.
* Check if left page also changed parent.
*/
if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
+ for (i=0; i<ptop; i++) {
+ mc->mc_pg[i] = mn.mc_pg[i];
+ mc->mc_ki[i] = mn.mc_ki[i];
+ }
mc->mc_pg[ptop] = mn.mc_pg[ptop];
mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
}
rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
mn.mc_top++;
}
+ mc->mc_flags ^= C_SPLITTING;
if (rc != MDB_SUCCESS) {
return rc;
}
rc = mdb_node_add(mc, 0, newkey, newdata, newpgno, nflags);
if (rc)
return rc;
+ for (i=0; i<mc->mc_top; i++)
+ mc->mc_ki[i] = mn.mc_ki[i];
goto done;
}
if (IS_LEAF2(rp)) {
if (!(node->mn_flags & F_BIGDATA))
newdata->mv_data = NODEDATA(node);
}
+ } else {
+ mc->mc_ki[ptop]++;
}
/* return tmp page to freelist */
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
+ int fixup = NUMKEYS(mp);
if (mc->mc_flags & C_SUB)
dbi--;
m3 = m2;
if (!(m3->mc_flags & C_INITIALIZED))
continue;
+ if (m3->mc_flags & C_SPLITTING)
+ continue;
if (new_root) {
int k;
/* root split */
m3->mc_ki[k+1] = m3->mc_ki[k];
m3->mc_pg[k+1] = m3->mc_pg[k];
}
- m3->mc_ki[0] = mc->mc_ki[0];
+ if (m3->mc_ki[0] >= split_indx) {
+ m3->mc_ki[0] = 1;
+ } else {
+ m3->mc_ki[0] = 0;
+ }
m3->mc_pg[0] = mc->mc_pg[0];
m3->mc_snum++;
m3->mc_top++;
}
if (m3->mc_pg[mc->mc_top] == mp) {
- if (m3->mc_ki[m3->mc_top] >= split_indx) {
- m3->mc_pg[m3->mc_top] = rp;
- m3->mc_ki[m3->mc_top] -= split_indx;
+ if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDB_SPLIT_REPLACE))
+ m3->mc_ki[mc->mc_top]++;
+ if (m3->mc_ki[mc->mc_top] >= fixup) {
+ m3->mc_pg[mc->mc_top] = rp;
+ m3->mc_ki[mc->mc_top] -= fixup;
+ m3->mc_ki[ptop] = mn.mc_ki[ptop];
}
+ } else if (!did_split && m3->mc_pg[ptop] == mc->mc_pg[ptop] &&
+ m3->mc_ki[ptop] >= mc->mc_ki[ptop]) {
+ m3->mc_ki[ptop]++;
}
}
}
txn->mt_dbflags[slot] = dbflag;
memcpy(&txn->mt_dbs[slot], data.mv_data, sizeof(MDB_db));
*dbi = slot;
- txn->mt_env->me_dbs[0][slot] = txn->mt_dbs[slot];
- txn->mt_env->me_dbs[1][slot] = txn->mt_dbs[slot];
+ txn->mt_env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags;
mdb_default_cmp(txn, slot);
if (!unused) {
txn->mt_numdbs++;