X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=libraries%2Fliblmdb%2Fmdb.c;h=46e4490c47b352fea77e00b1bde8ed2c0665068f;hb=c3d84bcd06d0017a7fad5f1d7932c590605e6369;hp=e0916a1bf0a2bb54720f2a747712ff78211e929b;hpb=86551828abbafc90f52fc17ba095f48e28edab3b;p=openldap diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index e0916a1bf0..46e4490c47 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -37,10 +37,26 @@ #endif #include #include -#include #ifdef _WIN32 #include +/** getpid() returns int; MinGW defines pid_t but MinGW64 typedefs it + * as int64 which is wrong. MSVC doesn't define it at all, so just + * don't use it. + */ +#define MDB_PID_T int +#ifdef __GNUC__ +# include +#else +# define LITTLE_ENDIAN 1234 +# define BIG_ENDIAN 4321 +# define BYTE_ORDER LITTLE_ENDIAN +# ifndef SSIZE_MAX +# define SSIZE_MAX INT_MAX +# endif +#endif #else +#define MDB_PID_T pid_t +#include #include #include #ifdef HAVE_SYS_FILE_H @@ -75,6 +91,7 @@ #ifndef _WIN32 #include #ifdef MDB_USE_POSIX_SEM +# define MDB_USE_HASH 1 #include #endif #endif @@ -140,6 +157,7 @@ * @{ */ #ifdef _WIN32 +#define MDB_USE_HASH 1 #define MDB_PIDLOCK 0 #define pthread_t DWORD #define pthread_mutex_t HANDLE @@ -171,7 +189,7 @@ #define Z "I" #else -#define Z "z" +#define Z "z" /**< printf format modifier for size_t */ /** For MDB_LOCK_FORMAT: True if readers take a pid lock in the lockfile */ #define MDB_PIDLOCK 1 @@ -293,39 +311,42 @@ typedef MDB_ID txnid_t; * @{ */ #ifndef MDB_DEBUG - /** Enable debug output. + /** Enable debug output. Needs variable argument macros (a C99 feature). * Set this to 1 for copious tracing. Set to 2 to add dumps of all IDLs * read from and written to the database (used for free space management). */ #define MDB_DEBUG 0 #endif -#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__)) -# undef MDB_DEBUG -# define MDB_DEBUG 0 -# define DPRINTF (void) /* Vararg macros may be unsupported */ -#elif MDB_DEBUG +#if MDB_DEBUG static int mdb_debug; static txnid_t mdb_debug_start; - /** Print a debug message with printf formatting. */ -# define DPRINTF(fmt, ...) /**< Requires 2 or more args */ \ - ((void) ((mdb_debug) && \ - fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__))) + /** Print a debug message with printf formatting. + * Requires double parenthesis around 2 or more args. + */ +# define DPRINTF(args) ((void) ((mdb_debug) && DPRINTF0 args)) +# define DPRINTF0(fmt, ...) \ + fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__) #else -# define DPRINTF(fmt, ...) ((void) 0) -# define MDB_DEBUG_SKIP +# define DPRINTF(args) ((void) 0) #endif /** Print a debug string. * The string is printed literally, with no format processing. */ -#define DPUTS(arg) DPRINTF("%s", arg) +#define DPUTS(arg) DPRINTF(("%s", arg)) + /** Debuging output value of a cursor DBI: Negative in a sub-cursor. */ +#define DDBI(mc) \ + (((mc)->mc_flags & C_SUB) ? -(int)(mc)->mc_dbi : (int)(mc)->mc_dbi) /** @} */ - /** A default memory page size. - * The actual size is platform-dependent, but we use this for - * boot-strapping. We probably should not be using this any more. - * The #GET_PAGESIZE() macro is used to get the actual size. + /** @brief The maximum size of a database page. + * + * This is 32k, since it must fit in #MDB_page.#mp_upper. + * + * LMDB will use database pages < OS pages if needed. + * That causes more I/O in write transactions: The OS must + * know (read) the whole page before writing a partial page. * * Note that we don't currently support Huge pages. On Linux, * regular data files cannot use Huge pages, and in general @@ -334,7 +355,7 @@ static txnid_t mdb_debug_start; * pressure from other processes is high. So until OSs have * actual paging support for Huge pages, they're not viable. */ -#define MDB_PAGESIZE 4096 +#define MAX_PAGESIZE 0x8000 /** The minimum number of keys required in a database page. * Setting this to a larger value will place a smaller bound on the @@ -368,7 +389,7 @@ static txnid_t mdb_debug_start; * * We require that keys all fit onto a regular page. This limit * could be raised a bit further if needed; to something just - * under #MDB_PAGESIZE / #MDB_MINKEYS. + * under (page size / #MDB_MINKEYS / 3). * * Note that data items in an #MDB_DUPSORT database are actually keys * of a subDB, so they're also limited to this size. @@ -395,7 +416,7 @@ static txnid_t mdb_debug_start; */ #define DKEY(x) mdb_dkey(x, kbuf) #else -#define DKBUF typedef int dummy_kbuf /* so we can put ';' after */ +#define DKBUF #define DKEY(x) 0 #endif @@ -428,7 +449,8 @@ typedef uint16_t indx_t; * * If #MDB_NOTLS is set, the slot address is not saved in thread-specific data. * - * No reader table is used if the database is on a read-only filesystem. + * No reader table is used if the database is on a read-only filesystem, or + * if #MDB_NOLOCK is set. * * Since the database uses multi-version concurrency control, readers don't * actually need any locking. This table is used to keep track of which @@ -491,7 +513,7 @@ typedef struct MDB_rxbody { */ txnid_t mrb_txnid; /** The process ID of the process owning this reader txn. */ - pid_t mrb_pid; + MDB_PID_T mrb_pid; /** The thread ID of the thread owning this txn. */ pthread_t mrb_tid; } MDB_rxbody; @@ -603,7 +625,7 @@ typedef struct MDB_page { #define P_LEAF 0x02 /**< leaf page */ #define P_OVERFLOW 0x04 /**< overflow page */ #define P_META 0x08 /**< meta page */ -#define P_DIRTY 0x10 /**< dirty page */ +#define P_DIRTY 0x10 /**< dirty page, also set for #P_SUBP pages */ #define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */ #define P_SUBP 0x40 /**< for #MDB_DUPSORT sub-pages */ #define P_KEEP 0x8000 /**< leave this page alone during spill */ @@ -759,9 +781,12 @@ typedef struct MDB_node { */ #define LEAF2KEY(p, i, ks) ((char *)(p) + PAGEHDRSZ + ((i)*(ks))) - /** Set the \b node's key into \b key, if requested. */ -#define MDB_GET_KEY(node, key) { if ((key) != NULL) { \ - (key)->mv_size = NODEKSZ(node); (key)->mv_data = NODEKEY(node); } } + /** Set the \b node's key into \b keyptr, if requested. */ +#define MDB_GET_KEY(node, keyptr) { if ((keyptr) != NULL) { \ + (keyptr)->mv_size = NODEKSZ(node); (keyptr)->mv_data = NODEKEY(node); } } + + /** Set the \b node's key into \b key. */ +#define MDB_GET_KEY2(node, key) { key.mv_size = NODEKSZ(node); key.mv_data = NODEKEY(node); } /** Information about a single database in the environment. */ typedef struct MDB_db { @@ -786,7 +811,10 @@ typedef struct MDB_db { /** Handle for the default DB. */ #define MAIN_DBI 1 - /** Meta page content. */ + /** Meta page content. + * A meta page is the start point for accessing a database snapshot. + * Pages 0-1 are meta pages. Transaction N writes meta page #(N % 2). + */ typedef struct MDB_meta { /** Stamp identifying this as an MDB file. It must be set * to #MDB_MAGIC. */ @@ -804,19 +832,18 @@ typedef struct MDB_meta { txnid_t mm_txnid; /**< txnid that committed this page */ } MDB_meta; - /** Buffer for a stack-allocated dirty page. + /** Buffer for a stack-allocated meta page. * The members define size and alignment, and silence type * aliasing warnings. They are not used directly; that could * mean incorrectly using several union members in parallel. */ -typedef union MDB_pagebuf { - char mb_raw[MDB_PAGESIZE]; +typedef union MDB_metabuf { MDB_page mb_page; struct { char mm_pad[PAGEHDRSZ]; MDB_meta mm_meta; } mb_metabuf; -} MDB_pagebuf; +} MDB_metabuf; /** Auxiliary DB info. * The information here is mostly static/read-only. There is @@ -847,7 +874,8 @@ struct MDB_txn { */ MDB_IDL mt_free_pgs; /** The sorted list of dirty pages we temporarily wrote to disk - * because the dirty list was full. + * because the dirty list was full. page numbers in here are + * shifted left by 1, deleted slots have the LSB set. */ MDB_IDL mt_spill_pgs; union { @@ -864,9 +892,9 @@ struct MDB_txn { * @ingroup internal * @{ */ -#define DB_DIRTY 0x01 /**< DB was written in this txn */ -#define DB_STALE 0x02 /**< DB record is older than txnID */ -#define DB_NEW 0x04 /**< DB handle opened in this txn */ +#define DB_DIRTY 0x01 /**< DB was modified or is DUPSORT data */ +#define DB_STALE 0x02 /**< Named-DB record is older than txnID */ +#define DB_NEW 0x04 /**< Named-DB handle opened in this txn */ #define DB_VALID 0x08 /**< DB handle is valid, see also #MDB_VALID */ /** @} */ /** In write txns, array of cursors for each DB */ @@ -888,12 +916,12 @@ struct MDB_txn { #define MDB_TXN_SPILLS 0x08 /**< txn or a parent has spilled pages */ /** @} */ unsigned int mt_flags; /**< @ref mdb_txn */ - /** dirty_list maxsize - # of allocated pages allowed, including in parent txns */ - unsigned int mt_dirty_room; - /** Tracks which of the two meta pages was used at the start - * of this transaction. + /** dirty_list room: Array size - #dirty pages visible to this txn. + * Includes ancestor txns' dirty pages not hidden by other txns' + * dirty/spilled pages. Thus commit(nested txn) has room to merge + * dirty_list into mt_parent after freeing hidden mt_parent pages. */ - unsigned int mt_toggle; + unsigned int mt_dirty_room; }; /** Enough space for 2^32 nodes with minimum of 2 keys per node. I.e., plenty. @@ -904,7 +932,14 @@ struct MDB_txn { struct MDB_xcursor; - /** Cursors are used for all DB operations */ + /** Cursors are used for all DB operations. + * A cursor holds a path of (page pointer, key index) from the DB + * root to a position in the DB, plus other state. #MDB_DUPSORT + * cursors include an xcursor to the current data item. Write txns + * track their cursors and keep them up to date when data moves. + * Exception: An xcursor's pointer to a #P_SUBP page can be stale. + * (A node with #F_DUPDATA but no #F_SUBDATA contains a subpage). + */ struct MDB_cursor { /** Next cursor on this DB in this txn */ MDB_cursor *mc_next; @@ -932,6 +967,7 @@ struct MDB_cursor { #define C_INITIALIZED 0x01 /**< cursor has been initialized and is valid */ #define C_EOF 0x02 /**< No more data */ #define C_SUB 0x04 /**< Cursor is a sub-cursor */ +#define C_DEL 0x08 /**< last op was a cursor_del */ #define C_SPLITTING 0x20 /**< Cursor is in page_split */ #define C_UNTRACK 0x40 /**< Un-track cursor when closing */ /** @} */ @@ -976,16 +1012,18 @@ struct MDB_env { /** Have liveness lock in reader table */ #define MDB_LIVE_READER 0x08000000U uint32_t me_flags; /**< @ref mdb_env */ - unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */ + unsigned int me_psize; /**< DB page size, inited from me_os_psize */ + unsigned int me_os_psize; /**< OS page size, from #GET_PAGESIZE */ unsigned int me_maxreaders; /**< size of the reader table */ unsigned int me_numreaders; /**< max numreaders set by this env */ MDB_dbi me_numdbs; /**< number of DBs opened */ MDB_dbi me_maxdbs; /**< size of the DB table */ - pid_t me_pid; /**< process ID of this env */ + MDB_PID_T me_pid; /**< process ID of this env */ char *me_path; /**< path to the DB files */ char *me_map; /**< the memory map of the data file */ MDB_txninfo *me_txns; /**< the memory map of the lock file or NULL */ MDB_meta *me_metas[2]; /**< pointers to the two meta pages */ + void *me_pbuf; /**< scratch area for DUPSORT put() */ MDB_txn *me_txn; /**< current write transaction */ size_t me_mapsize; /**< size of the data memory map */ off_t me_size; /**< current file size */ @@ -1017,8 +1055,8 @@ struct MDB_env { /** Nested transaction */ typedef struct MDB_ntxn { - MDB_txn mnt_txn; /* the transaction */ - MDB_pgstate mnt_pgstate; /* parent transaction's saved freestate */ + MDB_txn mnt_txn; /**< the transaction */ + MDB_pgstate mnt_pgstate; /**< parent transaction's saved freestate */ } MDB_ntxn; /** max number of pages to commit in one writev() call */ @@ -1040,6 +1078,8 @@ static int mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify); #define MDB_PS_MODIFY 1 #define MDB_PS_ROOTONLY 2 +#define MDB_PS_FIRST 4 +#define MDB_PS_LAST 8 static int mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags); static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst); @@ -1125,7 +1165,7 @@ static char *const mdb_errstr[] = { "MDB_CURSOR_FULL: Internal error - cursor stack limit reached", "MDB_PAGE_FULL: Internal error - page has no more space", "MDB_MAP_RESIZED: Database contents grew beyond environment mapsize", - "MDB_INCOMPATIBLE: Database flags changed or would change", + "MDB_INCOMPATIBLE: Operation and DB incompatible, or DB flags changed", "MDB_BAD_RSLOT: Invalid reuse of reader locktable slot", "MDB_BAD_TXN: Transaction cannot recover - it must be aborted", "MDB_BAD_VALSIZE: Too big key/data, key is empty, or wrong DUPFIXED size", @@ -1182,7 +1222,7 @@ void mdb_page_list(MDB_page *mp) { MDB_node *node; - unsigned int i, nkeys, nsize; + unsigned int i, nkeys, nsize, total = 0; MDB_val key; DKBUF; @@ -1192,18 +1232,23 @@ mdb_page_list(MDB_page *mp) node = NODEPTR(mp, i); key.mv_size = node->mn_ksize; key.mv_data = node->mn_data; - nsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t); + nsize = NODESIZE + key.mv_size; if (IS_BRANCH(mp)) { fprintf(stderr, "key %d: page %"Z"u, %s\n", i, NODEPGNO(node), DKEY(&key)); + total += nsize; } else { if (F_ISSET(node->mn_flags, F_BIGDATA)) nsize += sizeof(pgno_t); else nsize += NODEDSZ(node); + total += nsize; + nsize += sizeof(indx_t); fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key)); } + total += (total & 1); } + fprintf(stderr, "Total: %d\n", total); } void @@ -1225,7 +1270,7 @@ mdb_cursor_chk(MDB_cursor *mc) } #endif -#if MDB_DEBUG > 2 +#if (MDB_DEBUG) > 2 /** Count all the pages in each DB and in the freelist * and make sure it matches the actual number of pages * being used. @@ -1253,7 +1298,7 @@ static void mdb_audit(MDB_txn *txn) txn->mt_dbs[i].md_leaf_pages + txn->mt_dbs[i].md_overflow_pages; if (txn->mt_dbs[i].md_flags & MDB_DUPSORT) { - mdb_page_search(&mc, NULL, 0); + mdb_page_search(&mc, NULL, MDB_PS_FIRST); do { unsigned j; MDB_page *mp; @@ -1298,7 +1343,12 @@ mdb_page_malloc(MDB_txn *txn, unsigned num) { MDB_env *env = txn->mt_env; MDB_page *ret = env->me_dpages; - size_t sz = env->me_psize; + size_t psize = env->me_psize, sz = psize, off; + /* For ! #MDB_NOMEMINIT, psize counts how much to init. + * For a single page alloc, we init everything after the page header. + * For multi-page, we init the final page; if the caller needed that + * many pages they will be filling in at least up to the last page. + */ if (num == 1) { if (ret) { VGMEMP_ALLOC(env, ret, sz); @@ -1306,10 +1356,16 @@ mdb_page_malloc(MDB_txn *txn, unsigned num) env->me_dpages = ret->mp_next; return ret; } + psize -= off = PAGEHDRSZ; } else { sz *= num; + off = sz - psize; } if ((ret = malloc(sz)) != NULL) { + if (!(env->me_flags & MDB_NOMEMINIT)) { + memset((char *)ret + off, 0, psize); + ret->mp_pad = 0; + } VGMEMP_ALLOC(env, ret, sz); } return ret; @@ -1327,7 +1383,7 @@ mdb_page_free(MDB_env *env, MDB_page *mp) env->me_dpages = mp; } -/* Free a dirty page */ +/** Free a dirty page */ static void mdb_dpage_free(MDB_env *env, MDB_page *dp) { @@ -1354,52 +1410,83 @@ mdb_dlist_free(MDB_txn *txn) dl[0].mid = 0; } -/* Set or clear P_KEEP in non-overflow, non-sub pages in this txn's cursors. +/** Set or clear P_KEEP in dirty, non-overflow, non-sub pages watched by txn. * @param[in] mc A cursor handle for the current operation. * @param[in] pflags Flags of the pages to update: * P_DIRTY to set P_KEEP, P_DIRTY|P_KEEP to clear it. + * @param[in] all No shortcuts. Needed except after a full #mdb_page_flush(). + * @return 0 on success, non-zero on failure. */ -static void -mdb_cursorpages_mark(MDB_cursor *mc, unsigned pflags) +static int +mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all) { + enum { Mask = P_SUBP|P_DIRTY|P_KEEP }; MDB_txn *txn = mc->mc_txn; MDB_cursor *m3; MDB_xcursor *mx; + MDB_page *dp, *mp; + MDB_node *leaf; unsigned i, j; + int rc = MDB_SUCCESS, level; + /* Mark pages seen by cursors */ if (mc->mc_flags & C_UNTRACK) mc = NULL; /* will find mc in mt_cursors */ for (i = txn->mt_numdbs;; mc = txn->mt_cursors[--i]) { for (; mc; mc=mc->mc_next) { - for (m3 = mc; m3->mc_flags & C_INITIALIZED; m3 = &mx->mx_cursor) { - for (j=0; jmc_snum; j++) - if ((m3->mc_pg[j]->mp_flags & (P_SUBP|P_DIRTY|P_KEEP)) - == pflags) - m3->mc_pg[j]->mp_flags ^= P_KEEP; - mx = m3->mc_xcursor; - if (mx == NULL) - break; + if (!(mc->mc_flags & C_INITIALIZED)) + continue; + for (m3 = mc;; m3 = &mx->mx_cursor) { + mp = NULL; + for (j=0; jmc_snum; j++) { + mp = m3->mc_pg[j]; + if ((mp->mp_flags & Mask) == pflags) + mp->mp_flags ^= P_KEEP; + } + mx = m3->mc_xcursor; + /* Proceed to mx if it is at a sub-database */ + if (! (mx && (mx->mx_cursor.mc_flags & C_INITIALIZED))) + break; + if (! (mp && (mp->mp_flags & P_LEAF))) + break; + leaf = NODEPTR(mp, m3->mc_ki[j-1]); + if (!(leaf->mn_flags & F_SUBDATA)) + break; } } if (i == 0) break; } + + if (all) { + /* Mark dirty root pages */ + for (i=0; imt_numdbs; i++) { + if (txn->mt_dbflags[i] & DB_DIRTY) { + pgno_t pgno = txn->mt_dbs[i].md_root; + if (pgno == P_INVALID) + continue; + if ((rc = mdb_page_get(txn, pgno, &dp, &level)) != MDB_SUCCESS) + break; + if ((dp->mp_flags & Mask) == pflags && level <= 1) + dp->mp_flags ^= P_KEEP; + } + } + } + + return rc; } -static int mdb_page_flush(MDB_txn *txn); +static int mdb_page_flush(MDB_txn *txn, int keep); /** Spill pages from the dirty list back to disk. * This is intended to prevent running into #MDB_TXN_FULL situations, * but note that they may still occur in a few cases: - * 1) pages in #MDB_DUPSORT sub-DBs are never spilled, so if there - * are too many of these dirtied in one txn, the txn may still get - * too full. + * 1) our estimate of the txn size could be too small. Currently this + * seems unlikely, except with a large number of #MDB_MULTIPLE items. * 2) child txns may run out of space if their parents dirtied a * lot of pages and never spilled them. TODO: we probably should do * a preemptive spill during #mdb_txn_begin() of a child txn, if * the parent's dirty_room is below a given threshold. - * 3) our estimate of the txn size could be too small. At the - * moment this seems unlikely. * * Otherwise, if not using nested txns, it is expected that apps will * not run into #MDB_TXN_FULL any more. The pages are flushed to disk @@ -1429,8 +1516,8 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) MDB_txn *txn = m0->mc_txn; MDB_page *dp; MDB_ID2L dl = txn->mt_u.dirty_list; - unsigned int i, j; - int rc, level; + unsigned int i, j, need; + int rc; if (m0->mc_flags & C_SUB) return MDB_SUCCESS; @@ -1444,6 +1531,7 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) if (key) i += (LEAFSIZE(key, data) + txn->mt_env->me_psize) / txn->mt_env->me_psize; i += i; /* double it for good measure */ + need = i; if (txn->mt_dirty_room > i) return MDB_SUCCESS; @@ -1452,26 +1540,36 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) txn->mt_spill_pgs = mdb_midl_alloc(MDB_IDL_UM_MAX); if (!txn->mt_spill_pgs) return ENOMEM; - } - - /* Mark all the dirty root pages we want to preserve */ - for (i=0; imt_numdbs; i++) { - if (txn->mt_dbflags[i] & DB_DIRTY) { - pgno_t pgno = txn->mt_dbs[i].md_root; - if (pgno == P_INVALID) - continue; - if ((rc = mdb_page_get(txn, pgno, &dp, &level)) != MDB_SUCCESS) - goto done; - if ((dp->mp_flags & P_DIRTY) && level <= 1) - dp->mp_flags |= P_KEEP; + } else { + /* purge deleted slots */ + MDB_IDL sl = txn->mt_spill_pgs; + unsigned int num = sl[0]; + j=0; + for (i=1; i<=num; i++) { + if (!(sl[i] & 1)) + sl[++j] = sl[i]; } + sl[0] = j; } - /* Preserve pages used by cursors */ - mdb_cursorpages_mark(m0, P_DIRTY); + /* Preserve pages which may soon be dirtied again */ + if ((rc = mdb_pages_xkeep(m0, P_DIRTY, 1)) != MDB_SUCCESS) + goto done; + + /* Less aggressive spill - we originally spilled the entire dirty list, + * with a few exceptions for cursor pages and DB root pages. But this + * turns out to be a lot of wasted effort because in a large txn many + * of those pages will need to be used again. So now we spill only 1/8th + * of the dirty pages. Testing revealed this to be a good tradeoff, + * better than 1/2, 1/4, or 1/10. + */ + if (need < MDB_IDL_UM_MAX / 8) + need = MDB_IDL_UM_MAX / 8; /* Save the page IDs of all the pages we're flushing */ - for (i=1; i<=dl[0].mid; i++) { + /* flush from the tail forward, this saves a lot of shifting later on. */ + for (i=dl[0].mid; i && need; i--) { + MDB_ID pn = dl[i].mid << 1; dp = dl[i].mptr; if (dp->mp_flags & P_KEEP) continue; @@ -1482,8 +1580,8 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) MDB_txn *tx2; for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { if (tx2->mt_spill_pgs) { - j = mdb_midl_search(tx2->mt_spill_pgs, dl[i].mid); - if (j <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[j] == dl[i].mid) { + j = mdb_midl_search(tx2->mt_spill_pgs, pn); + if (j <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[j] == pn) { dp->mp_flags |= P_KEEP; break; } @@ -1492,41 +1590,21 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) if (tx2) continue; } - if ((rc = mdb_midl_append(&txn->mt_spill_pgs, dl[i].mid))) + if ((rc = mdb_midl_append(&txn->mt_spill_pgs, pn))) goto done; + need--; } mdb_midl_sort(txn->mt_spill_pgs); - rc = mdb_page_flush(txn); + /* Flush the spilled part of dirty list */ + if ((rc = mdb_page_flush(txn, i)) != MDB_SUCCESS) + goto done; - mdb_cursorpages_mark(m0, P_DIRTY|P_KEEP); + /* Reset any dirty pages we kept that page_flush didn't see */ + rc = mdb_pages_xkeep(m0, P_DIRTY|P_KEEP, i); done: - if (rc == 0) { - if (txn->mt_parent) { - MDB_txn *tx2; - pgno_t pgno = dl[i].mid; - txn->mt_dirty_room = txn->mt_parent->mt_dirty_room - dl[0].mid; - /* dirty pages that are dirty in an ancestor don't - * count against this txn's dirty_room. - */ - for (i=1; i<=dl[0].mid; i++) { - for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { - j = mdb_mid2l_search(tx2->mt_u.dirty_list, pgno); - if (j <= tx2->mt_u.dirty_list[0].mid && - tx2->mt_u.dirty_list[j].mid == pgno) { - txn->mt_dirty_room++; - break; - } - } - } - } else { - txn->mt_dirty_room = MDB_IDL_UM_MAX - dl[0].mid; - } - txn->mt_flags |= MDB_TXN_SPILLS; - } else { - txn->mt_flags |= MDB_TXN_ERROR; - } + txn->mt_flags |= rc ? MDB_TXN_ERROR : MDB_TXN_SPILLS; return rc; } @@ -1536,12 +1614,14 @@ mdb_find_oldest(MDB_txn *txn) { int i; txnid_t mr, oldest = txn->mt_txnid - 1; - MDB_reader *r = txn->mt_env->me_txns->mti_readers; - for (i = txn->mt_env->me_txns->mti_numreaders; --i >= 0; ) { - if (r[i].mr_pid) { - mr = r[i].mr_txnid; - if (oldest > mr) - oldest = mr; + if (txn->mt_env->me_txns) { + MDB_reader *r = txn->mt_env->me_txns->mti_readers; + for (i = txn->mt_env->me_txns->mti_numreaders; --i >= 0; ) { + if (r[i].mr_pid) { + mr = r[i].mr_txnid; + if (oldest > mr) + oldest = mr; + } } } return oldest; @@ -1674,11 +1754,11 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp) mop = env->me_pghead; } env->me_pglast = last; -#if MDB_DEBUG > 1 - DPRINTF("IDL read txn %"Z"u root %"Z"u num %u", - last, txn->mt_dbs[FREE_DBI].md_root, i); +#if (MDB_DEBUG) > 1 + DPRINTF(("IDL read txn %"Z"u root %"Z"u num %u", + last, txn->mt_dbs[FREE_DBI].md_root, i)); for (k = i; k; k--) - DPRINTF("IDL %"Z"u", idl[k]); + DPRINTF(("IDL %"Z"u", idl[k])); #endif /* Merge in descending sorted order */ j = mop_len; @@ -1751,26 +1831,28 @@ mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize) /** Pull a page off the txn's spill list, if present. * If a page being referenced was spilled to disk in this txn, bring * it back and make it dirty/writable again. - * @param[in] tx0 the transaction handle. + * @param[in] txn the transaction handle. * @param[in] mp the page being referenced. * @param[out] ret the writable page, if any. ret is unchanged if * mp wasn't spilled. */ static int -mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret) +mdb_page_unspill(MDB_txn *txn, MDB_page *mp, MDB_page **ret) { - MDB_env *env = tx0->mt_env; - MDB_txn *txn; + MDB_env *env = txn->mt_env; + const MDB_txn *tx2; unsigned x; - pgno_t pgno = mp->mp_pgno; + pgno_t pgno = mp->mp_pgno, pn = pgno << 1; - for (txn = tx0; txn; txn=txn->mt_parent) { - if (!txn->mt_spill_pgs) + for (tx2 = txn; tx2; tx2=tx2->mt_parent) { + if (!tx2->mt_spill_pgs) continue; - x = mdb_midl_search(txn->mt_spill_pgs, pgno); - if (x <= txn->mt_spill_pgs[0] && txn->mt_spill_pgs[x] == pgno) { + x = mdb_midl_search(tx2->mt_spill_pgs, pn); + if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) { MDB_page *np; int num; + if (txn->mt_dirty_room == 0) + return MDB_TXN_FULL; if (IS_OVERFLOW(mp)) num = mp->mp_pages; else @@ -1786,31 +1868,20 @@ mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret) else mdb_page_copy(np, mp, env->me_psize); } - if (txn == tx0) { - /* If in current txn, this page is no longer spilled */ - for (; x < txn->mt_spill_pgs[0]; x++) - txn->mt_spill_pgs[x] = txn->mt_spill_pgs[x+1]; - txn->mt_spill_pgs[0]--; + if (tx2 == txn) { + /* If in current txn, this page is no longer spilled. + * If it happens to be the last page, truncate the spill list. + * Otherwise mark it as deleted by setting the LSB. + */ + if (x == txn->mt_spill_pgs[0]) + txn->mt_spill_pgs[0]--; + else + txn->mt_spill_pgs[x] |= 1; } /* otherwise, if belonging to a parent txn, the * page remains spilled until child commits */ - if (txn->mt_parent) { - MDB_txn *tx2; - /* If this page is also in a parent's dirty list, then - * it's already accounted in dirty_room, and we need to - * cancel out the decrement that mdb_page_dirty does. - */ - for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { - x = mdb_mid2l_search(tx2->mt_u.dirty_list, pgno); - if (x <= tx2->mt_u.dirty_list[0].mid && - tx2->mt_u.dirty_list[x].mid == pgno) { - txn->mt_dirty_room++; - break; - } - } - } - mdb_page_dirty(tx0, np); + mdb_page_dirty(txn, np); np->mp_flags |= P_DIRTY; *ret = np; break; @@ -1829,7 +1900,6 @@ mdb_page_touch(MDB_cursor *mc) MDB_page *mp = mc->mc_pg[mc->mc_top], *np; MDB_txn *txn = mc->mc_txn; MDB_cursor *m2, *m3; - MDB_dbi dbi; pgno_t pgno; int rc; @@ -1846,7 +1916,8 @@ mdb_page_touch(MDB_cursor *mc) (rc = mdb_page_alloc(mc, 1, &np))) return rc; pgno = np->mp_pgno; - DPRINTF("touched db %u page %"Z"u -> %"Z"u", mc->mc_dbi,mp->mp_pgno,pgno); + DPRINTF(("touched db %d page %"Z"u -> %"Z"u", DDBI(mc), + mp->mp_pgno, pgno)); assert(mp->mp_pgno != pgno); mdb_midl_xappend(txn->mt_free_pgs, mp->mp_pgno); /* Update the parent page, if any, to point to the new page */ @@ -1892,17 +1963,16 @@ mdb_page_touch(MDB_cursor *mc) done: /* Adjust cursors pointing to mp */ mc->mc_pg[mc->mc_top] = np; - dbi = mc->mc_dbi; + m2 = txn->mt_cursors[mc->mc_dbi]; if (mc->mc_flags & C_SUB) { - dbi--; - for (m2 = txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { + for (; m2; m2=m2->mc_next) { m3 = &m2->mc_xcursor->mx_cursor; if (m3->mc_snum < mc->mc_snum) continue; if (m3->mc_pg[mc->mc_top] == mp) m3->mc_pg[mc->mc_top] = np; } } else { - for (m2 = txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { + for (; m2; m2=m2->mc_next) { if (m2->mc_snum < mc->mc_snum) continue; if (m2->mc_pg[mc->mc_top] == mp) { m2->mc_pg[mc->mc_top] = np; @@ -2019,7 +2089,7 @@ mdb_cursors_close(MDB_txn *txn, unsigned merge) } } -#ifdef MDB_DEBUG_SKIP +#if !(MDB_DEBUG) #define mdb_txn_reset0(txn, act) mdb_txn_reset0(txn) #endif static void @@ -2044,7 +2114,7 @@ enum Pidlock_op { * lock on the lockfile, set at an offset equal to the pid. */ static int -mdb_reader_pid(MDB_env *env, enum Pidlock_op op, pid_t pid) +mdb_reader_pid(MDB_env *env, enum Pidlock_op op, MDB_PID_T pid) { #if !(MDB_PIDLOCK) /* Currently the same as defined(_WIN32) */ int ret = 0; @@ -2087,7 +2157,9 @@ static int mdb_txn_renew0(MDB_txn *txn) { MDB_env *env = txn->mt_env; - unsigned int i; + MDB_txninfo *ti = env->me_txns; + MDB_meta *meta; + unsigned int i, nr; uint16_t x; int rc, new_notls = 0; @@ -2096,9 +2168,9 @@ mdb_txn_renew0(MDB_txn *txn) txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */ if (txn->mt_flags & MDB_TXN_RDONLY) { - if (!env->me_txns) { - i = mdb_env_pick_meta(env); - txn->mt_txnid = env->me_metas[i]->mm_txnid; + if (!ti) { + meta = env->me_metas[ mdb_env_pick_meta(env) ]; + txn->mt_txnid = meta->mm_txnid; txn->mt_u.reader = NULL; } else { MDB_reader *r = (env->me_flags & MDB_NOTLS) ? txn->mt_u.reader : @@ -2107,7 +2179,7 @@ mdb_txn_renew0(MDB_txn *txn) if (r->mr_pid != env->me_pid || r->mr_txnid != (txnid_t)-1) return MDB_BAD_RSLOT; } else { - pid_t pid = env->me_pid; + MDB_PID_T pid = env->me_pid; pthread_t tid = pthread_self(); if (!(env->me_flags & MDB_LIVE_READER)) { @@ -2120,36 +2192,43 @@ mdb_txn_renew0(MDB_txn *txn) } LOCK_MUTEX_R(env); - for (i=0; ime_txns->mti_numreaders; i++) - if (env->me_txns->mti_readers[i].mr_pid == 0) + nr = ti->mti_numreaders; + for (i=0; imti_readers[i].mr_pid == 0) break; if (i == env->me_maxreaders) { UNLOCK_MUTEX_R(env); return MDB_READERS_FULL; } - env->me_txns->mti_readers[i].mr_pid = pid; - env->me_txns->mti_readers[i].mr_tid = tid; - if (i >= env->me_txns->mti_numreaders) - env->me_txns->mti_numreaders = i+1; + ti->mti_readers[i].mr_pid = pid; + ti->mti_readers[i].mr_tid = tid; + if (i == nr) + ti->mti_numreaders = ++nr; /* Save numreaders for un-mutexed mdb_env_close() */ - env->me_numreaders = env->me_txns->mti_numreaders; + env->me_numreaders = nr; UNLOCK_MUTEX_R(env); - r = &env->me_txns->mti_readers[i]; + + r = &ti->mti_readers[i]; new_notls = (env->me_flags & MDB_NOTLS); if (!new_notls && (rc=pthread_setspecific(env->me_txkey, r))) { r->mr_pid = 0; return rc; } } - txn->mt_txnid = r->mr_txnid = env->me_txns->mti_txnid; + txn->mt_txnid = r->mr_txnid = ti->mti_txnid; txn->mt_u.reader = r; + meta = env->me_metas[txn->mt_txnid & 1]; } - txn->mt_toggle = txn->mt_txnid & 1; } else { - LOCK_MUTEX_W(env); + if (ti) { + LOCK_MUTEX_W(env); - txn->mt_txnid = env->me_txns->mti_txnid; - txn->mt_toggle = txn->mt_txnid & 1; + txn->mt_txnid = ti->mti_txnid; + meta = env->me_metas[txn->mt_txnid & 1]; + } else { + meta = env->me_metas[ mdb_env_pick_meta(env) ]; + txn->mt_txnid = meta->mm_txnid; + } txn->mt_txnid++; #if MDB_DEBUG if (txn->mt_txnid == mdb_debug_start) @@ -2165,10 +2244,10 @@ mdb_txn_renew0(MDB_txn *txn) } /* Copy the DB info and flags */ - memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db)); + memcpy(txn->mt_dbs, meta->mm_dbs, 2 * sizeof(MDB_db)); /* Moved to here to avoid a data race in read TXNs */ - txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1; + txn->mt_next_pgno = meta->mm_last_pg+1; for (i=2; imt_numdbs; i++) { x = env->me_dbflags[i]; @@ -2204,9 +2283,9 @@ mdb_txn_renew(MDB_txn *txn) rc = mdb_txn_renew0(txn); if (rc == MDB_SUCCESS) { - DPRINTF("renew txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", + DPRINTF(("renew txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', - (void *)txn, (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root); + (void *)txn, (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root)); } return rc; } @@ -2227,10 +2306,11 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) if (parent) { /* Nested transactions: Max 1 child, write txns only, no writemap */ if (parent->mt_child || - (flags & MDB_RDONLY) || (parent->mt_flags & MDB_TXN_RDONLY) || + (flags & MDB_RDONLY) || + (parent->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) || (env->me_flags & MDB_WRITEMAP)) { - return EINVAL; + return (parent->mt_flags & MDB_TXN_RDONLY) ? EINVAL : MDB_BAD_TXN; } tsize = sizeof(MDB_ntxn); } @@ -2239,7 +2319,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) size += env->me_maxdbs * sizeof(MDB_cursor *); if ((txn = calloc(1, size)) == NULL) { - DPRINTF("calloc: %s", strerror(ErrCode())); + DPRINTF(("calloc: %s", strerror(ErrCode()))); return ENOMEM; } txn->mt_dbs = (MDB_db *) ((char *)txn + tsize); @@ -2263,7 +2343,6 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) return ENOMEM; } txn->mt_txnid = parent->mt_txnid; - txn->mt_toggle = parent->mt_toggle; txn->mt_dirty_room = parent->mt_dirty_room; txn->mt_u.dirty_list[0].mid = 0; txn->mt_spill_pgs = NULL; @@ -2299,9 +2378,9 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) free(txn); else { *ret = txn; - DPRINTF("begin txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", + DPRINTF(("begin txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', - (void *) txn, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); + (void *) txn, (void *) env, txn->mt_dbs[MAIN_DBI].md_root)); } return rc; @@ -2353,9 +2432,9 @@ mdb_txn_reset0(MDB_txn *txn, const char *act) /* Close any DBI handles opened in this txn */ mdb_dbis_update(txn, 0); - DPRINTF("%s txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", + DPRINTF(("%s txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", act, txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', - (void *) txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root); + (void *) txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root)); if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { if (txn->mt_u.reader) { @@ -2389,7 +2468,8 @@ mdb_txn_reset0(MDB_txn *txn, const char *act) env->me_txn = NULL; /* The writer mutex was locked in mdb_txn_begin. */ - UNLOCK_MUTEX_W(env); + if (env->me_txns) + UNLOCK_MUTEX_W(env); } } @@ -2438,20 +2518,26 @@ mdb_freelist_save(MDB_txn *txn) int rc, maxfree_1pg = env->me_maxfree_1pg, more = 1; txnid_t pglast = 0, head_id = 0; pgno_t freecnt = 0, *free_pgs, *mop; - ssize_t head_room = 0, total_room = 0, mop_len; + ssize_t head_room = 0, total_room = 0, mop_len, clean_limit; mdb_cursor_init(&mc, txn, FREE_DBI, NULL); if (env->me_pghead) { /* Make sure first page of freeDB is touched and on freelist */ - rc = mdb_page_search(&mc, NULL, MDB_PS_MODIFY); + rc = mdb_page_search(&mc, NULL, MDB_PS_FIRST|MDB_PS_MODIFY); if (rc && rc != MDB_NOTFOUND) return rc; } + /* MDB_RESERVE cancels meminit in ovpage malloc (when no WRITEMAP) */ + clean_limit = (env->me_flags & (MDB_NOMEMINIT|MDB_WRITEMAP)) + ? SSIZE_MAX : maxfree_1pg; + for (;;) { /* Come back here after each Put() in case freelist changed */ MDB_val key, data; + pgno_t *pgs; + ssize_t j; /* If using records from freeDB which we have not yet * deleted, delete them and any we reserved for me_pghead. @@ -2472,9 +2558,7 @@ mdb_freelist_save(MDB_txn *txn) if (freecnt < txn->mt_free_pgs[0]) { if (!freecnt) { /* Make sure last page of freeDB is touched and on freelist */ - key.mv_size = MDB_MAXKEYSIZE+1; - key.mv_data = NULL; - rc = mdb_page_search(&mc, &key, MDB_PS_MODIFY); + rc = mdb_page_search(&mc, NULL, MDB_PS_LAST|MDB_PS_MODIFY); if (rc && rc != MDB_NOTFOUND) return rc; } @@ -2493,13 +2577,13 @@ mdb_freelist_save(MDB_txn *txn) } while (freecnt < free_pgs[0]); mdb_midl_sort(free_pgs); memcpy(data.mv_data, free_pgs, data.mv_size); -#if MDB_DEBUG > 1 +#if (MDB_DEBUG) > 1 { unsigned int i = free_pgs[0]; - DPRINTF("IDL write txn %"Z"u root %"Z"u num %u", - txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, i); + DPRINTF(("IDL write txn %"Z"u root %"Z"u num %u", + txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, i)); for (; i; i--) - DPRINTF("IDL %"Z"u", free_pgs[i]); + DPRINTF(("IDL %"Z"u", free_pgs[i])); } #endif continue; @@ -2537,11 +2621,16 @@ mdb_freelist_save(MDB_txn *txn) rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE); if (rc) return rc; - *(MDB_ID *)data.mv_data = 0; /* IDL is initially empty */ + /* IDL is initially empty, zero out at least the length */ + pgs = (pgno_t *)data.mv_data; + j = head_room > clean_limit ? head_room : 0; + do { + pgs[j] = 0; + } while (--j >= 0); total_room += head_room; } - /* Fill in the reserved, touched me_pghead records */ + /* Fill in the reserved me_pghead records */ rc = MDB_SUCCESS; if (mop_len) { MDB_val key, data; @@ -2573,10 +2662,13 @@ mdb_freelist_save(MDB_txn *txn) return rc; } -/** Flush dirty pages to the map, after clearing their dirty flag. +/** Flush (some) dirty pages to the map, after clearing their dirty flag. + * @param[in] txn the transaction that's being committed + * @param[in] keep number of initial pages in dirty_list to keep dirty. + * @return 0 on success, non-zero on failure. */ static int -mdb_page_flush(MDB_txn *txn) +mdb_page_flush(MDB_txn *txn, int keep) { MDB_env *env = txn->mt_env; MDB_ID2L dl = txn->mt_u.dirty_list; @@ -2594,10 +2686,11 @@ mdb_page_flush(MDB_txn *txn) int n = 0; #endif - j = 0; + j = i = keep; + if (env->me_flags & MDB_WRITEMAP) { /* Clear dirty flags */ - for (i=1; i<=pagecount; i++) { + while (++i <= pagecount) { dp = dl[i].mptr; /* Don't flush this page yet */ if (dp->mp_flags & P_KEEP) { @@ -2607,13 +2700,12 @@ mdb_page_flush(MDB_txn *txn) } dp->mp_flags &= ~P_DIRTY; } - dl[0].mid = j; - return MDB_SUCCESS; + goto done; } /* Write the pages */ - for (i = 1;; i++) { - if (i <= pagecount) { + for (;;) { + if (++i <= pagecount) { dp = dl[i].mptr; /* Don't flush this page yet */ if (dp->mp_flags & P_KEEP) { @@ -2638,13 +2730,13 @@ mdb_page_flush(MDB_txn *txn) * the write offset, to at least save the overhead of a Seek * system call. */ - DPRINTF("committing page %"Z"u", pgno); + DPRINTF(("committing page %"Z"u", pgno)); memset(&ov, 0, sizeof(ov)); ov.Offset = pos & 0xffffffff; ov.OffsetHigh = pos >> 16 >> 16; if (!WriteFile(env->me_fd, dp, size, NULL, &ov)) { rc = ErrCode(); - DPRINTF("WriteFile: %d", rc); + DPRINTF(("WriteFile: %d", rc)); return rc; } #else @@ -2660,7 +2752,7 @@ mdb_page_flush(MDB_txn *txn) } else { if (lseek(env->me_fd, wpos, SEEK_SET) == -1) { rc = ErrCode(); - DPRINTF("lseek: %s", strerror(rc)); + DPRINTF(("lseek: %s", strerror(rc))); return rc; } wres = writev(env->me_fd, iov, n); @@ -2669,7 +2761,7 @@ mdb_page_flush(MDB_txn *txn) if (wres != wsize) { if (wres < 0) { rc = ErrCode(); - DPRINTF("Write error: %s", strerror(rc)); + DPRINTF(("Write error: %s", strerror(rc))); } else { rc = EIO; /* TODO: Use which error code? */ DPUTS("short write, filesystem full?"); @@ -2683,7 +2775,7 @@ mdb_page_flush(MDB_txn *txn) wpos = pos; wsize = 0; } - DPRINTF("committing page %"Z"u", pgno); + DPRINTF(("committing page %"Z"u", pgno)); next_pos = pos + size; iov[n].iov_len = size; iov[n].iov_base = (char *)dp; @@ -2692,8 +2784,7 @@ mdb_page_flush(MDB_txn *txn) #endif /* _WIN32 */ } - j = 0; - for (i=1; i<=pagecount; i++) { + for (i = keep; ++i <= pagecount; ) { dp = dl[i].mptr; /* This is a page we skipped above */ if (!dl[i].mid) { @@ -2703,8 +2794,11 @@ mdb_page_flush(MDB_txn *txn) } mdb_dpage_free(env, dp); } - dl[0].mid = j; +done: + i--; + txn->mt_dirty_room += i - j; + dl[0].mid = j; return MDB_SUCCESS; } @@ -2744,14 +2838,18 @@ mdb_txn_commit(MDB_txn *txn) if (txn->mt_parent) { MDB_txn *parent = txn->mt_parent; - unsigned x, y, len; MDB_ID2L dst, src; + MDB_IDL pspill; + unsigned x, y, len, ps_len; /* Append our free list to parent's */ rc = mdb_midl_append_list(&parent->mt_free_pgs, txn->mt_free_pgs); if (rc) goto fail; mdb_midl_free(txn->mt_free_pgs); + /* Failures after this must either undo the changes + * to the parent or set MDB_TXN_ERROR in the parent. + */ parent->mt_next_pgno = txn->mt_next_pgno; parent->mt_flags = txn->mt_flags; @@ -2773,36 +2871,26 @@ mdb_txn_commit(MDB_txn *txn) dst = parent->mt_u.dirty_list; src = txn->mt_u.dirty_list; /* Remove anything in our dirty list from parent's spill list */ - if (parent->mt_spill_pgs) { - x = parent->mt_spill_pgs[0]; - len = x; - /* zero out our dirty pages in parent spill list */ - for (i=1; i<=src[0].mid; i++) { - if (src[i].mid < parent->mt_spill_pgs[x]) - continue; - if (src[i].mid > parent->mt_spill_pgs[x]) { - if (x <= 1) - break; + if ((pspill = parent->mt_spill_pgs) && (ps_len = pspill[0])) { + x = y = ps_len; + pspill[0] = (pgno_t)-1; + /* Mark our dirty pages as deleted in parent spill list */ + for (i=0, len=src[0].mid; ++i <= len; ) { + MDB_ID pn = src[i].mid << 1; + while (pn > pspill[x]) x--; - continue; + if (pn == pspill[x]) { + pspill[x] = 1; + y = --x; } - parent->mt_spill_pgs[x] = 0; - len--; - } - /* OK, we had a few hits, squash zeros from the spill list */ - if (len < parent->mt_spill_pgs[0]) { - x=1; - for (y=1; y<=parent->mt_spill_pgs[0]; y++) { - if (parent->mt_spill_pgs[y]) { - if (y != x) { - parent->mt_spill_pgs[x] = parent->mt_spill_pgs[y]; - } - x++; - } - } - parent->mt_spill_pgs[0] = len; } + /* Squash deleted pagenums if we deleted any */ + for (x=y; ++x <= ps_len; ) + if (!(pspill[x] & 1)) + pspill[++y] = pspill[x]; + pspill[0] = y; } + /* Find len = length of merging our dirty list with parent's */ x = dst[0].mid; dst[0].mid = 0; /* simplify loops */ @@ -2836,7 +2924,10 @@ mdb_txn_commit(MDB_txn *txn) parent->mt_dirty_room = txn->mt_dirty_room; if (txn->mt_spill_pgs) { if (parent->mt_spill_pgs) { - mdb_midl_append_list(&parent->mt_spill_pgs, txn->mt_spill_pgs); + /* TODO: Prevent failure here, so parent does not fail */ + rc = mdb_midl_append_list(&parent->mt_spill_pgs, txn->mt_spill_pgs); + if (rc) + parent->mt_flags |= MDB_TXN_ERROR; mdb_midl_free(txn->mt_spill_pgs); mdb_midl_sort(parent->mt_spill_pgs); } else { @@ -2847,7 +2938,7 @@ mdb_txn_commit(MDB_txn *txn) parent->mt_child = NULL; mdb_midl_free(((MDB_ntxn *)txn)->mnt_pgstate.mf_pghead); free(txn); - return MDB_SUCCESS; + return rc; } if (txn != env->me_txn) { @@ -2862,8 +2953,8 @@ mdb_txn_commit(MDB_txn *txn) !(txn->mt_flags & (MDB_TXN_DIRTY|MDB_TXN_SPILLS))) goto done; - DPRINTF("committing txn %"Z"u %p on mdbenv %p, root page %"Z"u", - txn->mt_txnid, (void *)txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root); + DPRINTF(("committing txn %"Z"u %p on mdbenv %p, root page %"Z"u", + txn->mt_txnid, (void*)txn, (void*)env, txn->mt_dbs[MAIN_DBI].md_root)); /* Update DB root pointers */ if (txn->mt_numdbs > 2) { @@ -2892,11 +2983,11 @@ mdb_txn_commit(MDB_txn *txn) if (mdb_midl_shrink(&txn->mt_free_pgs)) env->me_free_pgs = txn->mt_free_pgs; -#if MDB_DEBUG > 2 +#if (MDB_DEBUG) > 2 mdb_audit(txn); #endif - if ((rc = mdb_page_flush(txn)) || + if ((rc = mdb_page_flush(txn, 0)) || (rc = mdb_env_sync(env, 0)) || (rc = mdb_env_write_meta(txn))) goto fail; @@ -2906,7 +2997,8 @@ done: env->me_txn = NULL; mdb_dbis_update(txn, 1); - UNLOCK_MUTEX_W(env); + if (env->me_txns) + UNLOCK_MUTEX_W(env); free(txn); return MDB_SUCCESS; @@ -2925,10 +3017,11 @@ fail: static int mdb_env_read_header(MDB_env *env, MDB_meta *meta) { - MDB_pagebuf pbuf; + MDB_metabuf pbuf; MDB_page *p; MDB_meta *m; int i, rc, off; + enum { Size = sizeof(pbuf) }; /* We don't know the page size yet, so use a minimum value. * Read both meta pages so we can use the latest one. @@ -2940,24 +3033,24 @@ mdb_env_read_header(MDB_env *env, MDB_meta *meta) OVERLAPPED ov; memset(&ov, 0, sizeof(ov)); ov.Offset = off; - rc = ReadFile(env->me_fd,&pbuf,MDB_PAGESIZE,&len,&ov) ? (int)len : -1; + rc = ReadFile(env->me_fd, &pbuf, Size, &len, &ov) ? (int)len : -1; if (rc == -1 && ErrCode() == ERROR_HANDLE_EOF) rc = 0; #else - rc = pread(env->me_fd, &pbuf, MDB_PAGESIZE, off); + rc = pread(env->me_fd, &pbuf, Size, off); #endif - if (rc != MDB_PAGESIZE) { + if (rc != Size) { if (rc == 0 && off == 0) return ENOENT; rc = rc < 0 ? (int) ErrCode() : MDB_INVALID; - DPRINTF("read: %s", mdb_strerror(rc)); + DPRINTF(("read: %s", mdb_strerror(rc))); return rc; } p = (MDB_page *)&pbuf; if (!F_ISSET(p->mp_flags, P_META)) { - DPRINTF("page %"Z"u not a meta page", p->mp_pgno); + DPRINTF(("page %"Z"u not a meta page", p->mp_pgno)); return MDB_INVALID; } @@ -2968,8 +3061,8 @@ mdb_env_read_header(MDB_env *env, MDB_meta *meta) } if (m->mm_version != MDB_DATA_VERSION) { - DPRINTF("database is version %u, expected version %u", - m->mm_version, MDB_DATA_VERSION); + DPRINTF(("database is version %u, expected version %u", + m->mm_version, MDB_DATA_VERSION)); return MDB_VERSION_MISMATCH; } @@ -3006,7 +3099,7 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta) DPUTS("writing new meta page"); - GET_PAGESIZE(psize); + psize = env->me_psize; meta->mm_magic = MDB_MAGIC; meta->mm_version = MDB_DATA_VERSION; @@ -3061,9 +3154,9 @@ mdb_env_write_meta(MDB_txn *txn) assert(txn != NULL); assert(txn->mt_env != NULL); - toggle = !txn->mt_toggle; - DPRINTF("writing meta page %d for root page %"Z"u", - toggle, txn->mt_dbs[MAIN_DBI].md_root); + toggle = txn->mt_txnid & 1; + DPRINTF(("writing meta page %d for root page %"Z"u", + toggle, txn->mt_dbs[MAIN_DBI].md_root)); env = txn->mt_env; mp = env->me_metas[toggle]; @@ -3077,11 +3170,18 @@ mdb_env_write_meta(MDB_txn *txn) mp->mm_last_pg = txn->mt_next_pgno - 1; mp->mm_txnid = txn->mt_txnid; if (!(env->me_flags & (MDB_NOMETASYNC|MDB_NOSYNC))) { + unsigned meta_size = env->me_psize; rc = (env->me_flags & MDB_MAPASYNC) ? MS_ASYNC : MS_SYNC; ptr = env->me_map; - if (toggle) - ptr += env->me_psize; - if (MDB_MSYNC(ptr, env->me_psize, rc)) { + if (toggle) { +#ifndef _WIN32 /* POSIX msync() requires ptr = start of OS page */ + if (meta_size < env->me_os_psize) + meta_size += meta_size; + else +#endif + ptr += meta_size; + } + if (MDB_MSYNC(ptr, meta_size, rc)) { rc = ErrCode(); goto fail; } @@ -3139,6 +3239,7 @@ mdb_env_write_meta(MDB_txn *txn) WriteFile(env->me_fd, ptr, len, NULL, &ov); #else r2 = pwrite(env->me_fd, ptr, len, off); + (void)r2; /* Silence warnings. We don't care about pwrite's return value */ #endif fail: env->me_flags |= MDB_FATAL_ERROR; @@ -3151,7 +3252,8 @@ done: * readers will get consistent data regardless of how fresh or * how stale their view of these values is. */ - env->me_txns->mti_txnid = txn->mt_txnid; + if (env->me_txns) + env->me_txns->mti_txnid = txn->mt_txnid; return MDB_SUCCESS; } @@ -3185,16 +3287,114 @@ mdb_env_create(MDB_env **env) e->me_wmutex = SEM_FAILED; #endif e->me_pid = getpid(); + GET_PAGESIZE(e->me_os_psize); VGMEMP_CREATE(e,0,0); *env = e; return MDB_SUCCESS; } +static int +mdb_env_map(MDB_env *env, void *addr, int newsize) +{ + MDB_page *p; + unsigned int flags = env->me_flags; +#ifdef _WIN32 + int rc; + HANDLE mh; + LONG sizelo, sizehi; + sizelo = env->me_mapsize & 0xffffffff; + sizehi = env->me_mapsize >> 16 >> 16; /* only needed on Win64 */ + + /* Windows won't create mappings for zero length files. + * Just allocate the maxsize right now. + */ + if (newsize) { + if (SetFilePointer(env->me_fd, sizelo, &sizehi, 0) != (DWORD)sizelo + || !SetEndOfFile(env->me_fd) + || SetFilePointer(env->me_fd, 0, NULL, 0) != 0) + return ErrCode(); + } + mh = CreateFileMapping(env->me_fd, NULL, flags & MDB_WRITEMAP ? + PAGE_READWRITE : PAGE_READONLY, + sizehi, sizelo, NULL); + if (!mh) + return ErrCode(); + env->me_map = MapViewOfFileEx(mh, flags & MDB_WRITEMAP ? + FILE_MAP_WRITE : FILE_MAP_READ, + 0, 0, env->me_mapsize, addr); + rc = env->me_map ? 0 : ErrCode(); + CloseHandle(mh); + if (rc) + return rc; +#else + int prot = PROT_READ; + if (flags & MDB_WRITEMAP) { + prot |= PROT_WRITE; + if (ftruncate(env->me_fd, env->me_mapsize) < 0) + return ErrCode(); + } + env->me_map = mmap(addr, env->me_mapsize, prot, MAP_SHARED, + env->me_fd, 0); + if (env->me_map == MAP_FAILED) { + env->me_map = NULL; + return ErrCode(); + } + + if (flags & MDB_NORDAHEAD) { + /* Turn off readahead. It's harmful when the DB is larger than RAM. */ +#ifdef MADV_RANDOM + madvise(env->me_map, env->me_mapsize, MADV_RANDOM); +#else +#ifdef POSIX_MADV_RANDOM + posix_madvise(env->me_map, env->me_mapsize, POSIX_MADV_RANDOM); +#endif /* POSIX_MADV_RANDOM */ +#endif /* MADV_RANDOM */ + } +#endif /* _WIN32 */ + + /* Can happen because the address argument to mmap() is just a + * hint. mmap() can pick another, e.g. if the range is in use. + * The MAP_FIXED flag would prevent that, but then mmap could + * instead unmap existing pages to make room for the new map. + */ + if (addr && env->me_map != addr) + return EBUSY; /* TODO: Make a new MDB_* error code? */ + + p = (MDB_page *)env->me_map; + env->me_metas[0] = METADATA(p); + env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + env->me_psize); + + return MDB_SUCCESS; +} + int mdb_env_set_mapsize(MDB_env *env, size_t size) { - if (env->me_map) - return EINVAL; + /* If env is already open, caller is responsible for making + * sure there are no active txns. + */ + if (env->me_map) { + int rc; + void *old; + if (env->me_txn) + return EINVAL; + if (!size) + size = env->me_metas[mdb_env_pick_meta(env)]->mm_mapsize; + else if (size < env->me_mapsize) { + /* If the configured size is smaller, make sure it's + * still big enough. Silently round up to minimum if not. + */ + size_t minsize = (env->me_metas[mdb_env_pick_meta(env)]->mm_last_pg + 1) * env->me_psize; + if (size < minsize) + size = minsize; + } + munmap(env->me_map, env->me_mapsize); + env->me_mapsize = size; + old = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : NULL; + rc = mdb_env_map(env, old, 1); + if (rc) + return rc; + } env->me_mapsize = size; if (env->me_psize) env->me_maxpg = env->me_mapsize / env->me_psize; @@ -3234,12 +3434,17 @@ static int mdb_env_open2(MDB_env *env) { unsigned int flags = env->me_flags; - int i, newenv = 0; + int i, newenv = 0, rc; MDB_meta meta; - MDB_page *p; -#ifndef _WIN32 - int prot; -#endif + +#ifdef _WIN32 + /* See if we should use QueryLimited */ + rc = GetVersion(); + if ((rc & 0xff) > 5) + env->me_pidquery = MDB_PROCESS_QUERY_LIMITED_INFORMATION; + else + env->me_pidquery = PROCESS_QUERY_INFORMATION; +#endif /* _WIN32 */ memset(&meta, 0, sizeof(meta)); @@ -3248,6 +3453,11 @@ mdb_env_open2(MDB_env *env) return i; DPUTS("new mdbenv"); newenv = 1; + env->me_psize = env->me_os_psize; + if (env->me_psize > MAX_PAGESIZE) + env->me_psize = MAX_PAGESIZE; + } else { + env->me_psize = meta.mm_psize; } /* Was a mapsize configured? */ @@ -3265,66 +3475,9 @@ mdb_env_open2(MDB_env *env) env->me_mapsize = minsize; } -#ifdef _WIN32 - { - int rc; - HANDLE mh; - LONG sizelo, sizehi; - sizelo = env->me_mapsize & 0xffffffff; - sizehi = env->me_mapsize >> 16 >> 16; /* only needed on Win64 */ - - /* See if we should use QueryLimited */ - rc = GetVersion(); - if ((rc & 0xff) > 5) - env->me_pidquery = MDB_PROCESS_QUERY_LIMITED_INFORMATION; - else - env->me_pidquery = PROCESS_QUERY_INFORMATION; - - /* Windows won't create mappings for zero length files. - * Just allocate the maxsize right now. - */ - if (newenv) { - if (SetFilePointer(env->me_fd, sizelo, &sizehi, 0) != (DWORD)sizelo - || !SetEndOfFile(env->me_fd) - || SetFilePointer(env->me_fd, 0, NULL, 0) != 0) - return ErrCode(); - } - mh = CreateFileMapping(env->me_fd, NULL, flags & MDB_WRITEMAP ? - PAGE_READWRITE : PAGE_READONLY, - sizehi, sizelo, NULL); - if (!mh) - return ErrCode(); - env->me_map = MapViewOfFileEx(mh, flags & MDB_WRITEMAP ? - FILE_MAP_WRITE : FILE_MAP_READ, - 0, 0, env->me_mapsize, meta.mm_address); - rc = env->me_map ? 0 : ErrCode(); - CloseHandle(mh); - if (rc) - return rc; - } -#else - i = MAP_SHARED; - prot = PROT_READ; - if (flags & MDB_WRITEMAP) { - prot |= PROT_WRITE; - if (ftruncate(env->me_fd, env->me_mapsize) < 0) - return ErrCode(); - } - env->me_map = mmap(meta.mm_address, env->me_mapsize, prot, i, - env->me_fd, 0); - if (env->me_map == MAP_FAILED) { - env->me_map = NULL; - return ErrCode(); - } - /* Turn off readahead. It's harmful when the DB is larger than RAM. */ -#ifdef MADV_RANDOM - madvise(env->me_map, env->me_mapsize, MADV_RANDOM); -#else -#ifdef POSIX_MADV_RANDOM - posix_madvise(env->me_map, env->me_mapsize, POSIX_MADV_RANDOM); -#endif /* POSIX_MADV_RANDOM */ -#endif /* MADV_RANDOM */ -#endif /* _WIN32 */ + rc = mdb_env_map(env, meta.mm_address, newenv); + if (rc) + return rc; if (newenv) { if (flags & MDB_FIXEDMAP) @@ -3333,38 +3486,25 @@ mdb_env_open2(MDB_env *env) if (i != MDB_SUCCESS) { return i; } - } else if (meta.mm_address && env->me_map != meta.mm_address) { - /* Can happen because the address argument to mmap() is just a - * hint. mmap() can pick another, e.g. if the range is in use. - * The MAP_FIXED flag would prevent that, but then mmap could - * instead unmap existing pages to make room for the new map. - */ - return EBUSY; /* TODO: Make a new MDB_* error code? */ } - env->me_psize = meta.mm_psize; env->me_maxfree_1pg = (env->me_psize - PAGEHDRSZ) / sizeof(pgno_t) - 1; env->me_nodemax = (env->me_psize - PAGEHDRSZ) / MDB_MINKEYS; env->me_maxpg = env->me_mapsize / env->me_psize; - - p = (MDB_page *)env->me_map; - env->me_metas[0] = METADATA(p); - env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize); - #if MDB_DEBUG { int toggle = mdb_env_pick_meta(env); MDB_db *db = &env->me_metas[toggle]->mm_dbs[MAIN_DBI]; - DPRINTF("opened database version %u, pagesize %u", - env->me_metas[0]->mm_version, env->me_psize); - DPRINTF("using meta page %d", toggle); - DPRINTF("depth: %u", db->md_depth); - DPRINTF("entries: %"Z"u", db->md_entries); - DPRINTF("branch pages: %"Z"u", db->md_branch_pages); - DPRINTF("leaf pages: %"Z"u", db->md_leaf_pages); - DPRINTF("overflow pages: %"Z"u", db->md_overflow_pages); - DPRINTF("root: %"Z"u", db->md_root); + DPRINTF(("opened database version %u, pagesize %u", + env->me_metas[0]->mm_version, env->me_psize)); + DPRINTF(("using meta page %d", toggle)); + DPRINTF(("depth: %u", db->md_depth)); + DPRINTF(("entries: %"Z"u", db->md_entries)); + DPRINTF(("branch pages: %"Z"u", db->md_branch_pages)); + DPRINTF(("leaf pages: %"Z"u", db->md_leaf_pages)); + DPRINTF(("overflow pages: %"Z"u", db->md_overflow_pages)); + DPRINTF(("root: %"Z"u", db->md_root)); } #endif @@ -3426,7 +3566,7 @@ PIMAGE_TLS_CALLBACK mdb_tls_cbp __attribute__((section (".CRT$XLB"))) = mdb_tls_ #pragma comment(linker, "/INCLUDE:_tls_used") #pragma comment(linker, "/INCLUDE:mdb_tls_cbp") #pragma const_seg(".CRT$XLB") -extern const PIMAGE_TLS_CALLBACK mdb_tls_callback; +extern const PIMAGE_TLS_CALLBACK mdb_tls_cbp; const PIMAGE_TLS_CALLBACK mdb_tls_cbp = mdb_tls_callback; #pragma const_seg() #else /* WIN32 */ @@ -3524,7 +3664,7 @@ mdb_env_excl_lock(MDB_env *env, int *excl) return rc; } -#if defined(_WIN32) || defined(MDB_USE_POSIX_SEM) +#ifdef MDB_USE_HASH /* * hash_64 - 64 bit Fowler/Noll/Vo-0 FNV-1a hash code * @@ -3609,10 +3749,9 @@ static void mdb_hash_enc(MDB_val *val, char *encbuf) { mdb_hash_t h = mdb_hash_val(val, MDB_HASH_INIT); - unsigned long *l = (unsigned long *)&h; - mdb_pack85(l[0], encbuf); - mdb_pack85(l[1], encbuf+5); + mdb_pack85(h, encbuf); + mdb_pack85(h>>32, encbuf+5); encbuf[10] = '\0'; } #endif @@ -3691,7 +3830,7 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo); if (size < rsize && *excl > 0) { #ifdef _WIN32 - if (SetFilePointer(env->me_lfd, rsize, NULL, FILE_BEGIN) != rsize + if (SetFilePointer(env->me_lfd, rsize, NULL, FILE_BEGIN) != (DWORD)rsize || !SetEndOfFile(env->me_lfd)) goto fail_errno; #else @@ -3808,8 +3947,8 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) goto fail; } if (env->me_txns->mti_format != MDB_LOCK_FORMAT) { - DPRINTF("lock region has format+version 0x%x, expected 0x%x", - env->me_txns->mti_format, MDB_LOCK_FORMAT); + DPRINTF(("lock region has format+version 0x%x, expected 0x%x", + env->me_txns->mti_format, MDB_LOCK_FORMAT)); rc = MDB_VERSION_MISMATCH; goto fail; } @@ -3847,8 +3986,9 @@ fail: * at runtime. Changing other flags requires closing the * environment and re-opening it with the new flags. */ -#define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC) -#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP|MDB_NOTLS) +#define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC|MDB_NOMEMINIT) +#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP| \ + MDB_NOTLS|MDB_NOLOCK|MDB_NORDAHEAD) int mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode) @@ -3900,9 +4040,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode goto leave; } - rc = mdb_env_setup_locks(env, lpath, mode, &excl); - if (rc) - goto leave; + /* For RDONLY, get lockfile after we know datafile exists */ + if (!(flags & (MDB_RDONLY|MDB_NOLOCK))) { + rc = mdb_env_setup_locks(env, lpath, mode, &excl); + if (rc) + goto leave; + } #ifdef _WIN32 if (F_ISSET(flags, MDB_RDONLY)) { @@ -3928,6 +4071,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode goto leave; } + if ((flags & (MDB_RDONLY|MDB_NOLOCK)) == MDB_RDONLY) { + rc = mdb_env_setup_locks(env, lpath, mode, &excl); + if (rc) + goto leave; + } + if ((rc = mdb_env_open2(env)) == MDB_SUCCESS) { if (flags & (MDB_RDONLY|MDB_WRITEMAP)) { env->me_mfd = env->me_fd; @@ -3936,10 +4085,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode * MDB_NOSYNC/MDB_NOMETASYNC, in case these get reset. */ #ifdef _WIN32 + len = OPEN_EXISTING; env->me_mfd = CreateFile(dpath, oflags, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, len, mode | FILE_FLAG_WRITE_THROUGH, NULL); #else + oflags &= ~O_CREAT; env->me_mfd = open(dpath, oflags | MDB_DSYNC, mode); #endif if (env->me_mfd == INVALID_HANDLE_VALUE) { @@ -3947,10 +4098,15 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode goto leave; } } - DPRINTF("opened dbenv %p", (void *) env); + DPRINTF(("opened dbenv %p", (void *) env)); if (excl > 0) { rc = mdb_env_share_locks(env, &excl); + if (rc) + goto leave; } + if (!((flags & MDB_RDONLY) || + (env->me_pbuf = calloc(1, env->me_psize)))) + rc = ENOMEM; } leave: @@ -3974,6 +4130,7 @@ mdb_env_close0(MDB_env *env, int excl) for (i = env->me_maxdbs; --i > MAIN_DBI; ) free(env->me_dbxs[i].md_name.mv_data); + free(env->me_pbuf); free(env->me_dbflags); free(env->me_dbxs); free(env->me_path); @@ -4001,7 +4158,7 @@ mdb_env_close0(MDB_env *env, int excl) if (env->me_fd != INVALID_HANDLE_VALUE) (void) close(env->me_fd); if (env->me_txns) { - pid_t pid = env->me_pid; + MDB_PID_T pid = env->me_pid; /* Clearing readers is done in this function because * me_txkey with its destructor must be disabled first. */ @@ -4163,17 +4320,18 @@ mdb_env_copy(MDB_env *env, const char *path) newfd = CreateFile(lpath, GENERIC_WRITE, 0, NULL, CREATE_NEW, FILE_FLAG_NO_BUFFERING|FILE_FLAG_WRITE_THROUGH, NULL); #else - newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL -#ifdef O_DIRECT - |O_DIRECT -#endif - , 0666); + newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL, 0666); #endif if (newfd == INVALID_HANDLE_VALUE) { rc = ErrCode(); goto leave; } +#ifdef O_DIRECT + /* Set O_DIRECT if the file system supports it */ + if ((rc = fcntl(newfd, F_GETFL)) != -1) + (void) fcntl(newfd, F_SETFL, rc | O_DIRECT); +#endif #ifdef F_NOCACHE /* __APPLE__ */ rc = fcntl(newfd, F_NOCACHE, 1); if (rc) { @@ -4221,7 +4379,7 @@ mdb_cmp_long(const MDB_val *a, const MDB_val *b) *(size_t *)a->mv_data > *(size_t *)b->mv_data; } -/** Compare two items pointing at aligned int's */ +/** Compare two items pointing at aligned unsigned int's */ static int mdb_cmp_int(const MDB_val *a, const MDB_val *b) { @@ -4229,7 +4387,7 @@ mdb_cmp_int(const MDB_val *a, const MDB_val *b) *(unsigned int *)a->mv_data > *(unsigned int *)b->mv_data; } -/** Compare two items pointing at ints of unknown alignment. +/** Compare two items pointing at unsigned ints of unknown alignment. * Nodes and keys are guaranteed to be 2-byte aligned. */ static int @@ -4320,9 +4478,9 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp) { pgno_t pgno; COPY_PGNO(pgno, mp->mp_pgno); - DPRINTF("searching %u keys in %s %spage %"Z"u", + DPRINTF(("searching %u keys in %s %spage %"Z"u", nkeys, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "", - pgno); + pgno)); } #endif @@ -4349,8 +4507,8 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp) i = (low + high) >> 1; nodekey.mv_data = LEAF2KEY(mp, i, nodekey.mv_size); rc = cmp(key, &nodekey); - DPRINTF("found leaf index %u [%s], rc = %i", - i, DKEY(&nodekey), rc); + DPRINTF(("found leaf index %u [%s], rc = %i", + i, DKEY(&nodekey), rc)); if (rc == 0) break; if (rc > 0) @@ -4369,11 +4527,11 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp) rc = cmp(key, &nodekey); #if MDB_DEBUG if (IS_LEAF(mp)) - DPRINTF("found leaf index %u [%s], rc = %i", - i, DKEY(&nodekey), rc); + DPRINTF(("found leaf index %u [%s], rc = %i", + i, DKEY(&nodekey), rc)); else - DPRINTF("found branch index %u [%s -> %"Z"u], rc = %i", - i, DKEY(&nodekey), NODEPGNO(node), rc); + DPRINTF(("found branch index %u [%s -> %"Z"u], rc = %i", + i, DKEY(&nodekey), NODEPGNO(node), rc)); #endif if (rc == 0) break; @@ -4420,15 +4578,15 @@ static void mdb_cursor_pop(MDB_cursor *mc) { if (mc->mc_snum) { -#ifndef MDB_DEBUG_SKIP +#if MDB_DEBUG MDB_page *top = mc->mc_pg[mc->mc_top]; #endif mc->mc_snum--; if (mc->mc_snum) mc->mc_top--; - DPRINTF("popped page %"Z"u off db %u cursor %p", top->mp_pgno, - mc->mc_dbi, (void *) mc); + DPRINTF(("popped page %"Z"u off db %d cursor %p", top->mp_pgno, + DDBI(mc), (void *) mc)); } } @@ -4436,8 +4594,8 @@ mdb_cursor_pop(MDB_cursor *mc) static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp) { - DPRINTF("pushing page %"Z"u on db %u cursor %p", mp->mp_pgno, - mc->mc_dbi, (void *) mc); + DPRINTF(("pushing page %"Z"u on db %d cursor %p", mp->mp_pgno, + DDBI(mc), (void *) mc)); if (mc->mc_snum >= CURSOR_STACK) { assert(mc->mc_snum < CURSOR_STACK); @@ -4477,8 +4635,9 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl) * leave that unless page_touch happens again). */ if (tx2->mt_spill_pgs) { - x = mdb_midl_search(tx2->mt_spill_pgs, pgno); - if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pgno) { + MDB_ID pn = pgno << 1; + x = mdb_midl_search(tx2->mt_spill_pgs, pn); + if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) { p = (MDB_page *)(env->me_map + env->me_psize * pgno); goto done; } @@ -4498,7 +4657,7 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl) level = 0; p = (MDB_page *)(env->me_map + env->me_psize * pgno); } else { - DPRINTF("page %"Z"u not found", pgno); + DPRINTF(("page %"Z"u not found", pgno)); assert(p != NULL); return MDB_PAGE_NOTFOUND; } @@ -4510,37 +4669,28 @@ done: return MDB_SUCCESS; } -/** Search for the page a given key should be in. - * Pushes parent pages on the cursor stack. This function continues a - * search on a cursor that has already been initialized. (Usually by - * #mdb_page_search() but also by #mdb_node_move().) - * @param[in,out] mc the cursor for this operation. - * @param[in] key the key to search for. If NULL, search for the lowest - * page. (This is used by #mdb_cursor_first().) - * @param[in] modify If true, visited pages are updated with new page numbers. - * @return 0 on success, non-zero on failure. +/** Finish #mdb_page_search() / #mdb_page_search_lowest(). + * The cursor is at the root page, set up the rest of it. */ static int -mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) +mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int flags) { MDB_page *mp = mc->mc_pg[mc->mc_top]; - DKBUF; int rc; - + DKBUF; while (IS_BRANCH(mp)) { MDB_node *node; indx_t i; - DPRINTF("branch page %"Z"u has %u keys", mp->mp_pgno, NUMKEYS(mp)); + DPRINTF(("branch page %"Z"u has %u keys", mp->mp_pgno, NUMKEYS(mp))); assert(NUMKEYS(mp) > 1); - DPRINTF("found index 0 to page %"Z"u", NODEPGNO(NODEPTR(mp, 0))); + DPRINTF(("found index 0 to page %"Z"u", NODEPGNO(NODEPTR(mp, 0)))); - if (key == NULL) /* Initialize cursor to first page. */ + if (flags & (MDB_PS_FIRST|MDB_PS_LAST)) { i = 0; - else if (key->mv_size > MDB_MAXKEYSIZE && key->mv_data == NULL) { - /* cursor to last page */ - i = NUMKEYS(mp)-1; + if (flags & MDB_PS_LAST) + i = NUMKEYS(mp) - 1; } else { int exact; node = mdb_node_search(mc, key, &exact); @@ -4553,11 +4703,9 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) i--; } } + DPRINTF(("following index %u for key [%s]", i, DKEY(key))); } - if (key) - DPRINTF("following index %u for key [%s]", - i, DKEY(key)); assert(i < NUMKEYS(mp)); node = NODEPTR(mp, i); @@ -4568,7 +4716,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) if ((rc = mdb_cursor_push(mc, mp))) return rc; - if (modify) { + if (flags & MDB_PS_MODIFY) { if ((rc = mdb_page_touch(mc)) != 0) return rc; mp = mc->mc_pg[mc->mc_top]; @@ -4576,13 +4724,13 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) } if (!IS_LEAF(mp)) { - DPRINTF("internal error, index points to a %02X page!?", - mp->mp_flags); + DPRINTF(("internal error, index points to a %02X page!?", + mp->mp_flags)); return MDB_CORRUPTED; } - DPRINTF("found leaf page %"Z"u for key [%s]", mp->mp_pgno, - key ? DKEY(key) : NULL); + DPRINTF(("found leaf page %"Z"u for key [%s]", mp->mp_pgno, + key ? DKEY(key) : "null")); mc->mc_flags |= C_INITIALIZED; mc->mc_flags &= ~C_EOF; @@ -4608,18 +4756,17 @@ mdb_page_search_lowest(MDB_cursor *mc) mc->mc_ki[mc->mc_top] = 0; if ((rc = mdb_cursor_push(mc, mp))) return rc; - return mdb_page_search_root(mc, NULL, 0); + return mdb_page_search_root(mc, NULL, MDB_PS_FIRST); } /** Search for the page a given key should be in. - * Pushes parent pages on the cursor stack. This function just sets up - * the search; it finds the root page for \b mc's database and sets this - * as the root of the cursor's stack. Then #mdb_page_search_root() is - * called to complete the search. + * Push it and its parent pages on the cursor stack. * @param[in,out] mc the cursor for this operation. - * @param[in] key the key to search for. If NULL, search for the lowest - * page. (This is used by #mdb_cursor_first().) - * @param[in] flags If MDB_PS_MODIFY set, visited pages are updated with new page numbers. + * @param[in] key the key to search for, or NULL for first/last page. + * @param[in] flags If MDB_PS_MODIFY is set, visited pages in the DB + * are touched (updated with new page numbers). + * If MDB_PS_FIRST or MDB_PS_LAST is set, find first or last leaf. + * This is used by #mdb_cursor_first() and #mdb_cursor_last(). * If MDB_PS_ROOTONLY set, just fetch root node, no further lookups. * @return 0 on success, non-zero on failure. */ @@ -4630,23 +4777,20 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) pgno_t root; /* Make sure the txn is still viable, then find the root from - * the txn's db table. + * the txn's db table and set it as the root of the cursor's stack. */ if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_ERROR)) { DPUTS("transaction has failed, must abort"); return MDB_BAD_TXN; } else { /* Make sure we're using an up-to-date root */ - if (mc->mc_dbi > MAIN_DBI) { - if ((*mc->mc_dbflag & DB_STALE) || - ((flags & MDB_PS_MODIFY) && !(*mc->mc_dbflag & DB_DIRTY))) { + if (*mc->mc_dbflag & DB_STALE) { MDB_cursor mc2; - unsigned char dbflag = 0; mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI, NULL); - rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, flags & MDB_PS_MODIFY); + rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, 0); if (rc) return rc; - if (*mc->mc_dbflag & DB_STALE) { + { MDB_val data; int exact = 0; uint16_t flags; @@ -4666,11 +4810,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) return MDB_INCOMPATIBLE; memcpy(mc->mc_db, data.mv_data, sizeof(MDB_db)); } - if (flags & MDB_PS_MODIFY) - dbflag = DB_DIRTY; *mc->mc_dbflag &= ~DB_STALE; - *mc->mc_dbflag |= dbflag; - } } root = mc->mc_db->md_root; @@ -4688,8 +4828,8 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) mc->mc_snum = 1; mc->mc_top = 0; - DPRINTF("db %u root page %"Z"u has flags 0x%X", - mc->mc_dbi, root, mc->mc_pg[0]->mp_flags); + DPRINTF(("db %d root page %"Z"u has flags 0x%X", + DDBI(mc), root, mc->mc_pg[0]->mp_flags)); if (flags & MDB_PS_MODIFY) { if ((rc = mdb_page_touch(mc))) @@ -4710,9 +4850,10 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp) unsigned x = 0, ovpages = mp->mp_pages; MDB_env *env = txn->mt_env; MDB_IDL sl = txn->mt_spill_pgs; + MDB_ID pn = pg << 1; int rc; - DPRINTF("free ov page %"Z"u (%d)", pg, ovpages); + DPRINTF(("free ov page %"Z"u (%d)", pg, ovpages)); /* If the page is dirty or on the spill list we just acquired it, * so we should give it back to our current free list, if any. * Otherwise put it onto the list of pages we freed in this txn. @@ -4724,7 +4865,7 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp) if (env->me_pghead && !txn->mt_parent && ((mp->mp_flags & P_DIRTY) || - (sl && (x = mdb_midl_search(sl, pg)) <= sl[0] && sl[x] == pg))) + (sl && (x = mdb_midl_search(sl, pn)) <= sl[0] && sl[x] == pn))) { unsigned i, j; pgno_t *mop; @@ -4734,9 +4875,10 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp) return rc; if (!(mp->mp_flags & P_DIRTY)) { /* This page is no longer spilled */ - for (; x < sl[0]; x++) - sl[x] = sl[x+1]; - sl[0]--; + if (x == sl[0]) + sl[0]--; + else + sl[x] |= 1; goto release; } /* Remove from dirty list */ @@ -4799,7 +4941,7 @@ mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data) data->mv_size = NODEDSZ(leaf); memcpy(&pgno, NODEDATA(leaf), sizeof(pgno)); if ((rc = mdb_page_get(txn, pgno, &omp, NULL)) != 0) { - DPRINTF("read overflow page %"Z"u failed", pgno); + DPRINTF(("read overflow page %"Z"u failed", pgno)); return rc; } data->mv_data = METADATA(omp); @@ -4818,12 +4960,15 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, assert(key); assert(data); - DPRINTF("===> get db %u key [%s]", dbi, DKEY(key)); + DPRINTF(("===> get db %u key [%s]", dbi, DKEY(key))); if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) return EINVAL; - if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + if (txn->mt_flags & MDB_TXN_ERROR) + return MDB_BAD_TXN; + + if (key->mv_size > MDB_MAXKEYSIZE) { return MDB_BAD_VALSIZE; } @@ -4851,13 +4996,13 @@ mdb_cursor_sibling(MDB_cursor *mc, int move_right) } mdb_cursor_pop(mc); - DPRINTF("parent page is page %"Z"u, index %u", - mc->mc_pg[mc->mc_top]->mp_pgno, mc->mc_ki[mc->mc_top]); + DPRINTF(("parent page is page %"Z"u, index %u", + mc->mc_pg[mc->mc_top]->mp_pgno, mc->mc_ki[mc->mc_top])); if (move_right ? (mc->mc_ki[mc->mc_top] + 1u >= NUMKEYS(mc->mc_pg[mc->mc_top])) : (mc->mc_ki[mc->mc_top] == 0)) { - DPRINTF("no more keys left, moving to %s sibling", - move_right ? "right" : "left"); + DPRINTF(("no more keys left, moving to %s sibling", + move_right ? "right" : "left")); if ((rc = mdb_cursor_sibling(mc, move_right)) != MDB_SUCCESS) { /* undo cursor_pop before returning */ mc->mc_top++; @@ -4869,14 +5014,17 @@ mdb_cursor_sibling(MDB_cursor *mc, int move_right) mc->mc_ki[mc->mc_top]++; else mc->mc_ki[mc->mc_top]--; - DPRINTF("just moving to %s index key %u", - move_right ? "right" : "left", mc->mc_ki[mc->mc_top]); + DPRINTF(("just moving to %s index key %u", + move_right ? "right" : "left", mc->mc_ki[mc->mc_top])); } assert(IS_BRANCH(mc->mc_pg[mc->mc_top])); indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp, NULL) != 0)) + if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp, NULL)) != 0) { + /* mc will be inconsistent if caller does mc_snum++ as above */ + mc->mc_flags &= ~(C_INITIALIZED|C_EOF); return rc; + } mdb_cursor_push(mc, mp); if (!move_right) @@ -4906,8 +5054,11 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (op == MDB_NEXT || op == MDB_NEXT_DUP) { rc = mdb_cursor_next(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT); - if (op != MDB_NEXT || rc != MDB_NOTFOUND) + if (op != MDB_NEXT || rc != MDB_NOTFOUND) { + if (rc == MDB_SUCCESS) + MDB_GET_KEY(leaf, key); return rc; + } } } else { mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); @@ -4916,7 +5067,9 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) } } - DPRINTF("cursor_next: top page is %"Z"u in cursor %p", mp->mp_pgno, (void *) mc); + DPRINTF(("cursor_next: top page is %"Z"u in cursor %p", mp->mp_pgno, (void *) mc)); + if (mc->mc_flags & C_DEL) + goto skip; if (mc->mc_ki[mc->mc_top] + 1u >= NUMKEYS(mp)) { DPUTS("=====> move to next sibling page"); @@ -4925,12 +5078,13 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) return rc; } mp = mc->mc_pg[mc->mc_top]; - DPRINTF("next page is %"Z"u, key index %u", mp->mp_pgno, mc->mc_ki[mc->mc_top]); + DPRINTF(("next page is %"Z"u, key index %u", mp->mp_pgno, mc->mc_ki[mc->mc_top])); } else mc->mc_ki[mc->mc_top]++; - DPRINTF("==> cursor points to page %"Z"u with %u keys, key index %u", - mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top]); +skip: + DPRINTF(("==> cursor points to page %"Z"u with %u keys, key index %u", + mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top])); if (IS_LEAF2(mp)) { key->mv_size = mc->mc_db->md_pad; @@ -4973,11 +5127,14 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) if (mc->mc_db->md_flags & MDB_DUPSORT) { leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - if (op == MDB_PREV || op == MDB_PREV_DUP) { - if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (op == MDB_PREV || op == MDB_PREV_DUP) { rc = mdb_cursor_prev(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_PREV); - if (op != MDB_PREV || rc != MDB_NOTFOUND) + if (op != MDB_PREV || rc != MDB_NOTFOUND) { + if (rc == MDB_SUCCESS) + MDB_GET_KEY(leaf, key); return rc; + } } else { mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); if (op == MDB_PREV_DUP) @@ -4986,7 +5143,7 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) } } - DPRINTF("cursor_prev: top page is %"Z"u in cursor %p", mp->mp_pgno, (void *) mc); + DPRINTF(("cursor_prev: top page is %"Z"u in cursor %p", mp->mp_pgno, (void *) mc)); if (mc->mc_ki[mc->mc_top] == 0) { DPUTS("=====> move to prev sibling page"); @@ -4995,14 +5152,14 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) } mp = mc->mc_pg[mc->mc_top]; mc->mc_ki[mc->mc_top] = NUMKEYS(mp) - 1; - DPRINTF("prev page is %"Z"u, key index %u", mp->mp_pgno, mc->mc_ki[mc->mc_top]); + DPRINTF(("prev page is %"Z"u, key index %u", mp->mp_pgno, mc->mc_ki[mc->mc_top])); } else mc->mc_ki[mc->mc_top]--; mc->mc_flags &= ~C_EOF; - DPRINTF("==> cursor points to page %"Z"u with %u keys, key index %u", - mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top]); + DPRINTF(("==> cursor points to page %"Z"u with %u keys, key index %u", + mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top])); if (IS_LEAF2(mp)) { key->mv_size = mc->mc_db->md_pad; @@ -5043,7 +5200,8 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, assert(mc); assert(key); - assert(key->mv_size > 0); + if (key->mv_size == 0) + return MDB_BAD_VALSIZE; if (mc->mc_xcursor) mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); @@ -5062,7 +5220,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, nodekey.mv_data = LEAF2KEY(mp, 0, nodekey.mv_size); } else { leaf = NODEPTR(mp, 0); - MDB_GET_KEY(leaf, &nodekey); + MDB_GET_KEY2(leaf, nodekey); } rc = mc->mc_dbx->md_cmp(key, &nodekey); if (rc == 0) { @@ -5083,7 +5241,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, nkeys-1, nodekey.mv_size); } else { leaf = NODEPTR(mp, nkeys-1); - MDB_GET_KEY(leaf, &nodekey); + MDB_GET_KEY2(leaf, nodekey); } rc = mc->mc_dbx->md_cmp(key, &nodekey); if (rc == 0) { @@ -5101,7 +5259,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, mc->mc_ki[mc->mc_top], nodekey.mv_size); } else { leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - MDB_GET_KEY(leaf, &nodekey); + MDB_GET_KEY2(leaf, nodekey); } rc = mc->mc_dbx->md_cmp(key, &nodekey); if (rc == 0) { @@ -5131,7 +5289,11 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, if (!mc->mc_top) { /* There are no other pages */ mc->mc_ki[mc->mc_top] = 0; - return MDB_NOTFOUND; + if (op == MDB_SET_RANGE) { + rc = 0; + goto set1; + } else + return MDB_NOTFOUND; } } @@ -5195,6 +5357,7 @@ set1: if (rc) { if (op == MDB_GET_BOTH || rc > 0) return MDB_NOTFOUND; + rc = 0; } } else { @@ -5208,7 +5371,7 @@ set1: /* The key already matches in all other cases */ if (op == MDB_SET_RANGE || op == MDB_SET_KEY) MDB_GET_KEY(leaf, key); - DPRINTF("==> cursor placed on key [%s]", DKEY(key)); + DPRINTF(("==> cursor placed on key [%s]", DKEY(key))); return rc; } @@ -5224,7 +5387,7 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data) mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) { - rc = mdb_page_search(mc, NULL, 0); + rc = mdb_page_search(mc, NULL, MDB_PS_FIRST); if (rc != MDB_SUCCESS) return rc; } @@ -5270,11 +5433,7 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data) if (!(mc->mc_flags & C_EOF)) { if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) { - MDB_val lkey; - - lkey.mv_size = MDB_MAXKEYSIZE+1; - lkey.mv_data = NULL; - rc = mdb_page_search(mc, &lkey, 0); + rc = mdb_page_search(mc, NULL, MDB_PS_LAST); if (rc != MDB_SUCCESS) return rc; } @@ -5313,17 +5472,22 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, { int rc; int exact = 0; + int (*mfunc)(MDB_cursor *mc, MDB_val *key, MDB_val *data); assert(mc); + if (mc->mc_txn->mt_flags & MDB_TXN_ERROR) + return MDB_BAD_TXN; + switch (op) { case MDB_GET_CURRENT: if (!(mc->mc_flags & C_INITIALIZED)) { rc = EINVAL; } else { MDB_page *mp = mc->mc_pg[mc->mc_top]; - if (!NUMKEYS(mp)) { - mc->mc_ki[mc->mc_top] = 0; + int nkeys = NUMKEYS(mp); + if (!nkeys || mc->mc_ki[mc->mc_top] >= nkeys) { + mc->mc_ki[mc->mc_top] = nkeys; rc = MDB_NOTFOUND; break; } @@ -5336,6 +5500,8 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_GET_KEY(leaf, key); if (data) { if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (mc->mc_flags & C_DEL) + mdb_xcursor_init1(mc, leaf); rc = mdb_cursor_get(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_GET_CURRENT); } else { rc = mdb_node_read(mc->mc_txn, leaf, data); @@ -5346,17 +5512,21 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, break; case MDB_GET_BOTH: case MDB_GET_BOTH_RANGE: - if (data == NULL || mc->mc_xcursor == NULL) { + if (data == NULL) { rc = EINVAL; break; } + if (mc->mc_xcursor == NULL) { + rc = MDB_INCOMPATIBLE; + break; + } /* FALLTHRU */ case MDB_SET: case MDB_SET_KEY: case MDB_SET_RANGE: if (key == NULL) { rc = EINVAL; - } else if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + } else if (key->mv_size > MDB_MAXKEYSIZE) { rc = MDB_BAD_VALSIZE; } else if (op == MDB_SET_RANGE) rc = mdb_cursor_set(mc, key, data, op, NULL); @@ -5364,23 +5534,28 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, rc = mdb_cursor_set(mc, key, data, op, &exact); break; case MDB_GET_MULTIPLE: - if (data == NULL || - !(mc->mc_db->md_flags & MDB_DUPFIXED) || - !(mc->mc_flags & C_INITIALIZED)) { + if (data == NULL || !(mc->mc_flags & C_INITIALIZED)) { rc = EINVAL; break; } + if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) { + rc = MDB_INCOMPATIBLE; + break; + } rc = MDB_SUCCESS; if (!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED) || (mc->mc_xcursor->mx_cursor.mc_flags & C_EOF)) break; goto fetchm; case MDB_NEXT_MULTIPLE: - if (data == NULL || - !(mc->mc_db->md_flags & MDB_DUPFIXED)) { + if (data == NULL) { rc = EINVAL; break; } + if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) { + rc = MDB_INCOMPATIBLE; + break; + } if (!(mc->mc_flags & C_INITIALIZED)) rc = mdb_cursor_first(mc, key, data); else @@ -5423,45 +5598,48 @@ fetchm: rc = mdb_cursor_first(mc, key, data); break; case MDB_FIRST_DUP: - if (data == NULL || - !(mc->mc_db->md_flags & MDB_DUPSORT) || - !(mc->mc_flags & C_INITIALIZED) || - !(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) { + mfunc = mdb_cursor_first; + mmove: + if (data == NULL || !(mc->mc_flags & C_INITIALIZED)) { rc = EINVAL; break; } - rc = mdb_cursor_first(&mc->mc_xcursor->mx_cursor, data, NULL); + if (mc->mc_xcursor == NULL) { + rc = MDB_INCOMPATIBLE; + break; + } + if (!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) { + rc = EINVAL; + break; + } + rc = mfunc(&mc->mc_xcursor->mx_cursor, data, NULL); break; case MDB_LAST: rc = mdb_cursor_last(mc, key, data); break; case MDB_LAST_DUP: - if (data == NULL || - !(mc->mc_db->md_flags & MDB_DUPSORT) || - !(mc->mc_flags & C_INITIALIZED) || - !(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) { - rc = EINVAL; - break; - } - rc = mdb_cursor_last(&mc->mc_xcursor->mx_cursor, data, NULL); - break; + mfunc = mdb_cursor_last; + goto mmove; default: - DPRINTF("unhandled/unimplemented cursor operation %u", op); + DPRINTF(("unhandled/unimplemented cursor operation %u", op)); rc = EINVAL; break; } + if (mc->mc_flags & C_DEL) + mc->mc_flags ^= C_DEL; + return rc; } -/** Touch all the pages in the cursor stack. +/** Touch all the pages in the cursor stack. Set mc_top. * Makes sure all the pages are writable, before attempting a write operation. * @param[in] mc The cursor to operate on. */ static int mdb_cursor_touch(MDB_cursor *mc) { - int rc; + int rc = MDB_SUCCESS; if (mc->mc_dbi > MAIN_DBI && !(*mc->mc_dbflag & DB_DIRTY)) { MDB_cursor mc2; @@ -5472,13 +5650,14 @@ mdb_cursor_touch(MDB_cursor *mc) return rc; *mc->mc_dbflag |= DB_DIRTY; } - for (mc->mc_top = 0; mc->mc_top < mc->mc_snum; mc->mc_top++) { - rc = mdb_page_touch(mc); - if (rc) - return rc; + mc->mc_top = 0; + if (mc->mc_snum) { + do { + rc = mdb_page_touch(mc); + } while (!rc && ++(mc->mc_top) < mc->mc_snum); + mc->mc_top = mc->mc_snum-1; } - mc->mc_top = mc->mc_snum-1; - return MDB_SUCCESS; + return rc; } /** Do not spill pages to disk if txn is getting full, may fail instead */ @@ -5489,15 +5668,14 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, unsigned int flags) { enum { MDB_NO_ROOT = MDB_LAST_ERRCODE+10 }; /* internal code */ + MDB_env *env = mc->mc_txn->mt_env; MDB_node *leaf = NULL; MDB_val xdata, *rdata, dkey; - MDB_page *fp; MDB_db dummy; int do_sub = 0, insert = 0; unsigned int mcount = 0, dcount = 0, nospill; size_t nsize; int rc, rc2; - MDB_pagebuf pbuf; char dbuf[MDB_MAXKEYSIZE+1]; unsigned int nflags; DKBUF; @@ -5509,14 +5687,14 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, dcount = data[1].mv_size; data[1].mv_size = 0; if (!F_ISSET(mc->mc_db->md_flags, MDB_DUPFIXED)) - return EINVAL; + return MDB_INCOMPATIBLE; } nospill = flags & MDB_NOSPILL; flags &= ~MDB_NOSPILL; - if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY)) - return EACCES; + if (mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) + return (mc->mc_txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; if (flags != MDB_CURRENT && (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE)) return MDB_BAD_VALSIZE; @@ -5529,8 +5707,8 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, return MDB_BAD_VALSIZE; #endif - DPRINTF("==> put db %u key [%s], size %"Z"u, data size %"Z"u", - mc->mc_dbi, DKEY(key), key ? key->mv_size:0, data->mv_size); + DPRINTF(("==> put db %d key [%s], size %"Z"u, data size %"Z"u", + DDBI(mc), DKEY(key), key ? key->mv_size : 0, data->mv_size)); dkey.mv_size = 0; @@ -5541,6 +5719,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, } else if (mc->mc_db->md_root == P_INVALID) { /* new database, cursor has nothing to point to */ mc->mc_snum = 0; + mc->mc_top = 0; mc->mc_flags &= ~C_INITIALIZED; rc = MDB_NO_ROOT; } else { @@ -5563,7 +5742,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, rc = mdb_cursor_set(mc, key, &d2, MDB_SET, &exact); } if ((flags & MDB_NOOVERWRITE) && rc == 0) { - DPRINTF("duplicate key [%s]", DKEY(key)); + DPRINTF(("duplicate key [%s]", DKEY(key))); *data = d2; return MDB_KEYEXIST; } @@ -5571,6 +5750,9 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, return rc; } + if (mc->mc_flags & C_DEL) + mc->mc_flags ^= C_DEL; + /* Cursor is positioned, check for room in the dirty list */ if (!nospill) { if (flags & MDB_MULTIPLE) { @@ -5607,6 +5789,9 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, /* The key already exists */ if (rc == MDB_SUCCESS) { + MDB_page *fp, *mp; + MDB_val olddata; + /* there's only a key anyway, so this is a no-op */ if (IS_LEAF2(mc->mc_pg[mc->mc_top])) { unsigned int ksize = mc->mc_db->md_pad; @@ -5619,19 +5804,23 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, return MDB_SUCCESS; } +more: leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + olddata.mv_size = NODEDSZ(leaf); + olddata.mv_data = NODEDATA(leaf); /* DB has dups? */ if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) { + mp = fp = xdata.mv_data = env->me_pbuf; + mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; + /* Was a single item before, must convert now */ -more: if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { /* Just overwrite the current item */ if (flags == MDB_CURRENT) goto current; - dkey.mv_size = NODEDSZ(leaf); - dkey.mv_data = NODEDATA(leaf); + dkey = olddata; #if UINT_MAX < SIZE_MAX if (mc->mc_dbx->md_dcmp == mdb_cmp_int && dkey.mv_size == sizeof(size_t)) #ifdef MISALIGNED_OK @@ -5640,92 +5829,90 @@ more: mc->mc_dbx->md_dcmp = mdb_cmp_cint; #endif #endif - /* if data matches, ignore it */ - if (!mc->mc_dbx->md_dcmp(data, &dkey)) - return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS; + /* if data matches, skip it */ + if (!mc->mc_dbx->md_dcmp(data, &dkey)) { + if (flags & MDB_NODUPDATA) + rc = MDB_KEYEXIST; + else if (flags & MDB_MULTIPLE) + goto next_mult; + else + rc = MDB_SUCCESS; + return rc; + } /* create a fake page for the dup items */ memcpy(dbuf, dkey.mv_data, dkey.mv_size); dkey.mv_data = dbuf; - fp = (MDB_page *)&pbuf; - fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP; fp->mp_lower = PAGEHDRSZ; - fp->mp_upper = PAGEHDRSZ + dkey.mv_size + data->mv_size; + xdata.mv_size = PAGEHDRSZ + dkey.mv_size + data->mv_size; if (mc->mc_db->md_flags & MDB_DUPFIXED) { fp->mp_flags |= P_LEAF2; fp->mp_pad = data->mv_size; - fp->mp_upper += 2 * data->mv_size; /* leave space for 2 more */ + xdata.mv_size += 2 * data->mv_size; /* leave space for 2 more */ } else { - fp->mp_upper += 2 * sizeof(indx_t) + 2 * NODESIZE + + xdata.mv_size += 2 * (sizeof(indx_t) + NODESIZE) + (dkey.mv_size & 1) + (data->mv_size & 1); } - mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); - do_sub = 1; - rdata = &xdata; - xdata.mv_size = fp->mp_upper; - xdata.mv_data = fp; - flags |= F_DUPDATA; - goto new_sub; - } - if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) { + fp->mp_upper = xdata.mv_size; + } else if (leaf->mn_flags & F_SUBDATA) { + /* Data is on sub-DB, just store it */ + flags |= F_DUPDATA|F_SUBDATA; + goto put_sub; + } else { /* See if we need to convert from fake page to subDB */ - MDB_page *mp; unsigned int offset; unsigned int i; uint16_t fp_flags; - fp = NODEDATA(leaf); - if (flags == MDB_CURRENT) { -reuse: + fp = olddata.mv_data; + switch (flags) { + default: + if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) { + offset = NODESIZE + sizeof(indx_t) + data->mv_size; + offset += offset & 1; + break; + } + offset = fp->mp_pad; + if (SIZELEFT(fp) < offset) { + offset *= 4; /* space for 4 more */ + break; + } + /* FALLTHRU: Big enough MDB_DUPFIXED sub-page */ + case MDB_CURRENT: fp->mp_flags |= P_DIRTY; - COPY_PGNO(fp->mp_pgno, mc->mc_pg[mc->mc_top]->mp_pgno); + COPY_PGNO(fp->mp_pgno, mp->mp_pgno); mc->mc_xcursor->mx_cursor.mc_pg[0] = fp; flags |= F_DUPDATA; goto put_sub; } - if (mc->mc_db->md_flags & MDB_DUPFIXED) { - offset = fp->mp_pad; - if (SIZELEFT(fp) >= offset) - goto reuse; - offset *= 4; /* space for 4 more */ - } else { - offset = NODESIZE + sizeof(indx_t) + data->mv_size; - } - offset += offset & 1; fp_flags = fp->mp_flags; - if (NODESIZE + sizeof(indx_t) + NODEKSZ(leaf) + NODEDSZ(leaf) + - offset >= mc->mc_txn->mt_env->me_nodemax) { + xdata.mv_size = olddata.mv_size + offset; + if (NODESIZE + sizeof(indx_t) + NODEKSZ(leaf) + xdata.mv_size + >= env->me_nodemax) { /* yes, convert it */ - dummy.md_flags = 0; if (mc->mc_db->md_flags & MDB_DUPFIXED) { dummy.md_pad = fp->mp_pad; dummy.md_flags = MDB_DUPFIXED; if (mc->mc_db->md_flags & MDB_INTEGERDUP) dummy.md_flags |= MDB_INTEGERKEY; + } else { + dummy.md_pad = 0; + dummy.md_flags = 0; } dummy.md_depth = 1; dummy.md_branch_pages = 0; dummy.md_leaf_pages = 1; dummy.md_overflow_pages = 0; dummy.md_entries = NUMKEYS(fp); - rdata = &xdata; xdata.mv_size = sizeof(MDB_db); xdata.mv_data = &dummy; if ((rc = mdb_page_alloc(mc, 1, &mp))) return rc; - offset = mc->mc_txn->mt_env->me_psize - NODEDSZ(leaf); + offset = env->me_psize - olddata.mv_size; flags |= F_DUPDATA|F_SUBDATA; dummy.md_root = mp->mp_pgno; fp_flags &= ~P_SUBP; - } else { - /* no, just grow it */ - rdata = &xdata; - xdata.mv_size = NODEDSZ(leaf) + offset; - xdata.mv_data = &pbuf; - mp = (MDB_page *)&pbuf; - mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; - flags |= F_DUPDATA; } mp->mp_flags = fp_flags | P_DIRTY; mp->mp_pad = fp->mp_pad; @@ -5734,28 +5921,27 @@ reuse: if (IS_LEAF2(fp)) { memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad); } else { - nsize = NODEDSZ(leaf) - fp->mp_upper; - memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, nsize); + memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, + olddata.mv_size - fp->mp_upper); for (i=0; imp_ptrs[i] = fp->mp_ptrs[i] + offset; } - mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); - do_sub = 1; - goto new_sub; } - /* data is on sub-DB, just store it */ - flags |= F_DUPDATA|F_SUBDATA; - goto put_sub; + + rdata = &xdata; + flags |= F_DUPDATA; + do_sub = 1; + mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); + goto new_sub; } current: /* overflow page overwrites need special handling */ if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { MDB_page *omp; pgno_t pg; - unsigned psize = mc->mc_txn->mt_env->me_psize; - int level, ovpages, dpages = OVPAGES(data->mv_size, psize); + int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize); - memcpy(&pg, NODEDATA(leaf), sizeof(pg)); + memcpy(&pg, olddata.mv_data, sizeof(pg)); if ((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0) return rc2; ovpages = omp->mp_pages; @@ -5763,7 +5949,7 @@ current: /* Is the ov page large enough? */ if (ovpages >= dpages) { if (!(omp->mp_flags & P_DIRTY) && - (level || (mc->mc_txn->mt_env->me_flags & MDB_WRITEMAP))) + (level || (env->me_flags & MDB_WRITEMAP))) { rc = mdb_page_unspill(mc->mc_txn, omp, &omp); if (rc) @@ -5778,7 +5964,7 @@ current: */ if (level > 1) { /* It is writable only in a parent txn */ - size_t sz = (size_t) psize * ovpages, off; + size_t sz = (size_t) env->me_psize * ovpages, off; MDB_page *np = mdb_page_malloc(mc->mc_txn, ovpages); MDB_ID2 id2; if (!np) @@ -5808,15 +5994,15 @@ current: } if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS) return rc2; - } else if (NODEDSZ(leaf) == data->mv_size) { + } else if (data->mv_size == olddata.mv_size) { /* same size, just replace it. Note that we could * also reuse this node if the new data is smaller, * but instead we opt to shrink the node in that case. */ if (F_ISSET(flags, MDB_RESERVE)) - data->mv_data = NODEDATA(leaf); + data->mv_data = olddata.mv_data; else if (data->mv_size) - memcpy(NODEDATA(leaf), data->mv_data, data->mv_size); + memcpy(olddata.mv_data, data->mv_data, data->mv_size); else memcpy(NODEKEY(leaf), key->mv_data, key->mv_size); goto done; @@ -5824,7 +6010,7 @@ current: mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); mc->mc_db->md_entries--; } else { - DPRINTF("inserting key at index %i", mc->mc_ki[mc->mc_top]); + DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top])); insert = 1; } @@ -5832,7 +6018,7 @@ current: new_sub: nflags = flags & NODE_ADD_FLAGS; - nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(mc->mc_txn->mt_env, key, rdata); + nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(env, key, rdata); if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) { if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA ) nflags &= ~MDB_APPEND; @@ -5849,9 +6035,6 @@ new_sub: unsigned i = mc->mc_top; MDB_page *mp = mc->mc_pg[i]; - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -5923,15 +6106,15 @@ put_sub: mc->mc_db->md_entries++; if (flags & MDB_MULTIPLE) { if (!rc) { +next_mult: mcount++; + /* let caller know how many succeeded, if any */ + data[1].mv_size = mcount; if (mcount < dcount) { data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size; - leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); goto more; } } - /* let caller know how many succeeded, if any */ - data[1].mv_size = mcount; } } done: @@ -5947,26 +6130,30 @@ int mdb_cursor_del(MDB_cursor *mc, unsigned int flags) { MDB_node *leaf; + MDB_page *mp; int rc; - if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY)) - return EACCES; + if (mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) + return (mc->mc_txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; if (!(mc->mc_flags & C_INITIALIZED)) return EINVAL; + if (mc->mc_ki[mc->mc_top] >= NUMKEYS(mc->mc_pg[mc->mc_top])) + return MDB_NOTFOUND; + if (!(flags & MDB_NOSPILL) && (rc = mdb_page_spill(mc, NULL, NULL))) return rc; - flags &= ~MDB_NOSPILL; /* TODO: Or change (flags != MDB_NODUPDATA) to ~(flags & MDB_NODUPDATA), not looking at the logic of that code just now */ rc = mdb_cursor_touch(mc); if (rc) return rc; - leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + mp = mc->mc_pg[mc->mc_top]; + leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { - if (flags != MDB_NODUPDATA) { + if (!IS_LEAF2(mp) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (!(flags & MDB_NODUPDATA)) { if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) { mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); } @@ -5980,18 +6167,19 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) } else { MDB_cursor *m2; /* shrink fake page */ - mdb_node_shrink(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + mdb_node_shrink(mp, mc->mc_ki[mc->mc_top]); + leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); /* fix other sub-DB cursors pointed at this fake page */ for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) { if (m2 == mc || m2->mc_snum < mc->mc_snum) continue; - if (m2->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top] && + if (m2->mc_pg[mc->mc_top] == mp && m2->mc_ki[mc->mc_top] == mc->mc_ki[mc->mc_top]) m2->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); } } mc->mc_db->md_entries--; + mc->mc_flags |= C_DEL; return rc; } /* otherwise fall thru and delete the sub-DB */ @@ -6026,8 +6214,8 @@ mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp) if ((rc = mdb_page_alloc(mc, num, &np))) return rc; - DPRINTF("allocated new mpage %"Z"u, page size %u", - np->mp_pgno, mc->mc_txn->mt_env->me_psize); + DPRINTF(("allocated new mpage %"Z"u, page size %u", + np->mp_pgno, mc->mc_txn->mt_env->me_psize)); np->mp_flags = flags | P_DIRTY; np->mp_lower = PAGEHDRSZ; np->mp_upper = mc->mc_txn->mt_env->me_psize; @@ -6117,6 +6305,7 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, { unsigned int i; size_t node_size = NODESIZE; + ssize_t room; indx_t ofs; MDB_node *node; MDB_page *mp = mc->mc_pg[mc->mc_top]; @@ -6125,11 +6314,11 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, assert(mp->mp_upper >= mp->mp_lower); - DPRINTF("add to %s %spage %"Z"u index %i, data size %"Z"u key size %"Z"u [%s]", + DPRINTF(("add to %s %spage %"Z"u index %i, data size %"Z"u key size %"Z"u [%s]", IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx, data ? data->mv_size : 0, - key ? key->mv_size : 0, key ? DKEY(key) : NULL); + key ? key->mv_size : 0, key ? DKEY(key) : "null")); if (IS_LEAF2(mp)) { /* Move higher keys up one slot. */ @@ -6147,9 +6336,9 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, return MDB_SUCCESS; } + room = (ssize_t)SIZELEFT(mp) - (ssize_t)sizeof(indx_t); if (key != NULL) node_size += key->mv_size; - if (IS_LEAF(mp)) { assert(data); if (F_ISSET(flags, F_BIGDATA)) { @@ -6159,28 +6348,25 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, int ovpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize); int rc; /* Put data on overflow page. */ - DPRINTF("data size is %"Z"u, node would be %"Z"u, put data on overflow page", - data->mv_size, node_size+data->mv_size); - node_size += sizeof(pgno_t); + DPRINTF(("data size is %"Z"u, node would be %"Z"u, put data on overflow page", + data->mv_size, node_size+data->mv_size)); + node_size += sizeof(pgno_t) + (node_size & 1); + if ((ssize_t)node_size > room) + goto full; if ((rc = mdb_page_new(mc, P_OVERFLOW, ovpages, &ofp))) return rc; - DPRINTF("allocated overflow page %"Z"u", ofp->mp_pgno); + DPRINTF(("allocated overflow page %"Z"u", ofp->mp_pgno)); flags |= F_BIGDATA; + goto update; } else { node_size += data->mv_size; } } node_size += node_size & 1; + if ((ssize_t)node_size > room) + goto full; - if (node_size + sizeof(indx_t) > SIZELEFT(mp)) { - DPRINTF("not enough room in page %"Z"u, got %u ptrs", - mp->mp_pgno, NUMKEYS(mp)); - DPRINTF("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower, - mp->mp_upper - mp->mp_lower); - DPRINTF("node size = %"Z"u", node_size); - return MDB_PAGE_FULL; - } - +update: /* Move higher pointers up one slot. */ for (i = NUMKEYS(mp); i > indx; i--) mp->mp_ptrs[i] = mp->mp_ptrs[i - 1]; @@ -6226,6 +6412,13 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, } return MDB_SUCCESS; + +full: + DPRINTF(("not enough room in page %"Z"u, got %u ptrs", + mp->mp_pgno, NUMKEYS(mp))); + DPRINTF(("upper-lower = %u - %u = %"Z"d", mp->mp_upper,mp->mp_lower,room)); + DPRINTF(("node size = %"Z"u", node_size)); + return MDB_PAGE_FULL; } /** Delete the specified node from a page. @@ -6246,8 +6439,8 @@ mdb_node_del(MDB_page *mp, indx_t indx, int ksize) { pgno_t pgno; COPY_PGNO(pgno, mp->mp_pgno); - DPRINTF("delete node %u on %s page %"Z"u", indx, - IS_LEAF(mp) ? "leaf" : "branch", pgno); + DPRINTF(("delete node %u on %s page %"Z"u", indx, + IS_LEAF(mp) ? "leaf" : "branch", pgno)); } #endif assert(indx < NUMKEYS(mp)); @@ -6360,11 +6553,13 @@ mdb_xcursor_init0(MDB_cursor *mc) mx->mx_cursor.mc_txn = mc->mc_txn; mx->mx_cursor.mc_db = &mx->mx_db; mx->mx_cursor.mc_dbx = &mx->mx_dbx; - mx->mx_cursor.mc_dbi = mc->mc_dbi+1; + mx->mx_cursor.mc_dbi = mc->mc_dbi; mx->mx_cursor.mc_dbflag = &mx->mx_dbflag; mx->mx_cursor.mc_snum = 0; mx->mx_cursor.mc_top = 0; mx->mx_cursor.mc_flags = C_SUB; + mx->mx_dbx.md_name.mv_size = 0; + mx->mx_dbx.md_name.mv_data = NULL; mx->mx_dbx.md_cmp = mc->mc_dbx->md_dcmp; mx->mx_dbx.md_dcmp = NULL; mx->mx_dbx.md_rel = mc->mc_dbx->md_rel; @@ -6385,6 +6580,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) memcpy(&mx->mx_db, NODEDATA(node), sizeof(MDB_db)); mx->mx_cursor.mc_pg[0] = 0; mx->mx_cursor.mc_snum = 0; + mx->mx_cursor.mc_top = 0; mx->mx_cursor.mc_flags = C_SUB; } else { MDB_page *fp = NODEDATA(node); @@ -6397,8 +6593,8 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) mx->mx_db.md_entries = NUMKEYS(fp); COPY_PGNO(mx->mx_db.md_root, fp->mp_pgno); mx->mx_cursor.mc_snum = 1; - mx->mx_cursor.mc_flags = C_INITIALIZED|C_SUB; mx->mx_cursor.mc_top = 0; + mx->mx_cursor.mc_flags = C_INITIALIZED|C_SUB; mx->mx_cursor.mc_pg[0] = fp; mx->mx_cursor.mc_ki[0] = 0; if (mc->mc_db->md_flags & MDB_DUPFIXED) { @@ -6408,12 +6604,9 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) mx->mx_db.md_flags |= MDB_INTEGERKEY; } } - DPRINTF("Sub-db %u for db %u root page %"Z"u", mx->mx_cursor.mc_dbi, mc->mc_dbi, - mx->mx_db.md_root); - mx->mx_dbflag = DB_VALID | (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY) ? - DB_DIRTY : 0); - mx->mx_dbx.md_name.mv_data = NODEKEY(node); - mx->mx_dbx.md_name.mv_size = node->mn_ksize; + DPRINTF(("Sub-db -%u root page %"Z"u", mx->mx_cursor.mc_dbi, + mx->mx_db.md_root)); + mx->mx_dbflag = DB_VALID|DB_DIRTY; /* DB_DIRTY guides mdb_cursor_touch */ #if UINT_MAX < SIZE_MAX if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t)) #ifdef MISALIGNED_OK @@ -6460,6 +6653,9 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) if (txn == NULL || ret == NULL || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) return EINVAL; + if (txn->mt_flags & MDB_TXN_ERROR) + return MDB_BAD_TXN; + /* Allow read access to the freelist */ if (!dbi && !F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) return EINVAL; @@ -6505,8 +6701,8 @@ mdb_cursor_count(MDB_cursor *mc, size_t *countp) if (mc == NULL || countp == NULL) return EINVAL; - if (!(mc->mc_db->md_flags & MDB_DUPSORT)) - return EINVAL; + if (mc->mc_xcursor == NULL) + return MDB_INCOMPATIBLE; leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { @@ -6561,7 +6757,7 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key) MDB_node *node; char *base; size_t len; - int delta, delta0; + int delta, ksize, oksize; indx_t ptr, i, numkeys, indx; DKBUF; @@ -6575,25 +6771,28 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key) char kbuf2[(MDB_MAXKEYSIZE*2+1)]; k2.mv_data = NODEKEY(node); k2.mv_size = node->mn_ksize; - DPRINTF("update key %u (ofs %u) [%s] to [%s] on page %"Z"u", + DPRINTF(("update key %u (ofs %u) [%s] to [%s] on page %"Z"u", indx, ptr, mdb_dkey(&k2, kbuf2), DKEY(key), - mp->mp_pgno); + mp->mp_pgno)); } #endif - delta0 = delta = key->mv_size - node->mn_ksize; + ksize = key->mv_size; + ksize += (ksize & 1); + oksize = node->mn_ksize; + oksize += (oksize & 1); + delta = ksize - oksize; /* Must be 2-byte aligned. If new key is * shorter by 1, the shift will be skipped. */ - delta += (delta & 1); if (delta) { if (delta > 0 && SIZELEFT(mp) < delta) { pgno_t pgno; /* not enough space left, do a delete and split */ - DPRINTF("Not enough room, delta = %d, splitting...", delta); + DPRINTF(("Not enough room, delta = %d, splitting...", delta)); pgno = NODEPGNO(node); mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); return mdb_page_split(mc, key, NULL, pgno, MDB_SPLIT_REPLACE); @@ -6614,7 +6813,7 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key) } /* But even if no shift was needed, update ksize */ - if (delta0) + if (node->mn_ksize != key->mv_size) node->mn_ksize = key->mv_size; if (key->mv_size) @@ -6655,7 +6854,7 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) flags = 0; } else { srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]); - assert(!((long)srcnode&1)); + assert(!((size_t)srcnode&1)); srcpg = NODEPGNO(srcnode); flags = srcnode->mn_flags; if (csrc->mc_ki[csrc->mc_top] == 0 && IS_BRANCH(csrc->mc_pg[csrc->mc_top])) { @@ -6703,12 +6902,12 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) return rc; } - DPRINTF("moving %s node %u [%s] on page %"Z"u to node %u on page %"Z"u", + DPRINTF(("moving %s node %u [%s] on page %"Z"u to node %u on page %"Z"u", IS_LEAF(csrc->mc_pg[csrc->mc_top]) ? "leaf" : "branch", csrc->mc_ki[csrc->mc_top], DKEY(&key), csrc->mc_pg[csrc->mc_top]->mp_pgno, - cdst->mc_ki[cdst->mc_top], cdst->mc_pg[cdst->mc_top]->mp_pgno); + cdst->mc_ki[cdst->mc_top], cdst->mc_pg[cdst->mc_top]->mp_pgno)); /* Add the node to the destination page. */ @@ -6726,9 +6925,6 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) MDB_dbi dbi = csrc->mc_dbi; MDB_page *mp = csrc->mc_pg[csrc->mc_top]; - if (csrc->mc_flags & C_SUB) - dbi--; - for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (csrc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -6754,8 +6950,8 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) key.mv_size = NODEKSZ(srcnode); key.mv_data = NODEKEY(srcnode); } - DPRINTF("update separator for source page %"Z"u to [%s]", - csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key)); + DPRINTF(("update separator for source page %"Z"u to [%s]", + csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key))); mdb_cursor_copy(csrc, &mn); mn.mc_snum--; mn.mc_top--; @@ -6782,8 +6978,8 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) key.mv_size = NODEKSZ(srcnode); key.mv_data = NODEKEY(srcnode); } - DPRINTF("update separator for destination page %"Z"u to [%s]", - cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key)); + DPRINTF(("update separator for destination page %"Z"u to [%s]", + cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key))); mdb_cursor_copy(cdst, &mn); mn.mc_snum--; mn.mc_top--; @@ -6820,8 +7016,8 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) MDB_val key, data; unsigned nkeys; - DPRINTF("merging page %"Z"u into %"Z"u", csrc->mc_pg[csrc->mc_top]->mp_pgno, - cdst->mc_pg[cdst->mc_top]->mp_pgno); + DPRINTF(("merging page %"Z"u into %"Z"u", csrc->mc_pg[csrc->mc_top]->mp_pgno, + cdst->mc_pg[cdst->mc_top]->mp_pgno)); assert(csrc->mc_snum > 1); /* can't merge root page */ assert(cdst->mc_snum > 1); @@ -6873,8 +7069,9 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) } } - DPRINTF("dst page %"Z"u now has %u keys (%.1f%% filled)", - cdst->mc_pg[cdst->mc_top]->mp_pgno, NUMKEYS(cdst->mc_pg[cdst->mc_top]), (float)PAGEFILL(cdst->mc_txn->mt_env, cdst->mc_pg[cdst->mc_top]) / 10); + DPRINTF(("dst page %"Z"u now has %u keys (%.1f%% filled)", + cdst->mc_pg[cdst->mc_top]->mp_pgno, NUMKEYS(cdst->mc_pg[cdst->mc_top]), + (float)PAGEFILL(cdst->mc_txn->mt_env, cdst->mc_pg[cdst->mc_top]) / 10)); /* Unlink the src page from parent and add to free list. */ @@ -6902,9 +7099,6 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) MDB_dbi dbi = csrc->mc_dbi; MDB_page *mp = cdst->mc_pg[cdst->mc_top]; - if (csrc->mc_flags & C_SUB) - dbi--; - for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (csrc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -6964,9 +7158,10 @@ mdb_rebalance(MDB_cursor *mc) { pgno_t pgno; COPY_PGNO(pgno, mc->mc_pg[mc->mc_top]->mp_pgno); - DPRINTF("rebalancing %s page %"Z"u (has %u keys, %.1f%% full)", + DPRINTF(("rebalancing %s page %"Z"u (has %u keys, %.1f%% full)", IS_LEAF(mc->mc_pg[mc->mc_top]) ? "leaf" : "branch", - pgno, NUMKEYS(mc->mc_pg[mc->mc_top]), (float)PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) / 10); + pgno, NUMKEYS(mc->mc_pg[mc->mc_top]), + (float)PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) / 10)); } #endif @@ -6975,8 +7170,8 @@ mdb_rebalance(MDB_cursor *mc) #if MDB_DEBUG pgno_t pgno; COPY_PGNO(pgno, mc->mc_pg[mc->mc_top]->mp_pgno); - DPRINTF("no need to rebalance page %"Z"u, above fill threshold", - pgno); + DPRINTF(("no need to rebalance page %"Z"u, above fill threshold", + pgno)); #endif return MDB_SUCCESS; } @@ -6998,13 +7193,11 @@ mdb_rebalance(MDB_cursor *mc) /* Adjust cursors pointing to mp */ mc->mc_snum = 0; mc->mc_top = 0; + mc->mc_flags &= ~C_INITIALIZED; { MDB_cursor *m2, *m3; MDB_dbi dbi = mc->mc_dbi; - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7014,6 +7207,7 @@ mdb_rebalance(MDB_cursor *mc) if (m3->mc_pg[0] == mp) { m3->mc_snum = 0; m3->mc_top = 0; + m3->mc_flags &= ~C_INITIALIZED; } } } @@ -7034,9 +7228,6 @@ mdb_rebalance(MDB_cursor *mc) MDB_cursor *m2, *m3; MDB_dbi dbi = mc->mc_dbi; - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7044,10 +7235,13 @@ mdb_rebalance(MDB_cursor *mc) m3 = m2; if (m3 == mc || m3->mc_snum < mc->mc_snum) continue; if (m3->mc_pg[0] == mp) { - m3->mc_pg[0] = mc->mc_pg[0]; - m3->mc_snum = 1; - m3->mc_top = 0; - m3->mc_ki[0] = m3->mc_ki[1]; + int i; + m3->mc_snum--; + m3->mc_top--; + for (i=0; imc_snum; i++) { + m3->mc_pg[i] = m3->mc_pg[i+1]; + m3->mc_ki[i] = m3->mc_ki[i+1]; + } } } } @@ -7096,8 +7290,9 @@ mdb_rebalance(MDB_cursor *mc) mc->mc_ki[mc->mc_top] = 0; } - DPRINTF("found neighbor page %"Z"u (%u keys, %.1f%% full)", - mn.mc_pg[mn.mc_top]->mp_pgno, NUMKEYS(mn.mc_pg[mn.mc_top]), (float)PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) / 10); + DPRINTF(("found neighbor page %"Z"u (%u keys, %.1f%% full)", + mn.mc_pg[mn.mc_top]->mp_pgno, NUMKEYS(mn.mc_pg[mn.mc_top]), + (float)PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) / 10)); /* If the neighbor page is above threshold and has enough keys, * move one key from it. Otherwise we should try to merge them. @@ -7109,8 +7304,11 @@ mdb_rebalance(MDB_cursor *mc) else { if (mc->mc_ki[ptop] == 0) rc = mdb_page_merge(&mn, mc); - else + else { + mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1; rc = mdb_page_merge(mc, &mn); + mdb_cursor_copy(&mn, mc); + } mc->mc_flags &= ~(C_INITIALIZED|C_EOF); } return rc; @@ -7123,6 +7321,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf) int rc; MDB_page *mp; indx_t ki; + unsigned int nkeys; mp = mc->mc_pg[mc->mc_top]; ki = mc->mc_ki[mc->mc_top]; @@ -7142,30 +7341,34 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf) rc = mdb_rebalance(mc); if (rc != MDB_SUCCESS) mc->mc_txn->mt_flags |= MDB_TXN_ERROR; - /* if mc points past last node in page, invalidate */ - else if (mc->mc_ki[mc->mc_top] >= NUMKEYS(mc->mc_pg[mc->mc_top])) - mc->mc_flags &= ~(C_INITIALIZED|C_EOF); - - { - /* Adjust other cursors pointing to mp */ + else { MDB_cursor *m2; - unsigned int nkeys; MDB_dbi dbi = mc->mc_dbi; mp = mc->mc_pg[mc->mc_top]; nkeys = NUMKEYS(mp); + + /* if mc points past last node in page, find next sibling */ + if (mc->mc_ki[mc->mc_top] >= nkeys) + mdb_cursor_sibling(mc, 1); + + /* Adjust other cursors pointing to mp */ for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { - if (m2 == mc) + if (m2 == mc || m2->mc_snum < mc->mc_snum) continue; if (!(m2->mc_flags & C_INITIALIZED)) continue; if (m2->mc_pg[mc->mc_top] == mp) { - if (m2->mc_ki[mc->mc_top] > ki) - m2->mc_ki[mc->mc_top]--; + if (m2->mc_ki[mc->mc_top] >= ki) { + m2->mc_flags |= C_DEL; + if (m2->mc_ki[mc->mc_top] > ki) + m2->mc_ki[mc->mc_top]--; + } if (m2->mc_ki[mc->mc_top] >= nkeys) - m2->mc_flags &= ~(C_INITIALIZED|C_EOF); + mdb_cursor_sibling(m2, 1); } } + mc->mc_flags |= C_DEL; } return rc; @@ -7184,16 +7387,15 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, assert(key != NULL); - DPRINTF("====> delete db %u key [%s]", dbi, DKEY(key)); + DPRINTF(("====> delete db %u key [%s]", dbi, DKEY(key))); if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) return EINVAL; - if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { - return EACCES; - } + if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) + return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; - if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + if (key->mv_size > MDB_MAXKEYSIZE) { return MDB_BAD_VALSIZE; } @@ -7246,29 +7448,31 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno unsigned int nflags) { unsigned int flags; - int rc = MDB_SUCCESS, ins_new = 0, new_root = 0, newpos = 1, did_split = 0; + int rc = MDB_SUCCESS, new_root = 0, did_split = 0; indx_t newindx; pgno_t pgno = 0; - unsigned int i, j, split_indx, nkeys, pmax; + int i, j, split_indx, nkeys, pmax; + MDB_env *env = mc->mc_txn->mt_env; MDB_node *node; MDB_val sepkey, rkey, xdata, *rdata = &xdata; - MDB_page *copy; + MDB_page *copy = NULL; MDB_page *mp, *rp, *pp; - unsigned int ptop; + int ptop; MDB_cursor mn; DKBUF; mp = mc->mc_pg[mc->mc_top]; newindx = mc->mc_ki[mc->mc_top]; + nkeys = NUMKEYS(mp); - DPRINTF("-----> splitting %s page %"Z"u and adding [%s] at index %i", + DPRINTF(("-----> splitting %s page %"Z"u and adding [%s] at index %i/%i", IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno, - DKEY(newkey), mc->mc_ki[mc->mc_top]); + DKEY(newkey), mc->mc_ki[mc->mc_top], nkeys)); /* Create a right sibling. */ if ((rc = mdb_page_new(mc, mp->mp_flags, 1, &rp))) return rc; - DPRINTF("new right sibling: page %"Z"u", rp->mp_pgno); + DPRINTF(("new right sibling: page %"Z"u", rp->mp_pgno)); if (mc->mc_snum < 2) { if ((rc = mdb_page_new(mc, P_BRANCH, 1, &pp))) @@ -7279,7 +7483,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno mc->mc_pg[0] = pp; mc->mc_ki[0] = 0; mc->mc_db->md_root = pp->mp_pgno; - DPRINTF("root split! new root = %"Z"u", pp->mp_pgno); + DPRINTF(("root split! new root = %"Z"u", pp->mp_pgno)); mc->mc_db->md_depth++; new_root = 1; @@ -7297,7 +7501,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno ptop = 0; } else { ptop = mc->mc_top-1; - DPRINTF("parent branch page is %"Z"u", mc->mc_pg[ptop]->mp_pgno); + DPRINTF(("parent branch page is %"Z"u", mc->mc_pg[ptop]->mp_pgno)); } mc->mc_flags |= C_SPLITTING; @@ -7310,141 +7514,139 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno sepkey = *newkey; split_indx = newindx; nkeys = 0; - goto newsep; - } + } else { - nkeys = NUMKEYS(mp); - split_indx = nkeys / 2; - if (newindx < split_indx) - newpos = 0; - - if (IS_LEAF2(rp)) { - char *split, *ins; - int x; - unsigned int lsize, rsize, ksize; - /* Move half of the keys to the right sibling */ - copy = NULL; - x = mc->mc_ki[mc->mc_top] - split_indx; - ksize = mc->mc_db->md_pad; - split = LEAF2KEY(mp, split_indx, ksize); - rsize = (nkeys - split_indx) * ksize; - lsize = (nkeys - split_indx) * sizeof(indx_t); - mp->mp_lower -= lsize; - rp->mp_lower += lsize; - mp->mp_upper += rsize - lsize; - rp->mp_upper -= rsize - lsize; - sepkey.mv_size = ksize; - if (newindx == split_indx) { - sepkey.mv_data = newkey->mv_data; - } else { - sepkey.mv_data = split; - } - if (x<0) { - ins = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], ksize); - memcpy(rp->mp_ptrs, split, rsize); - sepkey.mv_data = rp->mp_ptrs; - memmove(ins+ksize, ins, (split_indx - mc->mc_ki[mc->mc_top]) * ksize); - memcpy(ins, newkey->mv_data, ksize); - mp->mp_lower += sizeof(indx_t); - mp->mp_upper -= ksize - sizeof(indx_t); + split_indx = (nkeys+1) / 2; + + if (IS_LEAF2(rp)) { + char *split, *ins; + int x; + unsigned int lsize, rsize, ksize; + /* Move half of the keys to the right sibling */ + copy = NULL; + x = mc->mc_ki[mc->mc_top] - split_indx; + ksize = mc->mc_db->md_pad; + split = LEAF2KEY(mp, split_indx, ksize); + rsize = (nkeys - split_indx) * ksize; + lsize = (nkeys - split_indx) * sizeof(indx_t); + mp->mp_lower -= lsize; + rp->mp_lower += lsize; + mp->mp_upper += rsize - lsize; + rp->mp_upper -= rsize - lsize; + sepkey.mv_size = ksize; + if (newindx == split_indx) { + sepkey.mv_data = newkey->mv_data; + } else { + sepkey.mv_data = split; + } + if (x<0) { + ins = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], ksize); + memcpy(rp->mp_ptrs, split, rsize); + sepkey.mv_data = rp->mp_ptrs; + memmove(ins+ksize, ins, (split_indx - mc->mc_ki[mc->mc_top]) * ksize); + memcpy(ins, newkey->mv_data, ksize); + mp->mp_lower += sizeof(indx_t); + mp->mp_upper -= ksize - sizeof(indx_t); + } else { + if (x) + memcpy(rp->mp_ptrs, split, x * ksize); + ins = LEAF2KEY(rp, x, ksize); + memcpy(ins, newkey->mv_data, ksize); + memcpy(ins+ksize, split + x * ksize, rsize - x * ksize); + rp->mp_lower += sizeof(indx_t); + rp->mp_upper -= ksize - sizeof(indx_t); + mc->mc_ki[mc->mc_top] = x; + mc->mc_pg[mc->mc_top] = rp; + } } else { - if (x) - memcpy(rp->mp_ptrs, split, x * ksize); - ins = LEAF2KEY(rp, x, ksize); - memcpy(ins, newkey->mv_data, ksize); - memcpy(ins+ksize, split + x * ksize, rsize - x * ksize); - rp->mp_lower += sizeof(indx_t); - rp->mp_upper -= ksize - sizeof(indx_t); - mc->mc_ki[mc->mc_top] = x; - mc->mc_pg[mc->mc_top] = rp; - } - goto newsep; - } + int psize, nsize, k; + /* Maximum free space in an empty page */ + pmax = env->me_psize - PAGEHDRSZ; + if (IS_LEAF(mp)) + nsize = mdb_leaf_size(env, newkey, newdata); + else + nsize = mdb_branch_size(env, newkey); + nsize += nsize & 1; - /* For leaf pages, check the split point based on what - * fits where, since otherwise mdb_node_add can fail. - * - * This check is only needed when the data items are - * relatively large, such that being off by one will - * make the difference between success or failure. - * - * It's also relevant if a page happens to be laid out - * such that one half of its nodes are all "small" and - * the other half of its nodes are "large." If the new - * item is also "large" and falls on the half with - * "large" nodes, it also may not fit. - */ - if (IS_LEAF(mp)) { - unsigned int psize, nsize; - /* Maximum free space in an empty page */ - pmax = mc->mc_txn->mt_env->me_psize - PAGEHDRSZ; - nsize = mdb_leaf_size(mc->mc_txn->mt_env, newkey, newdata); - if ((nkeys < 20) || (nsize > pmax/16)) { - if (newindx <= split_indx) { - psize = nsize; - newpos = 0; - for (i=0; imn_flags, F_BIGDATA)) - psize += sizeof(pgno_t); - else - psize += NODEDSZ(node); - psize += psize & 1; - if (psize > pmax) { - if (i <= newindx) { - split_indx = newindx; - if (i < newindx) - newpos = 1; + /* grab a page to hold a temporary copy */ + copy = mdb_page_malloc(mc->mc_txn, 1); + if (copy == NULL) + return ENOMEM; + copy->mp_pgno = mp->mp_pgno; + copy->mp_flags = mp->mp_flags; + copy->mp_lower = PAGEHDRSZ; + copy->mp_upper = env->me_psize; + + /* prepare to insert */ + for (i=0, j=0; imp_ptrs[j++] = 0; + } + copy->mp_ptrs[j++] = mp->mp_ptrs[i]; + } + + /* When items are relatively large the split point needs + * to be checked, because being off-by-one will make the + * difference between success or failure in mdb_node_add. + * + * It's also relevant if a page happens to be laid out + * such that one half of its nodes are all "small" and + * the other half of its nodes are "large." If the new + * item is also "large" and falls on the half with + * "large" nodes, it also may not fit. + * + * As a final tweak, if the new item goes on the last + * spot on the page (and thus, onto the new page), bias + * the split so the new page is emptier than the old page. + * This yields better packing during sequential inserts. + */ + if (nkeys < 20 || nsize > pmax/16 || newindx >= nkeys) { + /* Find split point */ + psize = 0; + if (newindx <= split_indx || newindx >= nkeys) { + i = 0; j = 1; + k = newindx >= nkeys ? nkeys : split_indx+2; + } else { + i = nkeys; j = -1; + k = split_indx-1; + } + for (; i!=k; i+=j) { + if (i == newindx) { + psize += nsize; + node = NULL; + } else { + node = (MDB_node *)((char *)mp + copy->mp_ptrs[i]); + psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t); + if (IS_LEAF(mp)) { + if (F_ISSET(node->mn_flags, F_BIGDATA)) + psize += sizeof(pgno_t); + else + psize += NODEDSZ(node); } - else - split_indx = i; - break; + psize += psize & 1; } - } - } else { - psize = nsize; - for (i=nkeys-1; i>=split_indx; i--) { - node = NODEPTR(mp, i); - psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t); - if (F_ISSET(node->mn_flags, F_BIGDATA)) - psize += sizeof(pgno_t); - else - psize += NODEDSZ(node); - psize += psize & 1; - if (psize > pmax) { - if (i >= newindx) { - split_indx = newindx; - newpos = 0; - } else - split_indx = i+1; + if (psize > pmax || i == k-j) { + split_indx = i + (j<0); break; } } } + if (split_indx == newindx) { + sepkey.mv_size = newkey->mv_size; + sepkey.mv_data = newkey->mv_data; + } else { + node = (MDB_node *)((char *)mp + copy->mp_ptrs[split_indx]); + sepkey.mv_size = node->mn_ksize; + sepkey.mv_data = NODEKEY(node); + } } } - /* First find the separating key between the split pages. - * The case where newindx == split_indx is ambiguous; the - * new item could go to the new page or stay on the original - * page. If newpos == 1 it goes to the new page. - */ - if (newindx == split_indx && newpos) { - sepkey.mv_size = newkey->mv_size; - sepkey.mv_data = newkey->mv_data; - } else { - node = NODEPTR(mp, split_indx); - sepkey.mv_size = node->mn_ksize; - sepkey.mv_data = NODEKEY(node); - } - -newsep: - DPRINTF("separator is [%s]", DKEY(&sepkey)); + DPRINTF(("separator is %d [%s]", split_indx, DKEY(&sepkey))); /* Copy separator key to the parent. */ - if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(mc->mc_txn->mt_env, &sepkey)) { + if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(env, &sepkey)) { mn.mc_snum--; mn.mc_top--; did_split = 1; @@ -7489,117 +7691,97 @@ newsep: return rc; for (i=0; imc_top; i++) mc->mc_ki[i] = mn.mc_ki[i]; - goto done; - } - if (IS_LEAF2(rp)) { - goto done; - } - - /* Move half of the keys to the right sibling. */ + } else if (!IS_LEAF2(mp)) { + /* Move nodes */ + mc->mc_pg[mc->mc_top] = rp; + i = split_indx; + j = 0; + do { + if (i == newindx) { + rkey.mv_data = newkey->mv_data; + rkey.mv_size = newkey->mv_size; + if (IS_LEAF(mp)) { + rdata = newdata; + } else + pgno = newpgno; + flags = nflags; + /* Update index for the new key. */ + mc->mc_ki[mc->mc_top] = j; + } else { + node = (MDB_node *)((char *)mp + copy->mp_ptrs[i]); + rkey.mv_data = NODEKEY(node); + rkey.mv_size = node->mn_ksize; + if (IS_LEAF(mp)) { + xdata.mv_data = NODEDATA(node); + xdata.mv_size = NODEDSZ(node); + rdata = &xdata; + } else + pgno = NODEPGNO(node); + flags = node->mn_flags; + } - /* grab a page to hold a temporary copy */ - copy = mdb_page_malloc(mc->mc_txn, 1); - if (copy == NULL) - return ENOMEM; + if (!IS_LEAF(mp) && j == 0) { + /* First branch index doesn't need key data. */ + rkey.mv_size = 0; + } - copy->mp_pgno = mp->mp_pgno; - copy->mp_flags = mp->mp_flags; - copy->mp_lower = PAGEHDRSZ; - copy->mp_upper = mc->mc_txn->mt_env->me_psize; - mc->mc_pg[mc->mc_top] = copy; - for (i = j = 0; i <= nkeys; j++) { - if (i == split_indx) { - /* Insert in right sibling. */ - /* Reset insert index for right sibling. */ - if (i != newindx || (newpos ^ ins_new)) { + rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags); + if (rc) { + /* return tmp page to freelist */ + mdb_page_free(env, copy); + return rc; + } + if (i == nkeys) { + i = 0; j = 0; - mc->mc_pg[mc->mc_top] = rp; + mc->mc_pg[mc->mc_top] = copy; + } else { + i++; + j++; + } + } while (i != split_indx); + + nkeys = NUMKEYS(copy); + for (i=0; imp_ptrs[i] = copy->mp_ptrs[i]; + mp->mp_lower = copy->mp_lower; + mp->mp_upper = copy->mp_upper; + memcpy(NODEPTR(mp, nkeys-1), NODEPTR(copy, nkeys-1), + env->me_psize - copy->mp_upper); + + /* reset back to original page */ + if (newindx < split_indx) { + mc->mc_pg[mc->mc_top] = mp; + if (nflags & MDB_RESERVE) { + node = NODEPTR(mp, mc->mc_ki[mc->mc_top]); + if (!(node->mn_flags & F_BIGDATA)) + newdata->mv_data = NODEDATA(node); } - } - - if (i == newindx && !ins_new) { - /* Insert the original entry that caused the split. */ - rkey.mv_data = newkey->mv_data; - rkey.mv_size = newkey->mv_size; - if (IS_LEAF(mp)) { - rdata = newdata; - } else - pgno = newpgno; - flags = nflags; - - ins_new = 1; - - /* Update index for the new key. */ - mc->mc_ki[mc->mc_top] = j; - } else if (i == nkeys) { - break; } else { - node = NODEPTR(mp, i); - rkey.mv_data = NODEKEY(node); - rkey.mv_size = node->mn_ksize; - if (IS_LEAF(mp)) { - xdata.mv_data = NODEDATA(node); - xdata.mv_size = NODEDSZ(node); - rdata = &xdata; - } else - pgno = NODEPGNO(node); - flags = node->mn_flags; - - i++; - } - - if (!IS_LEAF(mp) && j == 0) { - /* First branch index doesn't need key data. */ - rkey.mv_size = 0; - } - - rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags); - if (rc) break; - } - - nkeys = NUMKEYS(copy); - for (i=0; imp_ptrs[i] = copy->mp_ptrs[i]; - mp->mp_lower = copy->mp_lower; - mp->mp_upper = copy->mp_upper; - memcpy(NODEPTR(mp, nkeys-1), NODEPTR(copy, nkeys-1), - mc->mc_txn->mt_env->me_psize - copy->mp_upper); - - /* reset back to original page */ - if (newindx < split_indx || (!newpos && newindx == split_indx)) { - mc->mc_pg[mc->mc_top] = mp; - if (nflags & MDB_RESERVE) { - node = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - if (!(node->mn_flags & F_BIGDATA)) - newdata->mv_data = NODEDATA(node); - } - } else { - mc->mc_ki[ptop]++; - /* Make sure mc_ki is still valid. - */ - if (mn.mc_pg[ptop] != mc->mc_pg[ptop] && - mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) { - for (i=0; imc_pg[i] = mn.mc_pg[i]; - mc->mc_ki[i] = mn.mc_ki[i]; + mc->mc_pg[mc->mc_top] = rp; + mc->mc_ki[ptop]++; + /* Make sure mc_ki is still valid. + */ + if (mn.mc_pg[ptop] != mc->mc_pg[ptop] && + mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) { + for (i=0; imc_pg[i] = mn.mc_pg[i]; + mc->mc_ki[i] = mn.mc_ki[i]; + } + mc->mc_pg[ptop] = mn.mc_pg[ptop]; + mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1; } - mc->mc_pg[ptop] = mn.mc_pg[ptop]; - mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1; } + /* return tmp page to freelist */ + mdb_page_free(env, copy); } - /* return tmp page to freelist */ - mdb_page_free(mc->mc_txn->mt_env, copy); -done: { /* Adjust other cursors pointing to mp */ MDB_cursor *m2, *m3; MDB_dbi dbi = mc->mc_dbi; int fixup = NUMKEYS(mp); - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7627,7 +7809,7 @@ done: m3->mc_snum++; m3->mc_top++; } - if (m3->mc_pg[mc->mc_top] == mp) { + if (m3->mc_top >= mc->mc_top && m3->mc_pg[mc->mc_top] == mp) { if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDB_SPLIT_REPLACE)) m3->mc_ki[mc->mc_top]++; if (m3->mc_ki[mc->mc_top] >= fixup) { @@ -7635,12 +7817,13 @@ done: m3->mc_ki[mc->mc_top] -= fixup; m3->mc_ki[ptop] = mn.mc_ki[ptop]; } - } else if (!did_split && m3->mc_pg[ptop] == mc->mc_pg[ptop] && + } else if (!did_split && m3->mc_top >= ptop && m3->mc_pg[ptop] == mc->mc_pg[ptop] && m3->mc_ki[ptop] >= mc->mc_ki[ptop]) { m3->mc_ki[ptop]++; } } } + DPRINTF(("mp left: %d, rp left: %d", SIZELEFT(mp), SIZELEFT(rp))); return rc; } @@ -7657,14 +7840,6 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) return EINVAL; - if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { - return EACCES; - } - - if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { - return MDB_BAD_VALSIZE; - } - if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP)) != flags) return EINVAL; @@ -7704,6 +7879,16 @@ mdb_env_get_path(MDB_env *env, const char **arg) return MDB_SUCCESS; } +int +mdb_env_get_fd(MDB_env *env, mdb_filehandle_t *arg) +{ + if (!env || !arg) + return EINVAL; + + *arg = env->me_fd; + return MDB_SUCCESS; +} + /** Common code for #mdb_stat() and #mdb_env_stat(). * @param[in] env the environment to operate in. * @param[in] db the #MDB_db record containing the stats to return. @@ -7796,6 +7981,8 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db if ((flags & VALID_FLAGS) != flags) return EINVAL; + if (txn->mt_flags & MDB_TXN_ERROR) + return MDB_BAD_TXN; /* main DB? */ if (!name) { @@ -7850,7 +8037,7 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db /* make sure this is actually a DB */ MDB_node *node = NODEPTR(mc.mc_pg[mc.mc_top], mc.mc_ki[mc.mc_top]); if (!(node->mn_flags & F_SUBDATA)) - return EINVAL; + return MDB_INCOMPATIBLE; } else if (rc == MDB_NOTFOUND && (flags & MDB_CREATE)) { /* Create if requested */ MDB_db dummy; @@ -7872,7 +8059,6 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db txn->mt_dbflags[slot] = dbflag; memcpy(&txn->mt_dbs[slot], data.mv_data, sizeof(MDB_db)); *dbi = slot; - txn->mt_env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags; mdb_default_cmp(txn, slot); if (!unused) { txn->mt_numdbs++; @@ -7908,12 +8094,12 @@ void mdb_dbi_close(MDB_env *env, MDB_dbi dbi) free(ptr); } -int mdb_dbi_flags(MDB_env *env, MDB_dbi dbi, unsigned int *flags) +int mdb_dbi_flags(MDB_txn *txn, MDB_dbi dbi, unsigned int *flags) { /* We could return the flags for the FREE_DBI too but what's the point? */ - if (dbi < MAIN_DBI || dbi >= env->me_numdbs) + if (txn == NULL || dbi < MAIN_DBI || dbi >= txn->mt_numdbs) return EINVAL; - *flags = env->me_dbflags[dbi]; + *flags = txn->mt_dbs[dbi].md_flags & PERSISTENT_FLAGS; return MDB_SUCCESS; } @@ -7927,7 +8113,7 @@ mdb_drop0(MDB_cursor *mc, int subs) { int rc; - rc = mdb_page_search(mc, NULL, 0); + rc = mdb_page_search(mc, NULL, MDB_PS_FIRST); if (rc == MDB_SUCCESS) { MDB_txn *txn = mc->mc_txn; MDB_node *ni; @@ -8125,10 +8311,10 @@ int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx) return 0; } -/* insert pid into list if not already present. +/** Insert pid into list if not already present. * return -1 if already present. */ -static int mdb_pid_insert(pid_t *ids, pid_t pid) +static int mdb_pid_insert(MDB_PID_T *ids, MDB_PID_T pid) { /* binary search of pid in list */ unsigned base = 0; @@ -8153,7 +8339,7 @@ static int mdb_pid_insert(pid_t *ids, pid_t pid) return -1; } } - + if( val > 0 ) { ++cursor; } @@ -8168,7 +8354,7 @@ int mdb_reader_check(MDB_env *env, int *dead) { unsigned int i, j, rdrs; MDB_reader *mr; - pid_t *pids, pid; + MDB_PID_T *pids, pid; int count = 0; if (!env) @@ -8178,7 +8364,7 @@ int mdb_reader_check(MDB_env *env, int *dead) if (!env->me_txns) return MDB_SUCCESS; rdrs = env->me_txns->mti_numreaders; - pids = malloc((rdrs+1) * sizeof(pid_t)); + pids = malloc((rdrs+1) * sizeof(MDB_PID_T)); if (!pids) return ENOMEM; pids[0] = 0; @@ -8194,6 +8380,8 @@ int mdb_reader_check(MDB_env *env, int *dead) if (!mdb_reader_pid(env, Pidcheck, pid)) { for (j=i; j