]> git.sur5r.net Git - openldap/blobdiff - libraries/liblmdb/mdb.c
Wrap __func__ in mdb_func_
[openldap] / libraries / liblmdb / mdb.c
index 46e4490c47b352fea77e00b1bde8ed2c0665068f..a61dba26a54ee808100c699c52909329d894ccba 100644 (file)
 /** @defgroup internal MDB Internals
  *     @{
  */
-/** @defgroup compat   Windows Compatibility Macros
+/** @defgroup compat   Compatibility Macros
  *     A bunch of macros to minimize the amount of platform-specific ifdefs
  *     needed throughout the rest of the code. When the features this library
  *     needs are similar enough to POSIX to be hidden in a one-or-two line
  *     replacement, this macro approach is used.
  *     @{
  */
+
+       /** Wrapper around __func__, which is a C99 feature */
+#if __STDC_VERSION__ >= 199901L
+# define mdb_func_     __func__
+#elif __GNUC__ >= 2 || _MSC_VER >= 1300
+# define mdb_func_     __FUNCTION__
+#else
+/* If a debug message says <mdb_unknown>(), update the #if statements above */
+# define mdb_func_     "<mdb_unknown>"
+#endif
+
 #ifdef _WIN32
 #define MDB_USE_HASH   1
 #define MDB_PIDLOCK    0
@@ -327,7 +338,7 @@ static txnid_t mdb_debug_start;
         */
 # define DPRINTF(args) ((void) ((mdb_debug) && DPRINTF0 args))
 # define DPRINTF0(fmt, ...) \
-       fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__)
+       fprintf(stderr, "%s:%d " fmt "\n", mdb_func_, __LINE__, __VA_ARGS__)
 #else
 # define DPRINTF(args) ((void) 0)
 #endif
@@ -382,20 +393,25 @@ static txnid_t mdb_debug_start;
        /**     The version number for a database's lockfile format. */
 #define MDB_LOCK_VERSION        1
 
-       /**     @brief The maximum size of a key in the database.
+       /**     @brief The max size of a key we can write, or 0 for dynamic max.
         *
-        *      The library rejects bigger keys, and cannot deal with records
-        *      with bigger keys stored by a library with bigger max keysize.
+        *      Define this as 0 to compute the max from the page size.  511
+        *      is default for backwards compat: liblmdb <= 0.9.10 can break
+        *      when modifying a DB with keys/dupsort data bigger than its max.
         *
-        *      We require that keys all fit onto a regular page. This limit
-        *      could be raised a bit further if needed; to something just
-        *      under (page size / #MDB_MINKEYS / 3).
-        *
-        *      Note that data items in an #MDB_DUPSORT database are actually keys
-        *      of a subDB, so they're also limited to this size.
+        *      Data items in an #MDB_DUPSORT database are also limited to
+        *      this size, since they're actually keys of a sub-DB.  Keys and
+        *      #MDB_DUPSORT data items must fit on a node in a regular page.
         */
 #ifndef MDB_MAXKEYSIZE
 #define MDB_MAXKEYSIZE  511
+#endif
+
+       /**     The maximum size of a key we can write to the environment. */
+#if MDB_MAXKEYSIZE
+#define ENV_MAXKEY(env)        (MDB_MAXKEYSIZE)
+#else
+#define ENV_MAXKEY(env)        ((env)->me_maxkey)
 #endif
 
        /**     @brief The maximum size of a data item.
@@ -405,11 +421,15 @@ static txnid_t mdb_debug_start;
 #define MAXDATASIZE    0xffffffffUL
 
 #if MDB_DEBUG
+       /**     Key size which fits in a #DKBUF.
+        *      @ingroup debug
+        */
+#define DKBUF_MAXKEYSIZE ((MDB_MAXKEYSIZE) > 0 ? (MDB_MAXKEYSIZE) : 511)
        /**     A key buffer.
         *      @ingroup debug
         *      This is used for printing a hex dump of a key's contents.
         */
-#define DKBUF  char kbuf[(MDB_MAXKEYSIZE*2+1)]
+#define DKBUF  char kbuf[DKBUF_MAXKEYSIZE*2+1]
        /**     Display a key in hex.
         *      @ingroup debug
         *      Invoke a function to display a key in hex.
@@ -428,6 +448,9 @@ static txnid_t mdb_debug_start;
        /** Test if the flags \b f are set in a flag word \b w. */
 #define F_ISSET(w, f)   (((w) & (f)) == (f))
 
+       /** Round \b n up to an even number. */
+#define EVEN(n)                (((n) + 1U) & -2) /* sign-extending -2 to match n+1U */
+
        /**     Used for offsets within a single page.
         *      Since memory pages are typically 4 or 8KB in size, 12-13 bits,
         *      this is plenty.
@@ -679,7 +702,8 @@ typedef struct MDB_page {
 #define OVPAGES(size, psize)   ((PAGEHDRSZ-1 + (size)) / (psize) + 1)
 
        /** Header for a single key/data pair within a page.
-        * We guarantee 2-byte alignment for nodes.
+        * Used in pages of type #P_BRANCH and #P_LEAF without #P_LEAF2.
+        * We guarantee 2-byte alignment for 'MDB_node's.
         */
 typedef struct MDB_node {
        /** lo and hi are used for data size on leaf nodes and for
@@ -688,9 +712,11 @@ typedef struct MDB_node {
         * They are in host byte order in case that lets some
         * accesses be optimized into a 32-bit word access.
         */
-#define mn_lo mn_offset[BYTE_ORDER!=LITTLE_ENDIAN]
-#define mn_hi mn_offset[BYTE_ORDER==LITTLE_ENDIAN] /**< part of dsize or pgno */
-       unsigned short  mn_offset[2];   /**< storage for #mn_lo and #mn_hi */
+#if BYTE_ORDER == LITTLE_ENDIAN
+       unsigned short  mn_lo, mn_hi;   /**< part of data size or pgno */
+#else
+       unsigned short  mn_hi, mn_lo;
+#endif
 /** @defgroup mdb_node Node Flags
  *     @ingroup internal
  *     Flags for node headers.
@@ -1043,6 +1069,9 @@ struct MDB_env {
        int                     me_maxfree_1pg;
        /** Max size of a node on a page */
        unsigned int    me_nodemax;
+#if !(MDB_MAXKEYSIZE)
+       unsigned int    me_maxkey;      /**< max size of a key */
+#endif
 #ifdef _WIN32
        int             me_pidquery;            /**< Used in OpenProcess */
        HANDLE          me_rmutex;              /* Windows mutexes don't reside in shared mem */
@@ -1187,6 +1216,15 @@ mdb_strerror(int err)
 }
 
 #if MDB_DEBUG
+/** Return the page number of \b mp which may be sub-page, for debug output */
+static pgno_t
+mdb_dbg_pgno(MDB_page *mp)
+{
+       pgno_t ret;
+       COPY_PGNO(ret, mp->mp_pgno);
+       return ret;
+}
+
 /** Display a key in hexadecimal and return the address of the result.
  * @param[in] key the key to display
  * @param[in] buf the buffer to write into. Should always be #DKBUF.
@@ -1202,7 +1240,7 @@ mdb_dkey(MDB_val *key, char *buf)
        if (!key)
                return "";
 
-       if (key->mv_size > MDB_MAXKEYSIZE)
+       if (key->mv_size > DKBUF_MAXKEYSIZE)
                return "MDB_MAXKEYSIZE";
        /* may want to make this a dynamic check: if the key is mostly
         * printable characters, print it as-is instead of converting to hex.
@@ -1227,7 +1265,7 @@ mdb_page_list(MDB_page *mp)
        DKBUF;
 
        nkeys = NUMKEYS(mp);
-       fprintf(stderr, "Page %"Z"u numkeys %d\n", mp->mp_pgno, nkeys);
+       fprintf(stderr, "Page %"Z"u numkeys %d\n", mdb_dbg_pgno(mp), nkeys);
        for (i=0; i<nkeys; i++) {
                node = NODEPTR(mp, i);
                key.mv_size = node->mn_ksize;
@@ -1246,7 +1284,7 @@ mdb_page_list(MDB_page *mp)
                        nsize += sizeof(indx_t);
                        fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key));
                }
-               total += (total & 1);
+               total = EVEN(total);
        }
        fprintf(stderr, "Total: %d\n", total);
 }
@@ -1362,11 +1400,11 @@ mdb_page_malloc(MDB_txn *txn, unsigned num)
                off = sz - psize;
        }
        if ((ret = malloc(sz)) != NULL) {
+               VGMEMP_ALLOC(env, ret, sz);
                if (!(env->me_flags & MDB_NOMEMINIT)) {
                        memset((char *)ret + off, 0, psize);
                        ret->mp_pad = 0;
                }
-               VGMEMP_ALLOC(env, ret, sz);
        }
        return ret;
 }
@@ -1674,11 +1712,11 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
 #else
        enum { Paranoid = 0, Max_retries = INT_MAX /*infinite*/ };
 #endif
-       int rc, n2 = num-1, retry = Max_retries;
+       int rc, retry = Max_retries;
        MDB_txn *txn = mc->mc_txn;
        MDB_env *env = txn->mt_env;
        pgno_t pgno, *mop = env->me_pghead;
-       unsigned i, j, k, mop_len = mop ? mop[0] : 0;
+       unsigned i, j, k, mop_len = mop ? mop[0] : 0, n2 = num-1;
        MDB_page *np;
        txnid_t oldest = 0, last;
        MDB_cursor_op op;
@@ -1698,13 +1736,13 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
                /* Seek a big enough contiguous page range. Prefer
                 * pages at the tail, just truncating the list.
                 */
-               if (mop_len >= (unsigned)num) {
+               if (mop_len > n2) {
                        i = mop_len;
                        do {
                                pgno = mop[i];
                                if (mop[i-n2] == pgno+n2)
                                        goto search_done;
-                       } while (--i >= (unsigned)num);
+                       } while (--i > n2);
                        if (Max_retries < INT_MAX && --retry < 0)
                                break;
                }
@@ -1832,7 +1870,7 @@ mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize)
  * If a page being referenced was spilled to disk in this txn, bring
  * it back and make it dirty/writable again.
  * @param[in] txn the transaction handle.
- * @param[in] mp the page being referenced.
+ * @param[in] mp the page being referenced. It must not be dirty.
  * @param[out] ret the writable page, if any. ret is unchanged if
  * mp wasn't spilled.
  */
@@ -2184,10 +2222,8 @@ mdb_txn_renew0(MDB_txn *txn)
 
                                if (!(env->me_flags & MDB_LIVE_READER)) {
                                        rc = mdb_reader_pid(env, Pidset, pid);
-                                       if (rc) {
-                                               UNLOCK_MUTEX_R(env);
+                                       if (rc)
                                                return rc;
-                                       }
                                        env->me_flags |= MDB_LIVE_READER;
                                }
 
@@ -2809,8 +2845,8 @@ mdb_txn_commit(MDB_txn *txn)
        unsigned int i;
        MDB_env *env;
 
-       assert(txn != NULL);
-       assert(txn->mt_env != NULL);
+       if (txn == NULL || txn->mt_env == NULL)
+               return EINVAL;
 
        if (txn->mt_child) {
                rc = mdb_txn_commit(txn->mt_child);
@@ -3151,9 +3187,6 @@ mdb_env_write_meta(MDB_txn *txn)
        int r2;
 #endif
 
-       assert(txn != NULL);
-       assert(txn->mt_env != NULL);
-
        toggle = txn->mt_txnid & 1;
        DPRINTF(("writing meta page %d for root page %"Z"u",
                toggle, txn->mt_dbs[MAIN_DBI].md_root));
@@ -3487,10 +3520,15 @@ mdb_env_open2(MDB_env *env)
                        return i;
                }
        }
-       env->me_maxfree_1pg = (env->me_psize - PAGEHDRSZ) / sizeof(pgno_t) - 1;
-       env->me_nodemax = (env->me_psize - PAGEHDRSZ) / MDB_MINKEYS;
 
+       env->me_maxfree_1pg = (env->me_psize - PAGEHDRSZ) / sizeof(pgno_t) - 1;
+       env->me_nodemax = (((env->me_psize - PAGEHDRSZ) / MDB_MINKEYS) & -2)
+               - sizeof(indx_t);
+#if !(MDB_MAXKEYSIZE)
+       env->me_maxkey = env->me_nodemax - (NODESIZE + sizeof(MDB_db));
+#endif
        env->me_maxpg = env->me_mapsize / env->me_psize;
+
 #if MDB_DEBUG
        {
                int toggle = mdb_env_pick_meta(env);
@@ -3990,6 +4028,10 @@ fail:
 #define        CHANGELESS      (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP| \
        MDB_NOTLS|MDB_NOLOCK|MDB_NORDAHEAD)
 
+#if VALID_FLAGS & PERSISTENT_FLAGS & (CHANGEABLE|CHANGELESS)
+# error "Persistent DB flags & env flags overlap, but both go in mm_flags"
+#endif
+
 int
 mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode)
 {
@@ -4474,17 +4516,9 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp)
 
        nkeys = NUMKEYS(mp);
 
-#if MDB_DEBUG
-       {
-       pgno_t pgno;
-       COPY_PGNO(pgno, mp->mp_pgno);
        DPRINTF(("searching %u keys in %s %spage %"Z"u",
            nkeys, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "",
-           pgno));
-       }
-#endif
-
-       assert(nkeys > 0);
+           mdb_dbg_pgno(mp)));
 
        low = IS_LEAF(mp) ? 0 : 1;
        high = nkeys - 1;
@@ -4548,7 +4582,7 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp)
                        node = NODEPTR(mp, i);
        }
        if (exactp)
-               *exactp = (rc == 0);
+               *exactp = (rc == 0 && nkeys > 0);
        /* store the key index */
        mc->mc_ki[mc->mc_top] = i;
        if (i >= nkeys)
@@ -4598,7 +4632,7 @@ mdb_cursor_push(MDB_cursor *mc, MDB_page *mp)
                DDBI(mc), (void *) mc));
 
        if (mc->mc_snum >= CURSOR_STACK) {
-               assert(mc->mc_snum < CURSOR_STACK);
+               mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
                return MDB_CURSOR_FULL;
        }
 
@@ -4658,7 +4692,7 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl)
                p = (MDB_page *)(env->me_map + env->me_psize * pgno);
        } else {
                DPRINTF(("page %"Z"u not found", pgno));
-               assert(p != NULL);
+               txn->mt_flags |= MDB_TXN_ERROR;
                return MDB_PAGE_NOTFOUND;
        }
 
@@ -4726,6 +4760,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int flags)
        if (!IS_LEAF(mp)) {
                DPRINTF(("internal error, index points to a %02X page!?",
                    mp->mp_flags));
+               mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
                return MDB_CORRUPTED;
        }
 
@@ -4958,8 +4993,9 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi,
        int exact = 0;
        DKBUF;
 
-       assert(key);
-       assert(data);
+       if (key == NULL || data == NULL)
+               return EINVAL;
+
        DPRINTF(("===> get db %u key [%s]", dbi, DKEY(key)));
 
        if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
@@ -4968,10 +5004,6 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi,
        if (txn->mt_flags & MDB_TXN_ERROR)
                return MDB_BAD_TXN;
 
-       if (key->mv_size > MDB_MAXKEYSIZE) {
-               return MDB_BAD_VALSIZE;
-       }
-
        mdb_cursor_init(&mc, txn, dbi, &mx);
        return mdb_cursor_set(&mc, key, data, MDB_SET, &exact);
 }
@@ -5067,7 +5099,8 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
                }
        }
 
-       DPRINTF(("cursor_next: top page is %"Z"u in cursor %p", mp->mp_pgno, (void *) mc));
+       DPRINTF(("cursor_next: top page is %"Z"u in cursor %p",
+               mdb_dbg_pgno(mp), (void *) mc));
        if (mc->mc_flags & C_DEL)
                goto skip;
 
@@ -5084,7 +5117,7 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
 
 skip:
        DPRINTF(("==> cursor points to page %"Z"u with %u keys, key index %u",
-           mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top]));
+           mdb_dbg_pgno(mp), NUMKEYS(mp), mc->mc_ki[mc->mc_top]));
 
        if (IS_LEAF2(mp)) {
                key->mv_size = mc->mc_db->md_pad;
@@ -5143,7 +5176,8 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
                }
        }
 
-       DPRINTF(("cursor_prev: top page is %"Z"u in cursor %p", mp->mp_pgno, (void *) mc));
+       DPRINTF(("cursor_prev: top page is %"Z"u in cursor %p",
+               mdb_dbg_pgno(mp), (void *) mc));
 
        if (mc->mc_ki[mc->mc_top] == 0)  {
                DPUTS("=====> move to prev sibling page");
@@ -5159,7 +5193,7 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
        mc->mc_flags &= ~C_EOF;
 
        DPRINTF(("==> cursor points to page %"Z"u with %u keys, key index %u",
-           mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top]));
+           mdb_dbg_pgno(mp), NUMKEYS(mp), mc->mc_ki[mc->mc_top]));
 
        if (IS_LEAF2(mp)) {
                key->mv_size = mc->mc_db->md_pad;
@@ -5358,6 +5392,7 @@ set1:
                                if (op == MDB_GET_BOTH || rc > 0)
                                        return MDB_NOTFOUND;
                                rc = 0;
+                               *data = d2;
                        }
 
                } else {
@@ -5474,7 +5509,8 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        int              exact = 0;
        int              (*mfunc)(MDB_cursor *mc, MDB_val *key, MDB_val *data);
 
-       assert(mc);
+       if (mc == NULL)
+               return EINVAL;
 
        if (mc->mc_txn->mt_flags & MDB_TXN_ERROR)
                return MDB_BAD_TXN;
@@ -5526,12 +5562,10 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        case MDB_SET_RANGE:
                if (key == NULL) {
                        rc = EINVAL;
-               } else if (key->mv_size > MDB_MAXKEYSIZE) {
-                       rc = MDB_BAD_VALSIZE;
-               } else if (op == MDB_SET_RANGE)
-                       rc = mdb_cursor_set(mc, key, data, op, NULL);
-               else
-                       rc = mdb_cursor_set(mc, key, data, op, &exact);
+               } else {
+                       rc = mdb_cursor_set(mc, key, data, op,
+                               op == MDB_SET_RANGE ? NULL : &exact);
+               }
                break;
        case MDB_GET_MULTIPLE:
                if (data == NULL || !(mc->mc_flags & C_INITIALIZED)) {
@@ -5670,13 +5704,14 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        enum { MDB_NO_ROOT = MDB_LAST_ERRCODE+10 }; /* internal code */
        MDB_env         *env = mc->mc_txn->mt_env;
        MDB_node        *leaf = NULL;
-       MDB_val xdata, *rdata, dkey;
+       MDB_page        *fp, *mp;
+       uint16_t        fp_flags;
+       MDB_val         xdata, *rdata, dkey, olddata;
        MDB_db dummy;
-       int do_sub = 0, insert = 0;
+       int do_sub = 0, insert;
        unsigned int mcount = 0, dcount = 0, nospill;
        size_t nsize;
        int rc, rc2;
-       char dbuf[MDB_MAXKEYSIZE+1];
        unsigned int nflags;
        DKBUF;
 
@@ -5696,14 +5731,14 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        if (mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR))
                return (mc->mc_txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
 
-       if (flags != MDB_CURRENT && (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE))
-               return MDB_BAD_VALSIZE;
-
-       if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT) && data->mv_size > MDB_MAXKEYSIZE)
+       if (flags != MDB_CURRENT && key->mv_size-1 >= ENV_MAXKEY(env))
                return MDB_BAD_VALSIZE;
 
 #if SIZE_MAX > MAXDATASIZE
-       if (data->mv_size > MAXDATASIZE)
+       if (data->mv_size > ((mc->mc_db->md_flags & MDB_DUPSORT) ? ENV_MAXKEY(env) : MAXDATASIZE))
+               return MDB_BAD_VALSIZE;
+#else
+       if ((mc->mc_db->md_flags & MDB_DUPSORT) && data->mv_size > ENV_MAXKEY(env))
                return MDB_BAD_VALSIZE;
 #endif
 
@@ -5787,11 +5822,21 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                        return rc2;
        }
 
-       /* The key already exists */
-       if (rc == MDB_SUCCESS) {
-               MDB_page        *fp, *mp;
-               MDB_val         olddata;
-
+       insert = rc;
+       if (insert) {
+               /* The key does not exist */
+               DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top]));
+               if ((mc->mc_db->md_flags & MDB_DUPSORT) &&
+                       LEAFSIZE(key, data) > env->me_nodemax)
+               {
+                       /* Too big for a node, insert in sub-DB */
+                       fp_flags = P_LEAF|P_DIRTY;
+                       fp = env->me_pbuf;
+                       fp->mp_pad = data->mv_size; /* used if MDB_DUPFIXED */
+                       fp->mp_lower = fp->mp_upper = olddata.mv_size = PAGEHDRSZ;
+                       goto prep_subDB;
+               }
+       } else {
                /* there's only a key anyway, so this is a no-op */
                if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
                        unsigned int ksize = mc->mc_db->md_pad;
@@ -5811,6 +5856,12 @@ more:
 
                /* DB has dups? */
                if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
+                       /* Prepare (sub-)page/sub-DB to accept the new item,
+                        * if needed.  fp: old sub-page or a header faking
+                        * it.  mp: new (sub-)page.  offset: growth in page
+                        * size.  xdata: node data with new page or DB.
+                        */
+                       ssize_t         i, offset = 0;
                        mp = fp = xdata.mv_data = env->me_pbuf;
                        mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
 
@@ -5820,9 +5871,8 @@ more:
                                if (flags == MDB_CURRENT)
                                        goto current;
 
-                               dkey = olddata;
 #if UINT_MAX < SIZE_MAX
-                               if (mc->mc_dbx->md_dcmp == mdb_cmp_int && dkey.mv_size == sizeof(size_t))
+                               if (mc->mc_dbx->md_dcmp == mdb_cmp_int && olddata.mv_size == sizeof(size_t))
 #ifdef MISALIGNED_OK
                                        mc->mc_dbx->md_dcmp = mdb_cmp_long;
 #else
@@ -5830,7 +5880,7 @@ more:
 #endif
 #endif
                                /* if data matches, skip it */
-                               if (!mc->mc_dbx->md_dcmp(data, &dkey)) {
+                               if (!mc->mc_dbx->md_dcmp(data, &olddata)) {
                                        if (flags & MDB_NODUPDATA)
                                                rc = MDB_KEYEXIST;
                                        else if (flags & MDB_MULTIPLE)
@@ -5840,9 +5890,11 @@ more:
                                        return rc;
                                }
 
-                               /* create a fake page for the dup items */
-                               memcpy(dbuf, dkey.mv_data, dkey.mv_size);
-                               dkey.mv_data = dbuf;
+                               /* Back up original data item */
+                               dkey.mv_size = olddata.mv_size;
+                               dkey.mv_data = memcpy(fp+1, olddata.mv_data, olddata.mv_size);
+
+                               /* Make sub-page header for the dup items, with dummy body */
                                fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP;
                                fp->mp_lower = PAGEHDRSZ;
                                xdata.mv_size = PAGEHDRSZ + dkey.mv_size + data->mv_size;
@@ -5855,30 +5907,27 @@ more:
                                                (dkey.mv_size & 1) + (data->mv_size & 1);
                                }
                                fp->mp_upper = xdata.mv_size;
+                               olddata.mv_size = fp->mp_upper; /* pretend olddata is fp */
                        } else if (leaf->mn_flags & F_SUBDATA) {
                                /* Data is on sub-DB, just store it */
                                flags |= F_DUPDATA|F_SUBDATA;
                                goto put_sub;
                        } else {
-                               /* See if we need to convert from fake page to subDB */
-                               unsigned int offset;
-                               unsigned int i;
-                               uint16_t fp_flags;
-
+                               /* Data is on sub-page */
                                fp = olddata.mv_data;
                                switch (flags) {
                                default:
+                                       i = -(ssize_t)SIZELEFT(fp);
                                        if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) {
-                                               offset = NODESIZE + sizeof(indx_t) + data->mv_size;
-                                               offset += offset & 1;
-                                               break;
-                                       }
-                                       offset = fp->mp_pad;
-                                       if (SIZELEFT(fp) < offset) {
+                                               offset = i += (ssize_t) EVEN(
+                                                       sizeof(indx_t) + NODESIZE + data->mv_size);
+                                       } else {
+                                               i += offset = fp->mp_pad;
                                                offset *= 4; /* space for 4 more */
-                                               break;
                                        }
-                                       /* FALLTHRU: Big enough MDB_DUPFIXED sub-page */
+                                       if (i > 0)
+                                               break;
+                                       /* FALLTHRU: Sub-page is big enough */
                                case MDB_CURRENT:
                                        fp->mp_flags |= P_DIRTY;
                                        COPY_PGNO(fp->mp_pgno, mp->mp_pgno);
@@ -5886,12 +5935,16 @@ more:
                                        flags |= F_DUPDATA;
                                        goto put_sub;
                                }
-                               fp_flags = fp->mp_flags;
                                xdata.mv_size = olddata.mv_size + offset;
-                               if (NODESIZE + sizeof(indx_t) + NODEKSZ(leaf) + xdata.mv_size
-                                       >= env->me_nodemax) {
-                                       /* yes, convert it */
+                       }
+
+                       fp_flags = fp->mp_flags;
+                       if (NODESIZE + NODEKSZ(leaf) + xdata.mv_size > env->me_nodemax) {
+                                       /* Too big for a sub-page, convert to sub-DB */
+                                       fp_flags &= ~P_SUBP;
+prep_subDB:
                                        if (mc->mc_db->md_flags & MDB_DUPFIXED) {
+                                               fp_flags |= P_LEAF2;
                                                dummy.md_pad = fp->mp_pad;
                                                dummy.md_flags = MDB_DUPFIXED;
                                                if (mc->mc_db->md_flags & MDB_INTEGERDUP)
@@ -5912,18 +5965,18 @@ more:
                                        offset = env->me_psize - olddata.mv_size;
                                        flags |= F_DUPDATA|F_SUBDATA;
                                        dummy.md_root = mp->mp_pgno;
-                                       fp_flags &= ~P_SUBP;
-                               }
+                       }
+                       if (mp != fp) {
                                mp->mp_flags = fp_flags | P_DIRTY;
                                mp->mp_pad   = fp->mp_pad;
                                mp->mp_lower = fp->mp_lower;
                                mp->mp_upper = fp->mp_upper + offset;
-                               if (IS_LEAF2(fp)) {
+                               if (fp_flags & P_LEAF2) {
                                        memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad);
                                } else {
                                        memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper,
                                                olddata.mv_size - fp->mp_upper);
-                                       for (i=0; i<NUMKEYS(fp); i++)
+                                       for (i = NUMKEYS(fp); --i >= 0; )
                                                mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset;
                                }
                        }
@@ -5931,7 +5984,8 @@ more:
                        rdata = &xdata;
                        flags |= F_DUPDATA;
                        do_sub = 1;
-                       mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+                       if (!insert)
+                               mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
                        goto new_sub;
                }
 current:
@@ -6009,9 +6063,6 @@ current:
                }
                mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
                mc->mc_db->md_entries--;
-       } else {
-               DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top]));
-               insert = 1;
        }
 
        rdata = data;
@@ -6250,13 +6301,12 @@ mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data)
        size_t           sz;
 
        sz = LEAFSIZE(key, data);
-       if (sz >= env->me_nodemax) {
+       if (sz > env->me_nodemax) {
                /* put on overflow page */
                sz -= data->mv_size - sizeof(pgno_t);
        }
-       sz += sz & 1;
 
-       return sz + sizeof(indx_t);
+       return EVEN(sz + sizeof(indx_t));
 }
 
 /** Calculate the size of a branch node.
@@ -6275,7 +6325,7 @@ mdb_branch_size(MDB_env *env, MDB_val *key)
        size_t           sz;
 
        sz = INDXSIZE(key);
-       if (sz >= env->me_nodemax) {
+       if (sz > env->me_nodemax) {
                /* put on overflow page */
                /* not implemented */
                /* sz -= key->size - sizeof(pgno_t); */
@@ -6317,7 +6367,7 @@ mdb_node_add(MDB_cursor *mc, indx_t indx,
        DPRINTF(("add to %s %spage %"Z"u index %i, data size %"Z"u key size %"Z"u [%s]",
            IS_LEAF(mp) ? "leaf" : "branch",
                IS_SUBP(mp) ? "sub-" : "",
-           mp->mp_pgno, indx, data ? data->mv_size : 0,
+               mdb_dbg_pgno(mp), indx, data ? data->mv_size : 0,
                key ? key->mv_size : 0, key ? DKEY(key) : "null"));
 
        if (IS_LEAF2(mp)) {
@@ -6344,13 +6394,13 @@ mdb_node_add(MDB_cursor *mc, indx_t indx,
                if (F_ISSET(flags, F_BIGDATA)) {
                        /* Data already on overflow page. */
                        node_size += sizeof(pgno_t);
-               } else if (node_size + data->mv_size >= mc->mc_txn->mt_env->me_nodemax) {
+               } else if (node_size + data->mv_size > mc->mc_txn->mt_env->me_nodemax) {
                        int ovpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
                        int rc;
                        /* Put data on overflow page. */
                        DPRINTF(("data size is %"Z"u, node would be %"Z"u, put data on overflow page",
                            data->mv_size, node_size+data->mv_size));
-                       node_size += sizeof(pgno_t) + (node_size & 1);
+                       node_size = EVEN(node_size + sizeof(pgno_t));
                        if ((ssize_t)node_size > room)
                                goto full;
                        if ((rc = mdb_page_new(mc, P_OVERFLOW, ovpages, &ofp)))
@@ -6362,7 +6412,7 @@ mdb_node_add(MDB_cursor *mc, indx_t indx,
                        node_size += data->mv_size;
                }
        }
-       node_size += node_size & 1;
+       node_size = EVEN(node_size);
        if ((ssize_t)node_size > room)
                goto full;
 
@@ -6415,9 +6465,10 @@ update:
 
 full:
        DPRINTF(("not enough room in page %"Z"u, got %u ptrs",
-               mp->mp_pgno, NUMKEYS(mp)));
+               mdb_dbg_pgno(mp), NUMKEYS(mp)));
        DPRINTF(("upper-lower = %u - %u = %"Z"d", mp->mp_upper,mp->mp_lower,room));
        DPRINTF(("node size = %"Z"u", node_size));
+       mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
        return MDB_PAGE_FULL;
 }
 
@@ -6435,14 +6486,8 @@ mdb_node_del(MDB_page *mp, indx_t indx, int ksize)
        MDB_node        *node;
        char            *base;
 
-#if MDB_DEBUG
-       {
-       pgno_t pgno;
-       COPY_PGNO(pgno, mp->mp_pgno);
        DPRINTF(("delete node %u on %s page %"Z"u", indx,
-           IS_LEAF(mp) ? "leaf" : "branch", pgno));
-       }
-#endif
+           IS_LEAF(mp) ? "leaf" : "branch", mdb_dbg_pgno(mp)));
        assert(indx < NUMKEYS(mp));
 
        if (IS_LEAF2(mp)) {
@@ -6463,7 +6508,7 @@ mdb_node_del(MDB_page *mp, indx_t indx, int ksize)
                else
                        sz += NODEDSZ(node);
        }
-       sz += sz & 1;
+       sz = EVEN(sz);
 
        ptr = mp->mp_ptrs[indx];
        numkeys = NUMKEYS(mp);
@@ -6493,25 +6538,22 @@ mdb_node_shrink(MDB_page *mp, indx_t indx)
        MDB_node *node;
        MDB_page *sp, *xp;
        char *base;
-       int osize, nsize;
-       int delta;
+       int nsize, delta;
        indx_t           i, numkeys, ptr;
 
        node = NODEPTR(mp, indx);
        sp = (MDB_page *)NODEDATA(node);
-       osize = NODEDSZ(node);
-
-       delta = sp->mp_upper - sp->mp_lower;
-       SETDSZ(node, osize - delta);
+       delta = SIZELEFT(sp);
        xp = (MDB_page *)((char *)sp + delta);
 
        /* shift subpage upward */
        if (IS_LEAF2(sp)) {
                nsize = NUMKEYS(sp) * sp->mp_pad;
+               if (nsize & 1)
+                       return;         /* do not make the node uneven-sized */
                memmove(METADATA(xp), METADATA(sp), nsize);
        } else {
                int i;
-               nsize = osize - sp->mp_upper;
                numkeys = NUMKEYS(sp);
                for (i=numkeys-1; i>=0; i--)
                        xp->mp_ptrs[i] = sp->mp_ptrs[i] - delta;
@@ -6522,6 +6564,9 @@ mdb_node_shrink(MDB_page *mp, indx_t indx)
        xp->mp_pad = sp->mp_pad;
        COPY_PGNO(xp->mp_pgno, mp->mp_pgno);
 
+       nsize = NODEDSZ(node) - delta;
+       SETDSZ(node, nsize);
+
        /* shift lower nodes upward */
        ptr = mp->mp_ptrs[indx];
        numkeys = NUMKEYS(mp);
@@ -6745,7 +6790,7 @@ mdb_cursor_dbi(MDB_cursor *mc)
        return mc->mc_dbi;
 }
 
-/** Replace the key for a node with a new key.
+/** Replace the key for a branch node with a new key.
  * @param[in] mc Cursor pointing to the node to operate on.
  * @param[in] key The new key to use.
  * @return 0 on success, non-zero on failure.
@@ -6768,7 +6813,7 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key)
 #if MDB_DEBUG
        {
                MDB_val k2;
-               char kbuf2[(MDB_MAXKEYSIZE*2+1)];
+               char kbuf2[DKBUF_MAXKEYSIZE*2+1];
                k2.mv_data = NODEKEY(node);
                k2.mv_size = node->mn_ksize;
                DPRINTF(("update key %u (ofs %u) [%s] to [%s] on page %"Z"u",
@@ -6779,15 +6824,12 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key)
        }
 #endif
 
-       ksize = key->mv_size;
-       ksize += (ksize & 1);
-       oksize = node->mn_ksize;
-       oksize += (oksize & 1);
+       /* Sizes must be 2-byte aligned. */
+       ksize = EVEN(key->mv_size);
+       oksize = EVEN(node->mn_ksize);
        delta = ksize - oksize;
 
-       /* Must be 2-byte aligned. If new key is
-        * shorter by 1, the shift will be skipped.
-        */
+       /* Shift node contents if EVEN(key length) changed. */
        if (delta) {
                if (delta > 0 && SIZELEFT(mp) < delta) {
                        pgno_t pgno;
@@ -6845,7 +6887,6 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
                return rc;
 
        if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
-               srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);        /* fake */
                key.mv_size = csrc->mc_db->md_pad;
                key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
                data.mv_size = 0;
@@ -7154,25 +7195,15 @@ mdb_rebalance(MDB_cursor *mc)
        MDB_cursor      mn;
 
        minkeys = 1 + (IS_BRANCH(mc->mc_pg[mc->mc_top]));
-#if MDB_DEBUG
-       {
-       pgno_t pgno;
-       COPY_PGNO(pgno, mc->mc_pg[mc->mc_top]->mp_pgno);
        DPRINTF(("rebalancing %s page %"Z"u (has %u keys, %.1f%% full)",
            IS_LEAF(mc->mc_pg[mc->mc_top]) ? "leaf" : "branch",
-           pgno, NUMKEYS(mc->mc_pg[mc->mc_top]),
+           mdb_dbg_pgno(mc->mc_pg[mc->mc_top]), NUMKEYS(mc->mc_pg[mc->mc_top]),
                (float)PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) / 10));
-       }
-#endif
 
        if (PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) >= FILL_THRESHOLD &&
                NUMKEYS(mc->mc_pg[mc->mc_top]) >= minkeys) {
-#if MDB_DEBUG
-               pgno_t pgno;
-               COPY_PGNO(pgno, mc->mc_pg[mc->mc_top]->mp_pgno);
                DPRINTF(("no need to rebalance page %"Z"u, above fill threshold",
-                   pgno));
-#endif
+                   mdb_dbg_pgno(mc->mc_pg[mc->mc_top])));
                return MDB_SUCCESS;
        }
 
@@ -7342,7 +7373,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf)
        if (rc != MDB_SUCCESS)
                mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
        else {
-               MDB_cursor *m2;
+               MDB_cursor *m2, *m3;
                MDB_dbi dbi = mc->mc_dbi;
 
                mp = mc->mc_pg[mc->mc_top];
@@ -7354,18 +7385,19 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf)
 
                /* Adjust other cursors pointing to mp */
                for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
-                       if (m2 == mc || m2->mc_snum < mc->mc_snum)
+                       m3 = (mc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2;
+                       if (! (m2->mc_flags & m3->mc_flags & C_INITIALIZED))
                                continue;
-                       if (!(m2->mc_flags & C_INITIALIZED))
+                       if (m3 == mc || m3->mc_snum < mc->mc_snum)
                                continue;
-                       if (m2->mc_pg[mc->mc_top] == mp) {
-                               if (m2->mc_ki[mc->mc_top] >= ki) {
-                                       m2->mc_flags |= C_DEL;
-                                       if (m2->mc_ki[mc->mc_top] > ki)
-                                               m2->mc_ki[mc->mc_top]--;
+                       if (m3->mc_pg[mc->mc_top] == mp) {
+                               if (m3->mc_ki[mc->mc_top] >= ki) {
+                                       m3->mc_flags |= C_DEL;
+                                       if (m3->mc_ki[mc->mc_top] > ki)
+                                               m3->mc_ki[mc->mc_top]--;
                                }
-                               if (m2->mc_ki[mc->mc_top] >= nkeys)
-                                       mdb_cursor_sibling(m2, 1);
+                               if (m3->mc_ki[mc->mc_top] >= nkeys)
+                                       mdb_cursor_sibling(m3, 1);
                        }
                }
                mc->mc_flags |= C_DEL;
@@ -7385,7 +7417,8 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
        int              rc, exact;
        DKBUF;
 
-       assert(key != NULL);
+       if (key == NULL)
+               return EINVAL;
 
        DPRINTF(("====> delete db %u key [%s]", dbi, DKEY(key)));
 
@@ -7395,10 +7428,6 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
        if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR))
                return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
 
-       if (key->mv_size > MDB_MAXKEYSIZE) {
-               return MDB_BAD_VALSIZE;
-       }
-
        mdb_cursor_init(&mc, txn, dbi, &mx);
 
        exact = 0;
@@ -7566,7 +7595,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                                nsize = mdb_leaf_size(env, newkey, newdata);
                        else
                                nsize = mdb_branch_size(env, newkey);
-                       nsize += nsize & 1;
+                       nsize = EVEN(nsize);
 
                        /* grab a page to hold a temporary copy */
                        copy = mdb_page_malloc(mc->mc_txn, 1);
@@ -7623,7 +7652,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                                                        else
                                                                psize += NODEDSZ(node);
                                                }
-                                               psize += psize & 1;
+                                               psize = EVEN(psize);
                                        }
                                        if (psize > pmax || i == k-j) {
                                                split_indx = i + (j<0);
@@ -7834,8 +7863,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        MDB_cursor mc;
        MDB_xcursor mx;
 
-       assert(key != NULL);
-       assert(data != NULL);
+       if (key == NULL || data == NULL)
+               return EINVAL;
 
        if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID))
                return EINVAL;
@@ -8269,7 +8298,7 @@ int mdb_set_relctx(MDB_txn *txn, MDB_dbi dbi, void *ctx)
 
 int mdb_env_get_maxkeysize(MDB_env *env)
 {
-       return MDB_MAXKEYSIZE;
+       return ENV_MAXKEY(env);
 }
 
 int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx)
@@ -8277,7 +8306,7 @@ int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx)
        unsigned int i, rdrs;
        MDB_reader *mr;
        char buf[64];
-       int first = 1;
+       int rc = 0, first = 1;
 
        if (!env || !func)
                return -1;
@@ -8288,27 +8317,25 @@ int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx)
        mr = env->me_txns->mti_readers;
        for (i=0; i<rdrs; i++) {
                if (mr[i].mr_pid) {
-                       size_t tid;
-                       int rc;
-                       tid = mr[i].mr_tid;
-                       if (mr[i].mr_txnid == (txnid_t)-1) {
-                               sprintf(buf, "%10d %"Z"x -\n", mr[i].mr_pid, tid);
-                       } else {
-                               sprintf(buf, "%10d %"Z"x %"Z"u\n", mr[i].mr_pid, tid, mr[i].mr_txnid);
-                       }
+                       txnid_t txnid = mr[i].mr_txnid;
+                       sprintf(buf, txnid == (txnid_t)-1 ?
+                               "%10d %"Z"x -\n" : "%10d %"Z"x %"Z"u\n",
+                               (int)mr[i].mr_pid, (size_t)mr[i].mr_tid, txnid);
                        if (first) {
                                first = 0;
-                               func("    pid     thread     txnid\n", ctx);
+                               rc = func("    pid     thread     txnid\n", ctx);
+                               if (rc < 0)
+                                       break;
                        }
                        rc = func(buf, ctx);
                        if (rc < 0)
-                               return rc;
+                               break;
                }
        }
        if (first) {
-               func("(no active readers)\n", ctx);
+               rc = func("(no active readers)\n", ctx);
        }
-       return 0;
+       return rc;
 }
 
 /** Insert pid into list if not already present.
@@ -8369,7 +8396,6 @@ int mdb_reader_check(MDB_env *env, int *dead)
                return ENOMEM;
        pids[0] = 0;
        mr = env->me_txns->mti_readers;
-       j = 0;
        for (i=0; i<rdrs; i++) {
                if (mr[i].mr_pid && mr[i].mr_pid != env->me_pid) {
                        pid = mr[i].mr_pid;