X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=libraries%2Fliblmdb%2Fmdb.c;h=c98247c2b9937afd26a15fa592a53a9ec14d5bbc;hb=7fe85f5c8291f531f93c28b98865367aeca75484;hp=0e5c3618874d083c67e643f8843fe6433abe7079;hpb=86d9675543b82856523826c5c16fde0f8dc2b745;p=openldap diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index 0e5c361887..c98247c2b9 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -1,11 +1,11 @@ /** @file mdb.c - * @brief memory-mapped database library + * @brief Lightning memory-mapped database library * * A Btree-based database management library modeled loosely on the * BerkeleyDB API, but much simplified. */ /* - * Copyright 2011-2013 Howard Chu, Symas Corp. + * Copyright 2011-2014 Howard Chu, Symas Corp. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -145,7 +145,7 @@ # error "Two's complement, reasonably sized integer types, please" #endif -/** @defgroup internal MDB Internals +/** @defgroup internal LMDB Internals * @{ */ /** @defgroup compat Compatibility Macros @@ -381,7 +381,7 @@ static txnid_t mdb_debug_start; */ #define MDB_MINKEYS 2 - /** A stamp that identifies a file as an MDB file. + /** A stamp that identifies a file as an LMDB file. * There's nothing special about this value other than that it is easily * recognizable, and it will reflect any byte order mismatches. */ @@ -568,7 +568,7 @@ typedef struct MDB_reader { * unlikely. If a collision occurs, the results are unpredictable. */ typedef struct MDB_txbody { - /** Stamp identifying this as an MDB file. It must be set + /** Stamp identifying this as an LMDB file. It must be set * to #MDB_MAGIC. */ uint32_t mtb_magic; /** Format of this lock file. Must be set to #MDB_LOCK_FORMAT. */ @@ -650,6 +650,7 @@ typedef struct MDB_page { #define P_DIRTY 0x10 /**< dirty page, also set for #P_SUBP pages */ #define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */ #define P_SUBP 0x40 /**< for #MDB_DUPSORT sub-pages */ +#define P_LOOSE 0x4000 /**< page was dirtied then freed, can be reused */ #define P_KEEP 0x8000 /**< leave this page alone during spill */ /** @} */ uint16_t mp_flags; /**< @ref mdb_page */ @@ -841,7 +842,7 @@ typedef struct MDB_db { * Pages 0-1 are meta pages. Transaction N writes meta page #(N % 2). */ typedef struct MDB_meta { - /** Stamp identifying this as an MDB file. It must be set + /** Stamp identifying this as an LMDB file. It must be set * to #MDB_MAGIC. */ uint32_t mm_magic; /** Version number of this lock file. Must be set to #MDB_DATA_VERSION. */ @@ -1021,6 +1022,7 @@ typedef struct MDB_xcursor { typedef struct MDB_pgstate { pgno_t *mf_pghead; /**< Reclaimed freeDB pages, or NULL before use */ txnid_t mf_pglast; /**< ID of last used record, or 0 if !mf_pghead */ + MDB_page *mf_pgloose; /**< Dirty pages that can be reused */ } MDB_pgstate; /** The database environment. */ @@ -1057,6 +1059,7 @@ struct MDB_env { MDB_pgstate me_pgstate; /**< state of old pages from freeDB */ # define me_pglast me_pgstate.mf_pglast # define me_pghead me_pgstate.mf_pghead +# define me_pgloose me_pgstate.mf_pgloose MDB_page *me_dpages; /**< list of malloc'd blocks for re-use */ /** IDL of pages that became unused in a write txn */ MDB_IDL me_free_pgs; @@ -1182,7 +1185,7 @@ mdb_version(int *major, int *minor, int *patch) return MDB_VERSION_STRING; } -/** Table of descriptions for MDB @ref errors */ +/** Table of descriptions for LMDB @ref errors */ static char *const mdb_errstr[] = { "MDB_KEYEXIST: Key/data pair already exists", "MDB_NOTFOUND: No matching key/data pair found", @@ -1190,7 +1193,7 @@ static char *const mdb_errstr[] = { "MDB_CORRUPTED: Located page was wrong type", "MDB_PANIC: Update of meta page failed", "MDB_VERSION_MISMATCH: Database environment version mismatch", - "MDB_INVALID: File is not an MDB file", + "MDB_INVALID: File is not an LMDB file", "MDB_MAP_FULL: Environment mapsize limit reached", "MDB_DBS_FULL: Environment maxdbs limit reached", "MDB_READERS_FULL: Environment maxreaders limit reached", @@ -1202,7 +1205,7 @@ static char *const mdb_errstr[] = { "MDB_INCOMPATIBLE: Operation and DB incompatible, or DB flags changed", "MDB_BAD_RSLOT: Invalid reuse of reader locktable slot", "MDB_BAD_TXN: Transaction cannot recover - it must be aborted", - "MDB_BAD_VALSIZE: Too big key/data, key is empty, or wrong DUPFIXED size", + "MDB_BAD_VALSIZE: Unsupported size of key/DB name/data, or wrong DUPFIXED size", }; char * @@ -1485,7 +1488,6 @@ mdb_page_malloc(MDB_txn *txn, unsigned num) } return ret; } - /** Free a single page. * Saves single pages to a list, for future reuse. * (This is not used for multi-page overflow pages.) @@ -1525,6 +1527,23 @@ mdb_dlist_free(MDB_txn *txn) dl[0].mid = 0; } +/** Loosen a single page. + * Saves single pages to a list for future reuse + * in this same txn. It has been pulled from the freeDB + * and already resides on the dirty list, but has been + * deleted. Use these pages first before pulling again + * from the freeDB. + */ +static void +mdb_page_loose(MDB_env *env, MDB_page *mp) +{ + pgno_t *pp = (pgno_t *)mp->mp_ptrs; + *pp = mp->mp_pgno; + mp->mp_next = env->me_pgloose; + env->me_pgloose = mp; + mp->mp_flags |= P_LOOSE; +} + /** Set or clear P_KEEP in dirty, non-overflow, non-sub pages watched by txn. * @param[in] mc A cursor handle for the current operation. * @param[in] pflags Flags of the pages to update: @@ -1573,6 +1592,12 @@ mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all) break; } + /* Loose pages shouldn't be spilled */ + for (dp = txn->mt_env->me_pgloose; dp; dp=dp->mp_next) { + if ((dp->mp_flags & Mask) == pflags) + dp->mp_flags ^= P_KEEP; + } + if (all) { /* Mark dirty root pages */ for (i=0; imt_numdbs; i++) { @@ -1790,7 +1815,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp) #else enum { Paranoid = 0, Max_retries = INT_MAX /*infinite*/ }; #endif - int rc, retry = Max_retries; + int rc, retry = num * 20; MDB_txn *txn = mc->mc_txn; MDB_env *env = txn->mt_env; pgno_t pgno, *mop = env->me_pghead; @@ -1800,6 +1825,17 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp) MDB_cursor_op op; MDB_cursor m2; + /* If there are any loose pages, just use them */ + if (num == 1 && env->me_pgloose) { + pgno_t *pp; + np = env->me_pgloose; + env->me_pgloose = np->mp_next; + pp = (pgno_t *)np->mp_ptrs; + np->mp_pgno = *pp; + *mp = np; + return MDB_SUCCESS; + } + *mp = NULL; /* If our dirty list is already full, we can't do anything */ @@ -1823,7 +1859,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp) if (mop[i-n2] == pgno+n2) goto search_done; } while (--i > n2); - if (Max_retries < INT_MAX && --retry < 0) + if (--retry < 0) break; } @@ -2661,6 +2697,38 @@ mdb_freelist_save(MDB_txn *txn) return rc; } + /* Dispose of loose pages. Usually they will have all + * been used up by the time we get here. + */ + if (env->me_pgloose) { + MDB_page *mp = env->me_pgloose; + pgno_t *pp; + /* Just return them to freeDB */ + if (env->me_pghead) { + int i, j; + mop = env->me_pghead; + while(mp) { + pgno_t pg; + pp = (pgno_t *)mp->mp_ptrs; + pg = *pp; + j = mop[0] + 1; + for (i = mop[0]; i && mop[i] < pg; i--) + mop[j--] = mop[i]; + mop[j] = pg; + mop[0] += 1; + mp = mp->mp_next; + } + } else { + /* Oh well, they were wasted. Put on freelist */ + while(mp) { + pp = (pgno_t *)mp->mp_ptrs; + mdb_midl_append(&txn->mt_free_pgs, *pp); + mp = mp->mp_next; + } + } + env->me_pgloose = NULL; + } + /* MDB_RESERVE cancels meminit in ovpage malloc (when no WRITEMAP) */ clean_limit = (env->me_flags & (MDB_NOMEMINIT|MDB_WRITEMAP)) ? SSIZE_MAX : maxfree_1pg; @@ -2770,23 +2838,20 @@ mdb_freelist_save(MDB_txn *txn) mop += mop_len; rc = mdb_cursor_first(&mc, &key, &data); for (; !rc; rc = mdb_cursor_next(&mc, &key, &data, MDB_NEXT)) { - unsigned flags = MDB_CURRENT; txnid_t id = *(txnid_t *)key.mv_data; ssize_t len = (ssize_t)(data.mv_size / sizeof(MDB_ID)) - 1; MDB_ID save; mdb_tassert(txn, len >= 0 && id <= env->me_pglast); + key.mv_data = &id; if (len > mop_len) { len = mop_len; data.mv_size = (len + 1) * sizeof(MDB_ID); - /* Drop MDB_CURRENT when changing the data size */ - key.mv_data = &id; - flags = 0; } data.mv_data = mop -= len; save = mop[0]; mop[0] = len; - rc = mdb_cursor_put(&mc, &key, &data, flags); + rc = mdb_cursor_put(&mc, &key, &data, MDB_CURRENT); mop[0] = save; if (rc || !(mop_len -= len)) break; @@ -3558,7 +3623,7 @@ mdb_env_get_maxreaders(MDB_env *env, unsigned int *readers) return MDB_SUCCESS; } -/** Further setup required for opening an MDB environment +/** Further setup required for opening an LMDB environment */ static int mdb_env_open2(MDB_env *env) @@ -3680,7 +3745,9 @@ static void NTAPI mdb_tls_callback(PVOID module, DWORD reason, PVOID ptr) case DLL_THREAD_DETACH: for (i=0; i (unsigned short *)a->mv_data); return x; #else - return memcmp(a->mv_data, b->mv_data, a->mv_size); + unsigned short *u, *c, *end; + int x; + + end = (unsigned short *) ((char *) a->mv_data + a->mv_size); + u = (unsigned short *)a->mv_data; + c = (unsigned short *)b->mv_data; + do { + x = *u++ - *c++; + } while(!x && u < end); + return x; #endif } +/** Compare two items pointing at size_t's of unknown alignment. */ +#ifdef MISALIGNED_OK +# define mdb_cmp_clong mdb_cmp_long +#else +# define mdb_cmp_clong mdb_cmp_cint +#endif + /** Compare two items lexically */ static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) @@ -5415,7 +5498,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, if (!mc->mc_top) { /* There are no other pages */ mc->mc_ki[mc->mc_top] = 0; - if (op == MDB_SET_RANGE) { + if (op == MDB_SET_RANGE && !exactp) { rc = 0; goto set1; } else @@ -5451,8 +5534,10 @@ set1: mc->mc_flags &= ~C_EOF; if (IS_LEAF2(mp)) { - key->mv_size = mc->mc_db->md_pad; - key->mv_data = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], key->mv_size); + if (op == MDB_SET_RANGE || op == MDB_SET_KEY) { + key->mv_size = mc->mc_db->md_pad; + key->mv_data = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], key->mv_size); + } return MDB_SUCCESS; } @@ -5800,14 +5885,14 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, uint16_t fp_flags; MDB_val xdata, *rdata, dkey, olddata; MDB_db dummy; - int do_sub = 0, insert; + int do_sub = 0, insert_key, insert_data; unsigned int mcount = 0, dcount = 0, nospill; size_t nsize; int rc, rc2; unsigned int nflags; DKBUF; - if (mc == NULL) + if (mc == NULL || key == NULL) return EINVAL; env = mc->mc_txn->mt_env; @@ -5828,16 +5913,8 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, if (mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) return (mc->mc_txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; - if (flags != MDB_CURRENT) { - if (key == NULL) - return EINVAL; - if (key->mv_size-1 >= ENV_MAXKEY(env)) - return MDB_BAD_VALSIZE; - } else { - /* Ignore key except in sub-cursor, where key holds the data */ - if (!(mc->mc_flags & C_SUB)) - key = NULL; - } + if (key->mv_size-1 >= ENV_MAXKEY(env)) + return MDB_BAD_VALSIZE; #if SIZE_MAX > MAXDATASIZE if (data->mv_size > ((mc->mc_db->md_flags & MDB_DUPSORT) ? ENV_MAXKEY(env) : MAXDATASIZE)) @@ -5927,8 +6004,8 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, return rc2; } - insert = rc; - if (insert) { + insert_key = insert_data = rc; + if (insert_key) { /* The key does not exist */ DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top])); if ((mc->mc_db->md_flags & MDB_DUPSORT) && @@ -5944,13 +6021,12 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, } else { /* there's only a key anyway, so this is a no-op */ if (IS_LEAF2(mc->mc_pg[mc->mc_top])) { + char *ptr; unsigned int ksize = mc->mc_db->md_pad; if (key->mv_size != ksize) return MDB_BAD_VALSIZE; - if (flags == MDB_CURRENT) { - char *ptr = LEAF2KEY(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], ksize); - memcpy(ptr, key->mv_data, ksize); - } + ptr = LEAF2KEY(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], ksize); + memcpy(ptr, key->mv_data, ksize); return MDB_SUCCESS; } @@ -5978,21 +6054,14 @@ more: #if UINT_MAX < SIZE_MAX if (mc->mc_dbx->md_dcmp == mdb_cmp_int && olddata.mv_size == sizeof(size_t)) -#ifdef MISALIGNED_OK - mc->mc_dbx->md_dcmp = mdb_cmp_long; -#else - mc->mc_dbx->md_dcmp = mdb_cmp_cint; -#endif + mc->mc_dbx->md_dcmp = mdb_cmp_clong; #endif - /* if data matches, skip it */ + /* does data match? */ if (!mc->mc_dbx->md_dcmp(data, &olddata)) { if (flags & MDB_NODUPDATA) - rc = MDB_KEYEXIST; - else if (flags & MDB_MULTIPLE) - goto next_mult; - else - rc = MDB_SUCCESS; - return rc; + return MDB_KEYEXIST; + /* overwrite it */ + goto current; } /* Back up original data item */ @@ -6089,7 +6158,7 @@ prep_subDB: rdata = &xdata; flags |= F_DUPDATA; do_sub = 1; - if (!insert) + if (!insert_key) mdb_node_del(mc, 0); goto new_sub; } @@ -6149,7 +6218,7 @@ current: data->mv_data = METADATA(omp); else memcpy(METADATA(omp), data->mv_data, data->mv_size); - goto done; + return MDB_SUCCESS; } } if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS) @@ -6165,10 +6234,9 @@ current: memcpy(olddata.mv_data, data->mv_data, data->mv_size); else memcpy(NODEKEY(leaf), key->mv_data, key->mv_size); - goto done; + return MDB_SUCCESS; } mdb_node_del(mc, 0); - mc->mc_db->md_entries--; } rdata = data; @@ -6178,14 +6246,14 @@ new_sub: nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(env, key, rdata); if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) { if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA ) - nflags &= ~MDB_APPEND; - if (!insert) + nflags &= ~MDB_APPEND; /* sub-page may need room to grow */ + if (!insert_key) nflags |= MDB_SPLIT_REPLACE; rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags); } else { /* There is room already in this leaf page. */ rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags); - if (rc == 0 && !do_sub && insert) { + if (rc == 0 && insert_key) { /* Adjust other cursors pointing to mp */ MDB_cursor *m2, *m3; MDB_dbi dbi = mc->mc_dbi; @@ -6205,9 +6273,7 @@ new_sub: } } - if (rc != MDB_SUCCESS) - mc->mc_txn->mt_flags |= MDB_TXN_ERROR; - else { + if (rc == MDB_SUCCESS) { /* Now store the actual data in the child DB. Note that we're * storing the user data in the keys field, so there are strict * size limits on dupdata. The actual data fields of the child @@ -6215,6 +6281,7 @@ new_sub: */ if (do_sub) { int xflags; + size_t ecount; put_sub: xdata.mv_size = 0; xdata.mv_data = ""; @@ -6229,10 +6296,8 @@ put_sub: /* converted, write the original data first */ if (dkey.mv_size) { rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags); - if (rc) { - mc->mc_txn->mt_flags |= MDB_TXN_ERROR; - return rc == MDB_KEYEXIST ? MDB_CORRUPTED : rc; - } + if (rc) + goto bad_sub; { /* Adjust other cursors pointing to mp */ MDB_cursor *m2; @@ -6250,6 +6315,7 @@ put_sub: /* we've done our job */ dkey.mv_size = 0; } + ecount = mc->mc_xcursor->mx_db.md_entries; if (flags & MDB_APPENDDUP) xflags |= MDB_APPEND; rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags); @@ -6257,31 +6323,38 @@ put_sub: void *db = NODEDATA(leaf); memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); } + insert_data = mc->mc_xcursor->mx_db.md_entries - ecount; } - /* sub-writes might have failed so check rc again. - * Don't increment count if we just replaced an existing item. - */ - if (!rc && !(flags & MDB_CURRENT)) + /* Increment count unless we just replaced an existing item. */ + if (insert_data) mc->mc_db->md_entries++; + if (insert_key) { + /* Invalidate txn if we created an empty sub-DB */ + if (rc) + goto bad_sub; + /* If we succeeded and the key didn't exist before, + * make sure the cursor is marked valid. + */ + mc->mc_flags |= C_INITIALIZED; + } if (flags & MDB_MULTIPLE) { if (!rc) { -next_mult: mcount++; /* let caller know how many succeeded, if any */ data[1].mv_size = mcount; if (mcount < dcount) { data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size; + insert_key = insert_data = 0; goto more; } } } + return rc; +bad_sub: + if (rc == MDB_KEYEXIST) /* should not happen, we deleted that item */ + rc = MDB_CORRUPTED; } -done: - /* If we succeeded and the key didn't exist before, make sure - * the cursor is marked valid. - */ - if (!rc && insert) - mc->mc_flags |= C_INITIALIZED; + mc->mc_txn->mt_flags |= MDB_TXN_ERROR; return rc; } @@ -6314,7 +6387,10 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { - if (!(flags & MDB_NODUPDATA)) { + if (flags & MDB_NODUPDATA) { + /* mdb_cursor_del0() will subtract the final entry */ + mc->mc_db->md_entries -= mc->mc_xcursor->mx_db.md_entries - 1; + } else { if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) { mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); } @@ -6353,7 +6429,6 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) rc = mdb_drop0(&mc->mc_xcursor->mx_cursor, 0); if (rc) goto fail; - mc->mc_db->md_entries -= mc->mc_xcursor->mx_db.md_entries; } } @@ -6782,11 +6857,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) mx->mx_dbflag = DB_VALID|DB_DIRTY; /* DB_DIRTY guides mdb_cursor_touch */ #if UINT_MAX < SIZE_MAX if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t)) -#ifdef MISALIGNED_OK - mx->mx_dbx.md_cmp = mdb_cmp_long; -#else - mx->mx_dbx.md_cmp = mdb_cmp_cint; -#endif + mx->mx_dbx.md_cmp = mdb_cmp_clong; #endif } @@ -7060,20 +7131,20 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) MDB_node *s2; MDB_val bkey; /* must find the lowest key below dst */ - rc = mdb_page_search_lowest(cdst); + mdb_cursor_copy(cdst, &mn); + rc = mdb_page_search_lowest(&mn); if (rc) return rc; - if (IS_LEAF2(cdst->mc_pg[cdst->mc_top])) { - bkey.mv_size = cdst->mc_db->md_pad; - bkey.mv_data = LEAF2KEY(cdst->mc_pg[cdst->mc_top], 0, bkey.mv_size); + if (IS_LEAF2(mn.mc_pg[mn.mc_top])) { + bkey.mv_size = mn.mc_db->md_pad; + bkey.mv_data = LEAF2KEY(mn.mc_pg[mn.mc_top], 0, bkey.mv_size); } else { - s2 = NODEPTR(cdst->mc_pg[cdst->mc_top], 0); + s2 = NODEPTR(mn.mc_pg[mn.mc_top], 0); bkey.mv_size = NODEKSZ(s2); bkey.mv_data = NODEKEY(s2); } - cdst->mc_snum = snum--; - cdst->mc_top = snum; - mdb_cursor_copy(cdst, &mn); + mn.mc_snum = snum--; + mn.mc_top = snum; mn.mc_ki[snum] = 0; rc = mdb_update_key(&mn, &bkey); if (rc) @@ -7189,14 +7260,17 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) { - int rc; - indx_t i, j; - MDB_node *srcnode; + MDB_page *psrc, *pdst; + MDB_node *srcnode; MDB_val key, data; - unsigned nkeys; + unsigned nkeys; + int rc; + indx_t i, j; - DPRINTF(("merging page %"Z"u into %"Z"u", csrc->mc_pg[csrc->mc_top]->mp_pgno, - cdst->mc_pg[cdst->mc_top]->mp_pgno)); + psrc = csrc->mc_pg[csrc->mc_top]; + pdst = cdst->mc_pg[cdst->mc_top]; + + DPRINTF(("merging page %"Z"u into %"Z"u", psrc->mp_pgno, pdst->mp_pgno)); mdb_cassert(csrc, csrc->mc_snum > 1); /* can't merge root page */ mdb_cassert(csrc, cdst->mc_snum > 1); @@ -7207,36 +7281,35 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) /* Move all nodes from src to dst. */ - j = nkeys = NUMKEYS(cdst->mc_pg[cdst->mc_top]); - if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) { + j = nkeys = NUMKEYS(pdst); + if (IS_LEAF2(psrc)) { key.mv_size = csrc->mc_db->md_pad; - key.mv_data = METADATA(csrc->mc_pg[csrc->mc_top]); - for (i = 0; i < NUMKEYS(csrc->mc_pg[csrc->mc_top]); i++, j++) { + key.mv_data = METADATA(psrc); + for (i = 0; i < NUMKEYS(psrc); i++, j++) { rc = mdb_node_add(cdst, j, &key, NULL, 0, 0); if (rc != MDB_SUCCESS) return rc; key.mv_data = (char *)key.mv_data + key.mv_size; } } else { - for (i = 0; i < NUMKEYS(csrc->mc_pg[csrc->mc_top]); i++, j++) { - srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], i); - if (i == 0 && IS_BRANCH(csrc->mc_pg[csrc->mc_top])) { - unsigned int snum = csrc->mc_snum; + for (i = 0; i < NUMKEYS(psrc); i++, j++) { + srcnode = NODEPTR(psrc, i); + if (i == 0 && IS_BRANCH(psrc)) { + MDB_cursor mn; MDB_node *s2; + mdb_cursor_copy(csrc, &mn); /* must find the lowest key below src */ - rc = mdb_page_search_lowest(csrc); + rc = mdb_page_search_lowest(&mn); if (rc) return rc; - if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) { - key.mv_size = csrc->mc_db->md_pad; - key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size); + if (IS_LEAF2(mn.mc_pg[mn.mc_top])) { + key.mv_size = mn.mc_db->md_pad; + key.mv_data = LEAF2KEY(mn.mc_pg[mn.mc_top], 0, key.mv_size); } else { - s2 = NODEPTR(csrc->mc_pg[csrc->mc_top], 0); + s2 = NODEPTR(mn.mc_pg[mn.mc_top], 0); key.mv_size = NODEKSZ(s2); key.mv_data = NODEKEY(s2); } - csrc->mc_snum = snum--; - csrc->mc_top = snum; } else { key.mv_size = srcnode->mn_ksize; key.mv_data = NODEKEY(srcnode); @@ -7251,8 +7324,8 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) } DPRINTF(("dst page %"Z"u now has %u keys (%.1f%% filled)", - cdst->mc_pg[cdst->mc_top]->mp_pgno, NUMKEYS(cdst->mc_pg[cdst->mc_top]), - (float)PAGEFILL(cdst->mc_txn->mt_env, cdst->mc_pg[cdst->mc_top]) / 10)); + pdst->mp_pgno, NUMKEYS(pdst), + (float)PAGEFILL(cdst->mc_txn->mt_env, pdst) / 10)); /* Unlink the src page from parent and add to free list. */ @@ -7268,11 +7341,18 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) } csrc->mc_top++; - rc = mdb_midl_append(&csrc->mc_txn->mt_free_pgs, - csrc->mc_pg[csrc->mc_top]->mp_pgno); - if (rc) - return rc; - if (IS_LEAF(csrc->mc_pg[csrc->mc_top])) + psrc = csrc->mc_pg[csrc->mc_top]; + /* If not operating on FreeDB, allow this page to be reused + * in this txn. + */ + if ((psrc->mp_flags & P_DIRTY) && csrc->mc_dbi != FREE_DBI) { + mdb_page_loose(csrc->mc_txn->mt_env, psrc); + } else { + rc = mdb_midl_append(&csrc->mc_txn->mt_free_pgs, psrc->mp_pgno); + if (rc) + return rc; + } + if (IS_LEAF(psrc)) csrc->mc_db->md_leaf_pages--; else csrc->mc_db->md_branch_pages--; @@ -7280,7 +7360,6 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) /* Adjust other cursors pointing to mp */ MDB_cursor *m2, *m3; MDB_dbi dbi = csrc->mc_dbi; - MDB_page *mp = cdst->mc_pg[cdst->mc_top]; for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (csrc->mc_flags & C_SUB) @@ -7289,15 +7368,24 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) m3 = m2; if (m3 == csrc) continue; if (m3->mc_snum < csrc->mc_snum) continue; - if (m3->mc_pg[csrc->mc_top] == csrc->mc_pg[csrc->mc_top]) { - m3->mc_pg[csrc->mc_top] = mp; + if (m3->mc_pg[csrc->mc_top] == psrc) { + m3->mc_pg[csrc->mc_top] = pdst; m3->mc_ki[csrc->mc_top] += nkeys; } } } - mdb_cursor_pop(csrc); - - return mdb_rebalance(csrc); + { + unsigned int snum = cdst->mc_snum; + uint16_t depth = cdst->mc_db->md_depth; + mdb_cursor_pop(cdst); + rc = mdb_rebalance(cdst); + /* Did the tree shrink? */ + if (depth > cdst->mc_db->md_depth) + snum--; + cdst->mc_snum = snum; + cdst->mc_top = snum-1; + } + return rc; } /** Copy the contents of a cursor. @@ -7335,6 +7423,7 @@ mdb_rebalance(MDB_cursor *mc) int rc; unsigned int ptop, minkeys; MDB_cursor mn; + indx_t oldki; minkeys = 1 + (IS_BRANCH(mc->mc_pg[mc->mc_top])); DPRINTF(("rebalancing %s page %"Z"u (has %u keys, %.1f%% full)", @@ -7385,6 +7474,7 @@ mdb_rebalance(MDB_cursor *mc) } } } else if (IS_BRANCH(mp) && NUMKEYS(mp) == 1) { + int i; DPUTS("collapsing root page!"); rc = mdb_midl_append(&mc->mc_txn->mt_free_pgs, mp->mp_pgno); if (rc) @@ -7396,6 +7486,10 @@ mdb_rebalance(MDB_cursor *mc) mc->mc_db->md_depth--; mc->mc_db->md_branch_pages--; mc->mc_ki[0] = mc->mc_ki[1]; + for (i = 1; imc_db->md_depth; i++) { + mc->mc_pg[i] = mc->mc_pg[i+1]; + mc->mc_ki[i] = mc->mc_ki[i+1]; + } { /* Adjust other cursors pointing to mp */ MDB_cursor *m2, *m3; @@ -7408,7 +7502,6 @@ mdb_rebalance(MDB_cursor *mc) m3 = m2; if (m3 == mc || m3->mc_snum < mc->mc_snum) continue; if (m3->mc_pg[0] == mp) { - int i; m3->mc_snum--; m3->mc_top--; for (i=0; imc_snum; i++) { @@ -7439,6 +7532,7 @@ mdb_rebalance(MDB_cursor *mc) mdb_cursor_copy(mc, &mn); mn.mc_xcursor = NULL; + oldki = mc->mc_ki[mc->mc_top]; if (mc->mc_ki[ptop] == 0) { /* We're the leftmost leaf in our parent. */ @@ -7472,18 +7566,23 @@ mdb_rebalance(MDB_cursor *mc) * (A branch page must never have less than 2 keys.) */ minkeys = 1 + (IS_BRANCH(mn.mc_pg[mn.mc_top])); - if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) > minkeys) - return mdb_node_move(&mn, mc); - else { - if (mc->mc_ki[ptop] == 0) + if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) > minkeys) { + rc = mdb_node_move(&mn, mc); + if (mc->mc_ki[ptop]) { + oldki++; + } + } else { + if (mc->mc_ki[ptop] == 0) { rc = mdb_page_merge(&mn, mc); - else { + } else { + oldki += NUMKEYS(mn.mc_pg[mn.mc_top]); mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1; rc = mdb_page_merge(mc, &mn); mdb_cursor_copy(&mn, mc); } - mc->mc_flags &= ~(C_INITIALIZED|C_EOF); + mc->mc_flags &= ~C_EOF; } + mc->mc_ki[mc->mc_top] = oldki; return rc; }