]> git.sur5r.net Git - openldap/blobdiff - libraries/libmdb/mdb.c
ITS#7210 fix leak of overflow pages in freelist
[openldap] / libraries / libmdb / mdb.c
index 6abadd2fbe17d1146430552d2c327e18af13c0ab..7129d0a1bc0ba55ab6117e41671ac57a9999c8e2 100644 (file)
 
 #if defined(_WIN32) || defined(__APPLE__)
 #define MNAME_LEN      32
+#else
+#define MNAME_LEN      (sizeof(pthread_mutex_t))
 #endif
 
 /** @} */
@@ -543,7 +545,7 @@ typedef struct MDB_txninfo {
                pthread_mutex_t mt2_wmutex;
 #define mti_wmutex     mt2.mt2_wmutex
 #endif
-               char pad[(sizeof(pthread_mutex_t)+CACHELINE-1) & ~(CACHELINE-1)];
+               char pad[(MNAME_LEN+CACHELINE-1) & ~(CACHELINE-1)];
        } mt2;
        MDB_reader      mti_readers[1];
 } MDB_txninfo;
@@ -939,6 +941,8 @@ struct MDB_env {
        unsigned int    me_psize;       /**< size of a page, from #GET_PAGESIZE */
        unsigned int    me_db_toggle;   /**< which DB table is current */
        txnid_t         me_wtxnid;              /**< ID of last txn we committed */
+       txnid_t         me_pgfirst;             /**< ID of first old page record we used */
+       txnid_t         me_pglast;              /**< ID of last old page record we used */
        MDB_dbx         *me_dbxs;               /**< array of static DB info */
        MDB_db          *me_dbs[2];             /**< two arrays of MDB_db info */
        MDB_oldpages *me_pghead;        /**< list of old page records */
@@ -1104,6 +1108,55 @@ mdb_page_keys(MDB_page *mp)
 }
 #endif
 
+#if MDB_DEBUG > 2
+/** Count all the pages in each DB and in the freelist
+ *  and make sure it matches the actual number of pages
+ *  being used.
+ */
+static void mdb_audit(MDB_txn *txn)
+{
+       MDB_cursor mc;
+       MDB_val key, data;
+       int rc, i;
+       ID freecount, count;
+
+       freecount = 0;
+       mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
+       while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0)
+               freecount += *(ID *)data.mv_data;
+       freecount += txn->mt_dbs[0].md_branch_pages + txn->mt_dbs[0].md_leaf_pages +
+               txn->mt_dbs[0].md_overflow_pages;
+
+       count = 0;
+       for (i = 0; i<txn->mt_numdbs; i++) {
+               count += txn->mt_dbs[i].md_branch_pages +
+                       txn->mt_dbs[i].md_leaf_pages +
+                       txn->mt_dbs[i].md_overflow_pages;
+               if (txn->mt_dbs[i].md_flags & MDB_DUPSORT) {
+                       MDB_xcursor mx;
+                       mdb_cursor_init(&mc, txn, i, &mx);
+                       mdb_page_search(&mc, NULL, 0);
+                       do {
+                               int j;
+                               MDB_page *mp;
+                               mp = mc.mc_pg[mc.mc_top];
+                               for (j=0; j<NUMKEYS(mp); j++) {
+                                       MDB_node *leaf = NODEPTR(mp, j);
+                                       if (leaf->mn_flags & F_SUBDATA) {
+                                               MDB_db db;
+                                               memcpy(&db, NODEDATA(leaf), sizeof(db));
+                                               count += db.md_branch_pages + db.md_leaf_pages +
+                                                       db.md_overflow_pages;
+                                       }
+                               }
+                       }
+                       while (mdb_cursor_sibling(&mc, 1) == 0);
+               }
+       }
+       assert(freecount + count + 2 >= txn->mt_next_pgno - 1);
+}
+#endif
+
 int
 mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
 {
@@ -1126,13 +1179,11 @@ static MDB_page *
 mdb_page_malloc(MDB_cursor *mc) {
        MDB_page *ret;
        size_t sz = mc->mc_txn->mt_env->me_psize;
-       if (mc->mc_txn->mt_env->me_dpages) {
-               ret = mc->mc_txn->mt_env->me_dpages;
+       if ((ret = mc->mc_txn->mt_env->me_dpages) != NULL) {
                VGMEMP_ALLOC(mc->mc_txn->mt_env, ret, sz);
                VGMEMP_DEFINED(ret, sizeof(ret->mp_next));
                mc->mc_txn->mt_env->me_dpages = ret->mp_next;
-       } else {
-               ret = malloc(sz);
+       } else if ((ret = malloc(sz)) != NULL) {
                VGMEMP_ALLOC(mc->mc_txn->mt_env, ret, sz);
        }
        return ret;
@@ -1157,17 +1208,32 @@ mdb_page_alloc(MDB_cursor *mc, int num)
 
        if (txn->mt_txnid > 2) {
 
-               if (!txn->mt_env->me_pghead && mc->mc_dbi != FREE_DBI &&
+               if (!txn->mt_env->me_pghead &&
                        txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
                        /* See if there's anything in the free DB */
                        MDB_cursor m2;
                        MDB_node *leaf;
-                       txnid_t *kptr, oldest;
+                       MDB_val data;
+                       txnid_t *kptr, oldest, last;
 
                        mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
-                       mdb_page_search(&m2, NULL, 0);
-                       leaf = NODEPTR(m2.mc_pg[m2.mc_top], 0);
-                       kptr = (txnid_t *)NODEKEY(leaf);
+                       if (!txn->mt_env->me_pgfirst) {
+                               mdb_page_search(&m2, NULL, 0);
+                               leaf = NODEPTR(m2.mc_pg[m2.mc_top], 0);
+                               kptr = (txnid_t *)NODEKEY(leaf);
+                               last = *kptr;
+                       } else {
+                               MDB_val key;
+                               int rc, exact = 0;
+                               last = txn->mt_env->me_pglast + 1;
+                               leaf = NULL;
+                               key.mv_data = &last;
+                               key.mv_size = sizeof(last);
+                               rc = mdb_cursor_set(&m2, &key, &data, MDB_SET, &exact);
+                               if (rc)
+                                       goto none;
+                               last = *(txnid_t *)key.mv_data;
+                       }
 
                        {
                                unsigned int i;
@@ -1179,18 +1245,22 @@ mdb_page_alloc(MDB_cursor *mc, int num)
                                }
                        }
 
-                       if (oldest > *kptr) {
+                       if (oldest > last) {
                                /* It's usable, grab it.
                                 */
                                MDB_oldpages *mop;
-                               MDB_val data;
                                pgno_t *idl;
 
-                               mdb_node_read(txn, leaf, &data);
+                               if (!txn->mt_env->me_pgfirst) {
+                                       mdb_node_read(txn, leaf, &data);
+                               }
                                idl = (ID *) data.mv_data;
                                mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
                                mop->mo_next = txn->mt_env->me_pghead;
-                               mop->mo_txnid = *kptr;
+                               mop->mo_txnid = last;
+                               txn->mt_env->me_pglast = last;
+                               if (!txn->mt_env->me_pgfirst)
+                                       txn->mt_env->me_pgfirst = last;
                                txn->mt_env->me_pghead = mop;
                                memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
 
@@ -1204,12 +1274,9 @@ mdb_page_alloc(MDB_cursor *mc, int num)
                                        }
                                }
 #endif
-                               /* drop this IDL from the DB */
-                               m2.mc_ki[m2.mc_top] = 0;
-                               m2.mc_flags = C_INITIALIZED;
-                               mdb_cursor_del(&m2, 0);
                        }
                }
+none:
                if (txn->mt_env->me_pghead) {
                        MDB_oldpages *mop = txn->mt_env->me_pghead;
                        if (num > 1) {
@@ -1678,6 +1745,8 @@ mdb_txn_reset0(MDB_txn *txn)
                        txn->mt_env->me_pghead = mop->mo_next;
                        free(mop);
                }
+               txn->mt_env->me_pgfirst = 0;
+               txn->mt_env->me_pglast = 0;
 
                env->me_txn = NULL;
                /* The writer mutex was locked in mdb_txn_begin. */
@@ -1724,7 +1793,7 @@ mdb_txn_commit(MDB_txn *txn)
        off_t            size;
        MDB_page        *dp;
        MDB_env *env;
-       pgno_t  next;
+       pgno_t  next, freecnt;
        MDB_cursor mc;
 
        assert(txn != NULL);
@@ -1836,10 +1905,29 @@ mdb_txn_commit(MDB_txn *txn)
                /* make sure first page of freeDB is touched and on freelist */
                mdb_page_search(&mc, NULL, 1);
        }
+
+       /* Delete IDLs we used from the free list */
+       if (env->me_pgfirst) {
+               txnid_t cur;
+               MDB_val key;
+               int exact = 0;
+
+               key.mv_size = sizeof(cur);
+               for (cur = env->me_pgfirst; cur <= env->me_pglast; cur++) {
+                       key.mv_data = &cur;
+
+                       mdb_cursor_set(&mc, &key, NULL, MDB_SET, &exact);
+                       mdb_cursor_del(&mc, 0);
+               }
+               env->me_pgfirst = 0;
+               env->me_pglast = 0;
+       }
+
        /* save to free list */
+free2:
+       freecnt = txn->mt_free_pgs[0];
        if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
                MDB_val key, data;
-               pgno_t i;
 
                /* make sure last page of freeDB is touched and on freelist */
                key.mv_size = MAXKEYSIZE+1;
@@ -1867,30 +1955,60 @@ mdb_txn_commit(MDB_txn *txn)
                 * and make sure the entire thing got written.
                 */
                do {
-                       i = txn->mt_free_pgs[0];
+                       freecnt = txn->mt_free_pgs[0];
                        data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
                        rc = mdb_cursor_put(&mc, &key, &data, 0);
                        if (rc) {
                                mdb_txn_abort(txn);
                                return rc;
                        }
-               } while (i != txn->mt_free_pgs[0]);
-               if (mdb_midl_shrink(&txn->mt_free_pgs))
-                       env->me_free_pgs = txn->mt_free_pgs;
+               } while (freecnt != txn->mt_free_pgs[0]);
        }
        /* should only be one record now */
+again:
        if (env->me_pghead) {
                MDB_val key, data;
                MDB_oldpages *mop;
+               pgno_t orig;
+               txnid_t id;
 
                mop = env->me_pghead;
-               env->me_pghead = NULL;
-               key.mv_size = sizeof(pgno_t);
-               key.mv_data = &mop->mo_txnid;
+               id = mop->mo_txnid;
+               key.mv_size = sizeof(id);
+               key.mv_data = &id;
                data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
                data.mv_data = mop->mo_pages;
+               orig = mop->mo_pages[0];
+               /* These steps may grow the freelist again
+                * due to freed overflow pages...
+                */
                mdb_cursor_put(&mc, &key, &data, 0);
-               free(mop);
+               if (mop == env->me_pghead) {
+                       /* could have been used again here */
+                       if (mop->mo_pages[0] != orig) {
+                               data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
+                               data.mv_data = mop->mo_pages;
+                               id = mop->mo_txnid;
+                               mdb_cursor_put(&mc, &key, &data, 0);
+                       }
+                       env->me_pghead = NULL;
+                       free(mop);
+               } else {
+                       /* was completely used up */
+                       mdb_cursor_del(&mc, 0);
+                       if (env->me_pghead)
+                               goto again;
+               }
+               env->me_pgfirst = 0;
+               env->me_pglast = 0;
+       }
+       /* Check for growth of freelist again */
+       if (freecnt != txn->mt_free_pgs[0])
+               goto free2;
+
+       if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
+               if (mdb_midl_shrink(&txn->mt_free_pgs))
+                       env->me_free_pgs = txn->mt_free_pgs;
        }
 
        /* Update DB root pointers. Their pages have already been
@@ -1909,6 +2027,9 @@ mdb_txn_commit(MDB_txn *txn)
                        }
                }
        }
+#if MDB_DEBUG > 2
+       mdb_audit(txn);
+#endif
 
        /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
         */
@@ -1955,7 +2076,6 @@ mdb_txn_commit(MDB_txn *txn)
                        dp = txn->mt_u.dirty_list[i].mptr;
                        if (dp->mp_pgno != next) {
                                if (n) {
-                                       DPRINTF("committing %u dirty pages", n);
                                        rc = writev(env->me_fd, iov, n);
                                        if (rc != size) {
                                                n = ErrCode();
@@ -1990,7 +2110,6 @@ mdb_txn_commit(MDB_txn *txn)
                if (n == 0)
                        break;
 
-               DPRINTF("committing %u dirty pages", n);
                rc = writev(env->me_fd, iov, n);
                if (rc != size) {
                        n = ErrCode();
@@ -4205,9 +4324,42 @@ more:
                        goto put_sub;
                }
 current:
-               /* same size, just replace it */
-               if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
-                       NODEDSZ(leaf) == data->mv_size) {
+               /* overflow page overwrites need special handling */
+               if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
+                       MDB_page *omp;
+                       pgno_t pg;
+                       int ovpages, dpages;
+
+                       ovpages = OVPAGES(NODEDSZ(leaf), mc->mc_txn->mt_env->me_psize);
+                       dpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
+                       memcpy(&pg, NODEDATA(leaf), sizeof(pg));
+                       mdb_page_get(mc->mc_txn, pg, &omp);
+                       /* Is the ov page writable and large enough? */
+                       if ((omp->mp_flags & P_DIRTY) && ovpages >= dpages) {
+                               /* yes, overwrite it. Note in this case we don't
+                                * bother to try shrinking the node if the new data
+                                * is smaller than the overflow threshold.
+                                */
+                               if (F_ISSET(flags, MDB_RESERVE))
+                                       data->mv_data = METADATA(omp);
+                               else
+                                       memcpy(METADATA(omp), data->mv_data, data->mv_size);
+                               goto done;
+                       } else {
+                               /* no, free ovpages */
+                               int i;
+                               mc->mc_db->md_overflow_pages -= ovpages;
+                               for (i=0; i<ovpages; i++) {
+                                       DPRINTF("freed ov page %zu", pg);
+                                       mdb_midl_append(&mc->mc_txn->mt_free_pgs, pg);
+                                       pg++;
+                               }
+                       }
+               } else if (NODEDSZ(leaf) == data->mv_size) {
+                       /* same size, just replace it. Note that we could
+                        * also reuse this node if the new data is smaller,
+                        * but instead we opt to shrink the node in that case.
+                        */
                        if (F_ISSET(flags, MDB_RESERVE))
                                data->mv_data = NODEDATA(leaf);
                        else
@@ -4215,6 +4367,7 @@ current:
                        goto done;
                }
                mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
+               mc->mc_db->md_entries--;
        } else {
                DPRINTF("inserting key at index %i", mc->mc_ki[mc->mc_top]);
        }
@@ -4720,6 +4873,7 @@ mdb_xcursor_init0(MDB_cursor *mc)
        mx->mx_cursor.mc_dbi = mc->mc_dbi+1;
        mx->mx_cursor.mc_dbflag = &mx->mx_dbflag;
        mx->mx_cursor.mc_snum = 0;
+       mx->mx_cursor.mc_top = 0;
        mx->mx_cursor.mc_flags = C_SUB;
        mx->mx_dbx.md_cmp = mc->mc_dbx->md_dcmp;
        mx->mx_dbx.md_dcmp = NULL;
@@ -4808,7 +4962,11 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
        MDB_xcursor     *mx = NULL;
        size_t size = sizeof(MDB_cursor);
 
-       if (txn == NULL || ret == NULL || !dbi || dbi >= txn->mt_numdbs)
+       if (txn == NULL || ret == NULL || dbi >= txn->mt_numdbs)
+               return EINVAL;
+
+       /* Allow read access to the freelist */
+       if (!dbi && !F_ISSET(txn->mt_flags, MDB_TXN_RDONLY))
                return EINVAL;
 
        if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)
@@ -5399,6 +5557,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf)
 
                memcpy(&pg, NODEDATA(leaf), sizeof(pg));
                ovpages = OVPAGES(NODEDSZ(leaf), mc->mc_txn->mt_env->me_psize);
+               mc->mc_db->md_overflow_pages -= ovpages;
                for (i=0; i<ovpages; i++) {
                        DPRINTF("freed ov page %zu", pg);
                        mdb_midl_append(&mc->mc_txn->mt_free_pgs, pg);
@@ -5490,6 +5649,11 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
            IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno,
            DKEY(newkey), mc->mc_ki[mc->mc_top]);
 
+       /* Create a right sibling. */
+       if ((rp = mdb_page_new(mc, mp->mp_flags, 1)) == NULL)
+               return ENOMEM;
+       DPRINTF("new right sibling: page %zu", rp->mp_pgno);
+
        if (mc->mc_snum < 2) {
                if ((pp = mdb_page_new(mc, P_BRANCH, 1)) == NULL)
                        return ENOMEM;
@@ -5520,11 +5684,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                DPRINTF("parent branch page is %zu", mc->mc_pg[ptop]->mp_pgno);
        }
 
-       /* Create a right sibling. */
-       if ((rp = mdb_page_new(mc, mp->mp_flags, 1)) == NULL)
-               return ENOMEM;
-       DPRINTF("new right sibling: page %zu", rp->mp_pgno);
-
        mdb_cursor_copy(mc, &mn);
        mn.mc_pg[mn.mc_top] = rp;
        mn.mc_ki[ptop] = mc->mc_ki[ptop]+1;
@@ -5664,7 +5823,10 @@ newsep:
        if (nflags & MDB_APPEND) {
                mc->mc_pg[mc->mc_top] = rp;
                mc->mc_ki[mc->mc_top] = 0;
-               return mdb_node_add(mc, 0, newkey, newdata, newpgno, nflags);
+               rc = mdb_node_add(mc, 0, newkey, newdata, newpgno, nflags);
+               if (rc)
+                       return rc;
+               goto done;
        }
        if (IS_LEAF2(rp)) {
                goto done;
@@ -5771,10 +5933,11 @@ done:
                        if (!(m3->mc_flags & C_INITIALIZED))
                                continue;
                        if (new_root) {
+                               int k;
                                /* root split */
-                               for (i=m3->mc_top; i>0; i--) {
-                                       m3->mc_ki[i+1] = m3->mc_ki[i];
-                                       m3->mc_pg[i+1] = m3->mc_pg[i];
+                               for (k=m3->mc_top; k>=0; k--) {
+                                       m3->mc_ki[k+1] = m3->mc_ki[k];
+                                       m3->mc_pg[k+1] = m3->mc_pg[k];
                                }
                                m3->mc_ki[0] = mc->mc_ki[0];
                                m3->mc_pg[0] = mc->mc_pg[0];