]> git.sur5r.net Git - openldap/commitdiff
Cleanup, fix freelist alloc
authorHoward Chu <hyc@symas.com>
Mon, 29 Aug 2011 23:55:41 +0000 (16:55 -0700)
committerHoward Chu <hyc@symas.com>
Thu, 1 Sep 2011 23:31:10 +0000 (16:31 -0700)
Don't allow new pages for free list to come from the free list.
Otherwise a nasty data self-reference occurs that is too much
trouble to unwind.

libraries/libmdb/mdb.c

index e53732200a1438b6c885bda17637272e053c06ad..adfe542cf84cc7c86e3e93231101439892024d68 100644 (file)
@@ -268,8 +268,8 @@ typedef struct MDB_pageparent {
        unsigned mp_pi;
 } MDB_pageparent;
 
-static MDB_dpage *mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num);
-static int             mdb_touch(MDB_txn *txn, MDB_pageparent *mp);
+static MDB_dpage *mdb_alloc_page(MDB_txn *txn, MDB_dbi dbi, MDB_page *parent, unsigned int parent_idx, int num);
+static int             mdb_touch(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp);
 
 typedef struct MDB_ppage {                                     /* ordered list of pages */
        MDB_page                *mp_page;
@@ -321,7 +321,6 @@ typedef struct MDB_dbx {
 struct MDB_txn {
        pgno_t          mt_next_pgno;   /* next unallocated page */
        ULONG           mt_txnid;
-       ULONG           mt_oldest;
        MDB_env         *mt_env;        
        pgno_t          *mt_free_pgs;   /* this is an IDL */
        union {
@@ -570,73 +569,72 @@ mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
 
 /* Allocate new page(s) for writing */
 static MDB_dpage *
-mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
+mdb_alloc_page(MDB_txn *txn, MDB_dbi dbi, MDB_page *parent, unsigned int parent_idx, int num)
 {
        MDB_dpage *dp;
        pgno_t pgno = P_INVALID;
-       ULONG oldest;
        MIDL2 mid;
 
        if (txn->mt_txnid > 2) {
 
-       oldest = txn->mt_txnid - 1;
-       if (!txn->mt_env->me_pghead && txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
-               /* See if there's anything in the free DB */
-               MDB_pageparent mpp;
-               MDB_node *leaf;
-               ULONG *kptr;
-
-               mpp.mp_parent = NULL;
-               mpp.mp_pi = 0;
-               mdb_search_page(txn, FREE_DBI, NULL, NULL, 0, &mpp);
-               leaf = NODEPTR(mpp.mp_page, 0);
-               kptr = (ULONG *)NODEKEY(leaf);
+               if (!txn->mt_env->me_pghead && dbi != FREE_DBI &&
+                       txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
+                       /* See if there's anything in the free DB */
+                       MDB_pageparent mpp;
+                       MDB_node *leaf;
+                       ULONG *kptr, oldest;
 
-               /* It's potentially usable, unless there are still
-                * older readers outstanding. Grab it.
-                */
-               if (oldest > *kptr) {
-                       MDB_oldpages *mop;
-                       MDB_val data;
-                       pgno_t *idl;
-
-                       mdb_read_data(txn, leaf, &data);
-                       idl = (ULONG *)data.mv_data;
-                       mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
-                       mop->mo_next = txn->mt_env->me_pghead;
-                       mop->mo_txnid = *kptr;
-                       txn->mt_env->me_pghead = mop;
-                       memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
+                       mpp.mp_parent = NULL;
+                       mpp.mp_pi = 0;
+                       mdb_search_page(txn, FREE_DBI, NULL, NULL, 0, &mpp);
+                       leaf = NODEPTR(mpp.mp_page, 0);
+                       kptr = (ULONG *)NODEKEY(leaf);
 
-#if DEBUG > 1
                        {
                                unsigned int i;
-                               DPRINTF("IDL read txn %lu root %lu num %lu",
-                                       mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
-                               for (i=0; i<idl[0]; i++) {
-                                       DPRINTF("IDL %lu", idl[i+1]);
+                               oldest = txn->mt_txnid - 1;
+                               for (i=0; i<txn->mt_env->me_txns->mti_numreaders; i++) {
+                                       ULONG mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
+                                       if (mr && mr < oldest)
+                                               oldest = mr;
                                }
                        }
+
+                       if (oldest > *kptr) {
+                               /* It's usable, grab it.
+                                */
+                               MDB_oldpages *mop;
+                               MDB_val data;
+                               pgno_t *idl;
+
+                               mdb_read_data(txn, leaf, &data);
+                               idl = (ULONG *)data.mv_data;
+                               mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
+                               mop->mo_next = txn->mt_env->me_pghead;
+                               mop->mo_txnid = *kptr;
+                               txn->mt_env->me_pghead = mop;
+                               memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
+
+#if DEBUG > 1
+                               {
+                                       unsigned int i;
+                                       DPRINTF("IDL read txn %lu root %lu num %lu",
+                                               mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
+                                       for (i=0; i<idl[0]; i++) {
+                                               DPRINTF("IDL %lu", idl[i+1]);
+                                       }
+                               }
 #endif
-                       /* drop this IDL from the DB */
-                       mpp.mp_parent = NULL;
-                       mpp.mp_pi = 0;
-                       mdb_search_page(txn, FREE_DBI, NULL, NULL, 1, &mpp);
-                       leaf = NODEPTR(mpp.mp_page, 0);
-                       mdb_del0(txn, FREE_DBI, 0, &mpp, leaf);
-               }
-       }
-       if (txn->mt_env->me_pghead) {
-               unsigned int i;
-               for (i=0; i<txn->mt_env->me_txns->mti_numreaders; i++) {
-                       ULONG mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
-                       if (!mr) continue;
-                       if (mr < oldest)
-                               oldest = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
+                               /* drop this IDL from the DB */
+                               mpp.mp_parent = NULL;
+                               mpp.mp_pi = 0;
+                               mdb_search_page(txn, FREE_DBI, NULL, NULL, 1, &mpp);
+                               leaf = NODEPTR(mpp.mp_page, 0);
+                               mdb_del0(txn, FREE_DBI, 0, &mpp, leaf);
+                       }
                }
-               if (oldest > txn->mt_env->me_pghead->mo_txnid) {
+               if (txn->mt_env->me_pghead) {
                        MDB_oldpages *mop = txn->mt_env->me_pghead;
-                       txn->mt_oldest = oldest;
                        if (num > 1) {
                                /* FIXME: For now, always use fresh pages. We
                                 * really ought to search the free list for a
@@ -660,7 +658,6 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
                        }
                }
        }
-       }
 
        if (pgno == P_INVALID) {
                /* DB size is maxed out */
@@ -693,7 +690,7 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
 /* Touch a page: make it dirty and re-insert into tree with updated pgno.
  */
 static int
-mdb_touch(MDB_txn *txn, MDB_pageparent *pp)
+mdb_touch(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *pp)
 {
        MDB_page *mp = pp->mp_page;
        pgno_t  pgno;
@@ -702,9 +699,9 @@ mdb_touch(MDB_txn *txn, MDB_pageparent *pp)
 
        if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
                MDB_dpage *dp;
-               if ((dp = mdb_alloc_page(txn, pp->mp_parent, pp->mp_pi, 1)) == NULL)
+               if ((dp = mdb_alloc_page(txn, dbi, pp->mp_parent, pp->mp_pi, 1)) == NULL)
                        return ENOMEM;
-               DPRINTF("touched page %lu -> %lu", mp->mp_pgno, dp->p.mp_pgno);
+               DPRINTF("touched db %u page %lu -> %lu", dbi, mp->mp_pgno, dp->p.mp_pgno);
                assert(mp->mp_pgno != dp->p.mp_pgno);
                mdb_midl_insert(txn->mt_free_pgs, mp->mp_pgno);
                pgno = dp->p.mp_pgno;
@@ -740,11 +737,6 @@ mdb_txn_renew0(MDB_txn *txn)
 {
        MDB_env *env = txn->mt_env;
 
-       if (env->me_flags & MDB_FATAL_ERROR) {
-               DPUTS("mdb_txn_begin: environment had fatal error, must shutdown!");
-               return MDB_PANIC;
-       }
-
        if (txn->mt_flags & MDB_TXN_RDONLY) {
                MDB_reader *r = pthread_getspecific(env->me_txkey);
                if (!r) {
@@ -795,10 +787,6 @@ mdb_txn_renew0(MDB_txn *txn)
                        (txn->mt_numdbs - 2) * sizeof(MDB_db));
        LAZY_RWLOCK_UNLOCK(&env->me_dblock);
 
-       DPRINTF("begin txn %p %lu%c on mdbenv %p, root page %lu", txn,
-               txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
-               (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
-
        return MDB_SUCCESS;
 }
 
@@ -810,10 +798,15 @@ mdb_txn_renew(MDB_txn *txn)
        if (!txn)
                return EINVAL;
 
+       if (txn->mt_env->me_flags & MDB_FATAL_ERROR) {
+               DPUTS("environment had fatal error, must shutdown!");
+               return MDB_PANIC;
+       }
+
        rc = mdb_txn_renew0(txn);
        if (rc == MDB_SUCCESS) {
-               DPRINTF("reset txn %p %lu%c on mdbenv %p, root page %lu", txn,
-                       txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
+               DPRINTF("renew txn %lu%c %p on mdbenv %p, root page %lu",
+                       txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
                        (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
        }
        return rc;
@@ -826,7 +819,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
        int rc;
 
        if (env->me_flags & MDB_FATAL_ERROR) {
-               DPUTS("mdb_txn_begin: environment had fatal error, must shutdown!");
+               DPUTS("environment had fatal error, must shutdown!");
                return MDB_PANIC;
        }
        if ((txn = calloc(1, sizeof(MDB_txn) + env->me_maxdbs * sizeof(MDB_db))) == NULL) {
@@ -844,8 +837,8 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
                free(txn);
        else {
                *ret = txn;
-               DPRINTF("begin txn %p %lu%c on mdbenv %p, root page %lu", txn,
-                       txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
+               DPRINTF("begin txn %lu%c %p on mdbenv %p, root page %lu",
+                       txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
                        (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
        }
 
@@ -894,8 +887,8 @@ mdb_txn_reset(MDB_txn *txn)
        if (txn == NULL)
                return;
 
-       DPRINTF("reset txn %p %lu%c on mdbenv %p, root page %lu", txn,
-               txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
+       DPRINTF("reset txn %lu%c %p on mdbenv %p, root page %lu",
+               txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
                (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
 
        mdb_txn_reset0(txn);
@@ -907,8 +900,8 @@ mdb_txn_abort(MDB_txn *txn)
        if (txn == NULL)
                return;
 
-       DPRINTF("abort txn %p %lu%c on mdbenv %p, root page %lu", txn,
-               txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
+       DPRINTF("abort txn %lu%c %p on mdbenv %p, root page %lu",
+               txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
                (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
 
        mdb_txn_reset0(txn);
@@ -952,27 +945,23 @@ mdb_txn_commit(MDB_txn *txn)
        if (!txn->mt_u.dirty_list[0].mid)
                goto done;
 
-       DPRINTF("committing txn %p %lu on mdbenv %p, root page %lu", txn,
-           txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
+       DPRINTF("committing txn %lu %p on mdbenv %p, root page %lu",
+           txn->mt_txnid, txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root);
 
        /* should only be one record now */
        if (env->me_pghead) {
-               MDB_val key, data;
-               MDB_oldpages *mop;
+               MDB_pageparent mpp;
 
-               mop = env->me_pghead;
-               key.mv_size = sizeof(pgno_t);
-               key.mv_data = (char *)&mop->mo_txnid;
-               data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
-               data.mv_data = mop->mo_pages;
-               mdb_put0(txn, FREE_DBI, &key, &data, 0);
-               free(env->me_pghead);
-               env->me_pghead = NULL;
+               /* make sure first page of freeDB is touched and on freelist */
+               mpp.mp_parent = NULL;
+               mpp.mp_pi = 0;
+               mdb_search_page(txn, FREE_DBI, NULL, NULL, 1, &mpp);
        }
        /* save to free list */
        if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
                MDB_val key, data;
                MDB_pageparent mpp;
+               ULONG i;
 
                /* make sure last page of freeDB is touched and on freelist */
                key.mv_size = MAXKEYSIZE+1;
@@ -995,9 +984,34 @@ mdb_txn_commit(MDB_txn *txn)
                /* write to last page of freeDB */
                key.mv_size = sizeof(pgno_t);
                key.mv_data = (char *)&txn->mt_txnid;
-               data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
                data.mv_data = txn->mt_free_pgs;
+               /* The free list can still grow during this call,
+                * despite the pre-emptive touches above. So check
+                * and make sure the entire thing got written.
+                */
+               do {
+                       i = txn->mt_free_pgs[0];
+                       data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
+                       rc = mdb_put0(txn, FREE_DBI, &key, &data, 0);
+                       if (rc) {
+                               mdb_txn_abort(txn);
+                               return rc;
+                       }
+               } while (i != txn->mt_free_pgs[0]);
+       }
+       /* should only be one record now */
+       if (env->me_pghead) {
+               MDB_val key, data;
+               MDB_oldpages *mop;
+
+               mop = env->me_pghead;
+               key.mv_size = sizeof(pgno_t);
+               key.mv_data = (char *)&mop->mo_txnid;
+               data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
+               data.mv_data = mop->mo_pages;
                mdb_put0(txn, FREE_DBI, &key, &data, 0);
+               free(env->me_pghead);
+               env->me_pghead = NULL;
        }
 
        /* Update DB root pointers. Their pages have already been
@@ -1819,7 +1833,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
 
                if (modify) {
                        MDB_dhead *dh = ((MDB_dhead *)mp)-1;
-                       if ((rc = mdb_touch(txn, mpp)) != 0)
+                       if ((rc = mdb_touch(txn, dbi, mpp)) != 0)
                                return rc;
                        dh = ((MDB_dhead *)mpp->mp_page)-1;
                        dh->md_parent = mpp->mp_parent;
@@ -1888,7 +1902,7 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
                if (!F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) {
                        mpp->mp_parent = NULL;
                        mpp->mp_pi = 0;
-                       if ((rc = mdb_touch(txn, mpp)))
+                       if ((rc = mdb_touch(txn, dbi, mpp)))
                                return rc;
                        txn->mt_dbs[dbi].md_root = mpp->mp_page->mp_pgno;
                }
@@ -2475,7 +2489,7 @@ mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num)
 {
        MDB_dpage       *dp;
 
-       if ((dp = mdb_alloc_page(txn, NULL, 0, num)) == NULL)
+       if ((dp = mdb_alloc_page(txn, dbi, NULL, 0, num)) == NULL)
                return NULL;
        DPRINTF("allocated new mpage %lu, page size %u",
            dp->p.mp_pgno, txn->mt_env->me_psize);
@@ -2728,7 +2742,6 @@ mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_page *mp, MDB_
        mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node);
        mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize;
        mx->mx_txn.mt_next_pgno = txn->mt_next_pgno;
-       mx->mx_txn.mt_oldest = txn->mt_oldest;
        mx->mx_txn.mt_u = txn->mt_u;
        mx->mx_cursor.mc_initialized = 0;
        mx->mx_cursor.mc_eof = 0;
@@ -2738,7 +2751,6 @@ static void
 mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
 {
        txn->mt_next_pgno = mx->mx_txn.mt_next_pgno;
-       txn->mt_oldest = mx->mx_txn.mt_oldest;
        txn->mt_u = mx->mx_txn.mt_u;
        txn->mt_dbs[0] = mx->mx_dbs[0];
        txn->mt_dbs[1] = mx->mx_dbs[1];
@@ -2870,8 +2882,8 @@ mdb_move_node(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, indx_t srcindx,
        DKBUF;
 
        /* Mark src and dst as dirty. */
-       if ((rc = mdb_touch(txn, src)) ||
-           (rc = mdb_touch(txn, dst)))
+       if ((rc = mdb_touch(txn, dbi, src)) ||
+           (rc = mdb_touch(txn, dbi, dst)))
                return rc;;
 
        if (IS_LEAF2(src->mp_page)) {
@@ -2964,8 +2976,8 @@ mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst)
        assert(dst->mp_parent);
 
        /* Mark src and dst as dirty. */
-       if ((rc = mdb_touch(txn, src)) ||
-           (rc = mdb_touch(txn, dst)))
+       if ((rc = mdb_touch(txn, dbi, src)) ||
+           (rc = mdb_touch(txn, dbi, dst)))
                return rc;
 
        /* Move all nodes from src to dst.