]> git.sur5r.net Git - openldap/blobdiff - libraries/liblmdb/mdb.c
ITS#7536 fix mdb_rebalance
[openldap] / libraries / liblmdb / mdb.c
index e1f6357c7f09985578e7b2c973f2681c9b5d361a..058c68b3ce8271009ffe2b3e1f6f1130549804a3 100644 (file)
@@ -431,9 +431,6 @@ typedef uint16_t     indx_t;
  *     the longer we delay reclaiming old pages, the more likely it is that a
  *     string of contiguous pages can be found after coalescing old pages from
  *     many old transactions together.
- *
- *     @todo We don't actually do such coalescing yet, we grab pages from one
- *     old transaction at a time.
  *     @{
  */
        /**     Number of slots in the reader table.
@@ -844,6 +841,8 @@ struct MDB_txn {
 #define MDB_TXN_DIRTY          0x04            /**< must write, even if dirty list is empty */
 /** @} */
        unsigned int    mt_flags;               /**< @ref mdb_txn */
+       /** dirty_list maxsize - #allocated pages including in parent txns */
+       unsigned int    mt_dirty_room;
        /** Tracks which of the two meta pages was used at the start
         *      of this transaction.
         */
@@ -911,6 +910,13 @@ typedef struct MDB_xcursor {
        unsigned char mx_dbflag;
 } MDB_xcursor;
 
+       /** State of FreeDB old pages, stored in the MDB_env */
+typedef struct MDB_pgstate {
+       txnid_t         mf_pglast;      /**< ID of last old page record we used */
+       pgno_t          *mf_pghead;     /**< old pages reclaimed from freelist */
+       pgno_t          *mf_pgfree;     /**< memory to free when dropping me_pghead */
+} MDB_pgstate;
+
        /** The database environment. */
 struct MDB_env {
        HANDLE          me_fd;          /**< The main data file */
@@ -937,16 +943,20 @@ struct MDB_env {
        size_t          me_mapsize;             /**< size of the data memory map */
        off_t           me_size;                /**< current file size */
        pgno_t          me_maxpg;               /**< me_mapsize / me_psize */
-       txnid_t         me_pglast;              /**< ID of last old page record we used */
        MDB_dbx         *me_dbxs;               /**< array of static DB info */
        uint16_t        *me_dbflags;    /**< array of flags from MDB_db.md_flags */
-       pgno_t          *me_pghead;     /**< old pages reclaimed from freelist */
        pthread_key_t   me_txkey;       /**< thread-key for readers */
+       MDB_pgstate     me_pgstate;             /**< state of old pages from freeDB */
+#      define          me_pglast       me_pgstate.mf_pglast
+#      define          me_pghead       me_pgstate.mf_pghead
+#      define          me_pgfree       me_pgstate.mf_pgfree
        MDB_page        *me_dpages;             /**< list of malloc'd blocks for re-use */
        /** IDL of pages that became unused in a write txn */
        MDB_IDL         me_free_pgs;
        /** ID2L of pages that were written during a write txn */
        MDB_ID2         me_dirty_list[MDB_IDL_UM_SIZE];
+       /** Max number of freelist items that can fit in a single overflow page */
+       unsigned int    me_maxfree_1pg;
 #ifdef _WIN32
        HANDLE          me_rmutex;              /* Windows mutexes don't reside in shared mem */
        HANDLE          me_wmutex;
@@ -955,6 +965,13 @@ struct MDB_env {
        sem_t           *me_wmutex;
 #endif
 };
+
+       /** Nested transaction */
+typedef struct MDB_ntxn {
+       MDB_txn         mnt_txn;                /* the transaction */
+       MDB_pgstate     mnt_pgstate;    /* parent transaction's saved freestate */
+} MDB_ntxn;
+
        /** max number of pages to commit in one writev() call */
 #define MDB_COMMIT_PAGES        64
 #if defined(IOV_MAX) && IOV_MAX < MDB_COMMIT_PAGES
@@ -1051,7 +1068,9 @@ static char *const mdb_errstr[] = {
        "MDB_TLS_FULL: Thread-local storage keys full - too many environments open",
        "MDB_TXN_FULL: Transaction has too many dirty pages - transaction too big",
        "MDB_CURSOR_FULL: Internal error - cursor stack limit reached",
-       "MDB_PAGE_FULL: Internal error - page has no more space"
+       "MDB_PAGE_FULL: Internal error - page has no more space",
+       "MDB_MAP_RESIZED: Database contents grew beyond environment mapsize",
+       "MDB_INCOMPATIBLE: Database flags changed or would change",
 };
 
 char *
@@ -1265,7 +1284,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
        *mp = NULL;
 
        /* If our dirty list is already full, we can't do anything */
-       if (txn->mt_u.dirty_list[0].mid >= MDB_IDL_UM_MAX)
+       if (txn->mt_dirty_room == 0)
                return MDB_TXN_FULL;
 
        /* The free list won't have any content at all until txn 2 has
@@ -1290,14 +1309,12 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
                                last = *kptr;
                        } else {
                                MDB_val key;
-                               int exact;
 again:
-                               exact = 0;
                                last = txn->mt_env->me_pglast + 1;
                                leaf = NULL;
                                key.mv_data = &last;
                                key.mv_size = sizeof(last);
-                               rc = mdb_cursor_set(&m2, &key, &data, MDB_SET, &exact);
+                               rc = mdb_cursor_set(&m2, &key, &data, MDB_SET_RANGE, NULL);
                                if (rc)
                                        goto none;
                                last = *(txnid_t *)key.mv_data;
@@ -1337,7 +1354,7 @@ again:
                                if (!mop)
                                        return ENOMEM;
                                txn->mt_env->me_pglast = last;
-                               txn->mt_env->me_pghead = mop;
+                               txn->mt_env->me_pghead = txn->mt_env->me_pgfree = mop;
                                memcpy(mop, idl, MDB_IDL_SIZEOF(idl));
 
 #if MDB_DEBUG > 1
@@ -1357,7 +1374,7 @@ none:
                        pgno_t *mop = txn->mt_env->me_pghead;
                        if (num > 1) {
                                MDB_cursor m2;
-                               int retry = 500, readit = 0, n2 = num-1;
+                               int retry = 1, readit = 0, n2 = num-1;
                                unsigned int i, j, k;
 
                                /* If current list is too short, must fetch more and coalesce */
@@ -1377,7 +1394,6 @@ none:
                                        if (readit) {
                                                MDB_val key, data;
                                                pgno_t *idl, *mop2;
-                                               int exact;
 
                                                last = txn->mt_env->me_pglast + 1;
 
@@ -1402,12 +1418,17 @@ none:
                                                if (oldest - last < 1)
                                                        break;
 
-                                               exact = 0;
                                                key.mv_data = &last;
                                                key.mv_size = sizeof(last);
-                                               rc = mdb_cursor_set(&m2, &key, &data, MDB_SET, &exact);
-                                               if (rc)
+                                               rc = mdb_cursor_set(&m2,&key,&data,MDB_SET_RANGE,NULL);
+                                               if (rc) {
+                                                       if (rc == MDB_NOTFOUND)
+                                                               break;
                                                        return rc;
+                                               }
+                                               last = *(txnid_t*)key.mv_data;
+                                               if (oldest <= last)
+                                                       break;
                                                idl = (MDB_ID *) data.mv_data;
                                                mop2 = malloc(MDB_IDL_SIZEOF(idl) + MDB_IDL_SIZEOF(mop));
                                                if (!mop2)
@@ -1422,8 +1443,8 @@ none:
                                                                mop2[k--] = mop[j--];
                                                }
                                                txn->mt_env->me_pglast = last;
-                                               txn->mt_env->me_pghead = mop2;
-                                               free(mop);
+                                               free(txn->mt_env->me_pgfree);
+                                               txn->mt_env->me_pghead = txn->mt_env->me_pgfree = mop2;
                                                mop = mop2;
                                                /* Keep trying to read until we have enough */
                                                if (mop[0] < (unsigned)num) {
@@ -1444,11 +1465,10 @@ none:
                                                }
                                        }
 
-                                       /* Stop if we succeeded, or no more retries */
+                                       /* Stop if we succeeded, or no retries */
                                        if (!retry || pgno != P_INVALID)
                                                break;
                                        readit = 1;
-                                       retry--;
 
                                } while (1);
                        } else {
@@ -1457,8 +1477,8 @@ none:
                                mop[0]--;
                        }
                        if (MDB_IDL_IS_ZERO(mop)) {
-                               txn->mt_env->me_pghead = NULL;
-                               free(mop);
+                               free(txn->mt_env->me_pgfree);
+                               txn->mt_env->me_pghead = txn->mt_env->me_pgfree = NULL;
                        }
                }
        }
@@ -1503,6 +1523,7 @@ none:
        } else {
                mdb_mid2l_insert(txn->mt_u.dirty_list, &mid);
        }
+       txn->mt_dirty_room--;
        *mp = np;
 
        return MDB_SUCCESS;
@@ -1607,8 +1628,7 @@ finish:
                                return 0;
                        }
                }
-               if (mc->mc_txn->mt_u.dirty_list[0].mid >= MDB_IDL_UM_MAX)
-                       return MDB_TXN_FULL;
+               assert(mc->mc_txn->mt_u.dirty_list[0].mid < MDB_IDL_UM_MAX);
                /* No - copy it */
                np = mdb_page_malloc(mc);
                if (!np)
@@ -1800,6 +1820,7 @@ mdb_txn_renew0(MDB_txn *txn)
                if (txn->mt_txnid == mdb_debug_start)
                        mdb_debug = 1;
 #endif
+               txn->mt_dirty_room = MDB_IDL_UM_MAX;
                txn->mt_u.dirty_list = env->me_dirty_list;
                txn->mt_u.dirty_list[0].mid = 0;
                txn->mt_free_pgs = env->me_free_pgs;
@@ -1815,6 +1836,11 @@ mdb_txn_renew0(MDB_txn *txn)
        if (txn->mt_numdbs > 2)
                memset(txn->mt_dbflags+2, DB_STALE, txn->mt_numdbs-2);
 
+       if (env->me_maxpg < txn->mt_next_pgno) {
+               mdb_txn_reset0(txn);
+               return MDB_MAP_RESIZED;
+       }
+
        return MDB_SUCCESS;
 }
 
@@ -1844,7 +1870,8 @@ int
 mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
 {
        MDB_txn *txn;
-       int rc, size;
+       MDB_ntxn *ntxn;
+       int rc, size, tsize = sizeof(MDB_txn);
 
        if (env->me_flags & MDB_FATAL_ERROR) {
                DPUTS("environment had fatal error, must shutdown!");
@@ -1860,8 +1887,9 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
                {
                        return EINVAL;
                }
+               tsize = sizeof(MDB_ntxn);
        }
-       size = sizeof(MDB_txn) + env->me_maxdbs * (sizeof(MDB_db)+1);
+       size = tsize + env->me_maxdbs * (sizeof(MDB_db)+1);
        if (!(flags & MDB_RDONLY))
                size += env->me_maxdbs * sizeof(MDB_cursor *);
 
@@ -1869,7 +1897,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
                DPRINTF("calloc: %s", strerror(ErrCode()));
                return ENOMEM;
        }
-       txn->mt_dbs = (MDB_db *)(txn+1);
+       txn->mt_dbs = (MDB_db *) ((char *)txn + tsize);
        if (flags & MDB_RDONLY) {
                txn->mt_flags |= MDB_TXN_RDONLY;
                txn->mt_dbflags = (unsigned char *)(txn->mt_dbs + env->me_maxdbs);
@@ -1893,6 +1921,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
                }
                txn->mt_txnid = parent->mt_txnid;
                txn->mt_toggle = parent->mt_toggle;
+               txn->mt_dirty_room = parent->mt_dirty_room;
                txn->mt_u.dirty_list[0].mid = 0;
                txn->mt_free_pgs[0] = 0;
                txn->mt_next_pgno = parent->mt_next_pgno;
@@ -1902,8 +1931,22 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
                txn->mt_dbxs = parent->mt_dbxs;
                memcpy(txn->mt_dbs, parent->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
                memcpy(txn->mt_dbflags, parent->mt_dbflags, txn->mt_numdbs);
-               mdb_cursor_shadow(parent, txn);
                rc = 0;
+               ntxn = (MDB_ntxn *)txn;
+               ntxn->mnt_pgstate = env->me_pgstate; /* save parent me_pghead & co */
+               if (env->me_pghead) {
+                       size = MDB_IDL_SIZEOF(env->me_pghead);
+                       env->me_pghead = malloc(size);
+                       if (env->me_pghead)
+                               memcpy(env->me_pghead, ntxn->mnt_pgstate.mf_pghead, size);
+                       else
+                               rc = ENOMEM;
+               }
+               env->me_pgfree = env->me_pghead;
+               if (!rc)
+                       rc = mdb_cursor_shadow(parent, txn);
+               if (rc)
+                       mdb_txn_reset0(txn);
        } else {
                rc = mdb_txn_renew0(txn);
        }
@@ -1931,7 +1974,6 @@ mdb_txn_reset0(MDB_txn *txn)
                if (!(env->me_flags & MDB_ROFS))
                        txn->mt_u.reader->mr_txnid = (txnid_t)-1;
        } else {
-               pgno_t *mop;
                MDB_page *dp;
                unsigned int i;
 
@@ -1961,8 +2003,11 @@ mdb_txn_reset0(MDB_txn *txn)
                        }
                }
 
+               free(env->me_pgfree);
+
                if (txn->mt_parent) {
                        txn->mt_parent->mt_child = NULL;
+                       env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate;
                        mdb_midl_free(txn->mt_free_pgs);
                        free(txn->mt_u.dirty_list);
                        return;
@@ -1971,10 +2016,7 @@ mdb_txn_reset0(MDB_txn *txn)
                                env->me_free_pgs = txn->mt_free_pgs;
                }
 
-               if ((mop = txn->mt_env->me_pghead) != NULL) {
-                       txn->mt_env->me_pghead = NULL;
-                       free(mop);
-               }
+               txn->mt_env->me_pghead = txn->mt_env->me_pgfree = NULL;
                txn->mt_env->me_pglast = 0;
 
                env->me_txn = NULL;
@@ -2022,7 +2064,7 @@ mdb_txn_commit(MDB_txn *txn)
        off_t            size;
        MDB_page        *dp;
        MDB_env *env;
-       pgno_t  next, freecnt, maxfree_1pg;
+       pgno_t  next, freecnt;
        txnid_t oldpg_txnid, id;
        MDB_cursor mc;
 
@@ -2057,51 +2099,64 @@ mdb_txn_commit(MDB_txn *txn)
        }
 
        if (txn->mt_parent) {
-               MDB_db *ip, *jp;
-               MDB_dbi i;
-               unsigned x, y;
+               MDB_txn *parent = txn->mt_parent;
+               unsigned x, y, len;
                MDB_ID2L dst, src;
 
+               /* Append our free list to parent's */
+               if (mdb_midl_append_list(&parent->mt_free_pgs, txn->mt_free_pgs)) {
+                       mdb_txn_abort(txn);
+                       return ENOMEM;
+               }
+               mdb_midl_free(txn->mt_free_pgs);
+
+               parent->mt_next_pgno = txn->mt_next_pgno;
+               parent->mt_flags = txn->mt_flags;
+
                /* Merge (and close) our cursors with parent's */
                mdb_cursor_merge(txn);
 
-               /* Update parent's DB table */
-               ip = &txn->mt_parent->mt_dbs[2];
-               jp = &txn->mt_dbs[2];
-               for (i = 2; i < txn->mt_numdbs; i++) {
-                       if (ip->md_root != jp->md_root)
-                               *ip = *jp;
-                       ip++; jp++;
-               }
+               /* Update parent's DB table. */
+               memcpy(parent->mt_dbs, txn->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
+               memcpy(parent->mt_dbflags, txn->mt_dbflags, txn->mt_numdbs);
                txn->mt_parent->mt_numdbs = txn->mt_numdbs;
 
-               /* Append our free list to parent's */
-               mdb_midl_append_list(&txn->mt_parent->mt_free_pgs,
-                       txn->mt_free_pgs);
-               mdb_midl_free(txn->mt_free_pgs);
-
-               /* Merge our dirty list with parent's */
                dst = txn->mt_parent->mt_u.dirty_list;
                src = txn->mt_u.dirty_list;
-               x = mdb_mid2l_search(dst, src[1].mid);
-               for (y=1; y<=src[0].mid; y++) {
-                       while (x <= dst[0].mid && dst[x].mid != src[y].mid) x++;
-                       if (x > dst[0].mid)
-                               break;
-                       free(dst[x].mptr);
-                       dst[x].mptr = src[y].mptr;
-               }
+               /* Find len = length of merging our dirty list with parent's */
                x = dst[0].mid;
-               for (; y<=src[0].mid; y++) {
-                       if (++x >= MDB_IDL_UM_MAX) {
-                               mdb_txn_abort(txn);
-                               return MDB_TXN_FULL;
+               dst[0].mid = 0;         /* simplify loops */
+               if (parent->mt_parent) {
+                       len = x + src[0].mid;
+                       y = mdb_mid2l_search(src, dst[x].mid + 1) - 1;
+                       for (i = x; y && i; y--) {
+                               pgno_t yp = src[y].mid;
+                               while (yp < dst[i].mid)
+                                       i--;
+                               if (yp == dst[i].mid) {
+                                       i--;
+                                       len--;
+                               }
                        }
-                       dst[x] = src[y];
+               } else { /* Simplify the above for single-ancestor case */
+                       len = MDB_IDL_UM_MAX - txn->mt_dirty_room;
                }
-               dst[0].mid = x;
+               /* Merge our dirty list with parent's */
+               y = src[0].mid;
+               for (i = len; y; dst[i--] = src[y--]) {
+                       pgno_t yp = src[y].mid;
+                       while (yp < dst[x].mid)
+                               dst[i--] = dst[x--];
+                       if (yp == dst[x].mid)
+                               free(dst[x--].mptr);
+               }
+               assert(i == x);
+               dst[0].mid = len;
                free(txn->mt_u.dirty_list);
+               parent->mt_dirty_room = txn->mt_dirty_room;
+
                txn->mt_parent->mt_child = NULL;
+               free(((MDB_ntxn *)txn)->mnt_pgstate.mf_pgfree);
                free(txn);
                return MDB_SUCCESS;
        }
@@ -2118,9 +2173,7 @@ mdb_txn_commit(MDB_txn *txn)
        DPRINTF("committing txn %zu %p on mdbenv %p, root page %zu",
            txn->mt_txnid, (void *)txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root);
 
-       /* Update DB root pointers. Their pages have already been
-        * touched so this is all in-place and cannot fail.
-        */
+       /* Update DB root pointers */
        if (txn->mt_numdbs > 2) {
                MDB_dbi i;
                MDB_val data;
@@ -2130,7 +2183,9 @@ mdb_txn_commit(MDB_txn *txn)
                for (i = 2; i < txn->mt_numdbs; i++) {
                        if (txn->mt_dbflags[i] & DB_DIRTY) {
                                data.mv_data = &txn->mt_dbs[i];
-                               mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
+                               rc = mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
+                               if (rc)
+                                       goto fail;
                        }
                }
        }
@@ -2148,12 +2203,6 @@ mdb_txn_commit(MDB_txn *txn)
        mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
        oldpg_txnid = id = 0;
        freecnt = 0;
-       /* Preferred max #items per freelist entry, to avoid overflow pages.
-        * Leave room for headers, key (txnid), pagecount (pageno_t), and
-        * FIXME: a bit more in case there is some delimiter I don't know about.
-        */
-       maxfree_1pg = (env->me_psize - (PAGEHDRSZ + NODESIZE + 3*sizeof(MDB_ID)))
-               / sizeof(pgno_t);
 
        /* should only be one record now */
        if (env->me_pghead || env->me_pglast) {
@@ -2246,8 +2295,8 @@ free2:
                i = 2;
                do {
                        orig = mop[0];
-                       if (orig > maxfree_1pg && id > 4)
-                               orig = maxfree_1pg; /* Do not use an overflow page */
+                       if (orig > env->me_maxfree_1pg && id > 4)
+                               orig = env->me_maxfree_1pg; /* Do not use more than 1 page */
                        data.mv_size = (orig + 1) * sizeof(pgno_t);
                        rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
                        if (rc)
@@ -2262,9 +2311,8 @@ free2:
                if (mop[0] <= orig)
                        break;
                *(pgno_t *)data.mv_data = orig;
-               mop[0] -= orig;
-               memmove(&mop[1], &mop[1 + orig],
-                       mop[0] * sizeof(pgno_t));
+               mop[orig] = mop[0] - orig;
+               env->me_pghead = mop += orig;
                /* Save more oldpages at the previous txnid. */
                assert(env->me_pglast == id && id == oldpg_txnid);
                env->me_pglast = --oldpg_txnid;
@@ -2275,10 +2323,8 @@ free2:
        if (freecnt != txn->mt_free_pgs[0])
                goto free2;
 
-       if (env->me_pghead) {
-               free(env->me_pghead);
-               env->me_pghead = NULL;
-       }
+       free(env->me_pgfree);
+       env->me_pghead = env->me_pgfree = NULL;
 
        if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
                if (mdb_midl_shrink(&txn->mt_free_pgs))
@@ -2842,6 +2888,7 @@ mdb_env_open2(MDB_env *env)
                return EBUSY;   /* TODO: Make a new MDB_* error code? */
        }
        env->me_psize = meta.mm_psize;
+       env->me_maxfree_1pg = (env->me_psize - PAGEHDRSZ) / sizeof(pgno_t) - 1;
 
        env->me_maxpg = env->me_mapsize / env->me_psize;
 
@@ -3912,28 +3959,31 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret)
 {
        MDB_page *p = NULL;
 
-       if (txn->mt_env->me_flags & MDB_WRITEMAP) {
-               if (pgno < txn->mt_next_pgno)
-                       p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
-               goto done;
-       }
-       if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && txn->mt_u.dirty_list[0].mid) {
-               unsigned x;
-               x = mdb_mid2l_search(txn->mt_u.dirty_list, pgno);
-               if (x <= txn->mt_u.dirty_list[0].mid && txn->mt_u.dirty_list[x].mid == pgno) {
-                       p = txn->mt_u.dirty_list[x].mptr;
-               }
-       }
-       if (!p) {
-               if (pgno < txn->mt_next_pgno)
-                       p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
+       if (!((txn->mt_flags & MDB_TXN_RDONLY) |
+                 (txn->mt_env->me_flags & MDB_WRITEMAP)))
+       {
+               MDB_txn *tx2 = txn;
+               do {
+                       MDB_ID2L dl = tx2->mt_u.dirty_list;
+                       if (dl[0].mid) {
+                               unsigned x = mdb_mid2l_search(dl, pgno);
+                               if (x <= dl[0].mid && dl[x].mid == pgno) {
+                                       p = dl[x].mptr;
+                                       goto done;
+                               }
+                       }
+               } while ((tx2 = tx2->mt_parent) != NULL);
        }
-done:
-       *ret = p;
-       if (!p) {
+
+       if (pgno < txn->mt_next_pgno) {
+               p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
+       } else {
                DPRINTF("page %zu not found", pgno);
                assert(p != NULL);
        }
+
+done:
+       *ret = p;
        return (p != NULL) ? MDB_SUCCESS : MDB_PAGE_NOTFOUND;
 }
 
@@ -4057,6 +4107,12 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags)
                                        if (!exact)
                                                return MDB_NOTFOUND;
                                        mdb_node_read(mc->mc_txn, leaf, &data);
+                                       /* The txn may not know this DBI, or another process may
+                                        * have dropped and recreated the DB with other flags.
+                                        */
+                                       if (mc->mc_db->md_flags != *(uint16_t *)
+                                               ((char *) data.mv_data + offsetof(MDB_db, md_flags)))
+                                               return MDB_INCOMPATIBLE;
                                        memcpy(mc->mc_db, data.mv_data, sizeof(MDB_db));
                                }
                                if (flags & MDB_PS_MODIFY)
@@ -4263,7 +4319,7 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
                mdb_xcursor_init1(mc, leaf);
        }
        if (data) {
-               if ((rc = mdb_node_read(mc->mc_txn, leaf, data) != MDB_SUCCESS))
+               if ((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
                        return rc;
 
                if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
@@ -4336,7 +4392,7 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
                mdb_xcursor_init1(mc, leaf);
        }
        if (data) {
-               if ((rc = mdb_node_read(mc->mc_txn, leaf, data) != MDB_SUCCESS))
+               if ((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
                        return rc;
 
                if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
@@ -6312,12 +6368,13 @@ mdb_rebalance(MDB_cursor *mc)
        DPRINTF("found neighbor page %zu (%u keys, %.1f%% full)",
            mn.mc_pg[mn.mc_top]->mp_pgno, NUMKEYS(mn.mc_pg[mn.mc_top]), (float)PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) / 10);
 
-       /* If the neighbor page is above threshold and has at least two
-        * keys, move one key from it.
+       /* If the neighbor page is above threshold and has at least three
+        * keys, move one key from it. (A page must never have fewer than
+        * 2 keys.)
         *
         * Otherwise we should try to merge them.
         */
-       if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2)
+       if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) > 2)
                return mdb_node_move(&mn, mc);
        else {
                if (mc->mc_ki[ptop] == 0)
@@ -6954,6 +7011,7 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db
        MDB_val key, data;
        MDB_dbi i;
        MDB_cursor mc;
+       uint16_t mdflags;
        int rc, dbflag, exact;
        unsigned int unused = 0;
        size_t len;
@@ -7036,12 +7094,19 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db
                txn->mt_dbflags[slot] = dbflag;
                memcpy(&txn->mt_dbs[slot], data.mv_data, sizeof(MDB_db));
                *dbi = slot;
-               txn->mt_env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags;
+               txn->mt_env->me_dbflags[slot] = mdflags = txn->mt_dbs[slot].md_flags;
                mdb_default_cmp(txn, slot);
                if (!unused) {
                        txn->mt_numdbs++;
                        txn->mt_env->me_numdbs++;
                }
+               /* Open the DB in parent txns as well */
+               while ((txn = txn->mt_parent) != NULL) {
+                       txn->mt_dbflags[slot] = DB_STALE;
+                       txn->mt_dbs[slot].md_flags = mdflags;
+                       if (!unused)
+                               txn->mt_numdbs++;
+               }
        }
 
        return rc;
@@ -7148,8 +7213,10 @@ int mdb_drop(MDB_txn *txn, MDB_dbi dbi, int del)
        /* Can't delete the main DB */
        if (del && dbi > MAIN_DBI) {
                rc = mdb_del(txn, MAIN_DBI, &mc->mc_dbx->md_name, NULL);
-               if (!rc)
+               if (!rc) {
+                       txn->mt_dbflags[dbi] = DB_STALE;
                        mdb_dbi_close(txn->mt_env, dbi);
+               }
        } else {
                /* reset the DB record, mark it dirty */
                txn->mt_dbflags[dbi] |= DB_DIRTY;