* BerkeleyDB API, but much simplified.
*/
/*
- * Copyright 2011 Howard Chu, Symas Corp.
+ * Copyright 2011-2012 Howard Chu, Symas Corp.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
#include <errno.h>
#include <limits.h>
#include <stddef.h>
-#include <stdint.h>
+#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
+#if !(defined(BYTE_ORDER) || defined(__BYTE_ORDER))
+#include <resolv.h> /* defines BYTE_ORDER on HPUX and Solaris */
+#endif
+
+#if defined(__APPLE__) || defined (BSD)
+#define USE_POSIX_SEM
+#endif
+
#ifndef _WIN32
#include <pthread.h>
-#ifdef __APPLE__
+#ifdef USE_POSIX_SEM
#include <semaphore.h>
#endif
#endif
+#ifdef USE_VALGRIND
+#include <valgrind/memcheck.h>
+#define VGMEMP_CREATE(h,r,z) VALGRIND_CREATE_MEMPOOL(h,r,z)
+#define VGMEMP_ALLOC(h,a,s) VALGRIND_MEMPOOL_ALLOC(h,a,s)
+#define VGMEMP_FREE(h,a) VALGRIND_MEMPOOL_FREE(h,a)
+#define VGMEMP_DESTROY(h) VALGRIND_DESTROY_MEMPOOL(h)
+#define VGMEMP_DEFINED(a,s) VALGRIND_MAKE_MEM_DEFINED(a,s)
+#else
+#define VGMEMP_CREATE(h,r,z)
+#define VGMEMP_ALLOC(h,a,s)
+#define VGMEMP_FREE(h,a)
+#define VGMEMP_DESTROY(h)
+#define VGMEMP_DEFINED(a,s)
+#endif
+
#ifndef BYTE_ORDER
-#define BYTE_ORDER __BYTE_ORDER
+# if (defined(_LITTLE_ENDIAN) || defined(_BIG_ENDIAN)) && !(defined(_LITTLE_ENDIAN) && defined(_BIG_ENDIAN))
+/* Solaris just defines one or the other */
+# define LITTLE_ENDIAN 1234
+# define BIG_ENDIAN 4321
+# ifdef _LITTLE_ENDIAN
+# define BYTE_ORDER LITTLE_ENDIAN
+# else
+# define BYTE_ORDER BIG_ENDIAN
+# endif
+# else
+# define BYTE_ORDER __BYTE_ORDER
+# endif
#endif
+
#ifndef LITTLE_ENDIAN
#define LITTLE_ENDIAN __LITTLE_ENDIAN
#endif
#define BIG_ENDIAN __BIG_ENDIAN
#endif
+#if defined(__i386) || defined(__x86_64)
+#define MISALIGNED_OK 1
+#endif
+
#include "mdb.h"
#include "midl.h"
#define LOCK_MUTEX_W(env) pthread_mutex_lock((env)->me_wmutex)
#define UNLOCK_MUTEX_W(env) pthread_mutex_unlock((env)->me_wmutex)
#define getpid() GetCurrentProcessId()
-#define fdatasync(fd) (!FlushFileBuffers(fd))
+#define MDB_FDATASYNC(fd) (!FlushFileBuffers(fd))
#define ErrCode() GetLastError()
#define GET_PAGESIZE(x) {SYSTEM_INFO si; GetSystemInfo(&si); (x) = si.dwPageSize;}
#define close(fd) CloseHandle(fd)
#define munmap(ptr,len) UnmapViewOfFile(ptr)
#else
-#ifdef __APPLE__
+#ifdef USE_POSIX_SEM
#define LOCK_MUTEX_R(env) sem_wait((env)->me_rmutex)
#define UNLOCK_MUTEX_R(env) sem_post((env)->me_rmutex)
#define LOCK_MUTEX_W(env) sem_wait((env)->me_wmutex)
#define UNLOCK_MUTEX_W(env) sem_post((env)->me_wmutex)
-#define fdatasync(fd) fsync(fd)
+#define MDB_FDATASYNC(fd) fsync(fd)
#else
+#ifdef ANDROID
+#define MDB_FDATASYNC(fd) fsync(fd)
+#endif
/** Lock the reader mutex.
*/
#define LOCK_MUTEX_R(env) pthread_mutex_lock(&(env)->me_txns->mti_mutex)
/** Unlock the writer mutex.
*/
#define UNLOCK_MUTEX_W(env) pthread_mutex_unlock(&(env)->me_txns->mti_wmutex)
-#endif /* __APPLE__ */
+#endif /* USE_POSIX_SEM */
/** Get the error code for the last failed system function.
*/
#define GET_PAGESIZE(x) ((x) = sysconf(_SC_PAGE_SIZE))
#endif
-#if defined(_WIN32) || defined(__APPLE__)
+#if defined(_WIN32) || defined(USE_POSIX_SEM)
#define MNAME_LEN 32
+#else
+#define MNAME_LEN (sizeof(pthread_mutex_t))
#endif
/** @} */
#ifndef MDB_DSYNC
# define MDB_DSYNC O_DSYNC
#endif
+#endif
+
+/** Function for flushing the data of a file. Define this to fsync
+ * if fdatasync() is not supported.
+ */
+#ifndef MDB_FDATASYNC
+# define MDB_FDATASYNC fdatasync
#endif
/** A page number in the database.
* @note In the #MDB_node structure, we only store 48 bits of this value,
* which thus limits us to only 60 bits of addressable data.
*/
-typedef ID pgno_t;
+typedef MDB_ID pgno_t;
/** A transaction ID.
* See struct MDB_txn.mt_txnid for details.
*/
-typedef ID txnid_t;
+typedef MDB_ID txnid_t;
/** @defgroup debug Debug Macros
* @{
*/
-#ifndef DEBUG
+#ifndef MDB_DEBUG
/** Enable debug output.
* Set this to 1 for copious tracing. Set to 2 to add dumps of all IDLs
* read from and written to the database (used for free space management).
*/
-#define DEBUG 0
+#define MDB_DEBUG 0
#endif
#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
# define DPRINTF (void) /* Vararg macros may be unsupported */
-#elif DEBUG
+#elif MDB_DEBUG
+static int mdb_debug;
+static txnid_t mdb_debug_start;
+
/** Print a debug message with printf formatting. */
# define DPRINTF(fmt, ...) /**< Requires 2 or more args */ \
- fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__)
+ ((void) ((mdb_debug) && \
+ fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__)))
#else
# define DPRINTF(fmt, ...) ((void) 0)
#endif
* pressure from other processes is high. So until OSs have
* actual paging support for Huge pages, they're not viable.
*/
-#define PAGESIZE 4096
+#define MDB_PAGESIZE 4096
/** The minimum number of keys required in a database page.
* Setting this to a larger value will place a smaller bound on the
/** The maximum size of a key in the database.
* While data items have essentially unbounded size, we require that
* keys all fit onto a regular page. This limit could be raised a bit
- * further if needed; to something just under #PAGESIZE / #MDB_MINKEYS.
+ * further if needed; to something just under #MDB_PAGESIZE / #MDB_MINKEYS.
*/
#define MAXKEYSIZE 511
-#if DEBUG
+#if MDB_DEBUG
/** A key buffer.
* @ingroup debug
* This is used for printing a hex dump of a key's contents.
#define DKEY(x) mdb_dkey(x, kbuf)
#else
#define DKBUF typedef int dummy_kbuf /* so we can put ';' after */
-#define DKEY(x)
-#endif
-
-/** @defgroup lazylock Lazy Locking
- * Macros for locks that are't actually needed.
- * The DB view is always consistent because all writes are wrapped in
- * the wmutex. Finer-grained locks aren't necessary.
- * @{
- */
-#ifndef LAZY_LOCKS
- /** Use lazy locking. I.e., don't lock these accesses at all. */
-#define LAZY_LOCKS 1
-#endif
-#if LAZY_LOCKS
- /** Grab the reader lock */
-#define LAZY_MUTEX_LOCK(x)
- /** Release the reader lock */
-#define LAZY_MUTEX_UNLOCK(x)
- /** Release the DB table reader/writer lock */
-#define LAZY_RWLOCK_UNLOCK(x)
- /** Grab the DB table write lock */
-#define LAZY_RWLOCK_WRLOCK(x)
- /** Grab the DB table read lock */
-#define LAZY_RWLOCK_RDLOCK(x)
- /** Declare the DB table rwlock. Should not be followed by ';'. */
-#define LAZY_RWLOCK_DEF(x)
- /** Initialize the DB table rwlock */
-#define LAZY_RWLOCK_INIT(x,y)
- /** Destroy the DB table rwlock */
-#define LAZY_RWLOCK_DESTROY(x)
-#else
-#define LAZY_MUTEX_LOCK(x) pthread_mutex_lock(x)
-#define LAZY_MUTEX_UNLOCK(x) pthread_mutex_unlock(x)
-#define LAZY_RWLOCK_UNLOCK(x) pthread_rwlock_unlock(x)
-#define LAZY_RWLOCK_WRLOCK(x) pthread_rwlock_wrlock(x)
-#define LAZY_RWLOCK_RDLOCK(x) pthread_rwlock_rdlock(x)
-#define LAZY_RWLOCK_DEF(x) pthread_rwlock_t x;
-#define LAZY_RWLOCK_INIT(x,y) pthread_rwlock_init(x,y)
-#define LAZY_RWLOCK_DESTROY(x) pthread_rwlock_destroy(x)
+#define DKEY(x) 0
#endif
-/** @} */
/** An invalid page number.
* Mainly used to denote an empty tree.
* unlikely. If a collision occurs, the results are unpredictable.
*/
typedef struct MDB_txbody {
- /** Stamp identifying this as an MDB lock file. It must be set
+ /** Stamp identifying this as an MDB file. It must be set
* to #MDB_MAGIC. */
uint32_t mtb_magic;
/** Version number of this lock file. Must be set to #MDB_VERSION. */
uint32_t mtb_version;
-#if defined(_WIN32) || defined(__APPLE__)
+#if defined(_WIN32) || defined(USE_POSIX_SEM)
char mtb_rmname[MNAME_LEN];
#else
/** Mutex protecting access to this table.
* when readers release their slots.
*/
unsigned mtb_numreaders;
- /** The ID of the most recent meta page in the database.
- * This is recorded here only for convenience; the value can always
- * be determined by reading the main database meta pages.
- */
- uint32_t mtb_me_toggle;
} MDB_txbody;
/** The actual reader table definition. */
#define mti_rmname mt1.mtb.mtb_rmname
#define mti_txnid mt1.mtb.mtb_txnid
#define mti_numreaders mt1.mtb.mtb_numreaders
-#define mti_me_toggle mt1.mtb.mtb_me_toggle
char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
} mt1;
union {
-#if defined(_WIN32) || defined(__APPLE__)
+#if defined(_WIN32) || defined(USE_POSIX_SEM)
char mt2_wmname[MNAME_LEN];
#define mti_wmname mt2.mt2_wmname
#else
pthread_mutex_t mt2_wmutex;
#define mti_wmutex mt2.mt2_wmutex
#endif
- char pad[(sizeof(pthread_mutex_t)+CACHELINE-1) & ~(CACHELINE-1)];
+ char pad[(MNAME_LEN+CACHELINE-1) & ~(CACHELINE-1)];
} mt2;
MDB_reader mti_readers[1];
} MDB_txninfo;
#define F_DUPDATA 0x04 /**< data has duplicates */
/** valid flags for #mdb_node_add() */
-#define NODE_ADD_FLAGS (F_DUPDATA|F_SUBDATA|MDB_RESERVE)
+#define NODE_ADD_FLAGS (F_DUPDATA|F_SUBDATA|MDB_RESERVE|MDB_APPEND)
/** @} */
unsigned short mn_flags; /**< @ref mdb_node */
/** The size of a key in a node */
#define NODEKSZ(node) ((node)->mn_ksize)
+ /** Copy a page number from src to dst */
+#ifdef MISALIGNED_OK
+#define COPY_PGNO(dst,src) dst = src
+#else
+#if SIZE_MAX > 4294967295UL
+#define COPY_PGNO(dst,src) do { \
+ unsigned short *s, *d; \
+ s = (unsigned short *)&(src); \
+ d = (unsigned short *)&(dst); \
+ *d++ = *s++; \
+ *d++ = *s++; \
+ *d++ = *s++; \
+ *d = *s; \
+} while (0)
+#else
+#define COPY_PGNO(dst,src) do { \
+ unsigned short *s, *d; \
+ s = (unsigned short *)&(src); \
+ d = (unsigned short *)&(dst); \
+ *d++ = *s++; \
+ *d = *s; \
+} while (0)
+#endif
+#endif
/** The address of a key in a LEAF2 page.
* LEAF2 pages are used for #MDB_DUPFIXED sorted-duplicate sub-DBs.
* There are no node headers, keys are stored contiguously.
/** Meta page content. */
typedef struct MDB_meta {
- /** Stamp identifying this as an MDB data file. It must be set
+ /** Stamp identifying this as an MDB file. It must be set
* to #MDB_MAGIC. */
uint32_t mm_magic;
/** Version number of this lock file. Must be set to #MDB_VERSION. */
txnid_t mm_txnid; /**< txnid that committed this page */
} MDB_meta;
+ /** Buffer for a stack-allocated dirty page.
+ * The members define size and alignment, and silence type
+ * aliasing warnings. They are not used directly; that could
+ * mean incorrectly using several union members in parallel.
+ */
+typedef union MDB_pagebuf {
+ char mb_raw[MDB_PAGESIZE];
+ MDB_page mb_page;
+ struct {
+ char mm_pad[PAGEHDRSZ];
+ MDB_meta mm_meta;
+ } mb_metabuf;
+} MDB_pagebuf;
+
/** Auxiliary DB info.
* The information here is mostly static/read-only. There is
* only a single copy of this record in the environment.
MDB_env *mt_env; /**< the DB environment */
/** The list of pages that became unused during this transaction.
*/
- IDL mt_free_pgs;
+ MDB_IDL mt_free_pgs;
union {
- ID2L dirty_list; /**< modified pages */
+ MDB_ID2L dirty_list; /**< modified pages */
MDB_reader *reader; /**< this thread's slot in the reader table */
} mt_u;
/** Array of records for each DB known in the environment. */
/** The @ref mt_dbflag for this database */
unsigned char *mc_dbflag;
unsigned short mc_snum; /**< number of pushed pages */
- unsigned short mc_top; /**< index of top page, mc_snum-1 */
+ unsigned short mc_top; /**< index of top page, normally mc_snum-1 */
/** @defgroup mdb_cursor Cursor Flags
* @ingroup internal
* Cursor state flags.
#define C_SUB 0x04 /**< Cursor is a sub-cursor */
#define C_SHADOW 0x08 /**< Cursor is a dup from a parent txn */
#define C_ALLOCD 0x10 /**< Cursor was malloc'd */
+#define C_SPLITTING 0x20 /**< Cursor is in page_split */
/** @} */
unsigned int mc_flags; /**< @ref mdb_cursor */
MDB_page *mc_pg[CURSOR_STACK]; /**< stack of pushed pages */
struct MDB_oldpages *mo_next;
/** The ID of the transaction in which these pages were freed. */
txnid_t mo_txnid;
- /** An #IDL of the pages */
+ /** An #MDB_IDL of the pages */
pgno_t mo_pages[1]; /* dynamic */
} MDB_oldpages;
/** Failed to update the meta page. Probably an I/O error. */
#define MDB_FATAL_ERROR 0x80000000U
uint32_t me_flags; /**< @ref mdb_env */
- uint32_t me_extrapad; /**< unused for now */
+ unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */
unsigned int me_maxreaders; /**< size of the reader table */
MDB_dbi me_numdbs; /**< number of DBs opened */
MDB_dbi me_maxdbs; /**< size of the DB table */
size_t me_mapsize; /**< size of the data memory map */
off_t me_size; /**< current file size */
pgno_t me_maxpg; /**< me_mapsize / me_psize */
- unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */
- unsigned int me_db_toggle; /**< which DB table is current */
- txnid_t me_wtxnid; /**< ID of last txn we committed */
+ txnid_t me_pgfirst; /**< ID of first old page record we used */
+ txnid_t me_pglast; /**< ID of last old page record we used */
MDB_dbx *me_dbxs; /**< array of static DB info */
- MDB_db *me_dbs[2]; /**< two arrays of MDB_db info */
+ uint16_t *me_dbflags; /**< array of DB flags */
MDB_oldpages *me_pghead; /**< list of old page records */
+ MDB_oldpages *me_pgfree; /**< list of page records to free */
pthread_key_t me_txkey; /**< thread-key for readers */
MDB_page *me_dpages; /**< list of malloc'd blocks for re-use */
/** IDL of pages that became unused in a write txn */
- IDL me_free_pgs;
+ MDB_IDL me_free_pgs;
/** ID2L of pages that were written during a write txn */
- ID2 me_dirty_list[MDB_IDL_UM_SIZE];
- /** rwlock for the DB tables, if #LAZY_LOCKS is false */
- LAZY_RWLOCK_DEF(me_dblock)
+ MDB_ID2 me_dirty_list[MDB_IDL_UM_SIZE];
#ifdef _WIN32
HANDLE me_rmutex; /* Windows mutexes don't reside in shared mem */
HANDLE me_wmutex;
#endif
-#ifdef __APPLE__
+#ifdef USE_POSIX_SEM
sem_t *me_rmutex; /* Apple doesn't support shared mutexes */
sem_t *me_wmutex;
#endif
};
/** max number of pages to commit in one writev() call */
#define MDB_COMMIT_PAGES 64
+#if defined(IOV_MAX) && IOV_MAX < MDB_COMMIT_PAGES
+#undef MDB_COMMIT_PAGES
+#define MDB_COMMIT_PAGES IOV_MAX
+#endif
static MDB_page *mdb_page_alloc(MDB_cursor *mc, int num);
static MDB_page *mdb_page_new(MDB_cursor *mc, uint32_t flags, int num);
static int mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **mp);
static int mdb_page_search_root(MDB_cursor *mc,
MDB_val *key, int modify);
+#define MDB_PS_MODIFY 1
+#define MDB_PS_ROOTONLY 2
static int mdb_page_search(MDB_cursor *mc,
- MDB_val *key, int modify);
+ MDB_val *key, int flags);
static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst);
+
+#define MDB_SPLIT_REPLACE MDB_APPENDDUP /**< newkey is not new */
static int mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata,
pgno_t newpgno, unsigned int nflags);
static int mdb_env_read_header(MDB_env *env, MDB_meta *meta);
-static int mdb_env_read_meta(MDB_env *env, int *which);
+static int mdb_env_pick_meta(const MDB_env *env);
static int mdb_env_write_meta(MDB_txn *txn);
static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp);
return strerror(err);
}
-#if DEBUG
+#if MDB_DEBUG
/** Display a key in hexadecimal and return the address of the result.
* @param[in] key the key to display
* @param[in] buf the buffer to write into. Should always be #DKBUF.
* printable characters, print it as-is instead of converting to hex.
*/
#if 1
+ buf[0] = '\0';
for (i=0; i<key->mv_size; i++)
ptr += sprintf(ptr, "%02x", *c++);
#else
#endif
return buf;
}
+
+/** Display all the keys in the page. */
+static void
+mdb_page_keys(MDB_page *mp)
+{
+ MDB_node *node;
+ unsigned int i, nkeys;
+ MDB_val key;
+ DKBUF;
+
+ nkeys = NUMKEYS(mp);
+ fprintf(stderr, "numkeys %d\n", nkeys);
+ for (i=0; i<nkeys; i++) {
+ node = NODEPTR(mp, i);
+ key.mv_size = node->mn_ksize;
+ key.mv_data = node->mn_data;
+ fprintf(stderr, "key %d: %s\n", i, DKEY(&key));
+ }
+}
+
+void
+mdb_cursor_chk(MDB_cursor *mc)
+{
+ unsigned int i;
+ MDB_node *node;
+ MDB_page *mp;
+
+ if (!mc->mc_snum && !(mc->mc_flags & C_INITIALIZED)) return;
+ for (i=0; i<mc->mc_top; i++) {
+ mp = mc->mc_pg[i];
+ node = NODEPTR(mp, mc->mc_ki[i]);
+ if (NODEPGNO(node) != mc->mc_pg[i+1]->mp_pgno)
+ printf("oops!\n");
+ }
+ if (mc->mc_ki[i] >= NUMKEYS(mc->mc_pg[i]))
+ printf("ack!\n");
+}
+#endif
+
+#if MDB_DEBUG > 2
+/** Count all the pages in each DB and in the freelist
+ * and make sure it matches the actual number of pages
+ * being used.
+ */
+static void mdb_audit(MDB_txn *txn)
+{
+ MDB_cursor mc;
+ MDB_val key, data;
+ MDB_ID freecount, count;
+ MDB_dbi i;
+ int rc;
+
+ freecount = 0;
+ mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
+ while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0)
+ freecount += *(MDB_ID *)data.mv_data;
+
+ count = 0;
+ for (i = 0; i<txn->mt_numdbs; i++) {
+ MDB_xcursor mx, *mxp;
+ mxp = (txn->mt_dbs[i].md_flags & MDB_DUPSORT) ? &mx : NULL;
+ mdb_cursor_init(&mc, txn, i, mxp);
+ if (txn->mt_dbs[i].md_root == P_INVALID)
+ continue;
+ count += txn->mt_dbs[i].md_branch_pages +
+ txn->mt_dbs[i].md_leaf_pages +
+ txn->mt_dbs[i].md_overflow_pages;
+ if (txn->mt_dbs[i].md_flags & MDB_DUPSORT) {
+ mdb_page_search(&mc, NULL, 0);
+ do {
+ unsigned j;
+ MDB_page *mp;
+ mp = mc.mc_pg[mc.mc_top];
+ for (j=0; j<NUMKEYS(mp); j++) {
+ MDB_node *leaf = NODEPTR(mp, j);
+ if (leaf->mn_flags & F_SUBDATA) {
+ MDB_db db;
+ memcpy(&db, NODEDATA(leaf), sizeof(db));
+ count += db.md_branch_pages + db.md_leaf_pages +
+ db.md_overflow_pages;
+ }
+ }
+ }
+ while (mdb_cursor_sibling(&mc, 1) == 0);
+ }
+ }
+ if (freecount + count + 2 /* metapages */ != txn->mt_next_pgno) {
+ fprintf(stderr, "audit: %lu freecount: %lu count: %lu total: %lu next_pgno: %lu\n",
+ txn->mt_txnid, freecount, count+2, freecount+count+2, txn->mt_next_pgno);
+ }
+}
#endif
int
static MDB_page *
mdb_page_malloc(MDB_cursor *mc) {
MDB_page *ret;
- if (mc->mc_txn->mt_env->me_dpages) {
- ret = mc->mc_txn->mt_env->me_dpages;
+ size_t sz = mc->mc_txn->mt_env->me_psize;
+ if ((ret = mc->mc_txn->mt_env->me_dpages) != NULL) {
+ VGMEMP_ALLOC(mc->mc_txn->mt_env, ret, sz);
+ VGMEMP_DEFINED(ret, sizeof(ret->mp_next));
mc->mc_txn->mt_env->me_dpages = ret->mp_next;
- } else {
- ret = malloc(mc->mc_txn->mt_env->me_psize);
+ } else if ((ret = malloc(sz)) != NULL) {
+ VGMEMP_ALLOC(mc->mc_txn->mt_env, ret, sz);
}
return ret;
}
MDB_txn *txn = mc->mc_txn;
MDB_page *np;
pgno_t pgno = P_INVALID;
- ID2 mid;
+ MDB_ID2 mid;
- if (txn->mt_txnid > 2) {
+ /* The free list won't have any content at all until txn 2 has
+ * committed. The pages freed by txn 2 will be unreferenced
+ * after txn 3 commits, and so will be safe to re-use in txn 4.
+ */
+ if (txn->mt_txnid > 3) {
- if (!txn->mt_env->me_pghead && mc->mc_dbi != FREE_DBI &&
+ if (!txn->mt_env->me_pghead &&
txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
/* See if there's anything in the free DB */
MDB_cursor m2;
MDB_node *leaf;
- txnid_t *kptr, oldest;
+ MDB_val data;
+ txnid_t *kptr, oldest, last;
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
- mdb_page_search(&m2, NULL, 0);
- leaf = NODEPTR(m2.mc_pg[m2.mc_top], 0);
- kptr = (txnid_t *)NODEKEY(leaf);
+ if (!txn->mt_env->me_pgfirst) {
+ mdb_page_search(&m2, NULL, 0);
+ leaf = NODEPTR(m2.mc_pg[m2.mc_top], 0);
+ kptr = (txnid_t *)NODEKEY(leaf);
+ last = *kptr;
+ } else {
+ MDB_val key;
+ int rc, exact;
+again:
+ exact = 0;
+ last = txn->mt_env->me_pglast + 1;
+ leaf = NULL;
+ key.mv_data = &last;
+ key.mv_size = sizeof(last);
+ rc = mdb_cursor_set(&m2, &key, &data, MDB_SET, &exact);
+ if (rc)
+ goto none;
+ last = *(txnid_t *)key.mv_data;
+ }
{
unsigned int i;
}
}
- if (oldest > *kptr) {
+ if (oldest > last) {
/* It's usable, grab it.
*/
MDB_oldpages *mop;
- MDB_val data;
pgno_t *idl;
- mdb_node_read(txn, leaf, &data);
- idl = (ID *) data.mv_data;
+ if (!txn->mt_env->me_pgfirst) {
+ mdb_node_read(txn, leaf, &data);
+ }
+ txn->mt_env->me_pglast = last;
+ if (!txn->mt_env->me_pgfirst)
+ txn->mt_env->me_pgfirst = last;
+ idl = (MDB_ID *) data.mv_data;
+ /* We might have a zero-length IDL due to freelist growth
+ * during a prior commit
+ */
+ if (!idl[0]) goto again;
mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
mop->mo_next = txn->mt_env->me_pghead;
- mop->mo_txnid = *kptr;
+ mop->mo_txnid = last;
txn->mt_env->me_pghead = mop;
memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
-#if DEBUG > 1
+#if MDB_DEBUG > 1
{
unsigned int i;
DPRINTF("IDL read txn %zu root %zu num %zu",
}
}
#endif
- /* drop this IDL from the DB */
- m2.mc_ki[m2.mc_top] = 0;
- m2.mc_flags = C_INITIALIZED;
- mdb_cursor_del(&m2, 0);
}
}
+none:
if (txn->mt_env->me_pghead) {
MDB_oldpages *mop = txn->mt_env->me_pghead;
if (num > 1) {
}
if (MDB_IDL_IS_ZERO(mop->mo_pages)) {
txn->mt_env->me_pghead = mop->mo_next;
- free(mop);
+ if (mc->mc_dbi == FREE_DBI) {
+ mop->mo_next = txn->mt_env->me_pgfree;
+ txn->mt_env->me_pgfree = mop;
+ } else {
+ free(mop);
+ }
}
}
}
if (pgno == P_INVALID) {
/* DB size is maxed out */
if (txn->mt_next_pgno + num >= txn->mt_env->me_maxpg) {
- DPRINTF("DB size maxed out");
+ DPUTS("DB size maxed out");
return NULL;
}
}
if (txn->mt_env->me_dpages && num == 1) {
np = txn->mt_env->me_dpages;
+ VGMEMP_ALLOC(txn->mt_env, np, txn->mt_env->me_psize);
+ VGMEMP_DEFINED(np, sizeof(np->mp_next));
txn->mt_env->me_dpages = np->mp_next;
} else {
- if ((np = malloc(txn->mt_env->me_psize * num )) == NULL)
+ size_t sz = txn->mt_env->me_psize * num;
+ if ((np = malloc(sz)) == NULL)
return NULL;
+ VGMEMP_ALLOC(txn->mt_env, np, sz);
}
if (pgno == P_INVALID) {
np->mp_pgno = txn->mt_next_pgno;
return np;
}
+/** Copy a page: avoid copying unused portions of the page.
+ * @param[in] dst page to copy into
+ * @param[in] src page to copy from
+ */
+static void
+mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize)
+{
+ dst->mp_flags = src->mp_flags | P_DIRTY;
+ dst->mp_pages = src->mp_pages;
+
+ if (IS_LEAF2(src)) {
+ memcpy(dst->mp_ptrs, src->mp_ptrs, psize - PAGEHDRSZ - SIZELEFT(src));
+ } else {
+ unsigned int i, nkeys = NUMKEYS(src);
+ for (i=0; i<nkeys; i++)
+ dst->mp_ptrs[i] = src->mp_ptrs[i];
+ memcpy((char *)dst+src->mp_upper, (char *)src+src->mp_upper,
+ psize - src->mp_upper);
+ }
+}
+
/** Touch a page: make it dirty and re-insert into tree with updated pgno.
* @param[in] mc cursor pointing to the page to be touched
* @return 0 on success, non-zero on failure.
DPRINTF("touched db %u page %zu -> %zu", mc->mc_dbi, mp->mp_pgno, np->mp_pgno);
assert(mp->mp_pgno != np->mp_pgno);
mdb_midl_append(&mc->mc_txn->mt_free_pgs, mp->mp_pgno);
- pgno = np->mp_pgno;
- memcpy(np, mp, mc->mc_txn->mt_env->me_psize);
+ if (SIZELEFT(mp)) {
+ /* If page isn't full, just copy the used portion */
+ mdb_page_copy(np, mp, mc->mc_txn->mt_env->me_psize);
+ } else {
+ pgno = np->mp_pgno;
+ memcpy(np, mp, mc->mc_txn->mt_env->me_psize);
+ np->mp_pgno = pgno;
+ np->mp_flags |= P_DIRTY;
+ }
mp = np;
- mp->mp_pgno = pgno;
- mp->mp_flags |= P_DIRTY;
finish:
/* Adjust other cursors pointing to mp */
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (m2 == mc) continue;
m3 = &m2->mc_xcursor->mx_cursor;
+ if (m3->mc_snum < mc->mc_snum) continue;
if (m3->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top]) {
m3->mc_pg[mc->mc_top] = mp;
}
MDB_cursor *m2;
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
- if (m2 == mc) continue;
+ if (m2 == mc || m2->mc_snum < mc->mc_snum) continue;
if (m2->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top]) {
m2->mc_pg[mc->mc_top] = mp;
}
mc->mc_db->md_root = mp->mp_pgno;
} else if (mc->mc_txn->mt_parent) {
MDB_page *np;
- ID2 mid;
+ MDB_ID2 mid;
/* If txn has a parent, make sure the page is in our
* dirty list.
*/
{
int rc = 0;
if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) {
- if (fdatasync(env->me_fd))
+ if (MDB_FDATASYNC(env->me_fd))
rc = ErrCode();
}
return rc;
mdb_txn_renew0(MDB_txn *txn)
{
MDB_env *env = txn->mt_env;
- char mt_dbflag = 0;
+ unsigned int i;
+
+ /* Setup db info */
+ txn->mt_numdbs = env->me_numdbs;
+ txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
if (txn->mt_flags & MDB_TXN_RDONLY) {
MDB_reader *r = pthread_getspecific(env->me_txkey);
if (!r) {
- unsigned int i;
pid_t pid = getpid();
pthread_t tid = pthread_self();
r = &env->me_txns->mti_readers[i];
pthread_setspecific(env->me_txkey, r);
}
- txn->mt_toggle = env->me_txns->mti_me_toggle;
- txn->mt_txnid = env->me_txns->mti_txnid;
- /* This happens if a different process was the
- * last writer to the DB.
- */
- if (env->me_wtxnid < txn->mt_txnid)
- mt_dbflag = DB_STALE;
- r->mr_txnid = txn->mt_txnid;
+ txn->mt_txnid = r->mr_txnid = env->me_txns->mti_txnid;
+ txn->mt_toggle = txn->mt_txnid & 1;
+ txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
txn->mt_u.reader = r;
} else {
LOCK_MUTEX_W(env);
txn->mt_txnid = env->me_txns->mti_txnid;
- if (env->me_wtxnid < txn->mt_txnid)
- mt_dbflag = DB_STALE;
+ txn->mt_toggle = txn->mt_txnid & 1;
+ txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
txn->mt_txnid++;
- txn->mt_toggle = env->me_txns->mti_me_toggle;
+#if MDB_DEBUG
+ if (txn->mt_txnid == mdb_debug_start)
+ mdb_debug = 1;
+#endif
txn->mt_u.dirty_list = env->me_dirty_list;
txn->mt_u.dirty_list[0].mid = 0;
txn->mt_free_pgs = env->me_free_pgs;
txn->mt_free_pgs[0] = 0;
- txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
env->me_txn = txn;
}
- /* Copy the DB arrays */
- LAZY_RWLOCK_RDLOCK(&env->me_dblock);
- txn->mt_numdbs = env->me_numdbs;
- txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
+ /* Copy the DB info and flags */
memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db));
+ for (i=2; i<txn->mt_numdbs; i++)
+ txn->mt_dbs[i].md_flags = env->me_dbflags[i];
+ txn->mt_dbflags[0] = txn->mt_dbflags[1] = 0;
if (txn->mt_numdbs > 2)
- memcpy(txn->mt_dbs+2, env->me_dbs[env->me_db_toggle]+2,
- (txn->mt_numdbs - 2) * sizeof(MDB_db));
- LAZY_RWLOCK_UNLOCK(&env->me_dblock);
-
- memset(txn->mt_dbflags, mt_dbflag, env->me_numdbs);
+ memset(txn->mt_dbflags+2, DB_STALE, txn->mt_numdbs-2);
return MDB_SUCCESS;
}
free(txn);
return ENOMEM;
}
- txn->mt_u.dirty_list = malloc(sizeof(ID2)*MDB_IDL_UM_SIZE);
+ txn->mt_u.dirty_list = malloc(sizeof(MDB_ID2)*MDB_IDL_UM_SIZE);
if (!txn->mt_u.dirty_list) {
free(txn->mt_free_pgs);
free(txn);
dp = txn->mt_u.dirty_list[i].mptr;
if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
dp->mp_next = txn->mt_env->me_dpages;
+ VGMEMP_FREE(txn->mt_env, dp);
txn->mt_env->me_dpages = dp;
} else {
/* large pages just get freed directly */
+ VGMEMP_FREE(txn->mt_env, dp);
free(dp);
}
}
txn->mt_env->me_pghead = mop->mo_next;
free(mop);
}
+ txn->mt_env->me_pgfirst = 0;
+ txn->mt_env->me_pglast = 0;
env->me_txn = NULL;
/* The writer mutex was locked in mdb_txn_begin. */
off_t size;
MDB_page *dp;
MDB_env *env;
- pgno_t next;
+ pgno_t next, freecnt;
MDB_cursor mc;
assert(txn != NULL);
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
if (txn->mt_numdbs > env->me_numdbs) {
- /* update the DB tables */
- int toggle = !env->me_db_toggle;
- MDB_db *ip, *jp;
+ /* update the DB flags */
MDB_dbi i;
-
- ip = &env->me_dbs[toggle][env->me_numdbs];
- jp = &txn->mt_dbs[env->me_numdbs];
- LAZY_RWLOCK_WRLOCK(&env->me_dblock);
- for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
- *ip++ = *jp++;
- }
-
- env->me_db_toggle = toggle;
- env->me_numdbs = txn->mt_numdbs;
- LAZY_RWLOCK_UNLOCK(&env->me_dblock);
+ for (i = env->me_numdbs; i<txn->mt_numdbs; i++)
+ env->me_dbflags[i] = txn->mt_dbs[i].md_flags;
+ env->me_numdbs = i;
}
mdb_txn_abort(txn);
return MDB_SUCCESS;
MDB_db *ip, *jp;
MDB_dbi i;
unsigned x, y;
- ID2L dst, src;
+ MDB_ID2L dst, src;
/* Update parent's DB table */
ip = &txn->mt_parent->mt_dbs[2];
}
x = dst[0].mid;
for (; y<=src[0].mid; y++) {
- if (++x >= MDB_IDL_UM_MAX)
+ if (++x >= MDB_IDL_UM_MAX) {
+ mdb_txn_abort(txn);
return ENOMEM;
+ }
dst[x] = src[y];
}
dst[0].mid = x;
DPRINTF("committing txn %zu %p on mdbenv %p, root page %zu",
txn->mt_txnid, (void *)txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root);
+ /* Update DB root pointers. Their pages have already been
+ * touched so this is all in-place and cannot fail.
+ */
+ if (txn->mt_numdbs > 2) {
+ MDB_dbi i;
+ MDB_val data;
+ data.mv_size = sizeof(MDB_db);
+
+ mdb_cursor_init(&mc, txn, MAIN_DBI, NULL);
+ for (i = 2; i < txn->mt_numdbs; i++) {
+ if (txn->mt_dbflags[i] & DB_DIRTY) {
+ data.mv_data = &txn->mt_dbs[i];
+ mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
+ }
+ }
+ }
+
mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
/* should only be one record now */
if (env->me_pghead) {
/* make sure first page of freeDB is touched and on freelist */
- mdb_page_search(&mc, NULL, 1);
+ mdb_page_search(&mc, NULL, MDB_PS_MODIFY);
}
+
+ /* Delete IDLs we used from the free list */
+ if (env->me_pgfirst) {
+ txnid_t cur;
+ MDB_val key;
+ int exact = 0;
+
+ key.mv_size = sizeof(cur);
+ for (cur = env->me_pgfirst; cur <= env->me_pglast; cur++) {
+ key.mv_data = &cur;
+
+ mdb_cursor_set(&mc, &key, NULL, MDB_SET, &exact);
+ rc = mdb_cursor_del(&mc, 0);
+ if (rc) {
+ mdb_txn_abort(txn);
+ return rc;
+ }
+ }
+ env->me_pgfirst = 0;
+ env->me_pglast = 0;
+ }
+
/* save to free list */
+free2:
+ freecnt = txn->mt_free_pgs[0];
if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
MDB_val key, data;
- pgno_t i;
/* make sure last page of freeDB is touched and on freelist */
key.mv_size = MAXKEYSIZE+1;
key.mv_data = NULL;
- mdb_page_search(&mc, &key, 1);
+ mdb_page_search(&mc, &key, MDB_PS_MODIFY);
mdb_midl_sort(txn->mt_free_pgs);
-#if DEBUG > 1
+#if MDB_DEBUG > 1
{
unsigned int i;
- ID *idl = txn->mt_free_pgs;
+ MDB_IDL idl = txn->mt_free_pgs;
DPRINTF("IDL write txn %zu root %zu num %zu",
txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
for (i=0; i<idl[0]; i++) {
* and make sure the entire thing got written.
*/
do {
- i = txn->mt_free_pgs[0];
+ freecnt = txn->mt_free_pgs[0];
data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
rc = mdb_cursor_put(&mc, &key, &data, 0);
if (rc) {
mdb_txn_abort(txn);
return rc;
}
- } while (i != txn->mt_free_pgs[0]);
- if (mdb_midl_shrink(&txn->mt_free_pgs))
- env->me_free_pgs = txn->mt_free_pgs;
+ } while (freecnt != txn->mt_free_pgs[0]);
}
/* should only be one record now */
+again:
if (env->me_pghead) {
MDB_val key, data;
MDB_oldpages *mop;
+ pgno_t orig;
+ txnid_t id;
mop = env->me_pghead;
- key.mv_size = sizeof(pgno_t);
- key.mv_data = &mop->mo_txnid;
+ id = mop->mo_txnid;
+ key.mv_size = sizeof(id);
+ key.mv_data = &id;
data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
data.mv_data = mop->mo_pages;
+ orig = mop->mo_pages[0];
+ /* These steps may grow the freelist again
+ * due to freed overflow pages...
+ */
mdb_cursor_put(&mc, &key, &data, 0);
- free(env->me_pghead);
- env->me_pghead = NULL;
+ if (mop == env->me_pghead && env->me_pghead->mo_txnid == id) {
+ /* could have been used again here */
+ if (mop->mo_pages[0] != orig) {
+ data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
+ data.mv_data = mop->mo_pages;
+ id = mop->mo_txnid;
+ mdb_cursor_put(&mc, &key, &data, 0);
+ }
+ env->me_pghead = NULL;
+ free(mop);
+ } else {
+ /* was completely used up */
+ mdb_cursor_del(&mc, 0);
+ if (env->me_pghead)
+ goto again;
+ }
+ env->me_pgfirst = 0;
+ env->me_pglast = 0;
}
- /* Update DB root pointers. Their pages have already been
- * touched so this is all in-place and cannot fail.
- */
- {
- MDB_dbi i;
- MDB_val data;
- data.mv_size = sizeof(MDB_db);
+ while (env->me_pgfree) {
+ MDB_oldpages *mop = env->me_pgfree;
+ env->me_pgfree = mop->mo_next;
+ free(mop);;
+ }
- mdb_cursor_init(&mc, txn, MAIN_DBI, NULL);
- for (i = 2; i < txn->mt_numdbs; i++) {
- if (txn->mt_dbflags[i] & DB_DIRTY) {
- data.mv_data = &txn->mt_dbs[i];
- mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
- }
- }
+ /* Check for growth of freelist again */
+ if (freecnt != txn->mt_free_pgs[0])
+ goto free2;
+
+ if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
+ if (mdb_midl_shrink(&txn->mt_free_pgs))
+ env->me_free_pgs = txn->mt_free_pgs;
}
+#if MDB_DEBUG > 2
+ mdb_audit(txn);
+#endif
+
/* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
*/
next = 0;
dp = txn->mt_u.dirty_list[i].mptr;
if (dp->mp_pgno != next) {
if (n) {
- DPRINTF("committing %u dirty pages", n);
rc = writev(env->me_fd, iov, n);
if (rc != size) {
n = ErrCode();
DPRINTF("committing page %zu", dp->mp_pgno);
iov[n].iov_len = env->me_psize;
if (IS_OVERFLOW(dp)) iov[n].iov_len *= dp->mp_pages;
- iov[n].iov_base = dp;
+ iov[n].iov_base = (char *)dp;
size += iov[n].iov_len;
next = dp->mp_pgno + (IS_OVERFLOW(dp) ? dp->mp_pages : 1);
/* clear dirty flag */
if (n == 0)
break;
- DPRINTF("committing %u dirty pages", n);
rc = writev(env->me_fd, iov, n);
if (rc != size) {
n = ErrCode();
dp = txn->mt_u.dirty_list[i].mptr;
if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
dp->mp_next = txn->mt_env->me_dpages;
+ VGMEMP_FREE(txn->mt_env, dp);
txn->mt_env->me_dpages = dp;
} else {
+ VGMEMP_FREE(txn->mt_env, dp);
free(dp);
}
txn->mt_u.dirty_list[i].mid = 0;
mdb_txn_abort(txn);
return n;
}
- env->me_wtxnid = txn->mt_txnid;
done:
env->me_txn = NULL;
- /* update the DB tables */
- {
- int toggle = !env->me_db_toggle;
- MDB_db *ip, *jp;
+ if (txn->mt_numdbs > env->me_numdbs) {
+ /* update the DB flags */
MDB_dbi i;
-
- ip = &env->me_dbs[toggle][2];
- jp = &txn->mt_dbs[2];
- LAZY_RWLOCK_WRLOCK(&env->me_dblock);
- for (i = 2; i < txn->mt_numdbs; i++) {
- if (ip->md_root != jp->md_root)
- *ip = *jp;
- ip++; jp++;
- }
-
- env->me_db_toggle = toggle;
- env->me_numdbs = txn->mt_numdbs;
- LAZY_RWLOCK_UNLOCK(&env->me_dblock);
+ for (i = env->me_numdbs; i<txn->mt_numdbs; i++)
+ env->me_dbflags[i] = txn->mt_dbs[i].md_flags;
+ env->me_numdbs = i;
}
UNLOCK_MUTEX_W(env);
static int
mdb_env_read_header(MDB_env *env, MDB_meta *meta)
{
- char page[PAGESIZE];
+ MDB_pagebuf pbuf;
MDB_page *p;
MDB_meta *m;
int rc, err;
*/
#ifdef _WIN32
- if (!ReadFile(env->me_fd, page, PAGESIZE, (DWORD *)&rc, NULL) || rc == 0)
+ if (!ReadFile(env->me_fd, &pbuf, MDB_PAGESIZE, (DWORD *)&rc, NULL) || rc == 0)
#else
- if ((rc = read(env->me_fd, page, PAGESIZE)) == 0)
+ if ((rc = read(env->me_fd, &pbuf, MDB_PAGESIZE)) == 0)
#endif
{
return ENOENT;
}
- else if (rc != PAGESIZE) {
+ else if (rc != MDB_PAGESIZE) {
err = ErrCode();
if (rc > 0)
err = EINVAL;
return err;
}
- p = (MDB_page *)page;
+ p = (MDB_page *)&pbuf;
if (!F_ISSET(p->mp_flags, P_META)) {
DPRINTF("page %zu not a meta page", p->mp_pgno);
* readers will get consistent data regardless of how fresh or
* how stale their view of these values is.
*/
- LAZY_MUTEX_LOCK(&env->me_txns->mti_mutex);
- txn->mt_env->me_txns->mti_me_toggle = toggle;
txn->mt_env->me_txns->mti_txnid = txn->mt_txnid;
- LAZY_MUTEX_UNLOCK(&env->me_txns->mti_mutex);
return MDB_SUCCESS;
}
/** Check both meta pages to see which one is newer.
* @param[in] env the environment handle
- * @param[out] which address of where to store the meta toggle ID
- * @return 0 on success, non-zero on failure.
+ * @return meta toggle (0 or 1).
*/
static int
-mdb_env_read_meta(MDB_env *env, int *which)
+mdb_env_pick_meta(const MDB_env *env)
{
- int toggle = 0;
-
- assert(env != NULL);
-
- if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
- toggle = 1;
-
- DPRINTF("Using meta page %d", toggle);
- *which = toggle;
-
- return MDB_SUCCESS;
+ return (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid);
}
int
e->me_fd = INVALID_HANDLE_VALUE;
e->me_lfd = INVALID_HANDLE_VALUE;
e->me_mfd = INVALID_HANDLE_VALUE;
+ VGMEMP_CREATE(e,0,0);
*env = e;
return MDB_SUCCESS;
}
static int
mdb_env_open2(MDB_env *env, unsigned int flags)
{
- int i, newenv = 0, toggle;
+ int i, newenv = 0;
MDB_meta meta;
MDB_page *p;
i |= MAP_FIXED;
env->me_map = mmap(meta.mm_address, env->me_mapsize, PROT_READ, i,
env->me_fd, 0);
- if (env->me_map == MAP_FAILED)
+ if (env->me_map == MAP_FAILED) {
+ env->me_map = NULL;
return ErrCode();
+ }
#endif
if (newenv) {
env->me_metas[0] = METADATA(p);
env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize);
- if ((i = mdb_env_read_meta(env, &toggle)) != 0)
- return i;
-
- DPRINTF("opened database version %u, pagesize %u",
- env->me_metas[toggle]->mm_version, env->me_psize);
- DPRINTF("depth: %u", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_depth);
- DPRINTF("entries: %zu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_entries);
- DPRINTF("branch pages: %zu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_branch_pages);
- DPRINTF("leaf pages: %zu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_leaf_pages);
- DPRINTF("overflow pages: %zu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_overflow_pages);
- DPRINTF("root: %zu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_root);
+#if MDB_DEBUG
+ {
+ int toggle = mdb_env_pick_meta(env);
+ MDB_db *db = &env->me_metas[toggle]->mm_dbs[MAIN_DBI];
+
+ DPRINTF("opened database version %u, pagesize %u",
+ env->me_metas[0]->mm_version, env->me_psize);
+ DPRINTF("using meta page %d", toggle);
+ DPRINTF("depth: %u", db->md_depth);
+ DPRINTF("entries: %zu", db->md_entries);
+ DPRINTF("branch pages: %zu", db->md_branch_pages);
+ DPRINTF("leaf pages: %zu", db->md_leaf_pages);
+ DPRINTF("overflow pages: %zu", db->md_overflow_pages);
+ DPRINTF("root: %zu", db->md_root);
+ }
+#endif
return MDB_SUCCESS;
}
-#ifndef _WIN32
+
/** Release a reader thread's slot in the reader lock table.
* This function is called automatically when a thread exits.
- * Windows doesn't support destructor callbacks for thread-specific storage,
- * so this function is not compiled there.
* @param[in] ptr This points to the slot in the reader lock table.
*/
static void
reader->mr_pid = 0;
reader->mr_tid = 0;
}
+
+#ifdef _WIN32
+/** Junk for arranging thread-specific callbacks on Windows. This is
+ * necessarily platform and compiler-specific. Windows supports up
+ * to 1088 keys. Let's assume nobody opens more than 64 environments
+ * in a single process, for now. They can override this if needed.
+ */
+#ifndef MAX_TLS_KEYS
+#define MAX_TLS_KEYS 64
+#endif
+static pthread_key_t mdb_tls_keys[MAX_TLS_KEYS];
+static int mdb_tls_nkeys;
+
+static void NTAPI mdb_tls_callback(PVOID module, DWORD reason, PVOID ptr)
+{
+ int i;
+ switch(reason) {
+ case DLL_PROCESS_ATTACH: break;
+ case DLL_THREAD_ATTACH: break;
+ case DLL_THREAD_DETACH:
+ for (i=0; i<mdb_tls_nkeys; i++) {
+ MDB_reader *r = pthread_getspecific(mdb_tls_keys[i]);
+ mdb_env_reader_dest(r);
+ }
+ break;
+ case DLL_PROCESS_DETACH: break;
+ }
+}
+#ifdef __GNUC__
+#ifdef _WIN64
+const PIMAGE_TLS_CALLBACK mdb_tls_cbp __attribute__((section (".CRT$XLB"))) = mdb_tls_callback;
+#else
+PIMAGE_TLS_CALLBACK mdb_tls_cbp __attribute__((section (".CRT$XLB"))) = mdb_tls_callback;
+#endif
+#else
+#ifdef _WIN64
+/* Force some symbol references.
+ * _tls_used forces the linker to create the TLS directory if not already done
+ * mdb_tls_cbp prevents whole-program-optimizer from dropping the symbol.
+ */
+#pragma comment(linker, "/INCLUDE:_tls_used")
+#pragma comment(linker, "/INCLUDE:mdb_tls_cbp")
+#pragma const_seg(".CRT$XLB")
+extern const PIMAGE_TLS_CALLBACK mdb_tls_callback;
+const PIMAGE_TLS_CALLBACK mdb_tls_cbp = mdb_tls_callback;
+#pragma const_seg()
+#else /* WIN32 */
+#pragma comment(linker, "/INCLUDE:__tls_used")
+#pragma comment(linker, "/INCLUDE:_mdb_tls_cbp")
+#pragma data_seg(".CRT$XLB")
+PIMAGE_TLS_CALLBACK mdb_tls_cbp = mdb_tls_callback;
+#pragma data_seg()
+#endif /* WIN 32/64 */
+#endif /* !__GNUC__ */
#endif
/** Downgrade the exclusive lock on the region back to shared */
static void
mdb_env_share_locks(MDB_env *env)
{
- int toggle = 0;
+ int toggle = mdb_env_pick_meta(env);
- if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
- toggle = 1;
- env->me_txns->mti_me_toggle = toggle;
env->me_txns->mti_txnid = env->me_metas[toggle]->mm_txnid;
#ifdef _WIN32
}
#endif
}
-#if defined(_WIN32) || defined(__APPLE__)
+
+static int
+mdb_env_excl_lock(MDB_env *env, int *excl)
+{
+#ifdef _WIN32
+ if (LockFile(env->me_lfd, 0, 0, 1, 0)) {
+ *excl = 1;
+ } else {
+ OVERLAPPED ov;
+ memset(&ov, 0, sizeof(ov));
+ if (!LockFileEx(env->me_lfd, 0, 0, 1, 0, &ov)) {
+ return ErrCode();
+ }
+ }
+#else
+ struct flock lock_info;
+ memset((void *)&lock_info, 0, sizeof(lock_info));
+ lock_info.l_type = F_WRLCK;
+ lock_info.l_whence = SEEK_SET;
+ lock_info.l_start = 0;
+ lock_info.l_len = 1;
+ if (!fcntl(env->me_lfd, F_SETLK, &lock_info)) {
+ *excl = 1;
+ } else {
+ lock_info.l_type = F_RDLCK;
+ if (fcntl(env->me_lfd, F_SETLKW, &lock_info)) {
+ return ErrCode();
+ }
+ }
+#endif
+ return 0;
+}
+
+#if defined(_WIN32) || defined(USE_POSIX_SEM)
/*
* hash_64 - 64 bit Fowler/Noll/Vo-0 FNV-1a hash code
*
* hval arg on the first call.
*/
static mdb_hash_t
-mdb_hash_str(char *str, mdb_hash_t hval)
+mdb_hash_val(MDB_val *val, mdb_hash_t hval)
{
- unsigned char *s = (unsigned char *)str; /* unsigned string */
+ unsigned char *s = (unsigned char *)val->mv_data; /* unsigned string */
+ unsigned char *end = s + val->mv_size;
/*
* FNV-1a hash each octet of the string
*/
- while (*s) {
+ while (s < end) {
/* xor the bottom with the current octet */
hval ^= (mdb_hash_t)*s++;
* @param[out] hexbuf an array of 17 chars to hold the hash
*/
static void
-mdb_hash_hex(char *str, char *hexbuf)
+mdb_hash_hex(MDB_val *val, char *hexbuf)
{
int i;
- mdb_hash_t h = mdb_hash_str(str, MDB_HASH_INIT);
+ mdb_hash_t h = mdb_hash_val(val, MDB_HASH_INIT);
for (i=0; i<8; i++) {
hexbuf += sprintf(hexbuf, "%02x", (unsigned int)h & 0xff);
h >>= 8;
/* Try to get exclusive lock. If we succeed, then
* nobody is using the lock region and we should initialize it.
*/
- {
- if (LockFile(env->me_lfd, 0, 0, 1, 0)) {
- *excl = 1;
- } else {
- OVERLAPPED ov;
- memset(&ov, 0, sizeof(ov));
- if (!LockFileEx(env->me_lfd, 0, 0, 1, 0, &ov)) {
- rc = ErrCode();
- goto fail;
- }
- }
- }
+ if ((rc = mdb_env_excl_lock(env, excl))) goto fail;
size = GetFileSize(env->me_lfd, NULL);
+
#else
- if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT, mode)) == -1) {
- rc = ErrCode();
- return rc;
+#if !(O_CLOEXEC)
+ {
+ int fdflags;
+ if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT, mode)) == -1)
+ return ErrCode();
+ /* Lose record locks when exec*() */
+ if ((fdflags = fcntl(env->me_lfd, F_GETFD) | FD_CLOEXEC) >= 0)
+ fcntl(env->me_lfd, F_SETFD, fdflags);
}
+#else /* O_CLOEXEC on Linux: Open file and set FD_CLOEXEC atomically */
+ if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT|O_CLOEXEC, mode)) == -1)
+ return ErrCode();
+#endif
+
/* Try to get exclusive lock. If we succeed, then
* nobody is using the lock region and we should initialize it.
*/
- {
- struct flock lock_info;
- memset((void *)&lock_info, 0, sizeof(lock_info));
- lock_info.l_type = F_WRLCK;
- lock_info.l_whence = SEEK_SET;
- lock_info.l_start = 0;
- lock_info.l_len = 1;
- rc = fcntl(env->me_lfd, F_SETLK, &lock_info);
- if (rc == 0) {
- *excl = 1;
- } else {
- lock_info.l_type = F_RDLCK;
- rc = fcntl(env->me_lfd, F_SETLKW, &lock_info);
- if (rc) {
- rc = ErrCode();
- goto fail;
- }
- }
- }
+ if ((rc = mdb_env_excl_lock(env, excl))) goto fail;
+
size = lseek(env->me_lfd, 0, SEEK_END);
#endif
rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
size = rsize - sizeof(MDB_txninfo);
env->me_maxreaders = size/sizeof(MDB_reader) + 1;
}
-#ifdef _WIN32
{
+#ifdef _WIN32
HANDLE mh;
mh = CreateFileMapping(env->me_lfd, NULL, PAGE_READWRITE,
0, 0, NULL);
rc = ErrCode();
goto fail;
}
- }
#else
- env->me_txns = mmap(0, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
- env->me_lfd, 0);
- if (env->me_txns == MAP_FAILED) {
- rc = ErrCode();
- goto fail;
- }
+ void *m = mmap(NULL, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
+ env->me_lfd, 0);
+ if (m == MAP_FAILED) {
+ env->me_txns = NULL;
+ rc = ErrCode();
+ goto fail;
+ }
+ env->me_txns = m;
#endif
+ }
if (*excl) {
#ifdef _WIN32
+ BY_HANDLE_FILE_INFORMATION stbuf;
+ struct {
+ DWORD volume;
+ DWORD nhigh;
+ DWORD nlow;
+ } idbuf;
+ MDB_val val;
char hexbuf[17];
+
if (!mdb_sec_inited) {
InitializeSecurityDescriptor(&mdb_null_sd,
SECURITY_DESCRIPTOR_REVISION);
mdb_all_sa.lpSecurityDescriptor = &mdb_null_sd;
mdb_sec_inited = 1;
}
- mdb_hash_hex(lpath, hexbuf);
+ GetFileInformationByHandle(env->me_lfd, &stbuf);
+ idbuf.volume = stbuf.dwVolumeSerialNumber;
+ idbuf.nhigh = stbuf.nFileIndexHigh;
+ idbuf.nlow = stbuf.nFileIndexLow;
+ val.mv_data = &idbuf;
+ val.mv_size = sizeof(idbuf);
+ mdb_hash_hex(&val, hexbuf);
sprintf(env->me_txns->mti_rmname, "Global\\MDBr%s", hexbuf);
env->me_rmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_rmname);
if (!env->me_rmutex) {
goto fail;
}
#else /* _WIN32 */
-#ifdef __APPLE__
+#ifdef USE_POSIX_SEM
+ struct stat stbuf;
+ struct {
+ dev_t dev;
+ ino_t ino;
+ } idbuf;
+ MDB_val val;
char hexbuf[17];
- mdb_hash_hex(lpath, hexbuf);
- sprintf(env->me_txns->mti_rmname, "MDBr%s", hexbuf);
+
+ fstat(env->me_lfd, &stbuf);
+ idbuf.dev = stbuf.st_dev;
+ idbuf.ino = stbuf.st_ino;
+ val.mv_data = &idbuf;
+ val.mv_size = sizeof(idbuf);
+ mdb_hash_hex(&val, hexbuf);
+ sprintf(env->me_txns->mti_rmname, "/MDBr%s", hexbuf);
if (sem_unlink(env->me_txns->mti_rmname)) {
rc = ErrCode();
if (rc != ENOENT && rc != EINVAL)
rc = ErrCode();
goto fail;
}
- sprintf(env->me_txns->mti_wmname, "MDBw%s", hexbuf);
+ sprintf(env->me_txns->mti_wmname, "/MDBw%s", hexbuf);
if (sem_unlink(env->me_txns->mti_wmname)) {
rc = ErrCode();
if (rc != ENOENT && rc != EINVAL)
rc = ErrCode();
goto fail;
}
-#else /* __APPLE__ */
+#else /* USE_POSIX_SEM */
pthread_mutexattr_t mattr;
pthread_mutexattr_init(&mattr);
}
pthread_mutex_init(&env->me_txns->mti_mutex, &mattr);
pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr);
-#endif /* __APPLE__ */
+#endif /* USE_POSIX_SEM */
#endif /* _WIN32 */
env->me_txns->mti_version = MDB_VERSION;
env->me_txns->mti_magic = MDB_MAGIC;
env->me_txns->mti_txnid = 0;
env->me_txns->mti_numreaders = 0;
- env->me_txns->mti_me_toggle = 0;
} else {
if (env->me_txns->mti_magic != MDB_MAGIC) {
goto fail;
}
#endif
-#ifdef __APPLE__
+#ifdef USE_POSIX_SEM
env->me_rmutex = sem_open(env->me_txns->mti_rmname, 0);
if (!env->me_rmutex) {
rc = ErrCode();
len = OPEN_ALWAYS;
}
mode = FILE_ATTRIBUTE_NORMAL;
- if ((env->me_fd = CreateFile(dpath, oflags, FILE_SHARE_READ|FILE_SHARE_WRITE,
- NULL, len, mode, NULL)) == INVALID_HANDLE_VALUE) {
- rc = ErrCode();
- goto leave;
- }
+ env->me_fd = CreateFile(dpath, oflags, FILE_SHARE_READ|FILE_SHARE_WRITE,
+ NULL, len, mode, NULL);
#else
if (F_ISSET(flags, MDB_RDONLY))
oflags = O_RDONLY;
else
oflags = O_RDWR | O_CREAT;
- if ((env->me_fd = open(dpath, oflags, mode)) == -1) {
+ env->me_fd = open(dpath, oflags, mode);
+#endif
+ if (env->me_fd == INVALID_HANDLE_VALUE) {
rc = ErrCode();
goto leave;
}
-#endif
if ((rc = mdb_env_open2(env, flags)) == MDB_SUCCESS) {
- /* synchronous fd for meta writes */
+ if (flags & (MDB_RDONLY|MDB_NOSYNC|MDB_NOMETASYNC)) {
+ env->me_mfd = env->me_fd;
+ } else {
+ /* synchronous fd for meta writes */
#ifdef _WIN32
- if (!(flags & (MDB_RDONLY|MDB_NOSYNC)))
- mode |= FILE_FLAG_WRITE_THROUGH;
- if ((env->me_mfd = CreateFile(dpath, oflags, FILE_SHARE_READ|FILE_SHARE_WRITE,
- NULL, len, mode, NULL)) == INVALID_HANDLE_VALUE) {
- rc = ErrCode();
- goto leave;
- }
+ env->me_mfd = CreateFile(dpath, oflags,
+ FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, len,
+ mode | FILE_FLAG_WRITE_THROUGH, NULL);
#else
- if (!(flags & (MDB_RDONLY|MDB_NOSYNC)))
- oflags |= MDB_DSYNC;
- if ((env->me_mfd = open(dpath, oflags, mode)) == -1) {
- rc = ErrCode();
- goto leave;
- }
+ env->me_mfd = open(dpath, oflags | MDB_DSYNC, mode);
#endif
+ if (env->me_mfd == INVALID_HANDLE_VALUE) {
+ rc = ErrCode();
+ goto leave;
+ }
+ }
env->me_path = strdup(path);
DPRINTF("opened dbenv %p", (void *) env);
pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
- LAZY_RWLOCK_INIT(&env->me_dblock, NULL);
+#ifdef _WIN32
+ /* Windows TLS callbacks need help finding their TLS info. */
+ if (mdb_tls_nkeys < MAX_TLS_KEYS)
+ mdb_tls_keys[mdb_tls_nkeys++] = env->me_txkey;
+ else {
+ rc = ENOMEM;
+ goto leave;
+ }
+#endif
if (excl)
mdb_env_share_locks(env);
- env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
- env->me_dbs[0] = calloc(env->me_maxdbs, sizeof(MDB_db));
- env->me_dbs[1] = calloc(env->me_maxdbs, sizeof(MDB_db));
env->me_numdbs = 2;
+ env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
+ env->me_dbflags = calloc(env->me_maxdbs, sizeof(uint16_t));
+ if (!env->me_dbxs || !env->me_dbflags)
+ rc = ENOMEM;
}
leave:
if (env == NULL)
return;
+ VGMEMP_DESTROY(env);
while (env->me_dpages) {
dp = env->me_dpages;
+ VGMEMP_DEFINED(&dp->mp_next, sizeof(dp->mp_next));
env->me_dpages = dp->mp_next;
free(dp);
}
- free(env->me_dbs[1]);
- free(env->me_dbs[0]);
+ free(env->me_dbflags);
free(env->me_dbxs);
free(env->me_path);
- LAZY_RWLOCK_DESTROY(&env->me_dblock);
pthread_key_delete(env->me_txkey);
+#ifdef _WIN32
+ /* Delete our key from the global list */
+ { int i;
+ for (i=0; i<mdb_tls_nkeys; i++)
+ if (mdb_tls_keys[i] == env->me_txkey) {
+ mdb_tls_keys[i] = mdb_tls_keys[mdb_tls_nkeys-1];
+ mdb_tls_nkeys--;
+ break;
+ }
+ }
+#endif
if (env->me_map) {
munmap(env->me_map, env->me_mapsize);
}
- close(env->me_mfd);
+ if (env->me_mfd != env->me_fd)
+ close(env->me_mfd);
close(env->me_fd);
if (env->me_txns) {
pid_t pid = getpid();
for (i=0; i<env->me_txns->mti_numreaders; i++)
if (env->me_txns->mti_readers[i].mr_pid == pid)
env->me_txns->mti_readers[i].mr_pid = 0;
- munmap(env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo));
+#ifdef _WIN32
+ CloseHandle(env->me_rmutex);
+ CloseHandle(env->me_wmutex);
+ /* Windows automatically destroys the mutexes when
+ * the last handle closes.
+ */
+#else
+#ifdef USE_POSIX_SEM
+ sem_close(env->me_rmutex);
+ sem_close(env->me_wmutex);
+ { int excl = 0;
+ if (!mdb_env_excl_lock(env, &excl) && excl) {
+ /* we are the only remaining user of the environment.
+ clean up semaphores. */
+ sem_unlink(env->me_txns->mti_rmname);
+ sem_unlink(env->me_txns->mti_wmname);
+ }
+ }
+#endif
+#endif
+ munmap((void *)env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo));
}
close(env->me_lfd);
mdb_midl_free(env->me_free_pgs);
nkeys = NUMKEYS(mp);
+#if MDB_DEBUG
+ {
+ pgno_t pgno;
+ COPY_PGNO(pgno, mp->mp_pgno);
DPRINTF("searching %u keys in %s %spage %zu",
nkeys, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "",
- mp->mp_pgno);
+ pgno);
+ }
+#endif
assert(nkeys > 0);
nodekey.mv_data = NODEKEY(node);
rc = cmp(key, &nodekey);
-#if DEBUG
+#if MDB_DEBUG
if (IS_LEAF(mp))
DPRINTF("found leaf index %u [%s], rc = %i",
i, DKEY(&nodekey), rc);
static void
mdb_cursor_pop(MDB_cursor *mc)
{
- MDB_page *top;
-
if (mc->mc_snum) {
- top = mc->mc_pg[mc->mc_top];
+#if MDB_DEBUG
+ MDB_page *top = mc->mc_pg[mc->mc_top];
+#endif
mc->mc_snum--;
if (mc->mc_snum)
mc->mc_top--;
}
}
if (!p) {
- if (pgno <= txn->mt_env->me_metas[txn->mt_toggle]->mm_last_pg)
+ if (pgno < txn->mt_next_pgno)
p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
}
*ret = p;
* @param[in,out] mc the cursor for this operation.
* @param[in] key the key to search for. If NULL, search for the lowest
* page. (This is used by #mdb_cursor_first().)
- * @param[in] modify If true, visited pages are updated with new page numbers.
+ * @param[in] flags If MDB_PS_MODIFY set, visited pages are updated with new page numbers.
+ * If MDB_PS_ROOTONLY set, just fetch root node, no further lookups.
* @return 0 on success, non-zero on failure.
*/
static int
* @return 0 on success, non-zero on failure.
*/
static int
-mdb_page_search(MDB_cursor *mc, MDB_val *key, int modify)
+mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags)
{
int rc;
pgno_t root;
/* Make sure we're using an up-to-date root */
if (mc->mc_dbi > MAIN_DBI) {
if ((*mc->mc_dbflag & DB_STALE) ||
- (modify && !(*mc->mc_dbflag & DB_DIRTY))) {
+ ((flags & MDB_PS_MODIFY) && !(*mc->mc_dbflag & DB_DIRTY))) {
MDB_cursor mc2;
unsigned char dbflag = 0;
mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI, NULL);
- rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, modify);
+ rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, flags & MDB_PS_MODIFY);
if (rc)
return rc;
if (*mc->mc_dbflag & DB_STALE) {
mdb_node_read(mc->mc_txn, leaf, &data);
memcpy(mc->mc_db, data.mv_data, sizeof(MDB_db));
}
- if (modify)
+ if (flags & MDB_PS_MODIFY)
dbflag = DB_DIRTY;
*mc->mc_dbflag = dbflag;
}
}
}
- if ((rc = mdb_page_get(mc->mc_txn, root, &mc->mc_pg[0])))
- return rc;
+ assert(root > 1);
+ if (!mc->mc_pg[0] || mc->mc_pg[0]->mp_pgno != root)
+ if ((rc = mdb_page_get(mc->mc_txn, root, &mc->mc_pg[0])))
+ return rc;
mc->mc_snum = 1;
mc->mc_top = 0;
DPRINTF("db %u root page %zu has flags 0x%X",
mc->mc_dbi, root, mc->mc_pg[0]->mp_flags);
- if (modify) {
+ if (flags & MDB_PS_MODIFY) {
if ((rc = mdb_page_touch(mc)))
return rc;
}
- return mdb_page_search_root(mc, key, modify);
+ if (flags & MDB_PS_ROOTONLY)
+ return MDB_SUCCESS;
+
+ return mdb_page_search_root(mc, key, flags);
}
/** Return the data associated with a given node.
return rc;
}
} else {
- mc->mc_xcursor->mx_cursor.mc_flags = 0;
+ mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
if (op == MDB_NEXT_DUP)
return MDB_NOTFOUND;
}
if (op != MDB_PREV || rc == MDB_SUCCESS)
return rc;
} else {
- mc->mc_xcursor->mx_cursor.mc_flags = 0;
+ mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
if (op == MDB_PREV_DUP)
return MDB_NOTFOUND;
}
{
int rc;
MDB_page *mp;
- MDB_node *leaf;
+ MDB_node *leaf = NULL;
DKBUF;
assert(mc);
* was the one we wanted.
*/
mc->mc_ki[mc->mc_top] = 0;
- leaf = NODEPTR(mp, 0);
if (exactp)
*exactp = 1;
goto set1;
if (rc == 0) {
/* last node was the one we wanted */
mc->mc_ki[mc->mc_top] = nkeys-1;
- leaf = NODEPTR(mp, nkeys-1);
if (exactp)
*exactp = 1;
goto set1;
}
if (rc < 0) {
- /* This is definitely the right page, skip search_page */
+ if (mc->mc_ki[mc->mc_top] < NUMKEYS(mp)) {
+ /* This is definitely the right page, skip search_page */
+ if (mp->mp_flags & P_LEAF2) {
+ nodekey.mv_data = LEAF2KEY(mp,
+ mc->mc_ki[mc->mc_top], nodekey.mv_size);
+ } else {
+ leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
+ MDB_SET_KEY(leaf, &nodekey);
+ }
+ rc = mc->mc_dbx->md_cmp(key, &nodekey);
+ if (rc == 0) {
+ /* current node was the one we wanted */
+ if (exactp)
+ *exactp = 1;
+ goto set1;
+ }
+ }
rc = 0;
goto set2;
}
} else {
if (mc->mc_xcursor)
- mc->mc_xcursor->mx_cursor.mc_flags = 0;
+ mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
if ((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
return rc;
}
return rc;
} else {
if (mc->mc_xcursor)
- mc->mc_xcursor->mx_cursor.mc_flags = 0;
+ mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
if ((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
return rc;
}
{
int rc;
MDB_node *leaf;
- MDB_val lkey;
- lkey.mv_size = MAXKEYSIZE+1;
- lkey.mv_data = NULL;
+ if (!(mc->mc_flags & C_EOF)) {
if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
+ MDB_val lkey;
+
+ lkey.mv_size = MAXKEYSIZE+1;
+ lkey.mv_data = NULL;
rc = mdb_page_search(mc, &lkey, 0);
if (rc != MDB_SUCCESS)
return rc;
}
assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
- leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
- mc->mc_flags |= C_INITIALIZED;
- mc->mc_flags &= ~C_EOF;
-
mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]) - 1;
+ mc->mc_flags |= C_INITIALIZED|C_EOF;
+ }
+ leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
key->mv_size = mc->mc_db->md_pad;
return rc;
} else {
if (mc->mc_xcursor)
- mc->mc_xcursor->mx_cursor.mc_flags = 0;
+ mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
if ((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
return rc;
}
case MDB_PREV:
case MDB_PREV_DUP:
case MDB_PREV_NODUP:
- if (!(mc->mc_flags & C_INITIALIZED) || (mc->mc_flags & C_EOF))
+ if (!(mc->mc_flags & C_INITIALIZED) || (mc->mc_flags & C_EOF)) {
rc = mdb_cursor_last(mc, key, data);
- else
+ mc->mc_flags &= ~C_EOF;
+ } else
rc = mdb_cursor_prev(mc, key, data, op);
break;
case MDB_FIRST:
if (mc->mc_dbi > MAIN_DBI && !(*mc->mc_dbflag & DB_DIRTY)) {
MDB_cursor mc2;
mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI, NULL);
- rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, 1);
+ rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, MDB_PS_MODIFY);
if (rc)
return rc;
*mc->mc_dbflag = DB_DIRTY;
MDB_val xdata, *rdata, dkey;
MDB_page *fp;
MDB_db dummy;
- int do_sub = 0;
+ int do_sub = 0, insert = 0;
+ unsigned int mcount = 0;
size_t nsize;
int rc, rc2;
- char pbuf[PAGESIZE];
+ MDB_pagebuf pbuf;
char dbuf[MAXKEYSIZE+1];
unsigned int nflags;
DKBUF;
} else {
int exact = 0;
MDB_val d2;
+ if (flags & MDB_APPEND) {
+ MDB_val k2;
+ rc = mdb_cursor_last(mc, &k2, &d2);
+ if (rc == 0) {
+ rc = mc->mc_dbx->md_cmp(key, &k2);
+ if (rc > 0) {
+ rc = MDB_NOTFOUND;
+ mc->mc_ki[mc->mc_top]++;
+ } else {
+ rc = 0;
+ }
+ }
+ } else {
rc = mdb_cursor_set(mc, key, &d2, MDB_SET, &exact);
- if (flags == MDB_NOOVERWRITE && rc == 0) {
+ }
+ if ((flags & MDB_NOOVERWRITE) && rc == 0) {
DPRINTF("duplicate key [%s]", DKEY(key));
*data = d2;
return MDB_KEYEXIST;
/* DB has dups? */
if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
/* Was a single item before, must convert now */
+more:
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
/* Just overwrite the current item */
if (flags == MDB_CURRENT)
dkey.mv_size = NODEDSZ(leaf);
dkey.mv_data = NODEDATA(leaf);
- /* data matches, ignore it */
+#if UINT_MAX < SIZE_MAX
+ if (mc->mc_dbx->md_dcmp == mdb_cmp_int && dkey.mv_size == sizeof(size_t))
+#ifdef MISALIGNED_OK
+ mc->mc_dbx->md_dcmp = mdb_cmp_long;
+#else
+ mc->mc_dbx->md_dcmp = mdb_cmp_cint;
+#endif
+#endif
+ /* if data matches, ignore it */
if (!mc->mc_dbx->md_dcmp(data, &dkey))
return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
/* create a fake page for the dup items */
memcpy(dbuf, dkey.mv_data, dkey.mv_size);
dkey.mv_data = dbuf;
- fp = (MDB_page *)pbuf;
+ fp = (MDB_page *)&pbuf;
fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP;
fp->mp_lower = PAGEHDRSZ;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
fp->mp_flags |= P_LEAF2;
fp->mp_pad = data->mv_size;
+ fp->mp_upper += 2 * data->mv_size; /* leave space for 2 more */
} else {
fp->mp_upper += 2 * sizeof(indx_t) + 2 * NODESIZE +
(dkey.mv_size & 1) + (data->mv_size & 1);
do_sub = 1;
rdata = &xdata;
xdata.mv_size = fp->mp_upper;
- xdata.mv_data = pbuf;
+ xdata.mv_data = fp;
flags |= F_DUPDATA;
goto new_sub;
}
fp = NODEDATA(leaf);
if (flags == MDB_CURRENT) {
+reuse:
fp->mp_flags |= P_DIRTY;
- fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
+ COPY_PGNO(fp->mp_pgno, mc->mc_pg[mc->mc_top]->mp_pgno);
mc->mc_xcursor->mx_cursor.mc_pg[0] = fp;
flags |= F_DUPDATA;
goto put_sub;
}
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
offset = fp->mp_pad;
+ if (SIZELEFT(fp) >= offset)
+ goto reuse;
+ offset *= 4; /* space for 4 more */
} else {
offset = NODESIZE + sizeof(indx_t) + data->mv_size;
}
/* no, just grow it */
rdata = &xdata;
xdata.mv_size = NODEDSZ(leaf) + offset;
- xdata.mv_data = pbuf;
- mp = (MDB_page *)pbuf;
+ xdata.mv_data = &pbuf;
+ mp = (MDB_page *)&pbuf;
mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
flags |= F_DUPDATA;
}
goto put_sub;
}
current:
- /* same size, just replace it */
- if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
- NODEDSZ(leaf) == data->mv_size) {
+ /* overflow page overwrites need special handling */
+ if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
+ MDB_page *omp;
+ pgno_t pg;
+ int ovpages, dpages;
+
+ ovpages = OVPAGES(NODEDSZ(leaf), mc->mc_txn->mt_env->me_psize);
+ dpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
+ memcpy(&pg, NODEDATA(leaf), sizeof(pg));
+ mdb_page_get(mc->mc_txn, pg, &omp);
+ /* Is the ov page writable and large enough? */
+ if ((omp->mp_flags & P_DIRTY) && ovpages >= dpages) {
+ /* yes, overwrite it. Note in this case we don't
+ * bother to try shrinking the node if the new data
+ * is smaller than the overflow threshold.
+ */
+ if (F_ISSET(flags, MDB_RESERVE))
+ data->mv_data = METADATA(omp);
+ else
+ memcpy(METADATA(omp), data->mv_data, data->mv_size);
+ goto done;
+ } else {
+ /* no, free ovpages */
+ int i;
+ mc->mc_db->md_overflow_pages -= ovpages;
+ for (i=0; i<ovpages; i++) {
+ DPRINTF("freed ov page %zu", pg);
+ mdb_midl_append(&mc->mc_txn->mt_free_pgs, pg);
+ pg++;
+ }
+ }
+ } else if (NODEDSZ(leaf) == data->mv_size) {
+ /* same size, just replace it. Note that we could
+ * also reuse this node if the new data is smaller,
+ * but instead we opt to shrink the node in that case.
+ */
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = NODEDATA(leaf);
else
goto done;
}
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+ mc->mc_db->md_entries--;
} else {
DPRINTF("inserting key at index %i", mc->mc_ki[mc->mc_top]);
+ insert = 1;
}
rdata = data;
nflags = flags & NODE_ADD_FLAGS;
nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(mc->mc_txn->mt_env, key, rdata);
if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) {
+ if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA )
+ nflags &= ~MDB_APPEND;
+ if (!insert)
+ nflags |= MDB_SPLIT_REPLACE;
rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags);
} else {
/* There is room already in this leaf page. */
rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
- if (rc == 0 && !do_sub) {
+ if (rc == 0 && !do_sub && insert) {
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
dbi--;
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
- if (m2 == mc) continue;
if (mc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
+ if (m3 == mc || m3->mc_snum < mc->mc_snum) continue;
if (m3->mc_pg[i] == mp && m3->mc_ki[i] >= mc->mc_ki[i]) {
m3->mc_ki[i]++;
}
* DB are all zero size.
*/
if (do_sub) {
- MDB_db *db;
int xflags;
put_sub:
xdata.mv_size = 0;
MDB_page *mp = mc->mc_pg[i];
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
- if (m2 == mc) continue;
+ if (m2 == mc || m2->mc_snum < mc->mc_snum) continue;
if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) {
mdb_xcursor_init1(m2, leaf);
}
}
}
}
+ if (flags & MDB_APPENDDUP)
+ xflags |= MDB_APPEND;
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) {
- db = NODEDATA(leaf);
+ void *db = NODEDATA(leaf);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
}
}
*/
if (!rc && !(flags & MDB_CURRENT))
mc->mc_db->md_entries++;
+ if (flags & MDB_MULTIPLE) {
+ mcount++;
+ if (mcount < data[1].mv_size) {
+ data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size;
+ leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
+ goto more;
+ }
+ }
}
done:
return rc;
if (mc->mc_xcursor->mx_db.md_entries) {
if (leaf->mn_flags & F_SUBDATA) {
/* update subDB info */
- MDB_db *db = NODEDATA(leaf);
+ void *db = NODEDATA(leaf);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
} else {
/* shrink fake page */
size_t sz;
sz = LEAFSIZE(key, data);
- if (data->mv_size >= env->me_psize / MDB_MINKEYS) {
+ if (sz >= env->me_psize / MDB_MINKEYS) {
/* put on overflow page */
sz -= data->mv_size - sizeof(pgno_t);
}
if (F_ISSET(flags, F_BIGDATA)) {
/* Data already on overflow page. */
node_size += sizeof(pgno_t);
- } else if (data->mv_size >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) {
+ } else if (node_size + data->mv_size >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) {
int ovpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
/* Put data on overflow page. */
- DPRINTF("data size is %zu, put on overflow page",
- data->mv_size);
+ DPRINTF("data size is %zu, node would be %zu, put data on overflow page",
+ data->mv_size, node_size+data->mv_size);
node_size += sizeof(pgno_t);
if ((ofp = mdb_page_new(mc, P_OVERFLOW, ovpages)) == NULL)
return ENOMEM;
MDB_node *node;
char *base;
+#if MDB_DEBUG
+ {
+ pgno_t pgno;
+ COPY_PGNO(pgno, mp->mp_pgno);
DPRINTF("delete node %u on %s page %zu", indx,
- IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno);
+ IS_LEAF(mp) ? "leaf" : "branch", pgno);
+ }
+#endif
assert(indx < NUMKEYS(mp));
if (IS_LEAF2(mp)) {
xp->mp_lower = sp->mp_lower;
xp->mp_flags = sp->mp_flags;
xp->mp_pad = sp->mp_pad;
- xp->mp_pgno = mp->mp_pgno;
+ COPY_PGNO(xp->mp_pgno, mp->mp_pgno);
/* shift lower nodes upward */
ptr = mp->mp_ptrs[indx];
mx->mx_cursor.mc_dbx = &mx->mx_dbx;
mx->mx_cursor.mc_dbi = mc->mc_dbi+1;
mx->mx_cursor.mc_dbflag = &mx->mx_dbflag;
+ mx->mx_cursor.mc_snum = 0;
+ mx->mx_cursor.mc_top = 0;
+ mx->mx_cursor.mc_flags = C_SUB;
mx->mx_dbx.md_cmp = mc->mc_dbx->md_dcmp;
mx->mx_dbx.md_dcmp = NULL;
mx->mx_dbx.md_rel = mc->mc_dbx->md_rel;
MDB_xcursor *mx = mc->mc_xcursor;
if (node->mn_flags & F_SUBDATA) {
- MDB_db *db = NODEDATA(node);
- mx->mx_db = *db;
+ memcpy(&mx->mx_db, NODEDATA(node), sizeof(MDB_db));
+ mx->mx_cursor.mc_pg[0] = 0;
mx->mx_cursor.mc_snum = 0;
mx->mx_cursor.mc_flags = C_SUB;
} else {
mx->mx_db.md_leaf_pages = 1;
mx->mx_db.md_overflow_pages = 0;
mx->mx_db.md_entries = NUMKEYS(fp);
- mx->mx_db.md_root = fp->mp_pgno;
+ COPY_PGNO(mx->mx_db.md_root, fp->mp_pgno);
mx->mx_cursor.mc_snum = 1;
mx->mx_cursor.mc_flags = C_INITIALIZED|C_SUB;
mx->mx_cursor.mc_top = 0;
DB_DIRTY : 0;
mx->mx_dbx.md_name.mv_data = NODEKEY(node);
mx->mx_dbx.md_name.mv_size = node->mn_ksize;
+#if UINT_MAX < SIZE_MAX
if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t))
+#ifdef MISALIGNED_OK
mx->mx_dbx.md_cmp = mdb_cmp_long;
+#else
+ mx->mx_dbx.md_cmp = mdb_cmp_cint;
+#endif
+#endif
}
/** Initialize a cursor for a given transaction and database. */
mc->mc_dbx = &txn->mt_dbxs[dbi];
mc->mc_dbflag = &txn->mt_dbflags[dbi];
mc->mc_snum = 0;
+ mc->mc_top = 0;
+ mc->mc_pg[0] = 0;
mc->mc_flags = 0;
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
assert(mx != NULL);
} else {
mc->mc_xcursor = NULL;
}
+ if (*mc->mc_dbflag & DB_STALE) {
+ mdb_page_search(mc, NULL, MDB_PS_ROOTONLY);
+ }
}
int
MDB_xcursor *mx = NULL;
size_t size = sizeof(MDB_cursor);
- if (txn == NULL || ret == NULL || !dbi || dbi >= txn->mt_numdbs)
+ if (txn == NULL || ret == NULL || dbi >= txn->mt_numdbs)
+ return EINVAL;
+
+ /* Allow read access to the freelist */
+ if (!dbi && !F_ISSET(txn->mt_flags, MDB_TXN_RDONLY))
return EINVAL;
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)
static int
mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
{
- indx_t ptr, i, numkeys;
- int delta;
- size_t len;
MDB_node *node;
char *base;
+ size_t len;
+ int delta, delta0;
+ indx_t ptr, i, numkeys;
DKBUF;
node = NODEPTR(mp, indx);
ptr = mp->mp_ptrs[indx];
- DPRINTF("update key %u (ofs %u) [%.*s] to [%s] on page %zu",
- indx, ptr,
- (int)node->mn_ksize, (char *)NODEKEY(node),
- DKEY(key),
- mp->mp_pgno);
+#if MDB_DEBUG
+ {
+ MDB_val k2;
+ char kbuf2[(MAXKEYSIZE*2+1)];
+ k2.mv_data = NODEKEY(node);
+ k2.mv_size = node->mn_ksize;
+ DPRINTF("update key %u (ofs %u) [%s] to [%s] on page %zu",
+ indx, ptr,
+ mdb_dkey(&k2, kbuf2),
+ DKEY(key),
+ mp->mp_pgno);
+ }
+#endif
- delta = key->mv_size - node->mn_ksize;
+ delta0 = delta = key->mv_size - node->mn_ksize;
+
+ /* Must be 2-byte aligned. If new key is
+ * shorter by 1, the shift will be skipped.
+ */
+ delta += (delta & 1);
if (delta) {
if (delta > 0 && SIZELEFT(mp) < delta) {
DPRINTF("OUCH! Not enough room, delta = %d", delta);
mp->mp_upper -= delta;
node = NODEPTR(mp, indx);
- node->mn_ksize = key->mv_size;
}
- memcpy(NODEKEY(node), key->mv_data, key->mv_size);
+ /* But even if no shift was needed, update ksize */
+ if (delta0)
+ node->mn_ksize = key->mv_size;
+
+ if (key->mv_size)
+ memcpy(NODEKEY(node), key->mv_data, key->mv_size);
return MDB_SUCCESS;
}
int rc;
MDB_node *srcnode;
MDB_val key, data;
+ pgno_t srcpg;
+ unsigned short flags;
+
DKBUF;
/* Mark src and dst as dirty. */
key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
data.mv_size = 0;
data.mv_data = NULL;
+ srcpg = 0;
+ flags = 0;
} else {
srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]);
+ assert(!((long)srcnode&1));
+ srcpg = NODEPGNO(srcnode);
+ flags = srcnode->mn_flags;
if (csrc->mc_ki[csrc->mc_top] == 0 && IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
unsigned int snum = csrc->mc_snum;
MDB_node *s2;
/* must find the lowest key below src */
mdb_page_search_root(csrc, NULL, 0);
- s2 = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
- key.mv_size = NODEKSZ(s2);
- key.mv_data = NODEKEY(s2);
+ if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
+ key.mv_size = csrc->mc_db->md_pad;
+ key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size);
+ } else {
+ s2 = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
+ key.mv_size = NODEKSZ(s2);
+ key.mv_data = NODEKEY(s2);
+ }
csrc->mc_snum = snum--;
csrc->mc_top = snum;
} else {
data.mv_size = NODEDSZ(srcnode);
data.mv_data = NODEDATA(srcnode);
}
+ if (IS_BRANCH(cdst->mc_pg[cdst->mc_top]) && cdst->mc_ki[cdst->mc_top] == 0) {
+ unsigned int snum = cdst->mc_snum;
+ MDB_node *s2;
+ MDB_val bkey;
+ /* must find the lowest key below dst */
+ mdb_page_search_root(cdst, NULL, 0);
+ if (IS_LEAF2(cdst->mc_pg[cdst->mc_top])) {
+ bkey.mv_size = cdst->mc_db->md_pad;
+ bkey.mv_data = LEAF2KEY(cdst->mc_pg[cdst->mc_top], 0, bkey.mv_size);
+ } else {
+ s2 = NODEPTR(cdst->mc_pg[cdst->mc_top], 0);
+ bkey.mv_size = NODEKSZ(s2);
+ bkey.mv_data = NODEKEY(s2);
+ }
+ cdst->mc_snum = snum--;
+ cdst->mc_top = snum;
+ rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &bkey);
+ }
+
DPRINTF("moving %s node %u [%s] on page %zu to node %u on page %zu",
IS_LEAF(csrc->mc_pg[csrc->mc_top]) ? "leaf" : "branch",
csrc->mc_ki[csrc->mc_top],
/* Add the node to the destination page.
*/
- rc = mdb_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key, &data, NODEPGNO(srcnode),
- srcnode->mn_flags);
+ rc = mdb_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key, &data, srcpg, flags);
if (rc != MDB_SUCCESS)
return rc;
} else {
for (i = 0; i < NUMKEYS(csrc->mc_pg[csrc->mc_top]); i++, j++) {
srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], i);
+ if (i == 0 && IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
+ unsigned int snum = csrc->mc_snum;
+ MDB_node *s2;
+ /* must find the lowest key below src */
+ mdb_page_search_root(csrc, NULL, 0);
+ if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
+ key.mv_size = csrc->mc_db->md_pad;
+ key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size);
+ } else {
+ s2 = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
+ key.mv_size = NODEKSZ(s2);
+ key.mv_data = NODEKEY(s2);
+ }
+ csrc->mc_snum = snum--;
+ csrc->mc_top = snum;
+ } else {
+ key.mv_size = srcnode->mn_ksize;
+ key.mv_data = NODEKEY(srcnode);
+ }
- key.mv_size = srcnode->mn_ksize;
- key.mv_data = NODEKEY(srcnode);
data.mv_size = NODEDSZ(srcnode);
data.mv_data = NODEDATA(srcnode);
rc = mdb_node_add(cdst, j, &key, &data, NODEPGNO(srcnode), srcnode->mn_flags);
dbi--;
for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
- if (m2 == csrc) continue;
if (csrc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
+ if (m3 == csrc) continue;
+ if (m3->mc_snum < csrc->mc_snum) continue;
if (m3->mc_pg[csrc->mc_top] == csrc->mc_pg[csrc->mc_top]) {
m3->mc_pg[csrc->mc_top] = mp;
m3->mc_ki[csrc->mc_top] += nkeys;
unsigned int ptop;
MDB_cursor mn;
+#if MDB_DEBUG
+ {
+ pgno_t pgno;
+ COPY_PGNO(pgno, mc->mc_pg[mc->mc_top]->mp_pgno);
DPRINTF("rebalancing %s page %zu (has %u keys, %.1f%% full)",
IS_LEAF(mc->mc_pg[mc->mc_top]) ? "leaf" : "branch",
- mc->mc_pg[mc->mc_top]->mp_pgno, NUMKEYS(mc->mc_pg[mc->mc_top]), (float)PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) / 10);
+ pgno, NUMKEYS(mc->mc_pg[mc->mc_top]), (float)PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) / 10);
+ }
+#endif
if (PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) >= FILL_THRESHOLD) {
+#if MDB_DEBUG
+ pgno_t pgno;
+ COPY_PGNO(pgno, mc->mc_pg[mc->mc_top]->mp_pgno);
DPRINTF("no need to rebalance page %zu, above fill threshold",
- mc->mc_pg[mc->mc_top]->mp_pgno);
+ pgno);
+#endif
return MDB_SUCCESS;
}
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
+ if (m3->mc_snum < mc->mc_snum) continue;
if (m3->mc_pg[0] == mp) {
m3->mc_snum = 0;
m3->mc_top = 0;
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
+ if (m3->mc_snum < mc->mc_snum) continue;
if (m3->mc_pg[0] == mp) {
m3->mc_pg[0] = mc->mc_pg[0];
}
memcpy(&pg, NODEDATA(leaf), sizeof(pg));
ovpages = OVPAGES(NODEDSZ(leaf), mc->mc_txn->mt_env->me_psize);
+ mc->mc_db->md_overflow_pages -= ovpages;
for (i=0; i<ovpages; i++) {
DPRINTF("freed ov page %zu", pg);
mdb_midl_append(&mc->mc_txn->mt_free_pgs, pg);
* @param[in] newkey The key for the newly inserted node.
* @param[in] newdata The data for the newly inserted node.
* @param[in] newpgno The page number, if the new node is a branch node.
+ * @param[in] nflags The #NODE_ADD_FLAGS for the new node.
* @return 0 on success, non-zero on failure.
*/
static int
unsigned int nflags)
{
unsigned int flags;
- int rc = MDB_SUCCESS, ins_new = 0, new_root = 0;
+ int rc = MDB_SUCCESS, ins_new = 0, new_root = 0, newpos = 1, did_split = 0;
indx_t newindx;
pgno_t pgno = 0;
unsigned int i, j, split_indx, nkeys, pmax;
IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno,
DKEY(newkey), mc->mc_ki[mc->mc_top]);
+ /* Create a right sibling. */
+ if ((rp = mdb_page_new(mc, mp->mp_flags, 1)) == NULL)
+ return ENOMEM;
+ DPRINTF("new right sibling: page %zu", rp->mp_pgno);
+
if (mc->mc_snum < 2) {
if ((pp = mdb_page_new(mc, P_BRANCH, 1)) == NULL)
return ENOMEM;
DPRINTF("parent branch page is %zu", mc->mc_pg[ptop]->mp_pgno);
}
- /* Create a right sibling. */
- if ((rp = mdb_page_new(mc, mp->mp_flags, 1)) == NULL)
- return ENOMEM;
+ mc->mc_flags |= C_SPLITTING;
mdb_cursor_copy(mc, &mn);
mn.mc_pg[mn.mc_top] = rp;
mn.mc_ki[ptop] = mc->mc_ki[ptop]+1;
- DPRINTF("new right sibling: page %zu", rp->mp_pgno);
+
+ if (nflags & MDB_APPEND) {
+ mn.mc_ki[mn.mc_top] = 0;
+ sepkey = *newkey;
+ split_indx = newindx;
+ nkeys = 0;
+ goto newsep;
+ }
nkeys = NUMKEYS(mp);
- split_indx = nkeys / 2 + 1;
+ split_indx = (nkeys + 1) / 2;
+ if (newindx < split_indx)
+ newpos = 0;
if (IS_LEAF2(rp)) {
char *split, *ins;
}
/* For leaf pages, check the split point based on what
- * fits where, since otherwise add_node can fail.
+ * fits where, since otherwise mdb_node_add can fail.
+ *
+ * This check is only needed when the data items are
+ * relatively large, such that being off by one will
+ * make the difference between success or failure.
+ * When the size of the data items is much smaller than
+ * one-half of a page, this check is irrelevant.
*/
if (IS_LEAF(mp)) {
unsigned int psize, nsize;
/* Maximum free space in an empty page */
pmax = mc->mc_txn->mt_env->me_psize - PAGEHDRSZ;
nsize = mdb_leaf_size(mc->mc_txn->mt_env, newkey, newdata);
- if (newindx < split_indx) {
- psize = nsize;
- for (i=0; i<split_indx; i++) {
- node = NODEPTR(mp, i);
- psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
- if (F_ISSET(node->mn_flags, F_BIGDATA))
- psize += sizeof(pgno_t);
- else
- psize += NODEDSZ(node);
- psize += psize & 1;
- if (psize > pmax) {
- split_indx = i;
- break;
+ if ((nkeys < 20) || (nsize > pmax/4)) {
+ if (newindx <= split_indx) {
+ psize = nsize;
+ newpos = 0;
+ for (i=0; i<split_indx; i++) {
+ node = NODEPTR(mp, i);
+ psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
+ if (F_ISSET(node->mn_flags, F_BIGDATA))
+ psize += sizeof(pgno_t);
+ else
+ psize += NODEDSZ(node);
+ psize += psize & 1;
+ if (psize > pmax) {
+ if (i == split_indx - 1 && newindx == split_indx)
+ newpos = 1;
+ else
+ split_indx = i;
+ break;
+ }
}
- }
- } else {
- psize = nsize;
- for (i=nkeys-1; i>=split_indx; i--) {
- node = NODEPTR(mp, i);
- psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
- if (F_ISSET(node->mn_flags, F_BIGDATA))
- psize += sizeof(pgno_t);
- else
- psize += NODEDSZ(node);
- psize += psize & 1;
- if (psize > pmax) {
- split_indx = i+1;
- break;
+ } else {
+ psize = nsize;
+ for (i=nkeys-1; i>=split_indx; i--) {
+ node = NODEPTR(mp, i);
+ psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
+ if (F_ISSET(node->mn_flags, F_BIGDATA))
+ psize += sizeof(pgno_t);
+ else
+ psize += NODEDSZ(node);
+ psize += psize & 1;
+ if (psize > pmax) {
+ split_indx = i+1;
+ break;
+ }
}
}
}
}
/* First find the separating key between the split pages.
+ * The case where newindx == split_indx is ambiguous; the
+ * new item could go to the new page or stay on the original
+ * page. If newpos == 1 it goes to the new page.
*/
- if (newindx == split_indx) {
+ if (newindx == split_indx && newpos) {
sepkey.mv_size = newkey->mv_size;
sepkey.mv_data = newkey->mv_data;
} else {
if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(mc->mc_txn->mt_env, &sepkey)) {
mn.mc_snum--;
mn.mc_top--;
+ did_split = 1;
rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno, 0);
+ /* root split? */
+ if (mn.mc_snum == mc->mc_snum) {
+ mc->mc_pg[mc->mc_snum] = mc->mc_pg[mc->mc_top];
+ mc->mc_ki[mc->mc_snum] = mc->mc_ki[mc->mc_top];
+ mc->mc_pg[mc->mc_top] = mc->mc_pg[ptop];
+ mc->mc_ki[mc->mc_top] = mc->mc_ki[ptop];
+ mc->mc_snum++;
+ mc->mc_top++;
+ ptop++;
+ }
/* Right page might now have changed parent.
* Check if left page also changed parent.
*/
if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
+ for (i=0; i<ptop; i++) {
+ mc->mc_pg[i] = mn.mc_pg[i];
+ mc->mc_ki[i] = mn.mc_ki[i];
+ }
mc->mc_pg[ptop] = mn.mc_pg[ptop];
mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
}
rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
mn.mc_top++;
}
+ mc->mc_flags ^= C_SPLITTING;
if (rc != MDB_SUCCESS) {
return rc;
}
+ if (nflags & MDB_APPEND) {
+ mc->mc_pg[mc->mc_top] = rp;
+ mc->mc_ki[mc->mc_top] = 0;
+ rc = mdb_node_add(mc, 0, newkey, newdata, newpgno, nflags);
+ if (rc)
+ return rc;
+ for (i=0; i<mc->mc_top; i++)
+ mc->mc_ki[i] = mn.mc_ki[i];
+ goto done;
+ }
if (IS_LEAF2(rp)) {
goto done;
}
if (i == split_indx) {
/* Insert in right sibling. */
/* Reset insert index for right sibling. */
- j = (i == newindx && ins_new);
- mc->mc_pg[mc->mc_top] = rp;
+ if (i != newindx || (newpos ^ ins_new)) {
+ j = 0;
+ mc->mc_pg[mc->mc_top] = rp;
+ }
}
if (i == newindx && !ins_new) {
ins_new = 1;
- /* Update page and index for the new key. */
+ /* Update index for the new key. */
mc->mc_ki[mc->mc_top] = j;
} else if (i == nkeys) {
break;
}
rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags);
+ if (rc) break;
}
- /* reset back to original page */
- if (newindx < split_indx)
- mc->mc_pg[mc->mc_top] = mp;
-
nkeys = NUMKEYS(copy);
for (i=0; i<nkeys; i++)
mp->mp_ptrs[i] = copy->mp_ptrs[i];
memcpy(NODEPTR(mp, nkeys-1), NODEPTR(copy, nkeys-1),
mc->mc_txn->mt_env->me_psize - copy->mp_upper);
+ /* reset back to original page */
+ if (newindx < split_indx || (!newpos && newindx == split_indx)) {
+ mc->mc_pg[mc->mc_top] = mp;
+ if (nflags & MDB_RESERVE) {
+ node = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
+ if (!(node->mn_flags & F_BIGDATA))
+ newdata->mv_data = NODEDATA(node);
+ }
+ } else {
+ mc->mc_ki[ptop]++;
+ }
+
/* return tmp page to freelist */
copy->mp_next = mc->mc_txn->mt_env->me_dpages;
+ VGMEMP_FREE(mc->mc_txn->mt_env, copy);
mc->mc_txn->mt_env->me_dpages = copy;
done:
{
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
+ int fixup = NUMKEYS(mp);
if (mc->mc_flags & C_SUB)
dbi--;
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
+ if (!(m3->mc_flags & C_INITIALIZED))
+ continue;
+ if (m3->mc_flags & C_SPLITTING)
+ continue;
if (new_root) {
+ int k;
/* root split */
- for (i=m3->mc_top; i>0; i--) {
- m3->mc_ki[i+1] = m3->mc_ki[i];
- m3->mc_pg[i+1] = m3->mc_pg[i];
+ for (k=m3->mc_top; k>=0; k--) {
+ m3->mc_ki[k+1] = m3->mc_ki[k];
+ m3->mc_pg[k+1] = m3->mc_pg[k];
+ }
+ if (m3->mc_ki[0] >= split_indx) {
+ m3->mc_ki[0] = 1;
+ } else {
+ m3->mc_ki[0] = 0;
}
- m3->mc_ki[0] = mc->mc_ki[0];
m3->mc_pg[0] = mc->mc_pg[0];
m3->mc_snum++;
m3->mc_top++;
}
if (m3->mc_pg[mc->mc_top] == mp) {
- if (m3->mc_ki[m3->mc_top] >= split_indx) {
- m3->mc_pg[m3->mc_top] = rp;
- m3->mc_ki[m3->mc_top] -= split_indx;
+ if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDB_SPLIT_REPLACE))
+ m3->mc_ki[mc->mc_top]++;
+ if (m3->mc_ki[mc->mc_top] >= fixup) {
+ m3->mc_pg[mc->mc_top] = rp;
+ m3->mc_ki[mc->mc_top] -= fixup;
+ m3->mc_ki[ptop] = mn.mc_ki[ptop];
}
+ } else if (!did_split && m3->mc_pg[ptop] == mc->mc_pg[ptop] &&
+ m3->mc_ki[ptop] >= mc->mc_ki[ptop]) {
+ m3->mc_ki[ptop]++;
}
}
}
return EINVAL;
}
- if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE)) != flags)
+ if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND)) != flags)
return EINVAL;
mdb_cursor_init(&mc, txn, dbi, &mx);
* at runtime. Changing other flags requires closing the environment
* and re-opening it with the new flags.
*/
-#define CHANGEABLE (MDB_NOSYNC)
+#define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC)
int
mdb_env_set_flags(MDB_env *env, unsigned int flag, int onoff)
{
if (env == NULL || arg == NULL)
return EINVAL;
- mdb_env_read_meta(env, &toggle);
+ toggle = mdb_env_pick_meta(env);
return mdb_stat0(env, &env->me_metas[toggle]->mm_dbs[MAIN_DBI], arg);
}
static void
mdb_default_cmp(MDB_txn *txn, MDB_dbi dbi)
{
- if (txn->mt_dbs[dbi].md_flags & MDB_REVERSEKEY)
- txn->mt_dbxs[dbi].md_cmp = mdb_cmp_memnr;
- else if (txn->mt_dbs[dbi].md_flags & MDB_INTEGERKEY)
- txn->mt_dbxs[dbi].md_cmp = mdb_cmp_cint;
- else
- txn->mt_dbxs[dbi].md_cmp = mdb_cmp_memn;
+ uint16_t f = txn->mt_dbs[dbi].md_flags;
- if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
- if (txn->mt_dbs[dbi].md_flags & MDB_INTEGERDUP) {
- if (txn->mt_dbs[dbi].md_flags & MDB_DUPFIXED)
- txn->mt_dbxs[dbi].md_dcmp = mdb_cmp_int;
- else
- txn->mt_dbxs[dbi].md_dcmp = mdb_cmp_cint;
- } else if (txn->mt_dbs[dbi].md_flags & MDB_REVERSEDUP) {
- txn->mt_dbxs[dbi].md_dcmp = mdb_cmp_memnr;
- } else {
- txn->mt_dbxs[dbi].md_dcmp = mdb_cmp_memn;
- }
- } else {
- txn->mt_dbxs[dbi].md_dcmp = NULL;
- }
+ txn->mt_dbxs[dbi].md_cmp =
+ (f & MDB_REVERSEKEY) ? mdb_cmp_memnr :
+ (f & MDB_INTEGERKEY) ? mdb_cmp_cint : mdb_cmp_memn;
+
+ txn->mt_dbxs[dbi].md_dcmp =
+ !(f & MDB_DUPSORT) ? 0 :
+ ((f & MDB_INTEGERDUP)
+ ? ((f & MDB_DUPFIXED) ? mdb_cmp_int : mdb_cmp_cint)
+ : ((f & MDB_REVERSEDUP) ? mdb_cmp_memnr : mdb_cmp_memn));
}
int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
MDB_dbi i;
MDB_cursor mc;
int rc, dbflag, exact;
+ unsigned int unused = 0;
size_t len;
if (txn->mt_dbxs[FREE_DBI].md_cmp == NULL) {
/* Is the DB already open? */
len = strlen(name);
for (i=2; i<txn->mt_numdbs; i++) {
+ if (!txn->mt_dbxs[i].md_name.mv_size) {
+ /* Remember this free slot */
+ if (!unused) unused = i;
+ continue;
+ }
if (len == txn->mt_dbxs[i].md_name.mv_size &&
!strncmp(name, txn->mt_dbxs[i].md_name.mv_data, len)) {
*dbi = i;
}
}
- if (txn->mt_numdbs >= txn->mt_env->me_maxdbs - 1)
+ /* If no free slot and max hit, fail */
+ if (!unused && txn->mt_numdbs >= txn->mt_env->me_maxdbs - 1)
return ENFILE;
/* Find the DB info */
/* OK, got info, add to table */
if (rc == MDB_SUCCESS) {
- txn->mt_dbxs[txn->mt_numdbs].md_name.mv_data = strdup(name);
- txn->mt_dbxs[txn->mt_numdbs].md_name.mv_size = len;
- txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL;
- txn->mt_dbflags[txn->mt_numdbs] = dbflag;
- memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db));
- *dbi = txn->mt_numdbs;
- txn->mt_env->me_dbs[0][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
- txn->mt_env->me_dbs[1][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
- mdb_default_cmp(txn, txn->mt_numdbs);
- txn->mt_numdbs++;
+ unsigned int slot = unused ? unused : txn->mt_numdbs;
+ txn->mt_dbxs[slot].md_name.mv_data = strdup(name);
+ txn->mt_dbxs[slot].md_name.mv_size = len;
+ txn->mt_dbxs[slot].md_rel = NULL;
+ txn->mt_dbflags[slot] = dbflag;
+ memcpy(&txn->mt_dbs[slot], data.mv_data, sizeof(MDB_db));
+ *dbi = slot;
+ txn->mt_env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags;
+ mdb_default_cmp(txn, slot);
+ if (!unused) {
+ txn->mt_numdbs++;
+ txn->mt_env->me_numdbs++;
+ }
}
return rc;
if (!txn || !dbi || dbi >= txn->mt_numdbs)
return EINVAL;
+ if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY))
+ return EACCES;
+
rc = mdb_cursor_open(txn, dbi, &mc);
if (rc)
return rc;
rc = mdb_drop0(mc, mc->mc_db->md_flags & MDB_DUPSORT);
if (rc)
- mdb_cursor_close(mc);
- return rc;
+ goto leave;
/* Can't delete the main DB */
if (del && dbi > MAIN_DBI) {
txn->mt_dbs[dbi].md_entries = 0;
txn->mt_dbs[dbi].md_root = P_INVALID;
}
+leave:
mdb_cursor_close(mc);
return rc;
}