/** @file mdb.c
- * @brief memory-mapped database library
+ * @brief Lightning memory-mapped database library
*
* A Btree-based database management library modeled loosely on the
* BerkeleyDB API, but much simplified.
*/
/*
- * Copyright 2011-2013 Howard Chu, Symas Corp.
+ * Copyright 2011-2014 Howard Chu, Symas Corp.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
# error "Two's complement, reasonably sized integer types, please"
#endif
-/** @defgroup internal MDB Internals
+/** @defgroup internal LMDB Internals
* @{
*/
/** @defgroup compat Compatibility Macros
*/
#define MDB_MINKEYS 2
- /** A stamp that identifies a file as an MDB file.
+ /** A stamp that identifies a file as an LMDB file.
* There's nothing special about this value other than that it is easily
* recognizable, and it will reflect any byte order mismatches.
*/
* unlikely. If a collision occurs, the results are unpredictable.
*/
typedef struct MDB_txbody {
- /** Stamp identifying this as an MDB file. It must be set
+ /** Stamp identifying this as an LMDB file. It must be set
* to #MDB_MAGIC. */
uint32_t mtb_magic;
/** Format of this lock file. Must be set to #MDB_LOCK_FORMAT. */
#define P_DIRTY 0x10 /**< dirty page, also set for #P_SUBP pages */
#define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */
#define P_SUBP 0x40 /**< for #MDB_DUPSORT sub-pages */
+#define P_LOOSE 0x4000 /**< page was dirtied then freed, can be reused */
#define P_KEEP 0x8000 /**< leave this page alone during spill */
/** @} */
uint16_t mp_flags; /**< @ref mdb_page */
* Pages 0-1 are meta pages. Transaction N writes meta page #(N % 2).
*/
typedef struct MDB_meta {
- /** Stamp identifying this as an MDB file. It must be set
+ /** Stamp identifying this as an LMDB file. It must be set
* to #MDB_MAGIC. */
uint32_t mm_magic;
/** Version number of this lock file. Must be set to #MDB_DATA_VERSION. */
typedef struct MDB_pgstate {
pgno_t *mf_pghead; /**< Reclaimed freeDB pages, or NULL before use */
txnid_t mf_pglast; /**< ID of last used record, or 0 if !mf_pghead */
+ MDB_page *mf_pgloose; /**< Dirty pages that can be reused */
} MDB_pgstate;
/** The database environment. */
MDB_pgstate me_pgstate; /**< state of old pages from freeDB */
# define me_pglast me_pgstate.mf_pglast
# define me_pghead me_pgstate.mf_pghead
+# define me_pgloose me_pgstate.mf_pgloose
MDB_page *me_dpages; /**< list of malloc'd blocks for re-use */
/** IDL of pages that became unused in a write txn */
MDB_IDL me_free_pgs;
return MDB_VERSION_STRING;
}
-/** Table of descriptions for MDB @ref errors */
+/** Table of descriptions for LMDB @ref errors */
static char *const mdb_errstr[] = {
"MDB_KEYEXIST: Key/data pair already exists",
"MDB_NOTFOUND: No matching key/data pair found",
"MDB_CORRUPTED: Located page was wrong type",
"MDB_PANIC: Update of meta page failed",
"MDB_VERSION_MISMATCH: Database environment version mismatch",
- "MDB_INVALID: File is not an MDB file",
+ "MDB_INVALID: File is not an LMDB file",
"MDB_MAP_FULL: Environment mapsize limit reached",
"MDB_DBS_FULL: Environment maxdbs limit reached",
"MDB_READERS_FULL: Environment maxreaders limit reached",
}
return ret;
}
-
/** Free a single page.
* Saves single pages to a list, for future reuse.
* (This is not used for multi-page overflow pages.)
dl[0].mid = 0;
}
+/** Loosen a single page.
+ * Saves single pages to a list for future reuse
+ * in this same txn. It has been pulled from the freeDB
+ * and already resides on the dirty list, but has been
+ * deleted. Use these pages first before pulling again
+ * from the freeDB.
+ */
+static void
+mdb_page_loose(MDB_env *env, MDB_page *mp)
+{
+ pgno_t *pp = (pgno_t *)mp->mp_ptrs;
+ *pp = mp->mp_pgno;
+ mp->mp_next = env->me_pgloose;
+ env->me_pgloose = mp;
+ mp->mp_flags |= P_LOOSE;
+}
+
/** Set or clear P_KEEP in dirty, non-overflow, non-sub pages watched by txn.
* @param[in] mc A cursor handle for the current operation.
* @param[in] pflags Flags of the pages to update:
break;
}
+ /* Loose pages shouldn't be spilled */
+ for (dp = txn->mt_env->me_pgloose; dp; dp=dp->mp_next) {
+ if ((dp->mp_flags & Mask) == pflags)
+ dp->mp_flags ^= P_KEEP;
+ }
+
if (all) {
/* Mark dirty root pages */
for (i=0; i<txn->mt_numdbs; i++) {
#else
enum { Paranoid = 0, Max_retries = INT_MAX /*infinite*/ };
#endif
- int rc, retry = num;
+ int rc, retry = num * 20;
MDB_txn *txn = mc->mc_txn;
MDB_env *env = txn->mt_env;
pgno_t pgno, *mop = env->me_pghead;
MDB_cursor_op op;
MDB_cursor m2;
+ /* If there are any loose pages, just use them */
+ if (num == 1 && env->me_pgloose) {
+ pgno_t *pp;
+ np = env->me_pgloose;
+ env->me_pgloose = np->mp_next;
+ pp = (pgno_t *)np->mp_ptrs;
+ np->mp_pgno = *pp;
+ *mp = np;
+ return MDB_SUCCESS;
+ }
+
*mp = NULL;
/* If our dirty list is already full, we can't do anything */
return rc;
}
+ /* Dispose of loose pages. Usually they will have all
+ * been used up by the time we get here.
+ */
+ if (env->me_pgloose) {
+ MDB_page *mp = env->me_pgloose;
+ pgno_t *pp;
+ /* Just return them to freeDB */
+ if (env->me_pghead) {
+ int i, j;
+ mop = env->me_pghead;
+ while(mp) {
+ pgno_t pg;
+ pp = (pgno_t *)mp->mp_ptrs;
+ pg = *pp;
+ j = mop[0] + 1;
+ for (i = mop[0]; i && mop[i] < pg; i--)
+ mop[j--] = mop[i];
+ mop[j] = pg;
+ mop[0] += 1;
+ mp = mp->mp_next;
+ }
+ } else {
+ /* Oh well, they were wasted. Put on freelist */
+ while(mp) {
+ pp = (pgno_t *)mp->mp_ptrs;
+ mdb_midl_append(&txn->mt_free_pgs, *pp);
+ mp = mp->mp_next;
+ }
+ }
+ env->me_pgloose = NULL;
+ }
+
/* MDB_RESERVE cancels meminit in ovpage malloc (when no WRITEMAP) */
clean_limit = (env->me_flags & (MDB_NOMEMINIT|MDB_WRITEMAP))
? SSIZE_MAX : maxfree_1pg;
return MDB_SUCCESS;
}
-/** Further setup required for opening an MDB environment
+/** Further setup required for opening an LMDB environment
*/
static int
mdb_env_open2(MDB_env *env)
#endif
/** Open and/or initialize the lock region for the environment.
- * @param[in] env The MDB environment.
+ * @param[in] env The LMDB environment.
* @param[in] lpath The pathname of the file used for the lock region.
* @param[in] mode The Unix permissions for the file, if we create it.
* @param[out] excl Resulting file lock type: -1 none, 0 shared, 1 exclusive
} while(!x && u > (unsigned short *)a->mv_data);
return x;
#else
- return memcmp(a->mv_data, b->mv_data, a->mv_size);
+ unsigned short *u, *c, *end;
+ int x;
+
+ end = (unsigned short *) ((char *) a->mv_data + a->mv_size);
+ u = (unsigned short *)a->mv_data;
+ c = (unsigned short *)b->mv_data;
+ do {
+ x = *u++ - *c++;
+ } while(!x && u < end);
+ return x;
#endif
}
mc->mc_flags &= ~C_EOF;
if (IS_LEAF2(mp)) {
- key->mv_size = mc->mc_db->md_pad;
- key->mv_data = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], key->mv_size);
+ if (op == MDB_SET_RANGE || op == MDB_SET_KEY) {
+ key->mv_size = mc->mc_db->md_pad;
+ key->mv_data = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], key->mv_size);
+ }
return MDB_SUCCESS;
}
} else {
/* there's only a key anyway, so this is a no-op */
if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
+ char *ptr;
unsigned int ksize = mc->mc_db->md_pad;
if (key->mv_size != ksize)
return MDB_BAD_VALSIZE;
- if (flags == MDB_CURRENT) {
- char *ptr = LEAF2KEY(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], ksize);
- memcpy(ptr, key->mv_data, ksize);
- }
+ ptr = LEAF2KEY(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], ksize);
+ memcpy(ptr, key->mv_data, ksize);
return MDB_SUCCESS;
}
if (mc->mc_dbx->md_dcmp == mdb_cmp_int && olddata.mv_size == sizeof(size_t))
mc->mc_dbx->md_dcmp = mdb_cmp_clong;
#endif
- /* if data matches, skip it */
+ /* does data match? */
if (!mc->mc_dbx->md_dcmp(data, &olddata)) {
if (flags & MDB_NODUPDATA)
return MDB_KEYEXIST;
- rc = MDB_SUCCESS;
- goto next_sub;
+ /* overwrite it */
+ goto current;
}
/* Back up original data item */
*/
mc->mc_flags |= C_INITIALIZED;
}
-next_sub:
if (flags & MDB_MULTIPLE) {
if (!rc) {
mcount++;
MDB_node *s2;
MDB_val bkey;
/* must find the lowest key below dst */
- rc = mdb_page_search_lowest(cdst);
+ mdb_cursor_copy(cdst, &mn);
+ rc = mdb_page_search_lowest(&mn);
if (rc)
return rc;
- if (IS_LEAF2(cdst->mc_pg[cdst->mc_top])) {
- bkey.mv_size = cdst->mc_db->md_pad;
- bkey.mv_data = LEAF2KEY(cdst->mc_pg[cdst->mc_top], 0, bkey.mv_size);
+ if (IS_LEAF2(mn.mc_pg[mn.mc_top])) {
+ bkey.mv_size = mn.mc_db->md_pad;
+ bkey.mv_data = LEAF2KEY(mn.mc_pg[mn.mc_top], 0, bkey.mv_size);
} else {
- s2 = NODEPTR(cdst->mc_pg[cdst->mc_top], 0);
+ s2 = NODEPTR(mn.mc_pg[mn.mc_top], 0);
bkey.mv_size = NODEKSZ(s2);
bkey.mv_data = NODEKEY(s2);
}
- cdst->mc_snum = snum--;
- cdst->mc_top = snum;
- mdb_cursor_copy(cdst, &mn);
+ mn.mc_snum = snum--;
+ mn.mc_top = snum;
mn.mc_ki[snum] = 0;
rc = mdb_update_key(&mn, &bkey);
if (rc)
static int
mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
{
- int rc;
- indx_t i, j;
- MDB_node *srcnode;
+ MDB_page *psrc, *pdst;
+ MDB_node *srcnode;
MDB_val key, data;
- unsigned nkeys;
+ unsigned nkeys;
+ int rc;
+ indx_t i, j;
+
+ psrc = csrc->mc_pg[csrc->mc_top];
+ pdst = cdst->mc_pg[cdst->mc_top];
- DPRINTF(("merging page %"Z"u into %"Z"u", csrc->mc_pg[csrc->mc_top]->mp_pgno,
- cdst->mc_pg[cdst->mc_top]->mp_pgno));
+ DPRINTF(("merging page %"Z"u into %"Z"u", psrc->mp_pgno, pdst->mp_pgno));
mdb_cassert(csrc, csrc->mc_snum > 1); /* can't merge root page */
mdb_cassert(csrc, cdst->mc_snum > 1);
/* Move all nodes from src to dst.
*/
- j = nkeys = NUMKEYS(cdst->mc_pg[cdst->mc_top]);
- if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
+ j = nkeys = NUMKEYS(pdst);
+ if (IS_LEAF2(psrc)) {
key.mv_size = csrc->mc_db->md_pad;
- key.mv_data = METADATA(csrc->mc_pg[csrc->mc_top]);
- for (i = 0; i < NUMKEYS(csrc->mc_pg[csrc->mc_top]); i++, j++) {
+ key.mv_data = METADATA(psrc);
+ for (i = 0; i < NUMKEYS(psrc); i++, j++) {
rc = mdb_node_add(cdst, j, &key, NULL, 0, 0);
if (rc != MDB_SUCCESS)
return rc;
key.mv_data = (char *)key.mv_data + key.mv_size;
}
} else {
- for (i = 0; i < NUMKEYS(csrc->mc_pg[csrc->mc_top]); i++, j++) {
- srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], i);
- if (i == 0 && IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
- unsigned int snum = csrc->mc_snum;
+ for (i = 0; i < NUMKEYS(psrc); i++, j++) {
+ srcnode = NODEPTR(psrc, i);
+ if (i == 0 && IS_BRANCH(psrc)) {
+ MDB_cursor mn;
MDB_node *s2;
+ mdb_cursor_copy(csrc, &mn);
/* must find the lowest key below src */
- rc = mdb_page_search_lowest(csrc);
+ rc = mdb_page_search_lowest(&mn);
if (rc)
return rc;
- if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
- key.mv_size = csrc->mc_db->md_pad;
- key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size);
+ if (IS_LEAF2(mn.mc_pg[mn.mc_top])) {
+ key.mv_size = mn.mc_db->md_pad;
+ key.mv_data = LEAF2KEY(mn.mc_pg[mn.mc_top], 0, key.mv_size);
} else {
- s2 = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
+ s2 = NODEPTR(mn.mc_pg[mn.mc_top], 0);
key.mv_size = NODEKSZ(s2);
key.mv_data = NODEKEY(s2);
}
- csrc->mc_snum = snum--;
- csrc->mc_top = snum;
} else {
key.mv_size = srcnode->mn_ksize;
key.mv_data = NODEKEY(srcnode);
}
DPRINTF(("dst page %"Z"u now has %u keys (%.1f%% filled)",
- cdst->mc_pg[cdst->mc_top]->mp_pgno, NUMKEYS(cdst->mc_pg[cdst->mc_top]),
- (float)PAGEFILL(cdst->mc_txn->mt_env, cdst->mc_pg[cdst->mc_top]) / 10));
+ pdst->mp_pgno, NUMKEYS(pdst),
+ (float)PAGEFILL(cdst->mc_txn->mt_env, pdst) / 10));
/* Unlink the src page from parent and add to free list.
*/
}
csrc->mc_top++;
- rc = mdb_midl_append(&csrc->mc_txn->mt_free_pgs,
- csrc->mc_pg[csrc->mc_top]->mp_pgno);
- if (rc)
- return rc;
- if (IS_LEAF(csrc->mc_pg[csrc->mc_top]))
+ psrc = csrc->mc_pg[csrc->mc_top];
+ /* If not operating on FreeDB, allow this page to be reused
+ * in this txn.
+ */
+ if ((psrc->mp_flags & P_DIRTY) && csrc->mc_dbi != FREE_DBI) {
+ mdb_page_loose(csrc->mc_txn->mt_env, psrc);
+ } else {
+ rc = mdb_midl_append(&csrc->mc_txn->mt_free_pgs, psrc->mp_pgno);
+ if (rc)
+ return rc;
+ }
+ if (IS_LEAF(psrc))
csrc->mc_db->md_leaf_pages--;
else
csrc->mc_db->md_branch_pages--;
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = csrc->mc_dbi;
- MDB_page *mp = cdst->mc_pg[cdst->mc_top];
for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (csrc->mc_flags & C_SUB)
m3 = m2;
if (m3 == csrc) continue;
if (m3->mc_snum < csrc->mc_snum) continue;
- if (m3->mc_pg[csrc->mc_top] == csrc->mc_pg[csrc->mc_top]) {
- m3->mc_pg[csrc->mc_top] = mp;
+ if (m3->mc_pg[csrc->mc_top] == psrc) {
+ m3->mc_pg[csrc->mc_top] = pdst;
m3->mc_ki[csrc->mc_top] += nkeys;
}
}