]> git.sur5r.net Git - openldap/blobdiff - libraries/libmdb/mdb.c
Fix multi-page commits
[openldap] / libraries / libmdb / mdb.c
index 6faa1f5bdd5ecfffe1e80d45839583f803188be1..85535f89e29f740f09bc75c8ea84e3cd9fb94e08 100644 (file)
@@ -110,6 +110,7 @@ typedef struct MDB_txbody {
        pthread_mutex_t mtb_mutex;
        ULONG           mtb_txnid;
        uint32_t        mtb_numreaders;
+       uint32_t        mtb_me_toggle;
 } MDB_txbody;
 
 typedef struct MDB_txninfo {
@@ -120,6 +121,7 @@ typedef struct MDB_txninfo {
 #define mti_mutex      mt1.mtb.mtb_mutex
 #define mti_txnid      mt1.mtb.mtb_txnid
 #define mti_numreaders mt1.mtb.mtb_numreaders
+#define mti_me_toggle  mt1.mtb.mtb_me_toggle
                char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
        } mt1;
        union {
@@ -138,7 +140,7 @@ typedef struct MDB_page {           /* represents a page of storage */
 #define        mp_pgno         mp_p.p_pgno
        union padded {
                pgno_t          p_pgno;         /* page number */
-               void *          p_pad;
+               void *          p_align;        /* for IL32P64 */
        } mp_p;
 #define        P_BRANCH         0x01           /* branch page */
 #define        P_LEAF           0x02           /* leaf page */
@@ -311,9 +313,9 @@ struct MDB_env {
        int                     me_fd;
        int                     me_lfd;
        int                     me_mfd;                 /* just for writing the meta pages */
-       uint16_t        me_flags;
-       uint16_t                me_db_toggle;
-       unsigned int    me_psize;
+#define        MDB_FATAL_ERROR 0x80000000U
+       uint32_t        me_flags;
+       uint32_t        me_extrapad;    /* unused for now */
        unsigned int    me_maxreaders;
        unsigned int    me_numdbs;
        unsigned int    me_maxdbs;
@@ -325,6 +327,9 @@ struct MDB_env {
        MDB_txn         *me_txn;                /* current write transaction */
        size_t          me_mapsize;
        off_t           me_size;                /* current file size */
+       pgno_t          me_maxpg;               /* me_mapsize / me_psize */
+       unsigned int    me_psize;
+       unsigned int    me_db_toggle;
        MDB_dbx         *me_dbxs;               /* array */
        MDB_db          *me_dbs[2];
        MDB_oldpages *me_pghead;
@@ -455,6 +460,27 @@ mdb_version(int *maj, int *min, int *pat)
        return MDB_VERSION_STRING;
 }
 
+static const char *errstr[] = {
+       "MDB_KEYEXIST: Key/data pair already exists",
+       "MDB_NOTFOUND: No matching key/data pair found",
+       "MDB_PAGE_NOTFOUND: Requested page not found",
+       "MDB_CORRUPTED: Located page was wrong type",
+       "MDB_PANIC: Update of meta page failed",
+       "MDB_VERSION_MISMATCH: Database environment version mismatch"
+};
+
+char *
+mdb_strerror(int err)
+{
+       if (!err)
+               return ("Successful return: 0");
+
+       if (err >= MDB_KEYEXIST && err <= MDB_VERSION_MISMATCH)
+               return (char *)errstr[err - MDB_KEYEXIST];
+
+       return strerror(err);
+}
+
 int
 mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
 {
@@ -568,6 +594,11 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
        }
        }
 
+       if (pgno == P_INVALID) {
+               /* DB size is maxed out */
+               if (txn->mt_next_pgno + num >= txn->mt_env->me_maxpg)
+                       return NULL;
+       }
        if ((dp = malloc(txn->mt_env->me_psize * num + sizeof(MDB_dhead))) == NULL)
                return NULL;
        dp->h.md_num = num;
@@ -633,6 +664,10 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
        MDB_txn *txn;
        int rc, toggle;
 
+       if (env->me_flags & MDB_FATAL_ERROR) {
+               DPRINTF("mdb_txn_begin: environment had fatal error, must shutdown!");
+               return MDB_PANIC;
+       }
        if ((txn = calloc(1, sizeof(MDB_txn))) == NULL) {
                DPRINTF("calloc: %s", strerror(errno));
                return ENOMEM;
@@ -679,6 +714,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
 
        txn->mt_env = env;
 
+       toggle = env->me_txns->mti_me_toggle;
        if ((rc = mdb_env_read_meta(env, &toggle)) != MDB_SUCCESS) {
                mdb_txn_abort(txn);
                return rc;
@@ -763,9 +799,8 @@ mdb_txn_commit(MDB_txn *txn)
        env = txn->mt_env;
 
        if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
-               DPRINTF("attempt to commit read-only transaction");
                mdb_txn_abort(txn);
-               return EPERM;
+               return MDB_SUCCESS;
        }
 
        if (txn != env->me_txn) {
@@ -849,11 +884,12 @@ mdb_txn_commit(MDB_txn *txn)
        /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
         */
        next = 0;
+       i = 1;
        do {
                n = 0;
                done = 1;
                size = 0;
-               for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
+               for (; i<=txn->mt_u.dirty_list[0].mid; i++) {
                        dp = txn->mt_u.dirty_list[i].mptr;
                        if (dp->p.mp_pgno != next) {
                                if (n) {
@@ -914,8 +950,9 @@ mdb_txn_commit(MDB_txn *txn)
                mdb_txn_abort(txn);
                return n;
        }
-       env->me_txn = NULL;
 
+done:
+       env->me_txn = NULL;
        /* update the DB tables */
        {
                int toggle = !env->me_db_toggle;
@@ -939,10 +976,6 @@ mdb_txn_commit(MDB_txn *txn)
 
        pthread_mutex_unlock(&env->me_txns->mti_wmutex);
        free(txn);
-       txn = NULL;
-
-done:
-       mdb_txn_abort(txn);
 
        return MDB_SUCCESS;
 }
@@ -1036,19 +1069,23 @@ static int
 mdb_env_write_meta(MDB_txn *txn)
 {
        MDB_env *env;
-       MDB_meta        meta;
+       MDB_meta        meta, metab;
        off_t off;
-       int rc, len;
+       int rc, len, toggle;
        char *ptr;
 
        assert(txn != NULL);
        assert(txn->mt_env != NULL);
 
+       toggle = !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE);
        DPRINTF("writing meta page %d for root page %lu",
-               !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE), txn->mt_dbs[MAIN_DBI].md_root);
+               toggle, txn->mt_dbs[MAIN_DBI].md_root);
 
        env = txn->mt_env;
 
+       metab.mm_txnid = env->me_metas[toggle]->mm_txnid;
+       metab.mm_last_pg = env->me_metas[toggle]->mm_last_pg;
+
        ptr = (char *)&meta;
        off = offsetof(MDB_meta, mm_dbs[0].md_depth);
        len = sizeof(MDB_meta) - off;
@@ -1059,15 +1096,25 @@ mdb_env_write_meta(MDB_txn *txn)
        meta.mm_last_pg = txn->mt_next_pgno - 1;
        meta.mm_txnid = txn->mt_txnid;
 
-       if (!F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE))
+       if (toggle)
                off += env->me_psize;
        off += PAGEHDRSZ;
 
-       rc = pwrite(env->me_fd, ptr, len, off);
+       /* Write to the SYNC fd */
+       rc = pwrite(env->me_mfd, ptr, len, off);
        if (rc != len) {
                DPRINTF("write failed, disk error?");
+               /* On a failure, the pagecache still contains the new data.
+                * Write some old data back, to prevent it from being used.
+                * Use the non-SYNC fd; we know it will fail anyway.
+                */
+               meta.mm_last_pg = metab.mm_last_pg;
+               meta.mm_txnid = metab.mm_txnid;
+               rc = pwrite(env->me_fd, ptr, len, off);
+               env->me_flags |= MDB_FATAL_ERROR;
                return errno;
        }
+       txn->mt_env->me_txns->mti_me_toggle = toggle;
 
        return MDB_SUCCESS;
 }
@@ -1079,13 +1126,13 @@ mdb_env_read_meta(MDB_env *env, int *which)
 
        assert(env != NULL);
 
-       if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
+       if (which)
+               toggle = *which;
+       else if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
                toggle = 1;
 
        if (env->me_meta != env->me_metas[toggle])
                env->me_meta = env->me_metas[toggle];
-       if (which)
-               *which = toggle;
 
        DPRINTF("Using meta page %d", toggle);
 
@@ -1183,7 +1230,9 @@ mdb_env_open2(MDB_env *env, unsigned int flags)
        }
        env->me_psize = meta.mm_psize;
 
-       p = (MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)env->me_map;
+       env->me_maxpg = env->me_mapsize / env->me_psize;
+
+       p = (MDB_page *)env->me_map;
        env->me_metas[0] = METADATA(p);
        env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize);
 
@@ -1219,6 +1268,8 @@ mdb_env_share_locks(MDB_env *env)
        struct flock lock_info;
 
        env->me_txns->mti_txnid = env->me_meta->mm_txnid;
+       if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
+               env->me_txns->mti_me_toggle = 1;
 
        memset((void *)&lock_info, 0, sizeof(lock_info));
        lock_info.l_type = F_RDLCK;
@@ -1292,6 +1343,7 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
                env->me_txns->mti_magic = MDB_MAGIC;
                env->me_txns->mti_txnid = 0;
                env->me_txns->mti_numreaders = 0;
+               env->me_txns->mti_me_toggle = 0;
 
        } else {
                if (env->me_txns->mti_magic != MDB_MAGIC) {
@@ -1546,7 +1598,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
        int rc;
 
        if (cursor && cursor_push_page(cursor, mp) == NULL)
-               return MDB_FAIL;
+               return ENOMEM;
 
        while (IS_BRANCH(mp)) {
                unsigned int     i = 0;
@@ -1583,12 +1635,12 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
 
                mpp->mp_parent = mp;
                if ((mp = mdb_get_page(txn, NODEPGNO(node))) == NULL)
-                       return MDB_FAIL;
+                       return MDB_PAGE_NOTFOUND;
                mpp->mp_pi = i;
                mpp->mp_page = mp;
 
                if (cursor && cursor_push_page(cursor, mp) == NULL)
-                       return MDB_FAIL;
+                       return ENOMEM;
 
                if (modify) {
                        MDB_dhead *dh = ((MDB_dhead *)mp)-1;
@@ -1605,7 +1657,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
        if (!IS_LEAF(mp)) {
                DPRINTF("internal error, index points to a %02X page!?",
                    mp->mp_flags);
-               return MDB_FAIL;
+               return MDB_CORRUPTED;
        }
 
        DPRINTF("found leaf page %lu for key %.*s", mp->mp_pgno,
@@ -1643,7 +1695,7 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
        }
 
        if ((mpp->mp_page = mdb_get_page(txn, root)) == NULL)
-               return MDB_FAIL;
+               return MDB_PAGE_NOTFOUND;
 
        DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags);
 
@@ -1687,9 +1739,9 @@ mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data)
        memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
        if ((omp = mdb_get_page(txn, pgno)) == NULL) {
                DPRINTF("read overflow page %lu failed", pgno);
-               return MDB_FAIL;
+               return MDB_PAGE_NOTFOUND;
        }
-       data->mv_data = omp;
+       data->mv_data = METADATA(omp);
 
        return MDB_SUCCESS;
 }
@@ -1773,7 +1825,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right)
 
        indx = NODEPTR(parent->mp_page, parent->mp_ki);
        if ((mp = mdb_get_page(cursor->mc_txn, indx->mn_pgno)) == NULL)
-               return MDB_FAIL;
+               return MDB_PAGE_NOTFOUND;
 #if 0
        mp->parent = parent->mp_page;
        mp->parent_index = parent->mp_ki;
@@ -1957,7 +2009,8 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        cursor->mc_eof = 0;
 
        if (data) {
-               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+               MDB_val d2;
+               if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS)
                        return rc;
 
                if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
@@ -1978,7 +2031,10 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
                                if (rc != MDB_SUCCESS)
                                        return rc;
                        }
+               } else {
+                       *data = d2;
                }
+
        }
 
        rc = mdb_set_key(leaf, key);
@@ -2211,7 +2267,7 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx,
                            data->mv_size);
                        node_size += sizeof(pgno_t);
                        if ((ofp = mdb_new_page(txn, dbi, P_OVERFLOW, ovpages)) == NULL)
-                               return MDB_FAIL;
+                               return ENOMEM;
                        DPRINTF("allocated overflow page %lu", ofp->p.mp_pgno);
                        flags |= F_BIGDATA;
                } else {
@@ -2427,7 +2483,6 @@ mdb_cursor_close(MDB_cursor *cursor)
                while(!CURSOR_EMPTY(cursor))
                        cursor_pop_page(cursor);
                if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
-                       mdb_xcursor_fini(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor);
                        while(!CURSOR_EMPTY(&cursor->mc_xcursor->mx_cursor))
                                cursor_pop_page(&cursor->mc_xcursor->mx_cursor);
                }
@@ -2646,7 +2701,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                        DPRINTF("collapsing root page!");
                        txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
                        if ((root = mdb_get_page(txn, txn->mt_dbs[dbi].md_root)) == NULL)
-                               return MDB_FAIL;
+                               return MDB_PAGE_NOTFOUND;
                        txn->mt_dbs[dbi].md_depth--;
                        txn->mt_dbs[dbi].md_branch_pages--;
                } else
@@ -2672,7 +2727,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                DPRINTF("reading right neighbor");
                node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
                if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
-                       return MDB_FAIL;
+                       return MDB_PAGE_NOTFOUND;
                npp.mp_pi = mpp->mp_pi + 1;
                si = 0;
                di = NUMKEYS(mpp->mp_page);
@@ -2682,7 +2737,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                DPRINTF("reading left neighbor");
                node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
                if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
-                       return MDB_FAIL;
+                       return MDB_PAGE_NOTFOUND;
                npp.mp_pi = mpp->mp_pi - 1;
                si = NUMKEYS(npp.mp_page) - 1;
                di = 0;
@@ -2861,7 +2916,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
 
        if (mdp->h.md_parent == NULL) {
                if ((pdp = mdb_new_page(txn, dbi, P_BRANCH, 1)) == NULL)
-                       return MDB_FAIL;
+                       return ENOMEM;
                mdp->h.md_pi = 0;
                mdp->h.md_parent = &pdp->p;
                txn->mt_dbs[dbi].md_root = pdp->p.mp_pgno;
@@ -2869,23 +2924,23 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
                txn->mt_dbs[dbi].md_depth++;
 
                /* Add left (implicit) pointer. */
-               if (mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL,
-                   mdp->p.mp_pgno, 0) != MDB_SUCCESS)
-                       return MDB_FAIL;
+               if ((rc = mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL,
+                   mdp->p.mp_pgno, 0)) != MDB_SUCCESS)
+                       return rc;
        } else {
                DPRINTF("parent branch page is %lu", mdp->h.md_parent->mp_pgno);
        }
 
        /* Create a right sibling. */
        if ((rdp = mdb_new_page(txn, dbi, mdp->p.mp_flags, 1)) == NULL)
-               return MDB_FAIL;
+               return ENOMEM;
        rdp->h.md_parent = mdp->h.md_parent;
        rdp->h.md_pi = mdp->h.md_pi + 1;
        DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno);
 
        /* Move half of the keys to the right sibling. */
        if ((copy = malloc(txn->mt_env->me_psize)) == NULL)
-               return MDB_FAIL;
+               return ENOMEM;
        memcpy(copy, &mdp->p, txn->mt_env->me_psize);
        memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_psize - PAGEHDRSZ);
        mdp->p.mp_lower = PAGEHDRSZ;
@@ -2927,7 +2982,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
        }
        if (rc != MDB_SUCCESS) {
                free(copy);
-               return MDB_FAIL;
+               return rc;
        }
 
        for (i = j = 0; i <= NUMKEYS(copy); j++) {
@@ -3035,6 +3090,7 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi,
                mpp.mp_page = &dp->p;
                txn->mt_dbs[dbi].md_root = mpp.mp_page->mp_pgno;
                txn->mt_dbs[dbi].md_depth++;
+               txn->mt_dbxs[dbi].md_dirty = 1;
                ki = 0;
        }
        else
@@ -3127,6 +3183,19 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        return mdb_put0(txn, dbi, key, data, flags);
 }
 
+int
+mdb_env_set_flags(MDB_env *env, unsigned int flag, int onoff)
+{
+#define        CHANGEABLE      (MDB_NOSYNC)
+       if ((flag & CHANGEABLE) != flag)
+               return EINVAL;
+       if (onoff)
+               env->me_flags |= flag;
+       else
+               env->me_flags &= ~flag;
+       return MDB_SUCCESS;
+}
+
 int
 mdb_env_get_flags(MDB_env *env, unsigned int *arg)
 {
@@ -3224,6 +3293,8 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
                txn->mt_dbxs[txn->mt_numdbs].md_dirty = dirty;
                memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db));
                *dbi = txn->mt_numdbs;
+               txn->mt_env->me_dbs[0][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
+               txn->mt_env->me_dbs[1][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
                txn->mt_numdbs++;
        }