]> git.sur5r.net Git - openldap/blobdiff - libraries/libmdb/mdb.c
Don't malloc the free_pgs list every time
[openldap] / libraries / libmdb / mdb.c
index aa7f3d8ea6a959844425aadfb7da1168ba00e00b..7cb64417a46981bfe13b96c50cb479f9e0c2f2a0 100644 (file)
@@ -230,12 +230,15 @@ SLIST_HEAD(page_stack, MDB_ppage);
 #define CURSOR_POP(c)           SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry)
 #define CURSOR_PUSH(c,p)        SLIST_INSERT_HEAD(&(c)->mc_stack, p, mp_entry)
 
+struct MDB_xcursor;
+
 struct MDB_cursor {
        MDB_txn         *mc_txn;
        struct page_stack        mc_stack;              /* stack of parent pages */
        MDB_dbi         mc_dbi;
        short           mc_initialized; /* 1 if initialized */
        short           mc_eof;         /* 1 if end is reached */
+       struct MDB_xcursor      *mc_xcursor;
 };
 
 #define METAHASHLEN     offsetof(MDB_meta, mm_hash)
@@ -260,6 +263,8 @@ typedef struct MDB_dbx {
        MDB_cmp_func    *md_cmp;                /* user compare function */
        MDB_cmp_func    *md_dcmp;               /* user dupsort function */
        MDB_rel_func    *md_rel;                /* user relocate function */
+       MDB_dbi md_parent;
+       unsigned int    md_dirty;
 } MDB_dbx;
 
 struct MDB_txn {
@@ -279,9 +284,17 @@ struct MDB_txn {
 #define MDB_TXN_RDONLY          0x01           /* read-only transaction */
 #define MDB_TXN_ERROR           0x02           /* an error has occurred */
 #define MDB_TXN_METOGGLE       0x04            /* used meta page 1 */
-       unsigned int             mt_flags;
+       unsigned int    mt_flags;
 };
 
+/* Context for sorted-dup records */
+typedef struct MDB_xcursor {
+       MDB_cursor mx_cursor;
+       MDB_txn mx_txn;
+       MDB_dbx mx_dbxs[4];
+       MDB_db  mx_dbs[4];
+} MDB_xcursor;
+
 struct MDB_env {
        int                     me_fd;
        int                     me_lfd;
@@ -303,6 +316,7 @@ struct MDB_env {
        MDB_db          *me_dbs[2];
        MDB_oldpages *me_pghead;
        pthread_key_t   me_txkey;       /* thread-key for readers */
+       pgno_t          me_free_pgs[MDB_IDL_UM_SIZE];
 };
 
 #define NODESIZE        offsetof(MDB_node, mn_data)
@@ -330,7 +344,7 @@ static int  mdb_search_page(MDB_txn *txn,
 static int  mdbenv_read_header(MDB_env *env, MDB_meta *meta);
 static int  mdbenv_read_meta(MDB_env *env, int *which);
 static int  mdbenv_write_meta(MDB_txn *txn);
-static MDB_page *mdbenv_get_page(MDB_env *env, pgno_t pgno);
+static MDB_page *mdb_get_page(MDB_txn *txn, pgno_t pgno);
 
 static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
                            MDB_val *key, int *exactp, unsigned int *kip);
@@ -340,7 +354,7 @@ static int  mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
 static void mdb_del_node(MDB_page *mp, indx_t indx);
 static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki,
     MDB_pageparent *mpp, MDB_node *leaf);
-static int  mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data);
+static int  mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
 
 static int              mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp);
 static int              mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key);
@@ -362,6 +376,8 @@ static int           mdb_set_key(MDB_node *node, MDB_val *key);
 static int              mdb_sibling(MDB_cursor *cursor, int move_right);
 static int              mdb_cursor_next(MDB_cursor *cursor,
                            MDB_val *key, MDB_val *data);
+static int              mdb_cursor_prev(MDB_cursor *cursor,
+                           MDB_val *key, MDB_val *data);
 static int              mdb_cursor_set(MDB_cursor *cursor,
                            MDB_val *key, MDB_val *data, int *exactp);
 static int              mdb_cursor_first(MDB_cursor *cursor,
@@ -454,7 +470,7 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
                        MDB_val data;
                        pgno_t *idl;
 
-                       mdb_read_data(txn->mt_env, leaf, &data);
+                       mdb_read_data(txn, leaf, &data);
                        idl = (ULONG *)data.mv_data;
                        mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
                        mop->mo_next = txn->mt_env->me_pghead;
@@ -595,12 +611,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
 
                pthread_mutex_lock(&env->me_txns->mt_wmutex);
                env->me_txns->mt_txnid++;
-               txn->mt_free_pgs = malloc(MDB_IDL_UM_SIZEOF);
-               if (txn->mt_free_pgs == NULL) {
-                       free(txn->mt_u.dirty_queue);
-                       free(txn);
-                       return ENOMEM;
-               }
+               txn->mt_free_pgs = env->me_free_pgs;
                txn->mt_free_pgs[0] = 0;
        }
 
@@ -680,13 +691,14 @@ mdb_txn_abort(MDB_txn *txn)
                txn->mt_u.reader->mr_txnid = 0;
        } else {
                MDB_oldpages *mop;
+               unsigned int i;
+
                /* Discard all dirty pages. */
                while (!STAILQ_EMPTY(txn->mt_u.dirty_queue)) {
                        dp = STAILQ_FIRST(txn->mt_u.dirty_queue);
                        STAILQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
                        free(dp);
                }
-               free(txn->mt_free_pgs);
                free(txn->mt_u.dirty_queue);
 
                while ((mop = txn->mt_env->me_pghead)) {
@@ -696,6 +708,8 @@ mdb_txn_abort(MDB_txn *txn)
 
                env->me_txn = NULL;
                env->me_txns->mt_txnid--;
+               for (i=2; i<env->me_numdbs; i++)
+                       env->me_dbxs[i].md_dirty = 0;
                pthread_mutex_unlock(&env->me_txns->mt_wmutex);
        }
 
@@ -795,16 +809,12 @@ mdb_txn_commit(MDB_txn *txn)
                MDB_val data;
                data.mv_size = sizeof(MDB_db);
 
-               for (i = 2; i < env->me_numdbs; i++) {
-                       if (env->me_dbs[env->me_db_toggle][i].md_root != txn->mt_dbs[i].md_root) {
+               for (i = 2; i < txn->mt_numdbs; i++) {
+                       if (txn->mt_dbxs[i].md_dirty) {
                                data.mv_data = &txn->mt_dbs[i];
                                mdb_put(txn, i, &txn->mt_dbxs[i].md_name, &data, 0);
                        }
                }
-               for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
-                       data.mv_data = &txn->mt_dbs[i];
-                       mdb_put(txn, i, &txn->mt_dbxs[i].md_name, &data, 0);
-               }
        }
 
        /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
@@ -884,11 +894,14 @@ mdb_txn_commit(MDB_txn *txn)
        {
                int toggle = !env->me_db_toggle;
 
-               for (i = 0; i < env->me_numdbs; i++) {
-                       if (env->me_dbs[toggle][i].md_root != txn->mt_dbs[i].md_root)
+               for (i = 2; i < env->me_numdbs; i++) {
+                       if (txn->mt_dbxs[i].md_dirty) {
                                env->me_dbs[toggle][i] = txn->mt_dbs[i];
+                               txn->mt_dbxs[i].md_dirty = 0;
+                       }
                }
                for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
+                       txn->mt_dbxs[i].md_dirty = 0;
                        env->me_dbxs[i] = txn->mt_dbxs[i];
                        env->me_dbs[toggle][i] = txn->mt_dbs[i];
                }
@@ -899,7 +912,6 @@ mdb_txn_commit(MDB_txn *txn)
        }
 
        pthread_mutex_unlock(&env->me_txns->mt_wmutex);
-       free(txn->mt_free_pgs);
        free(txn->mt_u.dirty_queue);
        free(txn);
        txn = NULL;
@@ -1445,13 +1457,12 @@ cursor_push_page(MDB_cursor *cursor, MDB_page *mp)
 }
 
 static MDB_page *
-mdbenv_get_page(MDB_env *env, pgno_t pgno)
+mdb_get_page(MDB_txn *txn, pgno_t pgno)
 {
        MDB_page *p = NULL;
-       MDB_txn *txn = env->me_txn;
        int found = 0;
 
-       if (txn && !STAILQ_EMPTY(txn->mt_u.dirty_queue)) {
+       if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && !STAILQ_EMPTY(txn->mt_u.dirty_queue)) {
                MDB_dpage *dp;
                STAILQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
                        if (dp->p.mp_pgno == pgno) {
@@ -1462,7 +1473,7 @@ mdbenv_get_page(MDB_env *env, pgno_t pgno)
                }
        }
        if (!found) {
-               p = (MDB_page *)(env->me_map + env->me_psize * pgno);
+               p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
        }
        return p;
 }
@@ -1511,7 +1522,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
                        CURSOR_TOP(cursor)->mp_ki = i;
 
                mpp->mp_parent = mp;
-               if ((mp = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL)
+               if ((mp = mdb_get_page(txn, NODEPGNO(node))) == NULL)
                        return MDB_FAIL;
                mpp->mp_pi = i;
                mpp->mp_page = mp;
@@ -1571,20 +1582,20 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
                return ENOENT;
        }
 
-       if ((mpp->mp_page = mdbenv_get_page(txn->mt_env, root)) == NULL)
+       if ((mpp->mp_page = mdb_get_page(txn, root)) == NULL)
                return MDB_FAIL;
 
        DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags);
 
        if (modify) {
                /* For sub-databases, update main root first */
-               if (dbi > MAIN_DBI && txn->mt_dbs[dbi].md_root ==
-                       txn->mt_env->me_dbs[txn->mt_env->me_db_toggle][dbi].md_root) {
+               if (dbi > MAIN_DBI && !txn->mt_dbxs[dbi].md_dirty) {
                        MDB_pageparent mp2;
                        rc = mdb_search_page(txn, 0, &txn->mt_dbxs[dbi].md_name,
                                NULL, 1, &mp2);
                        if (rc)
                                return rc;
+                       txn->mt_dbxs[dbi].md_dirty = 1;
                }
                if (!F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) {
                        mpp->mp_parent = NULL;
@@ -1599,7 +1610,7 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
 }
 
 static int
-mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data)
+mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data)
 {
        MDB_page        *omp;           /* overflow mpage */
        pgno_t           pgno;
@@ -1614,7 +1625,7 @@ mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data)
         */
        data->mv_size = leaf->mn_dsize;
        memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
-       if ((omp = mdbenv_get_page(env, pgno)) == NULL) {
+       if ((omp = mdb_get_page(txn, pgno)) == NULL) {
                DPRINTF("read overflow page %lu failed", pgno);
                return MDB_FAIL;
        }
@@ -1644,7 +1655,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi,
 
        leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL);
        if (leaf && exact)
-               rc = mdb_read_data(txn->mt_env, leaf, data);
+               rc = mdb_read_data(txn, leaf, data);
        else {
                rc = ENOENT;
        }
@@ -1687,7 +1698,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right)
        assert(IS_BRANCH(parent->mp_page));
 
        indx = NODEPTR(parent->mp_page, parent->mp_ki);
-       if ((mp = mdbenv_get_page(cursor->mc_txn->mt_env, indx->mn_pgno)) == NULL)
+       if ((mp = mdb_get_page(cursor->mc_txn, indx->mn_pgno)) == NULL)
                return MDB_FAIL;
 #if 0
        mp->parent = parent->mp_page;
@@ -1747,7 +1758,47 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        assert(IS_LEAF(mp));
        leaf = NODEPTR(mp, top->mp_ki);
 
-       if (data && mdb_read_data(cursor->mc_txn->mt_env, leaf, data) != MDB_SUCCESS)
+       if (data && mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS)
+               return MDB_FAIL;
+
+       return mdb_set_key(leaf, key);
+}
+
+static int
+mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
+{
+       MDB_ppage       *top;
+       MDB_page        *mp;
+       MDB_node        *leaf;
+
+       assert(cursor->mc_initialized);
+
+       top = CURSOR_TOP(cursor);
+       mp = top->mp_page;
+
+       DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
+
+       if (top->mp_ki == 0)  {
+               DPRINTF("=====> move to prev sibling page");
+               if (mdb_sibling(cursor, 0) != MDB_SUCCESS) {
+                       return ENOENT;
+               }
+               top = CURSOR_TOP(cursor);
+               mp = top->mp_page;
+               top->mp_ki = NUMKEYS(mp) - 1;
+               DPRINTF("prev page is %lu, key index %u", mp->mp_pgno, top->mp_ki);
+       } else
+               top->mp_ki--;
+
+       cursor->mc_eof = 0;
+
+       DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
+           mp->mp_pgno, NUMKEYS(mp), top->mp_ki);
+
+       assert(IS_LEAF(mp));
+       leaf = NODEPTR(mp, top->mp_ki);
+
+       if (data && mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS)
                return MDB_FAIL;
 
        return mdb_set_key(leaf, key);
@@ -1792,7 +1843,7 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        cursor->mc_initialized = 1;
        cursor->mc_eof = 0;
 
-       if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS)
+       if (data && (rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
                return rc;
 
        rc = mdb_set_key(leaf, key);
@@ -1821,7 +1872,7 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        cursor->mc_initialized = 1;
        cursor->mc_eof = 0;
 
-       if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS)
+       if (data && (rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
                return rc;
 
        return mdb_set_key(leaf, key);
@@ -1831,6 +1882,7 @@ static int
 mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
 {
        int              rc;
+       MDB_ppage       *top;
        MDB_pageparent  mpp;
        MDB_node        *leaf;
        MDB_val lkey;
@@ -1845,9 +1897,12 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
 
        leaf = NODEPTR(mpp.mp_page, NUMKEYS(mpp.mp_page)-1);
        cursor->mc_initialized = 1;
-       cursor->mc_eof = 1;
+       cursor->mc_eof = 0;
 
-       if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS)
+       top = CURSOR_TOP(cursor);
+       top->mp_ki = NUMKEYS(top->mp_page) - 1;
+
+       if (data && (rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
                return rc;
 
        return mdb_set_key(leaf, key);
@@ -1880,6 +1935,14 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
                else
                        rc = mdb_cursor_next(cursor, key, data);
                break;
+       case MDB_PREV:
+               if (!cursor->mc_initialized || cursor->mc_eof) {
+                       while (CURSOR_TOP(cursor) != NULL)
+                               cursor_pop_page(cursor);
+                       rc = mdb_cursor_last(cursor, key, data);
+               } else
+                       rc = mdb_cursor_prev(cursor, key, data);
+               break;
        case MDB_FIRST:
                while (CURSOR_TOP(cursor) != NULL)
                        cursor_pop_page(cursor);
@@ -2089,14 +2152,36 @@ int
 mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
 {
        MDB_cursor      *cursor;
+       size_t size = sizeof(MDB_cursor);
 
-       if (txn == NULL || ret == NULL)
+       if (txn == NULL || ret == NULL || !dbi || dbi >= txn->mt_numdbs)
                return EINVAL;
 
-       if ((cursor = calloc(1, sizeof(*cursor))) != NULL) {
+       if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)
+               size += sizeof(MDB_xcursor);
+
+       if ((cursor = calloc(1, size)) != NULL) {
                SLIST_INIT(&cursor->mc_stack);
                cursor->mc_dbi = dbi;
                cursor->mc_txn = txn;
+               if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
+                       MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1);
+                       cursor->mc_xcursor = mx;
+                       mx->mx_cursor.mc_txn = &mx->mx_txn;
+                       mx->mx_txn = *txn;
+                       mx->mx_txn.mt_dbxs = mx->mx_dbxs;
+                       mx->mx_txn.mt_dbs = mx->mx_dbs;
+                       mx->mx_dbxs[0] = txn->mt_dbxs[0];
+                       mx->mx_dbxs[1] = txn->mt_dbxs[1];
+                       if (dbi > 1) {
+                               mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
+                               mx->mx_txn.mt_numdbs = 4;
+                       } else {
+                               mx->mx_txn.mt_numdbs = 3;
+                       }
+               }
+       } else {
+               return ENOMEM;
        }
 
        *ret = cursor;
@@ -2325,7 +2410,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) {
                        DPRINTF("collapsing root page!");
                        txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
-                       if ((root = mdbenv_get_page(txn->mt_env, txn->mt_dbs[dbi].md_root)) == NULL)
+                       if ((root = mdb_get_page(txn, txn->mt_dbs[dbi].md_root)) == NULL)
                                return MDB_FAIL;
                        txn->mt_dbs[dbi].md_depth--;
                        txn->mt_dbs[dbi].md_branch_pages--;
@@ -2351,7 +2436,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                 */
                DPRINTF("reading right neighbor");
                node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
-               if ((npp.mp_page = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL)
+               if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
                        return MDB_FAIL;
                npp.mp_pi = mpp->mp_pi + 1;
                si = 0;
@@ -2361,7 +2446,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                 */
                DPRINTF("reading left neighbor");
                node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
-               if ((npp.mp_page = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL)
+               if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
                        return MDB_FAIL;
                npp.mp_pi = mpp->mp_pi - 1;
                si = NUMKEYS(npp.mp_page) - 1;
@@ -2448,7 +2533,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                return ENOENT;
        }
 
-       if (data && (rc = mdb_read_data(txn->mt_env, leaf, data)) != MDB_SUCCESS)
+       if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS)
                return rc;
 
        return mdb_del0(txn, dbi, ki, &mpp, leaf);
@@ -2783,6 +2868,8 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
                txn->mt_dbxs[txn->mt_numdbs].md_cmp = NULL;
                txn->mt_dbxs[txn->mt_numdbs].md_dcmp = NULL;
                txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL;
+               txn->mt_dbxs[txn->mt_numdbs].md_parent = MAIN_DBI;
+               txn->mt_dbxs[txn->mt_numdbs].md_dirty = 0;
                memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db));
                *dbi = txn->mt_numdbs;
                txn->mt_numdbs++;