]> git.sur5r.net Git - openldap/blobdiff - libraries/libmdb/mdb.c
Add explicit md_dirty flag to dbxs
[openldap] / libraries / libmdb / mdb.c
index 2e914d8342703e6add595515c438ab6eecabc791..474119f723e5ee20524d808ca615e5092417ebb0 100644 (file)
@@ -1,3 +1,32 @@
+/* mdb.c - memory-mapped database library */
+/*
+ * Copyright 2011 Howard Chu, Symas Corp.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * <http://www.OpenLDAP.org/license.html>.
+ *
+ * This code is derived from btree.c written by Martin Hedenfalk.
+ *
+ * Copyright (c) 2009, 2010 Martin Hedenfalk <martin@bzero.se>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <sys/queue.h>
@@ -172,7 +201,7 @@ typedef struct MDB_dpage {
        MDB_page        p;
 } MDB_dpage;
 
-STAILQ_HEAD(dirty_queue, MDB_dpage);
+STAILQ_HEAD(dirty_queue, MDB_dpage);   /* FIXME: use a sorted data structure */
 
 typedef struct MDB_oldpages {
        struct MDB_oldpages *mo_next;
@@ -201,6 +230,9 @@ SLIST_HEAD(page_stack, MDB_ppage);
 #define CURSOR_POP(c)           SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry)
 #define CURSOR_PUSH(c,p)        SLIST_INSERT_HEAD(&(c)->mc_stack, p, mp_entry)
 
+struct MDB_fakeenv {
+};
+
 struct MDB_cursor {
        MDB_txn         *mc_txn;
        struct page_stack        mc_stack;              /* stack of parent pages */
@@ -231,6 +263,8 @@ typedef struct MDB_dbx {
        MDB_cmp_func    *md_cmp;                /* user compare function */
        MDB_cmp_func    *md_dcmp;               /* user dupsort function */
        MDB_rel_func    *md_rel;                /* user relocate function */
+       MDB_dbi md_parent;
+       unsigned int    md_dirty;
 } MDB_dbx;
 
 struct MDB_txn {
@@ -333,6 +367,8 @@ static int           mdb_set_key(MDB_node *node, MDB_val *key);
 static int              mdb_sibling(MDB_cursor *cursor, int move_right);
 static int              mdb_cursor_next(MDB_cursor *cursor,
                            MDB_val *key, MDB_val *data);
+static int              mdb_cursor_prev(MDB_cursor *cursor,
+                           MDB_val *key, MDB_val *data);
 static int              mdb_cursor_set(MDB_cursor *cursor,
                            MDB_val *key, MDB_val *data, int *exactp);
 static int              mdb_cursor_first(MDB_cursor *cursor,
@@ -433,6 +469,16 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
                        txn->mt_env->me_pghead = mop;
                        memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
 
+#if DEBUG > 1
+                       {
+                               unsigned int i;
+                               DPRINTF("IDL read txn %lu root %lu num %lu",
+                                       mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
+                               for (i=0; i<idl[0]; i++) {
+                                       DPRINTF("IDL %lu", idl[i+1]);
+                               }
+                       }
+#endif
                        /* drop this IDL from the DB */
                        mpp.mp_parent = NULL;
                        mpp.mp_pi = 0;
@@ -640,6 +686,9 @@ mdb_txn_abort(MDB_txn *txn)
        if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
                txn->mt_u.reader->mr_txnid = 0;
        } else {
+               MDB_oldpages *mop;
+               unsigned int i;
+
                /* Discard all dirty pages. */
                while (!STAILQ_EMPTY(txn->mt_u.dirty_queue)) {
                        dp = STAILQ_FIRST(txn->mt_u.dirty_queue);
@@ -648,8 +697,16 @@ mdb_txn_abort(MDB_txn *txn)
                }
                free(txn->mt_free_pgs);
                free(txn->mt_u.dirty_queue);
+
+               while ((mop = txn->mt_env->me_pghead)) {
+                       txn->mt_env->me_pghead = mop->mo_next;
+                       free(mop);
+               }
+
                env->me_txn = NULL;
                env->me_txns->mt_txnid--;
+               for (i=2; i<env->me_numdbs; i++)
+                       env->me_dbxs[i].md_dirty = 0;
                pthread_mutex_unlock(&env->me_txns->mt_wmutex);
        }
 
@@ -723,6 +780,17 @@ mdb_txn_commit(MDB_txn *txn)
                mpp.mp_pi = 0;
                mdb_search_page(txn, FREE_DBI, &key, NULL, 1, &mpp);
 
+#if DEBUG > 1
+               {
+                       unsigned int i;
+                       ULONG *idl = txn->mt_free_pgs;
+                       DPRINTF("IDL write txn %lu root %lu num %lu",
+                               txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
+                       for (i=0; i<idl[0]; i++) {
+                               DPRINTF("IDL %lu", idl[i+1]);
+                       }
+               }
+#endif
                /* write to last page of freeDB */
                key.mv_size = sizeof(pgno_t);
                key.mv_data = (char *)&txn->mt_txnid;
@@ -738,16 +806,12 @@ mdb_txn_commit(MDB_txn *txn)
                MDB_val data;
                data.mv_size = sizeof(MDB_db);
 
-               for (i = 2; i < env->me_numdbs; i++) {
-                       if (env->me_dbs[env->me_db_toggle][i].md_root != txn->mt_dbs[i].md_root) {
+               for (i = 2; i < txn->mt_numdbs; i++) {
+                       if (txn->mt_dbxs[i].md_dirty) {
                                data.mv_data = &txn->mt_dbs[i];
                                mdb_put(txn, i, &txn->mt_dbxs[i].md_name, &data, 0);
                        }
                }
-               for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
-                       data.mv_data = &txn->mt_dbs[i];
-                       mdb_put(txn, i, &txn->mt_dbxs[i].md_name, &data, 0);
-               }
        }
 
        /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
@@ -827,11 +891,14 @@ mdb_txn_commit(MDB_txn *txn)
        {
                int toggle = !env->me_db_toggle;
 
-               for (i = 0; i < env->me_numdbs; i++) {
-                       if (env->me_dbs[toggle][i].md_root != txn->mt_dbs[i].md_root)
+               for (i = 2; i < env->me_numdbs; i++) {
+                       if (txn->mt_dbxs[i].md_dirty) {
                                env->me_dbs[toggle][i] = txn->mt_dbs[i];
+                               txn->mt_dbxs[i].md_dirty = 0;
+                       }
                }
                for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
+                       txn->mt_dbxs[i].md_dirty = 0;
                        env->me_dbxs[i] = txn->mt_dbxs[i];
                        env->me_dbs[toggle][i] = txn->mt_dbs[i];
                }
@@ -1521,13 +1588,13 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
 
        if (modify) {
                /* For sub-databases, update main root first */
-               if (dbi > MAIN_DBI && txn->mt_dbs[dbi].md_root ==
-                       txn->mt_env->me_dbs[txn->mt_env->me_db_toggle][dbi].md_root) {
+               if (dbi > MAIN_DBI && !txn->mt_dbxs[dbi].md_dirty) {
                        MDB_pageparent mp2;
                        rc = mdb_search_page(txn, 0, &txn->mt_dbxs[dbi].md_name,
                                NULL, 1, &mp2);
                        if (rc)
                                return rc;
+                       txn->mt_dbxs[dbi].md_dirty = 1;
                }
                if (!F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) {
                        mpp->mp_parent = NULL;
@@ -1696,6 +1763,46 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        return mdb_set_key(leaf, key);
 }
 
+static int
+mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
+{
+       MDB_ppage       *top;
+       MDB_page        *mp;
+       MDB_node        *leaf;
+
+       assert(cursor->mc_initialized);
+
+       top = CURSOR_TOP(cursor);
+       mp = top->mp_page;
+
+       DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
+
+       if (top->mp_ki == 0)  {
+               DPRINTF("=====> move to prev sibling page");
+               if (mdb_sibling(cursor, 0) != MDB_SUCCESS) {
+                       return ENOENT;
+               }
+               top = CURSOR_TOP(cursor);
+               mp = top->mp_page;
+               top->mp_ki = NUMKEYS(mp) - 1;
+               DPRINTF("prev page is %lu, key index %u", mp->mp_pgno, top->mp_ki);
+       } else
+               top->mp_ki--;
+
+       cursor->mc_eof = 0;
+
+       DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
+           mp->mp_pgno, NUMKEYS(mp), top->mp_ki);
+
+       assert(IS_LEAF(mp));
+       leaf = NODEPTR(mp, top->mp_ki);
+
+       if (data && mdb_read_data(cursor->mc_txn->mt_env, leaf, data) != MDB_SUCCESS)
+               return MDB_FAIL;
+
+       return mdb_set_key(leaf, key);
+}
+
 static int
 mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
     int *exactp)
@@ -1717,7 +1824,7 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        top = CURSOR_TOP(cursor);
        leaf = mdb_search_node(cursor->mc_txn, cursor->mc_dbi, mpp.mp_page, key, exactp, &top->mp_ki);
        if (exactp != NULL && !*exactp) {
-               /* MDB_CURSOR_EXACT specified and not an exact match. */
+               /* MDB_SET specified and not an exact match. */
                return ENOENT;
        }
 
@@ -1774,6 +1881,7 @@ static int
 mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
 {
        int              rc;
+       MDB_ppage       *top;
        MDB_pageparent  mpp;
        MDB_node        *leaf;
        MDB_val lkey;
@@ -1788,7 +1896,10 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
 
        leaf = NODEPTR(mpp.mp_page, NUMKEYS(mpp.mp_page)-1);
        cursor->mc_initialized = 1;
-       cursor->mc_eof = 1;
+       cursor->mc_eof = 0;
+
+       top = CURSOR_TOP(cursor);
+       top->mp_ki = NUMKEYS(top->mp_page) - 1;
 
        if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS)
                return rc;
@@ -1806,13 +1917,13 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        assert(cursor);
 
        switch (op) {
-       case MDB_CURSOR:
-       case MDB_CURSOR_EXACT:
+       case MDB_SET:
+       case MDB_SET_RANGE:
                while (CURSOR_TOP(cursor) != NULL)
                        cursor_pop_page(cursor);
                if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
                        rc = EINVAL;
-               } else if (op == MDB_CURSOR_EXACT)
+               } else if (op == MDB_SET)
                        rc = mdb_cursor_set(cursor, key, data, &exact);
                else
                        rc = mdb_cursor_set(cursor, key, data, NULL);
@@ -1823,6 +1934,14 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
                else
                        rc = mdb_cursor_next(cursor, key, data);
                break;
+       case MDB_PREV:
+               if (!cursor->mc_initialized || cursor->mc_eof) {
+                       while (CURSOR_TOP(cursor) != NULL)
+                               cursor_pop_page(cursor);
+                       rc = mdb_cursor_last(cursor, key, data);
+               } else
+                       rc = mdb_cursor_prev(cursor, key, data);
+               break;
        case MDB_FIRST:
                while (CURSOR_TOP(cursor) != NULL)
                        cursor_pop_page(cursor);
@@ -2335,7 +2454,6 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_no
 {
        int rc;
 
-       mdb_del_node(mpp->mp_page, ki);
        /* add overflow pages to free list */
        if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
                int i, ovpages;
@@ -2344,10 +2462,12 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_no
                memcpy(&pg, NODEDATA(leaf), sizeof(pg));
                ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_psize);
                for (i=0; i<ovpages; i++) {
+                       DPRINTF("freed ov page %lu", pg);
                        mdb_idl_insert(txn->mt_free_pgs, pg);
                        pg++;
                }
        }
+       mdb_del_node(mpp->mp_page, ki);
        txn->mt_dbs[dbi].md_entries--;
        rc = mdb_rebalance(txn, dbi, mpp);
        if (rc != MDB_SUCCESS)
@@ -2725,6 +2845,8 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
                txn->mt_dbxs[txn->mt_numdbs].md_cmp = NULL;
                txn->mt_dbxs[txn->mt_numdbs].md_dcmp = NULL;
                txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL;
+               txn->mt_dbxs[txn->mt_numdbs].md_parent = MAIN_DBI;
+               txn->mt_dbxs[txn->mt_numdbs].md_dirty = 0;
                memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db));
                *dbi = txn->mt_numdbs;
                txn->mt_numdbs++;