X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=libraries%2Fliblmdb%2Fmdb.c;h=7a8ca3fde60a35eed55bf31e35f15a12c27678e3;hb=550bbe3788fafdf0fddcb2130173c1349fbc477f;hp=ceda38510b32b780700981985270a522bd979431;hpb=6ecee1cbf9c6cad8665e2e3233e5a581f16a402b;p=openldap diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index ceda38510b..7a8ca3fde6 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -75,6 +75,7 @@ #ifndef _WIN32 #include #ifdef MDB_USE_POSIX_SEM +# define MDB_USE_HASH 1 #include #endif #endif @@ -140,6 +141,7 @@ * @{ */ #ifdef _WIN32 +#define MDB_USE_HASH 1 #define MDB_PIDLOCK 0 #define pthread_t DWORD #define pthread_mutex_t HANDLE @@ -171,7 +173,7 @@ #define Z "I" #else -#define Z "z" +#define Z "z" /**< printf format modifier for size_t */ /** For MDB_LOCK_FORMAT: True if readers take a pid lock in the lockfile */ #define MDB_PIDLOCK 1 @@ -317,6 +319,9 @@ static txnid_t mdb_debug_start; * The string is printed literally, with no format processing. */ #define DPUTS(arg) DPRINTF(("%s", arg)) + /** Debuging output value of a cursor DBI: Negative in a sub-cursor. */ +#define DDBI(mc) \ + (((mc)->mc_flags & C_SUB) ? -(int)(mc)->mc_dbi : (int)(mc)->mc_dbi) /** @} */ /** A default memory page size. @@ -392,7 +397,7 @@ static txnid_t mdb_debug_start; */ #define DKEY(x) mdb_dkey(x, kbuf) #else -#define DKBUF typedef int dummy_kbuf /* so we can put ';' after */ +#define DKBUF #define DKEY(x) 0 #endif @@ -425,7 +430,8 @@ typedef uint16_t indx_t; * * If #MDB_NOTLS is set, the slot address is not saved in thread-specific data. * - * No reader table is used if the database is on a read-only filesystem. + * No reader table is used if the database is on a read-only filesystem, or + * if #MDB_NOLOCK is set. * * Since the database uses multi-version concurrency control, readers don't * actually need any locking. This table is used to keep track of which @@ -600,7 +606,7 @@ typedef struct MDB_page { #define P_LEAF 0x02 /**< leaf page */ #define P_OVERFLOW 0x04 /**< overflow page */ #define P_META 0x08 /**< meta page */ -#define P_DIRTY 0x10 /**< dirty page */ +#define P_DIRTY 0x10 /**< dirty page, also set for #P_SUBP pages */ #define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */ #define P_SUBP 0x40 /**< for #MDB_DUPSORT sub-pages */ #define P_KEEP 0x8000 /**< leave this page alone during spill */ @@ -756,9 +762,12 @@ typedef struct MDB_node { */ #define LEAF2KEY(p, i, ks) ((char *)(p) + PAGEHDRSZ + ((i)*(ks))) - /** Set the \b node's key into \b key, if requested. */ -#define MDB_GET_KEY(node, key) { if ((key) != NULL) { \ - (key)->mv_size = NODEKSZ(node); (key)->mv_data = NODEKEY(node); } } + /** Set the \b node's key into \b keyptr, if requested. */ +#define MDB_GET_KEY(node, keyptr) { if ((keyptr) != NULL) { \ + (keyptr)->mv_size = NODEKSZ(node); (keyptr)->mv_data = NODEKEY(node); } } + + /** Set the \b node's key into \b key. */ +#define MDB_GET_KEY2(node, key) { key.mv_size = NODEKSZ(node); key.mv_data = NODEKEY(node); } /** Information about a single database in the environment. */ typedef struct MDB_db { @@ -783,7 +792,10 @@ typedef struct MDB_db { /** Handle for the default DB. */ #define MAIN_DBI 1 - /** Meta page content. */ + /** Meta page content. + * A meta page is the start point for accessing a database snapshot. + * Pages 0-1 are meta pages. Transaction N writes meta page #(N % 2). + */ typedef struct MDB_meta { /** Stamp identifying this as an MDB file. It must be set * to #MDB_MAGIC. */ @@ -844,7 +856,8 @@ struct MDB_txn { */ MDB_IDL mt_free_pgs; /** The sorted list of dirty pages we temporarily wrote to disk - * because the dirty list was full. + * because the dirty list was full. page numbers in here are + * shifted left by 1, deleted slots have the LSB set. */ MDB_IDL mt_spill_pgs; union { @@ -861,9 +874,9 @@ struct MDB_txn { * @ingroup internal * @{ */ -#define DB_DIRTY 0x01 /**< DB was written in this txn */ -#define DB_STALE 0x02 /**< DB record is older than txnID */ -#define DB_NEW 0x04 /**< DB handle opened in this txn */ +#define DB_DIRTY 0x01 /**< DB was modified or is DUPSORT data */ +#define DB_STALE 0x02 /**< Named-DB record is older than txnID */ +#define DB_NEW 0x04 /**< Named-DB handle opened in this txn */ #define DB_VALID 0x08 /**< DB handle is valid, see also #MDB_VALID */ /** @} */ /** In write txns, array of cursors for each DB */ @@ -885,12 +898,12 @@ struct MDB_txn { #define MDB_TXN_SPILLS 0x08 /**< txn or a parent has spilled pages */ /** @} */ unsigned int mt_flags; /**< @ref mdb_txn */ - /** dirty_list maxsize - # of allocated pages allowed, including in parent txns */ - unsigned int mt_dirty_room; - /** Tracks which of the two meta pages was used at the start - * of this transaction. + /** dirty_list room: Array size - #dirty pages visible to this txn. + * Includes ancestor txns' dirty pages not hidden by other txns' + * dirty/spilled pages. Thus commit(nested txn) has room to merge + * dirty_list into mt_parent after freeing hidden mt_parent pages. */ - unsigned int mt_toggle; + unsigned int mt_dirty_room; }; /** Enough space for 2^32 nodes with minimum of 2 keys per node. I.e., plenty. @@ -901,7 +914,14 @@ struct MDB_txn { struct MDB_xcursor; - /** Cursors are used for all DB operations */ + /** Cursors are used for all DB operations. + * A cursor holds a path of (page pointer, key index) from the DB + * root to a position in the DB, plus other state. #MDB_DUPSORT + * cursors include an xcursor to the current data item. Write txns + * track their cursors and keep them up to date when data moves. + * Exception: An xcursor's pointer to a #P_SUBP page can be stale. + * (A node with #F_DUPDATA but no #F_SUBDATA contains a subpage). + */ struct MDB_cursor { /** Next cursor on this DB in this txn */ MDB_cursor *mc_next; @@ -929,6 +949,7 @@ struct MDB_cursor { #define C_INITIALIZED 0x01 /**< cursor has been initialized and is valid */ #define C_EOF 0x02 /**< No more data */ #define C_SUB 0x04 /**< Cursor is a sub-cursor */ +#define C_DEL 0x08 /**< last op was a cursor_del */ #define C_SPLITTING 0x20 /**< Cursor is in page_split */ #define C_UNTRACK 0x40 /**< Un-track cursor when closing */ /** @} */ @@ -1014,8 +1035,8 @@ struct MDB_env { /** Nested transaction */ typedef struct MDB_ntxn { - MDB_txn mnt_txn; /* the transaction */ - MDB_pgstate mnt_pgstate; /* parent transaction's saved freestate */ + MDB_txn mnt_txn; /**< the transaction */ + MDB_pgstate mnt_pgstate; /**< parent transaction's saved freestate */ } MDB_ntxn; /** max number of pages to commit in one writev() call */ @@ -1037,6 +1058,8 @@ static int mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify); #define MDB_PS_MODIFY 1 #define MDB_PS_ROOTONLY 2 +#define MDB_PS_FIRST 4 +#define MDB_PS_LAST 8 static int mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags); static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst); @@ -1250,7 +1273,7 @@ static void mdb_audit(MDB_txn *txn) txn->mt_dbs[i].md_leaf_pages + txn->mt_dbs[i].md_overflow_pages; if (txn->mt_dbs[i].md_flags & MDB_DUPSORT) { - mdb_page_search(&mc, NULL, 0); + mdb_page_search(&mc, NULL, MDB_PS_FIRST); do { unsigned j; MDB_page *mp; @@ -1324,7 +1347,7 @@ mdb_page_free(MDB_env *env, MDB_page *mp) env->me_dpages = mp; } -/* Free a dirty page */ +/** Free a dirty page */ static void mdb_dpage_free(MDB_env *env, MDB_page *dp) { @@ -1351,52 +1374,83 @@ mdb_dlist_free(MDB_txn *txn) dl[0].mid = 0; } -/* Set or clear P_KEEP in non-overflow, non-sub pages in this txn's cursors. +/** Set or clear P_KEEP in dirty, non-overflow, non-sub pages watched by txn. * @param[in] mc A cursor handle for the current operation. * @param[in] pflags Flags of the pages to update: * P_DIRTY to set P_KEEP, P_DIRTY|P_KEEP to clear it. + * @param[in] all No shortcuts. Needed except after a full #mdb_page_flush(). + * @return 0 on success, non-zero on failure. */ -static void -mdb_cursorpages_mark(MDB_cursor *mc, unsigned pflags) +static int +mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all) { + enum { Mask = P_SUBP|P_DIRTY|P_KEEP }; MDB_txn *txn = mc->mc_txn; MDB_cursor *m3; MDB_xcursor *mx; + MDB_page *dp, *mp; + MDB_node *leaf; unsigned i, j; + int rc = MDB_SUCCESS, level; + /* Mark pages seen by cursors */ if (mc->mc_flags & C_UNTRACK) mc = NULL; /* will find mc in mt_cursors */ for (i = txn->mt_numdbs;; mc = txn->mt_cursors[--i]) { for (; mc; mc=mc->mc_next) { - for (m3 = mc; m3->mc_flags & C_INITIALIZED; m3 = &mx->mx_cursor) { - for (j=0; jmc_snum; j++) - if ((m3->mc_pg[j]->mp_flags & (P_SUBP|P_DIRTY|P_KEEP)) - == pflags) - m3->mc_pg[j]->mp_flags ^= P_KEEP; - mx = m3->mc_xcursor; - if (mx == NULL) - break; + if (!(mc->mc_flags & C_INITIALIZED)) + continue; + for (m3 = mc;; m3 = &mx->mx_cursor) { + mp = NULL; + for (j=0; jmc_snum; j++) { + mp = m3->mc_pg[j]; + if ((mp->mp_flags & Mask) == pflags) + mp->mp_flags ^= P_KEEP; + } + mx = m3->mc_xcursor; + /* Proceed to mx if it is at a sub-database */ + if (! (mx && (mx->mx_cursor.mc_flags & C_INITIALIZED))) + break; + if (! (mp && (mp->mp_flags & P_LEAF))) + break; + leaf = NODEPTR(mp, m3->mc_ki[j-1]); + if (!(leaf->mn_flags & F_SUBDATA)) + break; } } if (i == 0) break; } + + if (all) { + /* Mark dirty root pages */ + for (i=0; imt_numdbs; i++) { + if (txn->mt_dbflags[i] & DB_DIRTY) { + pgno_t pgno = txn->mt_dbs[i].md_root; + if (pgno == P_INVALID) + continue; + if ((rc = mdb_page_get(txn, pgno, &dp, &level)) != MDB_SUCCESS) + break; + if ((dp->mp_flags & Mask) == pflags && level <= 1) + dp->mp_flags ^= P_KEEP; + } + } + } + + return rc; } -static int mdb_page_flush(MDB_txn *txn); +static int mdb_page_flush(MDB_txn *txn, int keep); /** Spill pages from the dirty list back to disk. * This is intended to prevent running into #MDB_TXN_FULL situations, * but note that they may still occur in a few cases: - * 1) pages in #MDB_DUPSORT sub-DBs are never spilled, so if there - * are too many of these dirtied in one txn, the txn may still get - * too full. + * 1) our estimate of the txn size could be too small. Currently this + * seems unlikely, except with a large number of #MDB_MULTIPLE items. * 2) child txns may run out of space if their parents dirtied a * lot of pages and never spilled them. TODO: we probably should do * a preemptive spill during #mdb_txn_begin() of a child txn, if * the parent's dirty_room is below a given threshold. - * 3) our estimate of the txn size could be too small. At the - * moment this seems unlikely. * * Otherwise, if not using nested txns, it is expected that apps will * not run into #MDB_TXN_FULL any more. The pages are flushed to disk @@ -1426,8 +1480,8 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) MDB_txn *txn = m0->mc_txn; MDB_page *dp; MDB_ID2L dl = txn->mt_u.dirty_list; - unsigned int i, j; - int rc, level; + unsigned int i, j, need; + int rc; if (m0->mc_flags & C_SUB) return MDB_SUCCESS; @@ -1441,6 +1495,7 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) if (key) i += (LEAFSIZE(key, data) + txn->mt_env->me_psize) / txn->mt_env->me_psize; i += i; /* double it for good measure */ + need = i; if (txn->mt_dirty_room > i) return MDB_SUCCESS; @@ -1449,26 +1504,36 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) txn->mt_spill_pgs = mdb_midl_alloc(MDB_IDL_UM_MAX); if (!txn->mt_spill_pgs) return ENOMEM; - } - - /* Mark all the dirty root pages we want to preserve */ - for (i=0; imt_numdbs; i++) { - if (txn->mt_dbflags[i] & DB_DIRTY) { - pgno_t pgno = txn->mt_dbs[i].md_root; - if (pgno == P_INVALID) - continue; - if ((rc = mdb_page_get(txn, pgno, &dp, &level)) != MDB_SUCCESS) - goto done; - if ((dp->mp_flags & P_DIRTY) && level <= 1) - dp->mp_flags |= P_KEEP; + } else { + /* purge deleted slots */ + MDB_IDL sl = txn->mt_spill_pgs; + unsigned int num = sl[0]; + j=0; + for (i=1; i<=num; i++) { + if (!(sl[i] & 1)) + sl[++j] = sl[i]; } + sl[0] = j; } - /* Preserve pages used by cursors */ - mdb_cursorpages_mark(m0, P_DIRTY); + /* Preserve pages which may soon be dirtied again */ + if ((rc = mdb_pages_xkeep(m0, P_DIRTY, 1)) != MDB_SUCCESS) + goto done; + + /* Less aggressive spill - we originally spilled the entire dirty list, + * with a few exceptions for cursor pages and DB root pages. But this + * turns out to be a lot of wasted effort because in a large txn many + * of those pages will need to be used again. So now we spill only 1/8th + * of the dirty pages. Testing revealed this to be a good tradeoff, + * better than 1/2, 1/4, or 1/10. + */ + if (need < MDB_IDL_UM_MAX / 8) + need = MDB_IDL_UM_MAX / 8; /* Save the page IDs of all the pages we're flushing */ - for (i=1; i<=dl[0].mid; i++) { + /* flush from the tail forward, this saves a lot of shifting later on. */ + for (i=dl[0].mid; i && need; i--) { + MDB_ID pn = dl[i].mid << 1; dp = dl[i].mptr; if (dp->mp_flags & P_KEEP) continue; @@ -1479,8 +1544,8 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) MDB_txn *tx2; for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { if (tx2->mt_spill_pgs) { - j = mdb_midl_search(tx2->mt_spill_pgs, dl[i].mid); - if (j <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[j] == dl[i].mid) { + j = mdb_midl_search(tx2->mt_spill_pgs, pn); + if (j <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[j] == pn) { dp->mp_flags |= P_KEEP; break; } @@ -1489,41 +1554,21 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) if (tx2) continue; } - if ((rc = mdb_midl_append(&txn->mt_spill_pgs, dl[i].mid))) + if ((rc = mdb_midl_append(&txn->mt_spill_pgs, pn))) goto done; + need--; } mdb_midl_sort(txn->mt_spill_pgs); - rc = mdb_page_flush(txn); + /* Flush the spilled part of dirty list */ + if ((rc = mdb_page_flush(txn, i)) != MDB_SUCCESS) + goto done; - mdb_cursorpages_mark(m0, P_DIRTY|P_KEEP); + /* Reset any dirty pages we kept that page_flush didn't see */ + rc = mdb_pages_xkeep(m0, P_DIRTY|P_KEEP, i); done: - if (rc == 0) { - if (txn->mt_parent) { - MDB_txn *tx2; - pgno_t pgno = dl[i].mid; - txn->mt_dirty_room = txn->mt_parent->mt_dirty_room - dl[0].mid; - /* dirty pages that are dirty in an ancestor don't - * count against this txn's dirty_room. - */ - for (i=1; i<=dl[0].mid; i++) { - for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { - j = mdb_mid2l_search(tx2->mt_u.dirty_list, pgno); - if (j <= tx2->mt_u.dirty_list[0].mid && - tx2->mt_u.dirty_list[j].mid == pgno) { - txn->mt_dirty_room++; - break; - } - } - } - } else { - txn->mt_dirty_room = MDB_IDL_UM_MAX - dl[0].mid; - } - txn->mt_flags |= MDB_TXN_SPILLS; - } else { - txn->mt_flags |= MDB_TXN_ERROR; - } + txn->mt_flags |= rc ? MDB_TXN_ERROR : MDB_TXN_SPILLS; return rc; } @@ -1533,12 +1578,14 @@ mdb_find_oldest(MDB_txn *txn) { int i; txnid_t mr, oldest = txn->mt_txnid - 1; - MDB_reader *r = txn->mt_env->me_txns->mti_readers; - for (i = txn->mt_env->me_txns->mti_numreaders; --i >= 0; ) { - if (r[i].mr_pid) { - mr = r[i].mr_txnid; - if (oldest > mr) - oldest = mr; + if (txn->mt_env->me_txns) { + MDB_reader *r = txn->mt_env->me_txns->mti_readers; + for (i = txn->mt_env->me_txns->mti_numreaders; --i >= 0; ) { + if (r[i].mr_pid) { + mr = r[i].mr_txnid; + if (oldest > mr) + oldest = mr; + } } } return oldest; @@ -1748,26 +1795,28 @@ mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize) /** Pull a page off the txn's spill list, if present. * If a page being referenced was spilled to disk in this txn, bring * it back and make it dirty/writable again. - * @param[in] tx0 the transaction handle. + * @param[in] txn the transaction handle. * @param[in] mp the page being referenced. * @param[out] ret the writable page, if any. ret is unchanged if * mp wasn't spilled. */ static int -mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret) +mdb_page_unspill(MDB_txn *txn, MDB_page *mp, MDB_page **ret) { - MDB_env *env = tx0->mt_env; - MDB_txn *txn; + MDB_env *env = txn->mt_env; + const MDB_txn *tx2; unsigned x; - pgno_t pgno = mp->mp_pgno; + pgno_t pgno = mp->mp_pgno, pn = pgno << 1; - for (txn = tx0; txn; txn=txn->mt_parent) { - if (!txn->mt_spill_pgs) + for (tx2 = txn; tx2; tx2=tx2->mt_parent) { + if (!tx2->mt_spill_pgs) continue; - x = mdb_midl_search(txn->mt_spill_pgs, pgno); - if (x <= txn->mt_spill_pgs[0] && txn->mt_spill_pgs[x] == pgno) { + x = mdb_midl_search(tx2->mt_spill_pgs, pn); + if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) { MDB_page *np; int num; + if (txn->mt_dirty_room == 0) + return MDB_TXN_FULL; if (IS_OVERFLOW(mp)) num = mp->mp_pages; else @@ -1783,31 +1832,20 @@ mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret) else mdb_page_copy(np, mp, env->me_psize); } - if (txn == tx0) { - /* If in current txn, this page is no longer spilled */ - for (; x < txn->mt_spill_pgs[0]; x++) - txn->mt_spill_pgs[x] = txn->mt_spill_pgs[x+1]; - txn->mt_spill_pgs[0]--; + if (tx2 == txn) { + /* If in current txn, this page is no longer spilled. + * If it happens to be the last page, truncate the spill list. + * Otherwise mark it as deleted by setting the LSB. + */ + if (x == txn->mt_spill_pgs[0]) + txn->mt_spill_pgs[0]--; + else + txn->mt_spill_pgs[x] |= 1; } /* otherwise, if belonging to a parent txn, the * page remains spilled until child commits */ - if (txn->mt_parent) { - MDB_txn *tx2; - /* If this page is also in a parent's dirty list, then - * it's already accounted in dirty_room, and we need to - * cancel out the decrement that mdb_page_dirty does. - */ - for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { - x = mdb_mid2l_search(tx2->mt_u.dirty_list, pgno); - if (x <= tx2->mt_u.dirty_list[0].mid && - tx2->mt_u.dirty_list[x].mid == pgno) { - txn->mt_dirty_room++; - break; - } - } - } - mdb_page_dirty(tx0, np); + mdb_page_dirty(txn, np); np->mp_flags |= P_DIRTY; *ret = np; break; @@ -1826,7 +1864,6 @@ mdb_page_touch(MDB_cursor *mc) MDB_page *mp = mc->mc_pg[mc->mc_top], *np; MDB_txn *txn = mc->mc_txn; MDB_cursor *m2, *m3; - MDB_dbi dbi; pgno_t pgno; int rc; @@ -1843,7 +1880,8 @@ mdb_page_touch(MDB_cursor *mc) (rc = mdb_page_alloc(mc, 1, &np))) return rc; pgno = np->mp_pgno; - DPRINTF(("touched db %u page %"Z"u -> %"Z"u", mc->mc_dbi,mp->mp_pgno,pgno)); + DPRINTF(("touched db %d page %"Z"u -> %"Z"u", DDBI(mc), + mp->mp_pgno, pgno)); assert(mp->mp_pgno != pgno); mdb_midl_xappend(txn->mt_free_pgs, mp->mp_pgno); /* Update the parent page, if any, to point to the new page */ @@ -1889,17 +1927,16 @@ mdb_page_touch(MDB_cursor *mc) done: /* Adjust cursors pointing to mp */ mc->mc_pg[mc->mc_top] = np; - dbi = mc->mc_dbi; + m2 = txn->mt_cursors[mc->mc_dbi]; if (mc->mc_flags & C_SUB) { - dbi--; - for (m2 = txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { + for (; m2; m2=m2->mc_next) { m3 = &m2->mc_xcursor->mx_cursor; if (m3->mc_snum < mc->mc_snum) continue; if (m3->mc_pg[mc->mc_top] == mp) m3->mc_pg[mc->mc_top] = np; } } else { - for (m2 = txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { + for (; m2; m2=m2->mc_next) { if (m2->mc_snum < mc->mc_snum) continue; if (m2->mc_pg[mc->mc_top] == mp) { m2->mc_pg[mc->mc_top] = np; @@ -2084,7 +2121,9 @@ static int mdb_txn_renew0(MDB_txn *txn) { MDB_env *env = txn->mt_env; - unsigned int i; + MDB_txninfo *ti = env->me_txns; + MDB_meta *meta; + unsigned int i, nr; uint16_t x; int rc, new_notls = 0; @@ -2093,9 +2132,9 @@ mdb_txn_renew0(MDB_txn *txn) txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */ if (txn->mt_flags & MDB_TXN_RDONLY) { - if (!env->me_txns) { - i = mdb_env_pick_meta(env); - txn->mt_txnid = env->me_metas[i]->mm_txnid; + if (!ti) { + meta = env->me_metas[ mdb_env_pick_meta(env) ]; + txn->mt_txnid = meta->mm_txnid; txn->mt_u.reader = NULL; } else { MDB_reader *r = (env->me_flags & MDB_NOTLS) ? txn->mt_u.reader : @@ -2117,36 +2156,43 @@ mdb_txn_renew0(MDB_txn *txn) } LOCK_MUTEX_R(env); - for (i=0; ime_txns->mti_numreaders; i++) - if (env->me_txns->mti_readers[i].mr_pid == 0) + nr = ti->mti_numreaders; + for (i=0; imti_readers[i].mr_pid == 0) break; if (i == env->me_maxreaders) { UNLOCK_MUTEX_R(env); return MDB_READERS_FULL; } - env->me_txns->mti_readers[i].mr_pid = pid; - env->me_txns->mti_readers[i].mr_tid = tid; - if (i >= env->me_txns->mti_numreaders) - env->me_txns->mti_numreaders = i+1; + ti->mti_readers[i].mr_pid = pid; + ti->mti_readers[i].mr_tid = tid; + if (i == nr) + ti->mti_numreaders = ++nr; /* Save numreaders for un-mutexed mdb_env_close() */ - env->me_numreaders = env->me_txns->mti_numreaders; + env->me_numreaders = nr; UNLOCK_MUTEX_R(env); - r = &env->me_txns->mti_readers[i]; + + r = &ti->mti_readers[i]; new_notls = (env->me_flags & MDB_NOTLS); if (!new_notls && (rc=pthread_setspecific(env->me_txkey, r))) { r->mr_pid = 0; return rc; } } - txn->mt_txnid = r->mr_txnid = env->me_txns->mti_txnid; + txn->mt_txnid = r->mr_txnid = ti->mti_txnid; txn->mt_u.reader = r; + meta = env->me_metas[txn->mt_txnid & 1]; } - txn->mt_toggle = txn->mt_txnid & 1; } else { - LOCK_MUTEX_W(env); + if (ti) { + LOCK_MUTEX_W(env); - txn->mt_txnid = env->me_txns->mti_txnid; - txn->mt_toggle = txn->mt_txnid & 1; + txn->mt_txnid = ti->mti_txnid; + meta = env->me_metas[txn->mt_txnid & 1]; + } else { + meta = env->me_metas[ mdb_env_pick_meta(env) ]; + txn->mt_txnid = meta->mm_txnid; + } txn->mt_txnid++; #if MDB_DEBUG if (txn->mt_txnid == mdb_debug_start) @@ -2162,10 +2208,10 @@ mdb_txn_renew0(MDB_txn *txn) } /* Copy the DB info and flags */ - memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db)); + memcpy(txn->mt_dbs, meta->mm_dbs, 2 * sizeof(MDB_db)); /* Moved to here to avoid a data race in read TXNs */ - txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1; + txn->mt_next_pgno = meta->mm_last_pg+1; for (i=2; imt_numdbs; i++) { x = env->me_dbflags[i]; @@ -2261,7 +2307,6 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) return ENOMEM; } txn->mt_txnid = parent->mt_txnid; - txn->mt_toggle = parent->mt_toggle; txn->mt_dirty_room = parent->mt_dirty_room; txn->mt_u.dirty_list[0].mid = 0; txn->mt_spill_pgs = NULL; @@ -2387,7 +2432,8 @@ mdb_txn_reset0(MDB_txn *txn, const char *act) env->me_txn = NULL; /* The writer mutex was locked in mdb_txn_begin. */ - UNLOCK_MUTEX_W(env); + if (env->me_txns) + UNLOCK_MUTEX_W(env); } } @@ -2442,7 +2488,7 @@ mdb_freelist_save(MDB_txn *txn) if (env->me_pghead) { /* Make sure first page of freeDB is touched and on freelist */ - rc = mdb_page_search(&mc, NULL, MDB_PS_MODIFY); + rc = mdb_page_search(&mc, NULL, MDB_PS_FIRST|MDB_PS_MODIFY); if (rc && rc != MDB_NOTFOUND) return rc; } @@ -2470,9 +2516,7 @@ mdb_freelist_save(MDB_txn *txn) if (freecnt < txn->mt_free_pgs[0]) { if (!freecnt) { /* Make sure last page of freeDB is touched and on freelist */ - key.mv_size = MDB_MAXKEYSIZE+1; - key.mv_data = NULL; - rc = mdb_page_search(&mc, &key, MDB_PS_MODIFY); + rc = mdb_page_search(&mc, NULL, MDB_PS_LAST|MDB_PS_MODIFY); if (rc && rc != MDB_NOTFOUND) return rc; } @@ -2539,7 +2583,7 @@ mdb_freelist_save(MDB_txn *txn) total_room += head_room; } - /* Fill in the reserved, touched me_pghead records */ + /* Fill in the reserved me_pghead records */ rc = MDB_SUCCESS; if (mop_len) { MDB_val key, data; @@ -2571,10 +2615,13 @@ mdb_freelist_save(MDB_txn *txn) return rc; } -/** Flush dirty pages to the map, after clearing their dirty flag. +/** Flush (some) dirty pages to the map, after clearing their dirty flag. + * @param[in] txn the transaction that's being committed + * @param[in] keep number of initial pages in dirty_list to keep dirty. + * @return 0 on success, non-zero on failure. */ static int -mdb_page_flush(MDB_txn *txn) +mdb_page_flush(MDB_txn *txn, int keep) { MDB_env *env = txn->mt_env; MDB_ID2L dl = txn->mt_u.dirty_list; @@ -2592,10 +2639,11 @@ mdb_page_flush(MDB_txn *txn) int n = 0; #endif - j = 0; + j = i = keep; + if (env->me_flags & MDB_WRITEMAP) { /* Clear dirty flags */ - for (i=1; i<=pagecount; i++) { + while (++i <= pagecount) { dp = dl[i].mptr; /* Don't flush this page yet */ if (dp->mp_flags & P_KEEP) { @@ -2605,13 +2653,12 @@ mdb_page_flush(MDB_txn *txn) } dp->mp_flags &= ~P_DIRTY; } - dl[0].mid = j; - return MDB_SUCCESS; + goto done; } /* Write the pages */ - for (i = 1;; i++) { - if (i <= pagecount) { + for (;;) { + if (++i <= pagecount) { dp = dl[i].mptr; /* Don't flush this page yet */ if (dp->mp_flags & P_KEEP) { @@ -2690,8 +2737,7 @@ mdb_page_flush(MDB_txn *txn) #endif /* _WIN32 */ } - j = 0; - for (i=1; i<=pagecount; i++) { + for (i = keep; ++i <= pagecount; ) { dp = dl[i].mptr; /* This is a page we skipped above */ if (!dl[i].mid) { @@ -2701,8 +2747,11 @@ mdb_page_flush(MDB_txn *txn) } mdb_dpage_free(env, dp); } - dl[0].mid = j; +done: + i--; + txn->mt_dirty_room += i - j; + dl[0].mid = j; return MDB_SUCCESS; } @@ -2742,14 +2791,18 @@ mdb_txn_commit(MDB_txn *txn) if (txn->mt_parent) { MDB_txn *parent = txn->mt_parent; - unsigned x, y, len; MDB_ID2L dst, src; + MDB_IDL pspill; + unsigned x, y, len, ps_len; /* Append our free list to parent's */ rc = mdb_midl_append_list(&parent->mt_free_pgs, txn->mt_free_pgs); if (rc) goto fail; mdb_midl_free(txn->mt_free_pgs); + /* Failures after this must either undo the changes + * to the parent or set MDB_TXN_ERROR in the parent. + */ parent->mt_next_pgno = txn->mt_next_pgno; parent->mt_flags = txn->mt_flags; @@ -2771,36 +2824,26 @@ mdb_txn_commit(MDB_txn *txn) dst = parent->mt_u.dirty_list; src = txn->mt_u.dirty_list; /* Remove anything in our dirty list from parent's spill list */ - if (parent->mt_spill_pgs) { - x = parent->mt_spill_pgs[0]; - len = x; - /* zero out our dirty pages in parent spill list */ - for (i=1; i<=src[0].mid; i++) { - if (src[i].mid < parent->mt_spill_pgs[x]) - continue; - if (src[i].mid > parent->mt_spill_pgs[x]) { - if (x <= 1) - break; + if ((pspill = parent->mt_spill_pgs) && (ps_len = pspill[0])) { + x = y = ps_len; + pspill[0] = (pgno_t)-1; + /* Mark our dirty pages as deleted in parent spill list */ + for (i=0, len=src[0].mid; ++i <= len; ) { + MDB_ID pn = src[i].mid << 1; + while (pn > pspill[x]) x--; - continue; - } - parent->mt_spill_pgs[x] = 0; - len--; - } - /* OK, we had a few hits, squash zeros from the spill list */ - if (len < parent->mt_spill_pgs[0]) { - x=1; - for (y=1; y<=parent->mt_spill_pgs[0]; y++) { - if (parent->mt_spill_pgs[y]) { - if (y != x) { - parent->mt_spill_pgs[x] = parent->mt_spill_pgs[y]; - } - x++; - } + if (pn == pspill[x]) { + pspill[x] = 1; + y = --x; } - parent->mt_spill_pgs[0] = len; } + /* Squash deleted pagenums if we deleted any */ + for (x=y; ++x <= ps_len; ) + if (!(pspill[x] & 1)) + pspill[++y] = pspill[x]; + pspill[0] = y; } + /* Find len = length of merging our dirty list with parent's */ x = dst[0].mid; dst[0].mid = 0; /* simplify loops */ @@ -2834,7 +2877,10 @@ mdb_txn_commit(MDB_txn *txn) parent->mt_dirty_room = txn->mt_dirty_room; if (txn->mt_spill_pgs) { if (parent->mt_spill_pgs) { - mdb_midl_append_list(&parent->mt_spill_pgs, txn->mt_spill_pgs); + /* TODO: Prevent failure here, so parent does not fail */ + rc = mdb_midl_append_list(&parent->mt_spill_pgs, txn->mt_spill_pgs); + if (rc) + parent->mt_flags |= MDB_TXN_ERROR; mdb_midl_free(txn->mt_spill_pgs); mdb_midl_sort(parent->mt_spill_pgs); } else { @@ -2845,7 +2891,7 @@ mdb_txn_commit(MDB_txn *txn) parent->mt_child = NULL; mdb_midl_free(((MDB_ntxn *)txn)->mnt_pgstate.mf_pghead); free(txn); - return MDB_SUCCESS; + return rc; } if (txn != env->me_txn) { @@ -2894,7 +2940,7 @@ mdb_txn_commit(MDB_txn *txn) mdb_audit(txn); #endif - if ((rc = mdb_page_flush(txn)) || + if ((rc = mdb_page_flush(txn, 0)) || (rc = mdb_env_sync(env, 0)) || (rc = mdb_env_write_meta(txn))) goto fail; @@ -2904,7 +2950,8 @@ done: env->me_txn = NULL; mdb_dbis_update(txn, 1); - UNLOCK_MUTEX_W(env); + if (env->me_txns) + UNLOCK_MUTEX_W(env); free(txn); return MDB_SUCCESS; @@ -3004,7 +3051,7 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta) DPUTS("writing new meta page"); - GET_PAGESIZE(psize); + psize = env->me_psize; meta->mm_magic = MDB_MAGIC; meta->mm_version = MDB_DATA_VERSION; @@ -3059,7 +3106,7 @@ mdb_env_write_meta(MDB_txn *txn) assert(txn != NULL); assert(txn->mt_env != NULL); - toggle = !txn->mt_toggle; + toggle = txn->mt_txnid & 1; DPRINTF(("writing meta page %d for root page %"Z"u", toggle, txn->mt_dbs[MAIN_DBI].md_root)); @@ -3137,6 +3184,7 @@ mdb_env_write_meta(MDB_txn *txn) WriteFile(env->me_fd, ptr, len, NULL, &ov); #else r2 = pwrite(env->me_fd, ptr, len, off); + (void)r2; /* Silence warnings. We don't care about pwrite's return value */ #endif fail: env->me_flags |= MDB_FATAL_ERROR; @@ -3149,7 +3197,8 @@ done: * readers will get consistent data regardless of how fresh or * how stale their view of these values is. */ - env->me_txns->mti_txnid = txn->mt_txnid; + if (env->me_txns) + env->me_txns->mti_txnid = txn->mt_txnid; return MDB_SUCCESS; } @@ -3188,11 +3237,108 @@ mdb_env_create(MDB_env **env) return MDB_SUCCESS; } +static int +mdb_env_map(MDB_env *env, void *addr, int newsize) +{ + MDB_page *p; + unsigned int flags = env->me_flags; +#ifdef _WIN32 + int rc; + HANDLE mh; + LONG sizelo, sizehi; + sizelo = env->me_mapsize & 0xffffffff; + sizehi = env->me_mapsize >> 16 >> 16; /* only needed on Win64 */ + + /* Windows won't create mappings for zero length files. + * Just allocate the maxsize right now. + */ + if (newsize) { + if (SetFilePointer(env->me_fd, sizelo, &sizehi, 0) != (DWORD)sizelo + || !SetEndOfFile(env->me_fd) + || SetFilePointer(env->me_fd, 0, NULL, 0) != 0) + return ErrCode(); + } + mh = CreateFileMapping(env->me_fd, NULL, flags & MDB_WRITEMAP ? + PAGE_READWRITE : PAGE_READONLY, + sizehi, sizelo, NULL); + if (!mh) + return ErrCode(); + env->me_map = MapViewOfFileEx(mh, flags & MDB_WRITEMAP ? + FILE_MAP_WRITE : FILE_MAP_READ, + 0, 0, env->me_mapsize, addr); + rc = env->me_map ? 0 : ErrCode(); + CloseHandle(mh); + if (rc) + return rc; +#else + int prot = PROT_READ; + if (flags & MDB_WRITEMAP) { + prot |= PROT_WRITE; + if (ftruncate(env->me_fd, env->me_mapsize) < 0) + return ErrCode(); + } + env->me_map = mmap(addr, env->me_mapsize, prot, MAP_SHARED, + env->me_fd, 0); + if (env->me_map == MAP_FAILED) { + env->me_map = NULL; + return ErrCode(); + } + + if (flags & MDB_NORDAHEAD) { + /* Turn off readahead. It's harmful when the DB is larger than RAM. */ +#ifdef MADV_RANDOM + madvise(env->me_map, env->me_mapsize, MADV_RANDOM); +#else +#ifdef POSIX_MADV_RANDOM + posix_madvise(env->me_map, env->me_mapsize, POSIX_MADV_RANDOM); +#endif /* POSIX_MADV_RANDOM */ +#endif /* MADV_RANDOM */ + } +#endif /* _WIN32 */ + + /* Can happen because the address argument to mmap() is just a + * hint. mmap() can pick another, e.g. if the range is in use. + * The MAP_FIXED flag would prevent that, but then mmap could + * instead unmap existing pages to make room for the new map. + */ + if (addr && env->me_map != addr) + return EBUSY; /* TODO: Make a new MDB_* error code? */ + + p = (MDB_page *)env->me_map; + env->me_metas[0] = METADATA(p); + env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + env->me_psize); + + return MDB_SUCCESS; +} + int mdb_env_set_mapsize(MDB_env *env, size_t size) { - if (env->me_map) - return EINVAL; + /* If env is already open, caller is responsible for making + * sure there are no active txns. + */ + if (env->me_map) { + int rc; + void *old; + if (env->me_txn) + return EINVAL; + if (!size) + size = env->me_metas[mdb_env_pick_meta(env)]->mm_mapsize; + else if (size < env->me_mapsize) { + /* If the configured size is smaller, make sure it's + * still big enough. Silently round up to minimum if not. + */ + size_t minsize = (env->me_metas[mdb_env_pick_meta(env)]->mm_last_pg + 1) * env->me_psize; + if (size < minsize) + size = minsize; + } + munmap(env->me_map, env->me_mapsize); + env->me_mapsize = size; + old = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : NULL; + rc = mdb_env_map(env, old, 1); + if (rc) + return rc; + } env->me_mapsize = size; if (env->me_psize) env->me_maxpg = env->me_mapsize / env->me_psize; @@ -3232,12 +3378,17 @@ static int mdb_env_open2(MDB_env *env) { unsigned int flags = env->me_flags; - int i, newenv = 0; + int i, newenv = 0, rc; MDB_meta meta; - MDB_page *p; -#ifndef _WIN32 - int prot; -#endif + +#ifdef _WIN32 + /* See if we should use QueryLimited */ + rc = GetVersion(); + if ((rc & 0xff) > 5) + env->me_pidquery = MDB_PROCESS_QUERY_LIMITED_INFORMATION; + else + env->me_pidquery = PROCESS_QUERY_INFORMATION; +#endif /* _WIN32 */ memset(&meta, 0, sizeof(meta)); @@ -3246,6 +3397,9 @@ mdb_env_open2(MDB_env *env) return i; DPUTS("new mdbenv"); newenv = 1; + GET_PAGESIZE(env->me_psize); + } else { + env->me_psize = meta.mm_psize; } /* Was a mapsize configured? */ @@ -3263,66 +3417,9 @@ mdb_env_open2(MDB_env *env) env->me_mapsize = minsize; } -#ifdef _WIN32 - { - int rc; - HANDLE mh; - LONG sizelo, sizehi; - sizelo = env->me_mapsize & 0xffffffff; - sizehi = env->me_mapsize >> 16 >> 16; /* only needed on Win64 */ - - /* See if we should use QueryLimited */ - rc = GetVersion(); - if ((rc & 0xff) > 5) - env->me_pidquery = MDB_PROCESS_QUERY_LIMITED_INFORMATION; - else - env->me_pidquery = PROCESS_QUERY_INFORMATION; - - /* Windows won't create mappings for zero length files. - * Just allocate the maxsize right now. - */ - if (newenv) { - if (SetFilePointer(env->me_fd, sizelo, &sizehi, 0) != (DWORD)sizelo - || !SetEndOfFile(env->me_fd) - || SetFilePointer(env->me_fd, 0, NULL, 0) != 0) - return ErrCode(); - } - mh = CreateFileMapping(env->me_fd, NULL, flags & MDB_WRITEMAP ? - PAGE_READWRITE : PAGE_READONLY, - sizehi, sizelo, NULL); - if (!mh) - return ErrCode(); - env->me_map = MapViewOfFileEx(mh, flags & MDB_WRITEMAP ? - FILE_MAP_WRITE : FILE_MAP_READ, - 0, 0, env->me_mapsize, meta.mm_address); - rc = env->me_map ? 0 : ErrCode(); - CloseHandle(mh); - if (rc) - return rc; - } -#else - i = MAP_SHARED; - prot = PROT_READ; - if (flags & MDB_WRITEMAP) { - prot |= PROT_WRITE; - if (ftruncate(env->me_fd, env->me_mapsize) < 0) - return ErrCode(); - } - env->me_map = mmap(meta.mm_address, env->me_mapsize, prot, i, - env->me_fd, 0); - if (env->me_map == MAP_FAILED) { - env->me_map = NULL; - return ErrCode(); - } - /* Turn off readahead. It's harmful when the DB is larger than RAM. */ -#ifdef MADV_RANDOM - madvise(env->me_map, env->me_mapsize, MADV_RANDOM); -#else -#ifdef POSIX_MADV_RANDOM - posix_madvise(env->me_map, env->me_mapsize, POSIX_MADV_RANDOM); -#endif /* POSIX_MADV_RANDOM */ -#endif /* MADV_RANDOM */ -#endif /* _WIN32 */ + rc = mdb_env_map(env, meta.mm_address, newenv); + if (rc) + return rc; if (newenv) { if (flags & MDB_FIXEDMAP) @@ -3331,24 +3428,11 @@ mdb_env_open2(MDB_env *env) if (i != MDB_SUCCESS) { return i; } - } else if (meta.mm_address && env->me_map != meta.mm_address) { - /* Can happen because the address argument to mmap() is just a - * hint. mmap() can pick another, e.g. if the range is in use. - * The MAP_FIXED flag would prevent that, but then mmap could - * instead unmap existing pages to make room for the new map. - */ - return EBUSY; /* TODO: Make a new MDB_* error code? */ } - env->me_psize = meta.mm_psize; env->me_maxfree_1pg = (env->me_psize - PAGEHDRSZ) / sizeof(pgno_t) - 1; env->me_nodemax = (env->me_psize - PAGEHDRSZ) / MDB_MINKEYS; env->me_maxpg = env->me_mapsize / env->me_psize; - - p = (MDB_page *)env->me_map; - env->me_metas[0] = METADATA(p); - env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize); - #if MDB_DEBUG { int toggle = mdb_env_pick_meta(env); @@ -3522,7 +3606,7 @@ mdb_env_excl_lock(MDB_env *env, int *excl) return rc; } -#if defined(_WIN32) || defined(MDB_USE_POSIX_SEM) +#ifdef MDB_USE_HASH /* * hash_64 - 64 bit Fowler/Noll/Vo-0 FNV-1a hash code * @@ -3607,10 +3691,9 @@ static void mdb_hash_enc(MDB_val *val, char *encbuf) { mdb_hash_t h = mdb_hash_val(val, MDB_HASH_INIT); - unsigned long *l = (unsigned long *)&h; - mdb_pack85(l[0], encbuf); - mdb_pack85(l[1], encbuf+5); + mdb_pack85(h, encbuf); + mdb_pack85(h>>32, encbuf+5); encbuf[10] = '\0'; } #endif @@ -3846,7 +3929,7 @@ fail: * environment and re-opening it with the new flags. */ #define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC) -#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP|MDB_NOTLS) +#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP|MDB_NOTLS|MDB_NOLOCK|MDB_NORDAHEAD) int mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode) @@ -3898,9 +3981,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode goto leave; } - rc = mdb_env_setup_locks(env, lpath, mode, &excl); - if (rc) - goto leave; + /* For RDONLY, get lockfile after we know datafile exists */ + if (!(flags & (MDB_RDONLY|MDB_NOLOCK))) { + rc = mdb_env_setup_locks(env, lpath, mode, &excl); + if (rc) + goto leave; + } #ifdef _WIN32 if (F_ISSET(flags, MDB_RDONLY)) { @@ -3926,6 +4012,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode goto leave; } + if ((flags & (MDB_RDONLY|MDB_NOLOCK)) == MDB_RDONLY) { + rc = mdb_env_setup_locks(env, lpath, mode, &excl); + if (rc) + goto leave; + } + if ((rc = mdb_env_open2(env)) == MDB_SUCCESS) { if (flags & (MDB_RDONLY|MDB_WRITEMAP)) { env->me_mfd = env->me_fd; @@ -3934,10 +4026,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode * MDB_NOSYNC/MDB_NOMETASYNC, in case these get reset. */ #ifdef _WIN32 + len = OPEN_EXISTING; env->me_mfd = CreateFile(dpath, oflags, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, len, mode | FILE_FLAG_WRITE_THROUGH, NULL); #else + oflags &= ~O_CREAT; env->me_mfd = open(dpath, oflags | MDB_DSYNC, mode); #endif if (env->me_mfd == INVALID_HANDLE_VALUE) { @@ -4161,17 +4255,18 @@ mdb_env_copy(MDB_env *env, const char *path) newfd = CreateFile(lpath, GENERIC_WRITE, 0, NULL, CREATE_NEW, FILE_FLAG_NO_BUFFERING|FILE_FLAG_WRITE_THROUGH, NULL); #else - newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL -#ifdef O_DIRECT - |O_DIRECT -#endif - , 0666); + newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL, 0666); #endif if (newfd == INVALID_HANDLE_VALUE) { rc = ErrCode(); goto leave; } +#ifdef O_DIRECT + /* Set O_DIRECT if the file system supports it */ + if ((rc = fcntl(newfd, F_GETFL)) != -1) + (void) fcntl(newfd, F_SETFL, rc | O_DIRECT); +#endif #ifdef F_NOCACHE /* __APPLE__ */ rc = fcntl(newfd, F_NOCACHE, 1); if (rc) { @@ -4219,7 +4314,7 @@ mdb_cmp_long(const MDB_val *a, const MDB_val *b) *(size_t *)a->mv_data > *(size_t *)b->mv_data; } -/** Compare two items pointing at aligned int's */ +/** Compare two items pointing at aligned unsigned int's */ static int mdb_cmp_int(const MDB_val *a, const MDB_val *b) { @@ -4227,7 +4322,7 @@ mdb_cmp_int(const MDB_val *a, const MDB_val *b) *(unsigned int *)a->mv_data > *(unsigned int *)b->mv_data; } -/** Compare two items pointing at ints of unknown alignment. +/** Compare two items pointing at unsigned ints of unknown alignment. * Nodes and keys are guaranteed to be 2-byte aligned. */ static int @@ -4425,8 +4520,8 @@ mdb_cursor_pop(MDB_cursor *mc) if (mc->mc_snum) mc->mc_top--; - DPRINTF(("popped page %"Z"u off db %u cursor %p", top->mp_pgno, - mc->mc_dbi, (void *) mc)); + DPRINTF(("popped page %"Z"u off db %d cursor %p", top->mp_pgno, + DDBI(mc), (void *) mc)); } } @@ -4434,8 +4529,8 @@ mdb_cursor_pop(MDB_cursor *mc) static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp) { - DPRINTF(("pushing page %"Z"u on db %u cursor %p", mp->mp_pgno, - mc->mc_dbi, (void *) mc)); + DPRINTF(("pushing page %"Z"u on db %d cursor %p", mp->mp_pgno, + DDBI(mc), (void *) mc)); if (mc->mc_snum >= CURSOR_STACK) { assert(mc->mc_snum < CURSOR_STACK); @@ -4475,8 +4570,9 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl) * leave that unless page_touch happens again). */ if (tx2->mt_spill_pgs) { - x = mdb_midl_search(tx2->mt_spill_pgs, pgno); - if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pgno) { + MDB_ID pn = pgno << 1; + x = mdb_midl_search(tx2->mt_spill_pgs, pn); + if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) { p = (MDB_page *)(env->me_map + env->me_psize * pgno); goto done; } @@ -4508,23 +4604,15 @@ done: return MDB_SUCCESS; } -/** Search for the page a given key should be in. - * Pushes parent pages on the cursor stack. This function continues a - * search on a cursor that has already been initialized. (Usually by - * #mdb_page_search() but also by #mdb_node_move().) - * @param[in,out] mc the cursor for this operation. - * @param[in] key the key to search for. If NULL, search for the lowest - * page. (This is used by #mdb_cursor_first().) - * @param[in] modify If true, visited pages are updated with new page numbers. - * @return 0 on success, non-zero on failure. +/** Finish #mdb_page_search() / #mdb_page_search_lowest(). + * The cursor is at the root page, set up the rest of it. */ static int -mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) +mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int flags) { MDB_page *mp = mc->mc_pg[mc->mc_top]; - DKBUF; int rc; - + DKBUF; while (IS_BRANCH(mp)) { MDB_node *node; @@ -4534,11 +4622,10 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) assert(NUMKEYS(mp) > 1); DPRINTF(("found index 0 to page %"Z"u", NODEPGNO(NODEPTR(mp, 0)))); - if (key == NULL) /* Initialize cursor to first page. */ + if (flags & (MDB_PS_FIRST|MDB_PS_LAST)) { i = 0; - else if (key->mv_size > MDB_MAXKEYSIZE && key->mv_data == NULL) { - /* cursor to last page */ - i = NUMKEYS(mp)-1; + if (flags & MDB_PS_LAST) + i = NUMKEYS(mp) - 1; } else { int exact; node = mdb_node_search(mc, key, &exact); @@ -4551,10 +4638,9 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) i--; } } + DPRINTF(("following index %u for key [%s]", i, DKEY(key))); } - if (key) - DPRINTF(("following index %u for key [%s]", i, DKEY(key))); assert(i < NUMKEYS(mp)); node = NODEPTR(mp, i); @@ -4565,7 +4651,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) if ((rc = mdb_cursor_push(mc, mp))) return rc; - if (modify) { + if (flags & MDB_PS_MODIFY) { if ((rc = mdb_page_touch(mc)) != 0) return rc; mp = mc->mc_pg[mc->mc_top]; @@ -4579,7 +4665,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) } DPRINTF(("found leaf page %"Z"u for key [%s]", mp->mp_pgno, - key ? DKEY(key) : NULL)); + key ? DKEY(key) : "null")); mc->mc_flags |= C_INITIALIZED; mc->mc_flags &= ~C_EOF; @@ -4605,18 +4691,17 @@ mdb_page_search_lowest(MDB_cursor *mc) mc->mc_ki[mc->mc_top] = 0; if ((rc = mdb_cursor_push(mc, mp))) return rc; - return mdb_page_search_root(mc, NULL, 0); + return mdb_page_search_root(mc, NULL, MDB_PS_FIRST); } /** Search for the page a given key should be in. - * Pushes parent pages on the cursor stack. This function just sets up - * the search; it finds the root page for \b mc's database and sets this - * as the root of the cursor's stack. Then #mdb_page_search_root() is - * called to complete the search. + * Push it and its parent pages on the cursor stack. * @param[in,out] mc the cursor for this operation. - * @param[in] key the key to search for. If NULL, search for the lowest - * page. (This is used by #mdb_cursor_first().) - * @param[in] flags If MDB_PS_MODIFY set, visited pages are updated with new page numbers. + * @param[in] key the key to search for, or NULL for first/last page. + * @param[in] flags If MDB_PS_MODIFY is set, visited pages in the DB + * are touched (updated with new page numbers). + * If MDB_PS_FIRST or MDB_PS_LAST is set, find first or last leaf. + * This is used by #mdb_cursor_first() and #mdb_cursor_last(). * If MDB_PS_ROOTONLY set, just fetch root node, no further lookups. * @return 0 on success, non-zero on failure. */ @@ -4627,23 +4712,20 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) pgno_t root; /* Make sure the txn is still viable, then find the root from - * the txn's db table. + * the txn's db table and set it as the root of the cursor's stack. */ if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_ERROR)) { DPUTS("transaction has failed, must abort"); return MDB_BAD_TXN; } else { /* Make sure we're using an up-to-date root */ - if (mc->mc_dbi > MAIN_DBI) { - if ((*mc->mc_dbflag & DB_STALE) || - ((flags & MDB_PS_MODIFY) && !(*mc->mc_dbflag & DB_DIRTY))) { + if (*mc->mc_dbflag & DB_STALE) { MDB_cursor mc2; - unsigned char dbflag = 0; mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI, NULL); - rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, flags & MDB_PS_MODIFY); + rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, 0); if (rc) return rc; - if (*mc->mc_dbflag & DB_STALE) { + { MDB_val data; int exact = 0; uint16_t flags; @@ -4663,11 +4745,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) return MDB_INCOMPATIBLE; memcpy(mc->mc_db, data.mv_data, sizeof(MDB_db)); } - if (flags & MDB_PS_MODIFY) - dbflag = DB_DIRTY; *mc->mc_dbflag &= ~DB_STALE; - *mc->mc_dbflag |= dbflag; - } } root = mc->mc_db->md_root; @@ -4685,8 +4763,8 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) mc->mc_snum = 1; mc->mc_top = 0; - DPRINTF(("db %u root page %"Z"u has flags 0x%X", - mc->mc_dbi, root, mc->mc_pg[0]->mp_flags)); + DPRINTF(("db %d root page %"Z"u has flags 0x%X", + DDBI(mc), root, mc->mc_pg[0]->mp_flags)); if (flags & MDB_PS_MODIFY) { if ((rc = mdb_page_touch(mc))) @@ -4707,6 +4785,7 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp) unsigned x = 0, ovpages = mp->mp_pages; MDB_env *env = txn->mt_env; MDB_IDL sl = txn->mt_spill_pgs; + MDB_ID pn = pg << 1; int rc; DPRINTF(("free ov page %"Z"u (%d)", pg, ovpages)); @@ -4721,7 +4800,7 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp) if (env->me_pghead && !txn->mt_parent && ((mp->mp_flags & P_DIRTY) || - (sl && (x = mdb_midl_search(sl, pg)) <= sl[0] && sl[x] == pg))) + (sl && (x = mdb_midl_search(sl, pn)) <= sl[0] && sl[x] == pn))) { unsigned i, j; pgno_t *mop; @@ -4731,9 +4810,10 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp) return rc; if (!(mp->mp_flags & P_DIRTY)) { /* This page is no longer spilled */ - for (; x < sl[0]; x++) - sl[x] = sl[x+1]; - sl[0]--; + if (x == sl[0]) + sl[0]--; + else + sl[x] |= 1; goto release; } /* Remove from dirty list */ @@ -4823,7 +4903,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, if (txn->mt_flags & MDB_TXN_ERROR) return MDB_BAD_TXN; - if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + if (key->mv_size > MDB_MAXKEYSIZE) { return MDB_BAD_VALSIZE; } @@ -4875,8 +4955,11 @@ mdb_cursor_sibling(MDB_cursor *mc, int move_right) assert(IS_BRANCH(mc->mc_pg[mc->mc_top])); indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp, NULL) != 0)) + if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp, NULL)) != 0) { + /* mc will be inconsistent if caller does mc_snum++ as above */ + mc->mc_flags &= ~(C_INITIALIZED|C_EOF); return rc; + } mdb_cursor_push(mc, mp); if (!move_right) @@ -4906,8 +4989,11 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (op == MDB_NEXT || op == MDB_NEXT_DUP) { rc = mdb_cursor_next(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT); - if (op != MDB_NEXT || rc != MDB_NOTFOUND) + if (op != MDB_NEXT || rc != MDB_NOTFOUND) { + if (rc == MDB_SUCCESS) + MDB_GET_KEY(leaf, key); return rc; + } } } else { mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); @@ -4917,6 +5003,8 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) } DPRINTF(("cursor_next: top page is %"Z"u in cursor %p", mp->mp_pgno, (void *) mc)); + if (mc->mc_flags & C_DEL) + goto skip; if (mc->mc_ki[mc->mc_top] + 1u >= NUMKEYS(mp)) { DPUTS("=====> move to next sibling page"); @@ -4929,6 +5017,7 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) } else mc->mc_ki[mc->mc_top]++; +skip: DPRINTF(("==> cursor points to page %"Z"u with %u keys, key index %u", mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top])); @@ -4973,11 +5062,14 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) if (mc->mc_db->md_flags & MDB_DUPSORT) { leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - if (op == MDB_PREV || op == MDB_PREV_DUP) { - if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (op == MDB_PREV || op == MDB_PREV_DUP) { rc = mdb_cursor_prev(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_PREV); - if (op != MDB_PREV || rc != MDB_NOTFOUND) + if (op != MDB_PREV || rc != MDB_NOTFOUND) { + if (rc == MDB_SUCCESS) + MDB_GET_KEY(leaf, key); return rc; + } } else { mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); if (op == MDB_PREV_DUP) @@ -5043,7 +5135,8 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, assert(mc); assert(key); - assert(key->mv_size > 0); + if (key->mv_size == 0) + return MDB_BAD_VALSIZE; if (mc->mc_xcursor) mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); @@ -5062,7 +5155,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, nodekey.mv_data = LEAF2KEY(mp, 0, nodekey.mv_size); } else { leaf = NODEPTR(mp, 0); - MDB_GET_KEY(leaf, &nodekey); + MDB_GET_KEY2(leaf, nodekey); } rc = mc->mc_dbx->md_cmp(key, &nodekey); if (rc == 0) { @@ -5083,7 +5176,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, nkeys-1, nodekey.mv_size); } else { leaf = NODEPTR(mp, nkeys-1); - MDB_GET_KEY(leaf, &nodekey); + MDB_GET_KEY2(leaf, nodekey); } rc = mc->mc_dbx->md_cmp(key, &nodekey); if (rc == 0) { @@ -5101,7 +5194,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, mc->mc_ki[mc->mc_top], nodekey.mv_size); } else { leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - MDB_GET_KEY(leaf, &nodekey); + MDB_GET_KEY2(leaf, nodekey); } rc = mc->mc_dbx->md_cmp(key, &nodekey); if (rc == 0) { @@ -5131,7 +5224,11 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, if (!mc->mc_top) { /* There are no other pages */ mc->mc_ki[mc->mc_top] = 0; - return MDB_NOTFOUND; + if (op == MDB_SET_RANGE) { + rc = 0; + goto set1; + } else + return MDB_NOTFOUND; } } @@ -5195,6 +5292,7 @@ set1: if (rc) { if (op == MDB_GET_BOTH || rc > 0) return MDB_NOTFOUND; + rc = 0; } } else { @@ -5224,7 +5322,7 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data) mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) { - rc = mdb_page_search(mc, NULL, 0); + rc = mdb_page_search(mc, NULL, MDB_PS_FIRST); if (rc != MDB_SUCCESS) return rc; } @@ -5270,11 +5368,7 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data) if (!(mc->mc_flags & C_EOF)) { if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) { - MDB_val lkey; - - lkey.mv_size = MDB_MAXKEYSIZE+1; - lkey.mv_data = NULL; - rc = mdb_page_search(mc, &lkey, 0); + rc = mdb_page_search(mc, NULL, MDB_PS_LAST); if (rc != MDB_SUCCESS) return rc; } @@ -5326,8 +5420,9 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, rc = EINVAL; } else { MDB_page *mp = mc->mc_pg[mc->mc_top]; - if (!NUMKEYS(mp)) { - mc->mc_ki[mc->mc_top] = 0; + int nkeys = NUMKEYS(mp); + if (!nkeys || mc->mc_ki[mc->mc_top] >= nkeys) { + mc->mc_ki[mc->mc_top] = nkeys; rc = MDB_NOTFOUND; break; } @@ -5340,6 +5435,8 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_GET_KEY(leaf, key); if (data) { if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (mc->mc_flags & C_DEL) + mdb_xcursor_init1(mc, leaf); rc = mdb_cursor_get(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_GET_CURRENT); } else { rc = mdb_node_read(mc->mc_txn, leaf, data); @@ -5364,7 +5461,7 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, case MDB_SET_RANGE: if (key == NULL) { rc = EINVAL; - } else if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + } else if (key->mv_size > MDB_MAXKEYSIZE) { rc = MDB_BAD_VALSIZE; } else if (op == MDB_SET_RANGE) rc = mdb_cursor_set(mc, key, data, op, NULL); @@ -5464,17 +5561,20 @@ fetchm: break; } + if (mc->mc_flags & C_DEL) + mc->mc_flags ^= C_DEL; + return rc; } -/** Touch all the pages in the cursor stack. +/** Touch all the pages in the cursor stack. Set mc_top. * Makes sure all the pages are writable, before attempting a write operation. * @param[in] mc The cursor to operate on. */ static int mdb_cursor_touch(MDB_cursor *mc) { - int rc; + int rc = MDB_SUCCESS; if (mc->mc_dbi > MAIN_DBI && !(*mc->mc_dbflag & DB_DIRTY)) { MDB_cursor mc2; @@ -5485,13 +5585,14 @@ mdb_cursor_touch(MDB_cursor *mc) return rc; *mc->mc_dbflag |= DB_DIRTY; } - for (mc->mc_top = 0; mc->mc_top < mc->mc_snum; mc->mc_top++) { - rc = mdb_page_touch(mc); - if (rc) - return rc; + mc->mc_top = 0; + if (mc->mc_snum) { + do { + rc = mdb_page_touch(mc); + } while (!rc && ++(mc->mc_top) < mc->mc_snum); + mc->mc_top = mc->mc_snum-1; } - mc->mc_top = mc->mc_snum-1; - return MDB_SUCCESS; + return rc; } /** Do not spill pages to disk if txn is getting full, may fail instead */ @@ -5542,8 +5643,8 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, return MDB_BAD_VALSIZE; #endif - DPRINTF(("==> put db %u key [%s], size %"Z"u, data size %"Z"u", - mc->mc_dbi, DKEY(key), key ? key->mv_size:0, data->mv_size)); + DPRINTF(("==> put db %d key [%s], size %"Z"u, data size %"Z"u", + DDBI(mc), DKEY(key), key ? key->mv_size : 0, data->mv_size)); dkey.mv_size = 0; @@ -5554,6 +5655,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, } else if (mc->mc_db->md_root == P_INVALID) { /* new database, cursor has nothing to point to */ mc->mc_snum = 0; + mc->mc_top = 0; mc->mc_flags &= ~C_INITIALIZED; rc = MDB_NO_ROOT; } else { @@ -5584,6 +5686,9 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, return rc; } + if (mc->mc_flags & C_DEL) + mc->mc_flags ^= C_DEL; + /* Cursor is positioned, check for room in the dirty list */ if (!nospill) { if (flags & MDB_MULTIPLE) { @@ -5869,9 +5974,6 @@ new_sub: unsigned i = mc->mc_top; MDB_page *mp = mc->mc_pg[i]; - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -5968,6 +6070,7 @@ int mdb_cursor_del(MDB_cursor *mc, unsigned int flags) { MDB_node *leaf; + MDB_page *mp; int rc; if (mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) @@ -5976,17 +6079,20 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) if (!(mc->mc_flags & C_INITIALIZED)) return EINVAL; + if (mc->mc_ki[mc->mc_top] >= NUMKEYS(mc->mc_pg[mc->mc_top])) + return MDB_NOTFOUND; + if (!(flags & MDB_NOSPILL) && (rc = mdb_page_spill(mc, NULL, NULL))) return rc; - flags &= ~MDB_NOSPILL; /* TODO: Or change (flags != MDB_NODUPDATA) to ~(flags & MDB_NODUPDATA), not looking at the logic of that code just now */ rc = mdb_cursor_touch(mc); if (rc) return rc; - leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + mp = mc->mc_pg[mc->mc_top]; + leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (!IS_LEAF2(mp) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (!(flags & MDB_NODUPDATA)) { if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) { mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); @@ -6001,18 +6107,19 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) } else { MDB_cursor *m2; /* shrink fake page */ - mdb_node_shrink(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + mdb_node_shrink(mp, mc->mc_ki[mc->mc_top]); + leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); /* fix other sub-DB cursors pointed at this fake page */ for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) { if (m2 == mc || m2->mc_snum < mc->mc_snum) continue; - if (m2->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top] && + if (m2->mc_pg[mc->mc_top] == mp && m2->mc_ki[mc->mc_top] == mc->mc_ki[mc->mc_top]) m2->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); } } mc->mc_db->md_entries--; + mc->mc_flags |= C_DEL; return rc; } /* otherwise fall thru and delete the sub-DB */ @@ -6138,6 +6245,7 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, { unsigned int i; size_t node_size = NODESIZE; + ssize_t room; indx_t ofs; MDB_node *node; MDB_page *mp = mc->mc_pg[mc->mc_top]; @@ -6150,7 +6258,7 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx, data ? data->mv_size : 0, - key ? key->mv_size : 0, key ? DKEY(key) : NULL)); + key ? key->mv_size : 0, key ? DKEY(key) : "null")); if (IS_LEAF2(mp)) { /* Move higher keys up one slot. */ @@ -6168,9 +6276,9 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, return MDB_SUCCESS; } + room = (ssize_t)SIZELEFT(mp) - (ssize_t)sizeof(indx_t); if (key != NULL) node_size += key->mv_size; - if (IS_LEAF(mp)) { assert(data); if (F_ISSET(flags, F_BIGDATA)) { @@ -6182,26 +6290,23 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, /* Put data on overflow page. */ DPRINTF(("data size is %"Z"u, node would be %"Z"u, put data on overflow page", data->mv_size, node_size+data->mv_size)); - node_size += sizeof(pgno_t); + node_size += sizeof(pgno_t) + (node_size & 1); + if ((ssize_t)node_size > room) + goto full; if ((rc = mdb_page_new(mc, P_OVERFLOW, ovpages, &ofp))) return rc; DPRINTF(("allocated overflow page %"Z"u", ofp->mp_pgno)); flags |= F_BIGDATA; + goto update; } else { node_size += data->mv_size; } } node_size += node_size & 1; + if ((ssize_t)node_size > room) + goto full; - if (node_size + sizeof(indx_t) > SIZELEFT(mp)) { - DPRINTF(("not enough room in page %"Z"u, got %u ptrs", - mp->mp_pgno, NUMKEYS(mp))); - DPRINTF(("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower, - mp->mp_upper - mp->mp_lower)); - DPRINTF(("node size = %"Z"u", node_size)); - return MDB_PAGE_FULL; - } - +update: /* Move higher pointers up one slot. */ for (i = NUMKEYS(mp); i > indx; i--) mp->mp_ptrs[i] = mp->mp_ptrs[i - 1]; @@ -6247,6 +6352,13 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, } return MDB_SUCCESS; + +full: + DPRINTF(("not enough room in page %"Z"u, got %u ptrs", + mp->mp_pgno, NUMKEYS(mp))); + DPRINTF(("upper-lower = %u - %u = %"Z"d", mp->mp_upper,mp->mp_lower,room)); + DPRINTF(("node size = %"Z"u", node_size)); + return MDB_PAGE_FULL; } /** Delete the specified node from a page. @@ -6381,11 +6493,13 @@ mdb_xcursor_init0(MDB_cursor *mc) mx->mx_cursor.mc_txn = mc->mc_txn; mx->mx_cursor.mc_db = &mx->mx_db; mx->mx_cursor.mc_dbx = &mx->mx_dbx; - mx->mx_cursor.mc_dbi = mc->mc_dbi+1; + mx->mx_cursor.mc_dbi = mc->mc_dbi; mx->mx_cursor.mc_dbflag = &mx->mx_dbflag; mx->mx_cursor.mc_snum = 0; mx->mx_cursor.mc_top = 0; mx->mx_cursor.mc_flags = C_SUB; + mx->mx_dbx.md_name.mv_size = 0; + mx->mx_dbx.md_name.mv_data = NULL; mx->mx_dbx.md_cmp = mc->mc_dbx->md_dcmp; mx->mx_dbx.md_dcmp = NULL; mx->mx_dbx.md_rel = mc->mc_dbx->md_rel; @@ -6406,6 +6520,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) memcpy(&mx->mx_db, NODEDATA(node), sizeof(MDB_db)); mx->mx_cursor.mc_pg[0] = 0; mx->mx_cursor.mc_snum = 0; + mx->mx_cursor.mc_top = 0; mx->mx_cursor.mc_flags = C_SUB; } else { MDB_page *fp = NODEDATA(node); @@ -6418,8 +6533,8 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) mx->mx_db.md_entries = NUMKEYS(fp); COPY_PGNO(mx->mx_db.md_root, fp->mp_pgno); mx->mx_cursor.mc_snum = 1; - mx->mx_cursor.mc_flags = C_INITIALIZED|C_SUB; mx->mx_cursor.mc_top = 0; + mx->mx_cursor.mc_flags = C_INITIALIZED|C_SUB; mx->mx_cursor.mc_pg[0] = fp; mx->mx_cursor.mc_ki[0] = 0; if (mc->mc_db->md_flags & MDB_DUPFIXED) { @@ -6429,12 +6544,9 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) mx->mx_db.md_flags |= MDB_INTEGERKEY; } } - DPRINTF(("Sub-db %u for db %u root page %"Z"u", mx->mx_cursor.mc_dbi, mc->mc_dbi, + DPRINTF(("Sub-db -%u root page %"Z"u", mx->mx_cursor.mc_dbi, mx->mx_db.md_root)); - mx->mx_dbflag = DB_VALID | (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY) ? - DB_DIRTY : 0); - mx->mx_dbx.md_name.mv_data = NODEKEY(node); - mx->mx_dbx.md_name.mv_size = node->mn_ksize; + mx->mx_dbflag = DB_VALID|DB_DIRTY; /* DB_DIRTY guides mdb_cursor_touch */ #if UINT_MAX < SIZE_MAX if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t)) #ifdef MISALIGNED_OK @@ -6750,9 +6862,6 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) MDB_dbi dbi = csrc->mc_dbi; MDB_page *mp = csrc->mc_pg[csrc->mc_top]; - if (csrc->mc_flags & C_SUB) - dbi--; - for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (csrc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -6927,9 +7036,6 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) MDB_dbi dbi = csrc->mc_dbi; MDB_page *mp = cdst->mc_pg[cdst->mc_top]; - if (csrc->mc_flags & C_SUB) - dbi--; - for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (csrc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7024,13 +7130,11 @@ mdb_rebalance(MDB_cursor *mc) /* Adjust cursors pointing to mp */ mc->mc_snum = 0; mc->mc_top = 0; + mc->mc_flags &= ~C_INITIALIZED; { MDB_cursor *m2, *m3; MDB_dbi dbi = mc->mc_dbi; - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7040,6 +7144,7 @@ mdb_rebalance(MDB_cursor *mc) if (m3->mc_pg[0] == mp) { m3->mc_snum = 0; m3->mc_top = 0; + m3->mc_flags &= ~C_INITIALIZED; } } } @@ -7060,9 +7165,6 @@ mdb_rebalance(MDB_cursor *mc) MDB_cursor *m2, *m3; MDB_dbi dbi = mc->mc_dbi; - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7070,10 +7172,13 @@ mdb_rebalance(MDB_cursor *mc) m3 = m2; if (m3 == mc || m3->mc_snum < mc->mc_snum) continue; if (m3->mc_pg[0] == mp) { - m3->mc_pg[0] = mc->mc_pg[0]; - m3->mc_snum = 1; - m3->mc_top = 0; - m3->mc_ki[0] = m3->mc_ki[1]; + int i; + m3->mc_snum--; + m3->mc_top--; + for (i=0; imc_snum; i++) { + m3->mc_pg[i] = m3->mc_pg[i+1]; + m3->mc_ki[i] = m3->mc_ki[i+1]; + } } } } @@ -7136,8 +7241,11 @@ mdb_rebalance(MDB_cursor *mc) else { if (mc->mc_ki[ptop] == 0) rc = mdb_page_merge(&mn, mc); - else + else { + mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1; rc = mdb_page_merge(mc, &mn); + mdb_cursor_copy(&mn, mc); + } mc->mc_flags &= ~(C_INITIALIZED|C_EOF); } return rc; @@ -7150,6 +7258,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf) int rc; MDB_page *mp; indx_t ki; + unsigned int nkeys; mp = mc->mc_pg[mc->mc_top]; ki = mc->mc_ki[mc->mc_top]; @@ -7169,30 +7278,34 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf) rc = mdb_rebalance(mc); if (rc != MDB_SUCCESS) mc->mc_txn->mt_flags |= MDB_TXN_ERROR; - /* if mc points past last node in page, invalidate */ - else if (mc->mc_ki[mc->mc_top] >= NUMKEYS(mc->mc_pg[mc->mc_top])) - mc->mc_flags &= ~(C_INITIALIZED|C_EOF); - - { - /* Adjust other cursors pointing to mp */ + else { MDB_cursor *m2; - unsigned int nkeys; MDB_dbi dbi = mc->mc_dbi; mp = mc->mc_pg[mc->mc_top]; nkeys = NUMKEYS(mp); + + /* if mc points past last node in page, find next sibling */ + if (mc->mc_ki[mc->mc_top] >= nkeys) + mdb_cursor_sibling(mc, 1); + + /* Adjust other cursors pointing to mp */ for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { - if (m2 == mc) + if (m2 == mc || m2->mc_snum < mc->mc_snum) continue; if (!(m2->mc_flags & C_INITIALIZED)) continue; if (m2->mc_pg[mc->mc_top] == mp) { - if (m2->mc_ki[mc->mc_top] > ki) - m2->mc_ki[mc->mc_top]--; + if (m2->mc_ki[mc->mc_top] >= ki) { + m2->mc_flags |= C_DEL; + if (m2->mc_ki[mc->mc_top] > ki) + m2->mc_ki[mc->mc_top]--; + } if (m2->mc_ki[mc->mc_top] >= nkeys) - m2->mc_flags &= ~(C_INITIALIZED|C_EOF); + mdb_cursor_sibling(m2, 1); } } + mc->mc_flags |= C_DEL; } return rc; @@ -7219,7 +7332,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; - if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + if (key->mv_size > MDB_MAXKEYSIZE) { return MDB_BAD_VALSIZE; } @@ -7272,24 +7385,26 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno unsigned int nflags) { unsigned int flags; - int rc = MDB_SUCCESS, ins_new = 0, new_root = 0, newpos = 1, did_split = 0; + int rc = MDB_SUCCESS, new_root = 0, did_split = 0; indx_t newindx; pgno_t pgno = 0; - unsigned int i, j, split_indx, nkeys, pmax; + int i, j, split_indx, nkeys, pmax; + MDB_env *env = mc->mc_txn->mt_env; MDB_node *node; MDB_val sepkey, rkey, xdata, *rdata = &xdata; - MDB_page *copy; + MDB_page *copy = NULL; MDB_page *mp, *rp, *pp; - unsigned int ptop; + int ptop; MDB_cursor mn; DKBUF; mp = mc->mc_pg[mc->mc_top]; newindx = mc->mc_ki[mc->mc_top]; + nkeys = NUMKEYS(mp); - DPRINTF(("-----> splitting %s page %"Z"u and adding [%s] at index %i", + DPRINTF(("-----> splitting %s page %"Z"u and adding [%s] at index %i/%i", IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno, - DKEY(newkey), mc->mc_ki[mc->mc_top])); + DKEY(newkey), mc->mc_ki[mc->mc_top], nkeys)); /* Create a right sibling. */ if ((rc = mdb_page_new(mc, mp->mp_flags, 1, &rp))) @@ -7336,141 +7451,139 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno sepkey = *newkey; split_indx = newindx; nkeys = 0; - goto newsep; - } + } else { - nkeys = NUMKEYS(mp); - split_indx = nkeys / 2; - if (newindx < split_indx) - newpos = 0; - - if (IS_LEAF2(rp)) { - char *split, *ins; - int x; - unsigned int lsize, rsize, ksize; - /* Move half of the keys to the right sibling */ - copy = NULL; - x = mc->mc_ki[mc->mc_top] - split_indx; - ksize = mc->mc_db->md_pad; - split = LEAF2KEY(mp, split_indx, ksize); - rsize = (nkeys - split_indx) * ksize; - lsize = (nkeys - split_indx) * sizeof(indx_t); - mp->mp_lower -= lsize; - rp->mp_lower += lsize; - mp->mp_upper += rsize - lsize; - rp->mp_upper -= rsize - lsize; - sepkey.mv_size = ksize; - if (newindx == split_indx) { - sepkey.mv_data = newkey->mv_data; - } else { - sepkey.mv_data = split; - } - if (x<0) { - ins = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], ksize); - memcpy(rp->mp_ptrs, split, rsize); - sepkey.mv_data = rp->mp_ptrs; - memmove(ins+ksize, ins, (split_indx - mc->mc_ki[mc->mc_top]) * ksize); - memcpy(ins, newkey->mv_data, ksize); - mp->mp_lower += sizeof(indx_t); - mp->mp_upper -= ksize - sizeof(indx_t); + split_indx = (nkeys+1) / 2; + + if (IS_LEAF2(rp)) { + char *split, *ins; + int x; + unsigned int lsize, rsize, ksize; + /* Move half of the keys to the right sibling */ + copy = NULL; + x = mc->mc_ki[mc->mc_top] - split_indx; + ksize = mc->mc_db->md_pad; + split = LEAF2KEY(mp, split_indx, ksize); + rsize = (nkeys - split_indx) * ksize; + lsize = (nkeys - split_indx) * sizeof(indx_t); + mp->mp_lower -= lsize; + rp->mp_lower += lsize; + mp->mp_upper += rsize - lsize; + rp->mp_upper -= rsize - lsize; + sepkey.mv_size = ksize; + if (newindx == split_indx) { + sepkey.mv_data = newkey->mv_data; + } else { + sepkey.mv_data = split; + } + if (x<0) { + ins = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], ksize); + memcpy(rp->mp_ptrs, split, rsize); + sepkey.mv_data = rp->mp_ptrs; + memmove(ins+ksize, ins, (split_indx - mc->mc_ki[mc->mc_top]) * ksize); + memcpy(ins, newkey->mv_data, ksize); + mp->mp_lower += sizeof(indx_t); + mp->mp_upper -= ksize - sizeof(indx_t); + } else { + if (x) + memcpy(rp->mp_ptrs, split, x * ksize); + ins = LEAF2KEY(rp, x, ksize); + memcpy(ins, newkey->mv_data, ksize); + memcpy(ins+ksize, split + x * ksize, rsize - x * ksize); + rp->mp_lower += sizeof(indx_t); + rp->mp_upper -= ksize - sizeof(indx_t); + mc->mc_ki[mc->mc_top] = x; + mc->mc_pg[mc->mc_top] = rp; + } } else { - if (x) - memcpy(rp->mp_ptrs, split, x * ksize); - ins = LEAF2KEY(rp, x, ksize); - memcpy(ins, newkey->mv_data, ksize); - memcpy(ins+ksize, split + x * ksize, rsize - x * ksize); - rp->mp_lower += sizeof(indx_t); - rp->mp_upper -= ksize - sizeof(indx_t); - mc->mc_ki[mc->mc_top] = x; - mc->mc_pg[mc->mc_top] = rp; - } - goto newsep; - } + int psize, nsize, k; + /* Maximum free space in an empty page */ + pmax = env->me_psize - PAGEHDRSZ; + if (IS_LEAF(mp)) + nsize = mdb_leaf_size(env, newkey, newdata); + else + nsize = mdb_branch_size(env, newkey); + nsize += nsize & 1; - /* For leaf pages, check the split point based on what - * fits where, since otherwise mdb_node_add can fail. - * - * This check is only needed when the data items are - * relatively large, such that being off by one will - * make the difference between success or failure. - * - * It's also relevant if a page happens to be laid out - * such that one half of its nodes are all "small" and - * the other half of its nodes are "large." If the new - * item is also "large" and falls on the half with - * "large" nodes, it also may not fit. - */ - if (IS_LEAF(mp)) { - unsigned int psize, nsize; - /* Maximum free space in an empty page */ - pmax = mc->mc_txn->mt_env->me_psize - PAGEHDRSZ; - nsize = mdb_leaf_size(mc->mc_txn->mt_env, newkey, newdata); - if ((nkeys < 20) || (nsize > pmax/16)) { - if (newindx <= split_indx) { - psize = nsize; - newpos = 0; - for (i=0; imn_flags, F_BIGDATA)) - psize += sizeof(pgno_t); - else - psize += NODEDSZ(node); - psize += psize & 1; - if (psize > pmax) { - if (i <= newindx) { - split_indx = newindx; - if (i < newindx) - newpos = 1; + /* grab a page to hold a temporary copy */ + copy = mdb_page_malloc(mc->mc_txn, 1); + if (copy == NULL) + return ENOMEM; + copy->mp_pgno = mp->mp_pgno; + copy->mp_flags = mp->mp_flags; + copy->mp_lower = PAGEHDRSZ; + copy->mp_upper = env->me_psize; + + /* prepare to insert */ + for (i=0, j=0; imp_ptrs[j++] = 0; + } + copy->mp_ptrs[j++] = mp->mp_ptrs[i]; + } + + /* When items are relatively large the split point needs + * to be checked, because being off-by-one will make the + * difference between success or failure in mdb_node_add. + * + * It's also relevant if a page happens to be laid out + * such that one half of its nodes are all "small" and + * the other half of its nodes are "large." If the new + * item is also "large" and falls on the half with + * "large" nodes, it also may not fit. + * + * As a final tweak, if the new item goes on the last + * spot on the page (and thus, onto the new page), bias + * the split so the new page is emptier than the old page. + * This yields better packing during sequential inserts. + */ + if (nkeys < 20 || nsize > pmax/16 || newindx >= nkeys) { + /* Find split point */ + psize = 0; + if (newindx <= split_indx || newindx >= nkeys) { + i = 0; j = 1; + k = newindx >= nkeys ? nkeys : split_indx+2; + } else { + i = nkeys; j = -1; + k = split_indx-1; + } + for (; i!=k; i+=j) { + if (i == newindx) { + psize += nsize; + node = NULL; + } else { + node = (MDB_node *)((char *)mp + copy->mp_ptrs[i]); + psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t); + if (IS_LEAF(mp)) { + if (F_ISSET(node->mn_flags, F_BIGDATA)) + psize += sizeof(pgno_t); + else + psize += NODEDSZ(node); } - else - split_indx = i; - break; + psize += psize & 1; } - } - } else { - psize = nsize; - for (i=nkeys-1; i>=split_indx; i--) { - node = NODEPTR(mp, i); - psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t); - if (F_ISSET(node->mn_flags, F_BIGDATA)) - psize += sizeof(pgno_t); - else - psize += NODEDSZ(node); - psize += psize & 1; - if (psize > pmax) { - if (i >= newindx) { - split_indx = newindx; - newpos = 0; - } else - split_indx = i+1; + if (psize > pmax || i == k-j) { + split_indx = i + (j<0); break; } } } + if (split_indx == newindx) { + sepkey.mv_size = newkey->mv_size; + sepkey.mv_data = newkey->mv_data; + } else { + node = (MDB_node *)((char *)mp + copy->mp_ptrs[split_indx]); + sepkey.mv_size = node->mn_ksize; + sepkey.mv_data = NODEKEY(node); + } } } - /* First find the separating key between the split pages. - * The case where newindx == split_indx is ambiguous; the - * new item could go to the new page or stay on the original - * page. If newpos == 1 it goes to the new page. - */ - if (newindx == split_indx && newpos) { - sepkey.mv_size = newkey->mv_size; - sepkey.mv_data = newkey->mv_data; - } else { - node = NODEPTR(mp, split_indx); - sepkey.mv_size = node->mn_ksize; - sepkey.mv_data = NODEKEY(node); - } - -newsep: - DPRINTF(("separator is [%s]", DKEY(&sepkey))); + DPRINTF(("separator is %d [%s]", split_indx, DKEY(&sepkey))); /* Copy separator key to the parent. */ - if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(mc->mc_txn->mt_env, &sepkey)) { + if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(env, &sepkey)) { mn.mc_snum--; mn.mc_top--; did_split = 1; @@ -7515,117 +7628,97 @@ newsep: return rc; for (i=0; imc_top; i++) mc->mc_ki[i] = mn.mc_ki[i]; - goto done; - } - if (IS_LEAF2(rp)) { - goto done; - } - - /* Move half of the keys to the right sibling. */ + } else if (!IS_LEAF2(mp)) { + /* Move nodes */ + mc->mc_pg[mc->mc_top] = rp; + i = split_indx; + j = 0; + do { + if (i == newindx) { + rkey.mv_data = newkey->mv_data; + rkey.mv_size = newkey->mv_size; + if (IS_LEAF(mp)) { + rdata = newdata; + } else + pgno = newpgno; + flags = nflags; + /* Update index for the new key. */ + mc->mc_ki[mc->mc_top] = j; + } else { + node = (MDB_node *)((char *)mp + copy->mp_ptrs[i]); + rkey.mv_data = NODEKEY(node); + rkey.mv_size = node->mn_ksize; + if (IS_LEAF(mp)) { + xdata.mv_data = NODEDATA(node); + xdata.mv_size = NODEDSZ(node); + rdata = &xdata; + } else + pgno = NODEPGNO(node); + flags = node->mn_flags; + } - /* grab a page to hold a temporary copy */ - copy = mdb_page_malloc(mc->mc_txn, 1); - if (copy == NULL) - return ENOMEM; + if (!IS_LEAF(mp) && j == 0) { + /* First branch index doesn't need key data. */ + rkey.mv_size = 0; + } - copy->mp_pgno = mp->mp_pgno; - copy->mp_flags = mp->mp_flags; - copy->mp_lower = PAGEHDRSZ; - copy->mp_upper = mc->mc_txn->mt_env->me_psize; - mc->mc_pg[mc->mc_top] = copy; - for (i = j = 0; i <= nkeys; j++) { - if (i == split_indx) { - /* Insert in right sibling. */ - /* Reset insert index for right sibling. */ - if (i != newindx || (newpos ^ ins_new)) { + rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags); + if (rc) { + /* return tmp page to freelist */ + mdb_page_free(env, copy); + return rc; + } + if (i == nkeys) { + i = 0; j = 0; - mc->mc_pg[mc->mc_top] = rp; + mc->mc_pg[mc->mc_top] = copy; + } else { + i++; + j++; + } + } while (i != split_indx); + + nkeys = NUMKEYS(copy); + for (i=0; imp_ptrs[i] = copy->mp_ptrs[i]; + mp->mp_lower = copy->mp_lower; + mp->mp_upper = copy->mp_upper; + memcpy(NODEPTR(mp, nkeys-1), NODEPTR(copy, nkeys-1), + env->me_psize - copy->mp_upper); + + /* reset back to original page */ + if (newindx < split_indx) { + mc->mc_pg[mc->mc_top] = mp; + if (nflags & MDB_RESERVE) { + node = NODEPTR(mp, mc->mc_ki[mc->mc_top]); + if (!(node->mn_flags & F_BIGDATA)) + newdata->mv_data = NODEDATA(node); } - } - - if (i == newindx && !ins_new) { - /* Insert the original entry that caused the split. */ - rkey.mv_data = newkey->mv_data; - rkey.mv_size = newkey->mv_size; - if (IS_LEAF(mp)) { - rdata = newdata; - } else - pgno = newpgno; - flags = nflags; - - ins_new = 1; - - /* Update index for the new key. */ - mc->mc_ki[mc->mc_top] = j; - } else if (i == nkeys) { - break; } else { - node = NODEPTR(mp, i); - rkey.mv_data = NODEKEY(node); - rkey.mv_size = node->mn_ksize; - if (IS_LEAF(mp)) { - xdata.mv_data = NODEDATA(node); - xdata.mv_size = NODEDSZ(node); - rdata = &xdata; - } else - pgno = NODEPGNO(node); - flags = node->mn_flags; - - i++; - } - - if (!IS_LEAF(mp) && j == 0) { - /* First branch index doesn't need key data. */ - rkey.mv_size = 0; - } - - rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags); - if (rc) break; - } - - nkeys = NUMKEYS(copy); - for (i=0; imp_ptrs[i] = copy->mp_ptrs[i]; - mp->mp_lower = copy->mp_lower; - mp->mp_upper = copy->mp_upper; - memcpy(NODEPTR(mp, nkeys-1), NODEPTR(copy, nkeys-1), - mc->mc_txn->mt_env->me_psize - copy->mp_upper); - - /* reset back to original page */ - if (newindx < split_indx || (!newpos && newindx == split_indx)) { - mc->mc_pg[mc->mc_top] = mp; - if (nflags & MDB_RESERVE) { - node = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - if (!(node->mn_flags & F_BIGDATA)) - newdata->mv_data = NODEDATA(node); - } - } else { - mc->mc_ki[ptop]++; - /* Make sure mc_ki is still valid. - */ - if (mn.mc_pg[ptop] != mc->mc_pg[ptop] && - mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) { - for (i=0; imc_pg[i] = mn.mc_pg[i]; - mc->mc_ki[i] = mn.mc_ki[i]; + mc->mc_pg[mc->mc_top] = rp; + mc->mc_ki[ptop]++; + /* Make sure mc_ki is still valid. + */ + if (mn.mc_pg[ptop] != mc->mc_pg[ptop] && + mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) { + for (i=0; imc_pg[i] = mn.mc_pg[i]; + mc->mc_ki[i] = mn.mc_ki[i]; + } + mc->mc_pg[ptop] = mn.mc_pg[ptop]; + mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1; } - mc->mc_pg[ptop] = mn.mc_pg[ptop]; - mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1; } + /* return tmp page to freelist */ + mdb_page_free(env, copy); } - /* return tmp page to freelist */ - mdb_page_free(mc->mc_txn->mt_env, copy); -done: { /* Adjust other cursors pointing to mp */ MDB_cursor *m2, *m3; MDB_dbi dbi = mc->mc_dbi; int fixup = NUMKEYS(mp); - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7653,7 +7746,7 @@ done: m3->mc_snum++; m3->mc_top++; } - if (m3->mc_pg[mc->mc_top] == mp) { + if (m3->mc_top >= mc->mc_top && m3->mc_pg[mc->mc_top] == mp) { if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDB_SPLIT_REPLACE)) m3->mc_ki[mc->mc_top]++; if (m3->mc_ki[mc->mc_top] >= fixup) { @@ -7661,12 +7754,13 @@ done: m3->mc_ki[mc->mc_top] -= fixup; m3->mc_ki[ptop] = mn.mc_ki[ptop]; } - } else if (!did_split && m3->mc_pg[ptop] == mc->mc_pg[ptop] && + } else if (!did_split && m3->mc_top >= ptop && m3->mc_pg[ptop] == mc->mc_pg[ptop] && m3->mc_ki[ptop] >= mc->mc_ki[ptop]) { m3->mc_ki[ptop]++; } } } + DPRINTF(("mp left: %d, rp left: %d", SIZELEFT(mp), SIZELEFT(rp))); return rc; } @@ -7683,13 +7777,6 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) return EINVAL; - if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) - return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; - - if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { - return MDB_BAD_VALSIZE; - } - if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP)) != flags) return EINVAL; @@ -7729,6 +7816,16 @@ mdb_env_get_path(MDB_env *env, const char **arg) return MDB_SUCCESS; } +int +mdb_env_get_fd(MDB_env *env, mdb_filehandle_t *arg) +{ + if (!env || !arg) + return EINVAL; + + *arg = env->me_fd; + return MDB_SUCCESS; +} + /** Common code for #mdb_stat() and #mdb_env_stat(). * @param[in] env the environment to operate in. * @param[in] db the #MDB_db record containing the stats to return. @@ -7899,7 +7996,6 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db txn->mt_dbflags[slot] = dbflag; memcpy(&txn->mt_dbs[slot], data.mv_data, sizeof(MDB_db)); *dbi = slot; - txn->mt_env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags; mdb_default_cmp(txn, slot); if (!unused) { txn->mt_numdbs++; @@ -7935,12 +8031,12 @@ void mdb_dbi_close(MDB_env *env, MDB_dbi dbi) free(ptr); } -int mdb_dbi_flags(MDB_env *env, MDB_dbi dbi, unsigned int *flags) +int mdb_dbi_flags(MDB_txn *txn, MDB_dbi dbi, unsigned int *flags) { /* We could return the flags for the FREE_DBI too but what's the point? */ - if (dbi < MAIN_DBI || dbi >= env->me_numdbs) + if (txn == NULL || dbi < MAIN_DBI || dbi >= txn->mt_numdbs) return EINVAL; - *flags = env->me_dbflags[dbi]; + *flags = txn->mt_dbs[dbi].md_flags & PERSISTENT_FLAGS; return MDB_SUCCESS; } @@ -7954,7 +8050,7 @@ mdb_drop0(MDB_cursor *mc, int subs) { int rc; - rc = mdb_page_search(mc, NULL, 0); + rc = mdb_page_search(mc, NULL, MDB_PS_FIRST); if (rc == MDB_SUCCESS) { MDB_txn *txn = mc->mc_txn; MDB_node *ni; @@ -8152,7 +8248,7 @@ int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx) return 0; } -/* insert pid into list if not already present. +/** Insert pid into list if not already present. * return -1 if already present. */ static int mdb_pid_insert(pid_t *ids, pid_t pid) @@ -8180,7 +8276,7 @@ static int mdb_pid_insert(pid_t *ids, pid_t pid) return -1; } } - + if( val > 0 ) { ++cursor; } @@ -8221,6 +8317,8 @@ int mdb_reader_check(MDB_env *env, int *dead) if (!mdb_reader_pid(env, Pidcheck, pid)) { for (j=i; j