]> git.sur5r.net Git - openldap/blobdiff - libraries/liblmdb/mdb.c
mdb_txn_renew0(): Fix un-mutexed me_flags update.
[openldap] / libraries / liblmdb / mdb.c
index b447e9a64caee6085ff5c8bdb479a2133d352408..a3ac603f882f58cd01dad1807c35f7381c344b45 100644 (file)
@@ -65,7 +65,6 @@
 #include <fcntl.h>
 #endif
 
-#include <assert.h>
 #include <errno.h>
 #include <limits.h>
 #include <stddef.h>
@@ -353,7 +352,7 @@ static txnid_t mdb_debug_start;
 
        /**     @brief The maximum size of a database page.
         *
-        *      This is 32k, since it must fit in #MDB_page.#mp_upper.
+        *      This is 32k, since it must fit in #MDB_page.%mp_upper.
         *
         *      LMDB will use database pages < OS pages if needed.
         *      That causes more I/O in write transactions: The OS must
@@ -942,7 +941,7 @@ struct MDB_txn {
 #define MDB_TXN_SPILLS         0x08            /**< txn or a parent has spilled pages */
 /** @} */
        unsigned int    mt_flags;               /**< @ref mdb_txn */
-       /** dirty_list room: Array size - #dirty pages visible to this txn.
+       /** #dirty_list room: Array size - \#dirty pages visible to this txn.
         *      Includes ancestor txns' dirty pages not hidden by other txns'
         *      dirty/spilled pages. Thus commit(nested txn) has room to merge
         *      dirty_list into mt_parent after freeing hidden mt_parent pages.
@@ -1035,8 +1034,6 @@ struct MDB_env {
 #define        MDB_ENV_ACTIVE  0x20000000U
        /** me_txkey is set */
 #define        MDB_ENV_TXKEY   0x10000000U
-       /** Have liveness lock in reader table */
-#define        MDB_LIVE_READER 0x08000000U
        uint32_t        me_flags;               /**< @ref mdb_env */
        unsigned int    me_psize;       /**< DB page size, inited from me_os_psize */
        unsigned int    me_os_psize;    /**< OS page size, from #GET_PAGESIZE */
@@ -1072,6 +1069,7 @@ struct MDB_env {
 #if !(MDB_MAXKEYSIZE)
        unsigned int    me_maxkey;      /**< max size of a key */
 #endif
+       int             me_live_reader;         /**< have liveness lock in reader table */
 #ifdef _WIN32
        int             me_pidquery;            /**< Used in OpenProcess */
        HANDLE          me_rmutex;              /* Windows mutexes don't reside in shared mem */
@@ -1097,7 +1095,7 @@ typedef struct MDB_ntxn {
 #define MDB_COMMIT_PAGES       IOV_MAX
 #endif
 
-       /* max bytes to write in one call */
+       /** max bytes to write in one call */
 #define MAX_WRITE              (0x80000000U >> (sizeof(ssize_t) == 4))
 
 static int  mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp);
@@ -1130,7 +1128,7 @@ static void mdb_env_close0(MDB_env *env, int excl);
 static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp);
 static int  mdb_node_add(MDB_cursor *mc, indx_t indx,
                            MDB_val *key, MDB_val *data, pgno_t pgno, unsigned int flags);
-static void mdb_node_del(MDB_page *mp, indx_t indx, int ksize);
+static void mdb_node_del(MDB_cursor *mc, int ksize);
 static void mdb_node_shrink(MDB_page *mp, indx_t indx);
 static int     mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst);
 static int  mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
@@ -1701,7 +1699,7 @@ static void
 mdb_page_dirty(MDB_txn *txn, MDB_page *mp)
 {
        MDB_ID2 mid;
-       int (*insert)(MDB_ID2L, MDB_ID2 *);
+       int rc, (*insert)(MDB_ID2L, MDB_ID2 *);
 
        if (txn->mt_env->me_flags & MDB_WRITEMAP) {
                insert = mdb_mid2l_append;
@@ -1710,7 +1708,8 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp)
        }
        mid.mid = mp->mp_pgno;
        mid.mptr = mp;
-       insert(txn->mt_u.dirty_list, &mid);
+       rc = insert(txn->mt_u.dirty_list, &mid);
+       mdb_tassert(txn, rc == 0);
        txn->mt_dirty_room--;
 }
 
@@ -2032,7 +2031,8 @@ mdb_page_touch(MDB_cursor *mc)
                        return ENOMEM;
                mid.mid = pgno;
                mid.mptr = np;
-               mdb_mid2l_insert(dl, &mid);
+               rc = mdb_mid2l_insert(dl, &mid);
+               mdb_cassert(mc, rc == 0);
        } else {
                return 0;
        }
@@ -2267,11 +2267,11 @@ mdb_txn_renew0(MDB_txn *txn)
                                MDB_PID_T pid = env->me_pid;
                                pthread_t tid = pthread_self();
 
-                               if (!(env->me_flags & MDB_LIVE_READER)) {
+                               if (!env->me_live_reader) {
                                        rc = mdb_reader_pid(env, Pidset, pid);
                                        if (rc)
                                                return rc;
-                                       env->me_flags |= MDB_LIVE_READER;
+                                       env->me_live_reader = 1;
                                }
 
                                LOCK_MUTEX_R(env);
@@ -2727,10 +2727,11 @@ mdb_freelist_save(MDB_txn *txn)
                        MDB_ID save;
 
                        mdb_tassert(txn, len >= 0 && id <= env->me_pglast);
-                       key.mv_data = &id;
                        if (len > mop_len) {
                                len = mop_len;
                                data.mv_size = (len + 1) * sizeof(MDB_ID);
+                               /* Drop MDB_CURRENT when changing the data size */
+                               key.mv_data = &id;
                                flags = 0;
                        }
                        data.mv_data = mop -= len;
@@ -3555,7 +3556,7 @@ mdb_env_open2(MDB_env *env)
                        env->me_mapsize = minsize;
        }
 
-       rc = mdb_env_map(env, meta.mm_address, newenv);
+       rc = mdb_env_map(env, meta.mm_address, newenv || env->me_mapsize != meta.mm_mapsize);
        if (rc)
                return rc;
 
@@ -5279,8 +5280,6 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        MDB_node        *leaf = NULL;
        DKBUF;
 
-       assert(mc);
-       mdb_cassert(mc, key);
        if (key->mv_size == 0)
                return MDB_BAD_VALSIZE;
 
@@ -5749,7 +5748,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
     unsigned int flags)
 {
        enum { MDB_NO_ROOT = MDB_LAST_ERRCODE+10 }; /* internal code */
-       MDB_env         *env = mc->mc_txn->mt_env;
+       MDB_env         *env;
        MDB_node        *leaf = NULL;
        MDB_page        *fp, *mp;
        uint16_t        fp_flags;
@@ -5762,6 +5761,11 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        unsigned int nflags;
        DKBUF;
 
+       if (mc == NULL)
+               return EINVAL;
+
+       env = mc->mc_txn->mt_env;
+
        /* Check this first so counter will always be zero on any
         * early failures.
         */
@@ -5778,8 +5782,16 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        if (mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR))
                return (mc->mc_txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
 
-       if (flags != MDB_CURRENT && key->mv_size-1 >= ENV_MAXKEY(env))
-               return MDB_BAD_VALSIZE;
+       if (flags != MDB_CURRENT) {
+               if (key == NULL)
+                       return EINVAL;
+               if (key->mv_size-1 >= ENV_MAXKEY(env))
+                       return MDB_BAD_VALSIZE;
+       } else {
+               /* Ignore key except in sub-cursor, where key holds the data */
+               if (!(mc->mc_flags & C_SUB))
+                       key = NULL;
+       }
 
 #if SIZE_MAX > MAXDATASIZE
        if (data->mv_size > ((mc->mc_db->md_flags & MDB_DUPSORT) ? ENV_MAXKEY(env) : MAXDATASIZE))
@@ -5908,7 +5920,7 @@ more:
                         * it.  mp: new (sub-)page.  offset: growth in page
                         * size.  xdata: node data with new page or DB.
                         */
-                       ssize_t         i, offset = 0;
+                       unsigned        i, offset = 0;
                        mp = fp = xdata.mv_data = env->me_pbuf;
                        mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
 
@@ -5964,17 +5976,17 @@ more:
                                fp = olddata.mv_data;
                                switch (flags) {
                                default:
-                                       i = -(ssize_t)SIZELEFT(fp);
                                        if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) {
-                                               offset = i += (ssize_t) EVEN(
-                                                       sizeof(indx_t) + NODESIZE + data->mv_size);
-                                       } else {
-                                               i += offset = fp->mp_pad;
-                                               offset *= 4; /* space for 4 more */
+                                               offset = EVEN(NODESIZE + sizeof(indx_t) +
+                                                       data->mv_size);
+                                               break;
                                        }
-                                       if (i > 0)
+                                       offset = fp->mp_pad;
+                                       if (SIZELEFT(fp) < offset) {
+                                               offset *= 4; /* space for 4 more */
                                                break;
-                                       /* FALLTHRU: Sub-page is big enough */
+                                       }
+                                       /* FALLTHRU: Big enough MDB_DUPFIXED sub-page */
                                case MDB_CURRENT:
                                        fp->mp_flags |= P_DIRTY;
                                        COPY_PGNO(fp->mp_pgno, mp->mp_pgno);
@@ -6023,7 +6035,7 @@ prep_subDB:
                                } else {
                                        memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper,
                                                olddata.mv_size - fp->mp_upper);
-                                       for (i = NUMKEYS(fp); --i >= 0; )
+                                       for (i=0; i<NUMKEYS(fp); i++)
                                                mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset;
                                }
                        }
@@ -6032,7 +6044,7 @@ prep_subDB:
                        flags |= F_DUPDATA;
                        do_sub = 1;
                        if (!insert)
-                               mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+                               mdb_node_del(mc, 0);
                        goto new_sub;
                }
 current:
@@ -6072,7 +6084,8 @@ current:
                                                return ENOMEM;
                                        id2.mid = pg;
                                        id2.mptr = np;
-                                       mdb_mid2l_insert(mc->mc_txn->mt_u.dirty_list, &id2);
+                                       rc = mdb_mid2l_insert(mc->mc_txn->mt_u.dirty_list, &id2);
+                                       mdb_cassert(mc, rc == 0);
                                        if (!(flags & MDB_RESERVE)) {
                                                /* Copy end of page, adjusting alignment so
                                                 * compiler may copy words instead of bytes.
@@ -6102,13 +6115,13 @@ current:
                         */
                        if (F_ISSET(flags, MDB_RESERVE))
                                data->mv_data = olddata.mv_data;
-                       else if (data->mv_size)
+                       else if (!(mc->mc_flags & C_SUB))
                                memcpy(olddata.mv_data, data->mv_data, data->mv_size);
                        else
                                memcpy(NODEKEY(leaf), key->mv_data, key->mv_size);
                        goto done;
                }
-               mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+               mdb_node_del(mc, 0);
                mc->mc_db->md_entries--;
        }
 
@@ -6520,14 +6533,15 @@ full:
 }
 
 /** Delete the specified node from a page.
- * @param[in] mp The page to operate on.
- * @param[in] indx The index of the node to delete.
+ * @param[in] mc Cursor pointing to the node to delete.
  * @param[in] ksize The size of a node. Only used if the page is
  * part of a #MDB_DUPFIXED database.
  */
 static void
-mdb_node_del(MDB_page *mp, indx_t indx, int ksize)
+mdb_node_del(MDB_cursor *mc, int ksize)
 {
+       MDB_page *mp = mc->mc_pg[mc->mc_top];
+       indx_t  indx = mc->mc_ki[mc->mc_top];
        unsigned int     sz;
        indx_t           i, j, numkeys, ptr;
        MDB_node        *node;
@@ -6535,10 +6549,11 @@ mdb_node_del(MDB_page *mp, indx_t indx, int ksize)
 
        DPRINTF(("delete node %u on %s page %"Z"u", indx,
            IS_LEAF(mp) ? "leaf" : "branch", mdb_dbg_pgno(mp)));
-       assert(indx < NUMKEYS(mp));
+       numkeys = NUMKEYS(mp);
+       mdb_cassert(mc, indx < numkeys);
 
        if (IS_LEAF2(mp)) {
-               int x = NUMKEYS(mp) - 1 - indx;
+               int x = numkeys - 1 - indx;
                base = LEAF2KEY(mp, indx, ksize);
                if (x)
                        memmove(base, base + ksize, x * ksize);
@@ -6558,7 +6573,6 @@ mdb_node_del(MDB_page *mp, indx_t indx, int ksize)
        sz = EVEN(sz);
 
        ptr = mp->mp_ptrs[indx];
-       numkeys = NUMKEYS(mp);
        for (i = j = 0; i < numkeys; i++) {
                if (i != indx) {
                        mp->mp_ptrs[j] = mp->mp_ptrs[i];
@@ -6833,7 +6847,6 @@ mdb_cursor_txn(MDB_cursor *mc)
 MDB_dbi
 mdb_cursor_dbi(MDB_cursor *mc)
 {
-       assert(mc != NULL);
        return mc->mc_dbi;
 }
 
@@ -6883,7 +6896,7 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key)
                        /* not enough space left, do a delete and split */
                        DPRINTF(("Not enough room, delta = %d, splitting...", delta));
                        pgno = NODEPGNO(node);
-                       mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+                       mdb_node_del(mc, 0);
                        return mdb_page_split(mc, key, NULL, pgno, MDB_SPLIT_REPLACE);
                }
 
@@ -7005,7 +7018,7 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
 
        /* Delete the node from the source page.
         */
-       mdb_node_del(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
+       mdb_node_del(csrc, key.mv_size);
 
        {
                /* Adjust other cursors pointing to mp */
@@ -7094,6 +7107,7 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
  *     the \b csrc page will be freed.
  * @param[in] csrc Cursor pointing to the source page.
  * @param[in] cdst Cursor pointing to the destination page.
+ * @return 0 on success, non-zero on failure.
  */
 static int
 mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
@@ -7163,15 +7177,17 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
 
        /* Unlink the src page from parent and add to free list.
         */
-       mdb_node_del(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1], 0);
-       if (csrc->mc_ki[csrc->mc_top-1] == 0) {
+       csrc->mc_top--;
+       mdb_node_del(csrc, 0);
+       if (csrc->mc_ki[csrc->mc_top] == 0) {
                key.mv_size = 0;
-               csrc->mc_top--;
                rc = mdb_update_key(csrc, &key);
-               csrc->mc_top++;
-               if (rc)
+               if (rc) {
+                       csrc->mc_top++;
                        return rc;
+               }
        }
+       csrc->mc_top++;
 
        rc = mdb_midl_append(&csrc->mc_txn->mt_free_pgs,
                csrc->mc_pg[csrc->mc_top]->mp_pgno);
@@ -7414,7 +7430,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf)
                        (rc = mdb_ovpage_free(mc, omp)))
                        return rc;
        }
-       mdb_node_del(mp, ki, mc->mc_db->md_pad);
+       mdb_node_del(mc, mc->mc_db->md_pad);
        mc->mc_db->md_entries--;
        rc = mdb_rebalance(mc);
        if (rc != MDB_SUCCESS)
@@ -7748,7 +7764,13 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                                mc->mc_ki[i] = mn.mc_ki[i];
                        }
                        mc->mc_pg[ptop] = mn.mc_pg[ptop];
-                       mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
+                       if (mn.mc_ki[ptop]) {
+                               mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
+                       } else {
+                               /* find right page's left sibling */
+                               mc->mc_ki[ptop] = mn.mc_ki[ptop];
+                               mdb_cursor_sibling(mc, 0);
+                       }
                }
        } else {
                mn.mc_top--;
@@ -7840,12 +7862,10 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                         */
                        if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
                                mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
-                               for (i=0; i<ptop; i++) {
+                               for (i=0; i<=ptop; i++) {
                                        mc->mc_pg[i] = mn.mc_pg[i];
                                        mc->mc_ki[i] = mn.mc_ki[i];
                                }
-                               mc->mc_pg[ptop] = mn.mc_pg[ptop];
-                               mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
                        }
                }
                /* return tmp page to freelist */