]> git.sur5r.net Git - openldap/blobdiff - libraries/libmdb/mdb.c
Add mdb_del for sorted dups
[openldap] / libraries / libmdb / mdb.c
index 545c7632db6cf7c96bc0211f6834049ae55fa841..979d9c0df20376aeaf6d01e48bd705c57b1c5e5f 100644 (file)
@@ -74,7 +74,7 @@ typedef ULONG         pgno_t;
 #define MDB_MINKEYS     4
 #define MDB_MAGIC       0xBEEFC0DE
 #define MDB_VERSION     1
-#define MAXKEYSIZE      255
+#define MAXKEYSIZE      511
 
 #define P_INVALID       (~0UL)
 
@@ -2147,6 +2147,56 @@ mdb_del_node(MDB_page *mp, indx_t indx)
        mp->mp_upper += sz;
 }
 
+static void
+mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
+{
+       MDB_dbi dbn;
+
+       mx->mx_cursor.mc_txn = &mx->mx_txn;
+       mx->mx_txn = *txn;
+       mx->mx_txn.mt_dbxs = mx->mx_dbxs;
+       mx->mx_txn.mt_dbs = mx->mx_dbs;
+       mx->mx_dbxs[0] = txn->mt_dbxs[0];
+       mx->mx_dbxs[1] = txn->mt_dbxs[1];
+       if (dbi > 1) {
+               mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
+               dbn = 2;
+       } else {
+               dbn = 1;
+       }
+       mx->mx_dbxs[dbn+1].md_parent = dbn;
+       mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp;
+       mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel;
+       mx->mx_dbxs[dbn+1].md_dirty = 0;
+       mx->mx_txn.mt_numdbs = dbn+2;
+}
+
+static void
+mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_db *db)
+{
+       mx->mx_dbs[0] = txn->mt_dbs[0];
+       mx->mx_dbs[1] = txn->mt_dbs[1];
+       if (dbi > 1) {
+               mx->mx_dbs[2] = txn->mt_dbs[dbi];
+               mx->mx_dbs[3] = *db;
+       } else {
+               mx->mx_dbs[2] = *db;
+       }
+}
+
+static void
+mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
+{
+       txn->mt_dbs[0] = mx->mx_dbs[0];
+       txn->mt_dbs[1] = mx->mx_dbs[1];
+       txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty;
+       txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty;
+       if (dbi > 1) {
+               txn->mt_dbs[dbi] = mx->mx_dbs[2];
+               txn->mt_dbxs[2].md_dirty = mx->mx_dbxs[2].md_dirty;
+       }
+}
+
 int
 mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
 {
@@ -2166,18 +2216,7 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
                if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
                        MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1);
                        cursor->mc_xcursor = mx;
-                       mx->mx_cursor.mc_txn = &mx->mx_txn;
-                       mx->mx_txn = *txn;
-                       mx->mx_txn.mt_dbxs = mx->mx_dbxs;
-                       mx->mx_txn.mt_dbs = mx->mx_dbs;
-                       mx->mx_dbxs[0] = txn->mt_dbxs[0];
-                       mx->mx_dbxs[1] = txn->mt_dbxs[1];
-                       if (dbi > 1) {
-                               mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
-                               mx->mx_txn.mt_numdbs = 4;
-                       } else {
-                               mx->mx_txn.mt_numdbs = 3;
-                       }
+                       mdb_xcursor_init0(txn, dbi, mx);
                }
        } else {
                return ENOMEM;
@@ -2535,6 +2574,47 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
        if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS)
                return rc;
 
+       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+       /* add all the child DB's pages to the free list */
+               MDB_cursor mc;
+               MDB_xcursor mx;
+               MDB_pageparent mp2;
+
+               mdb_xcursor_init0(txn, dbi, &mx);
+               mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+               SLIST_INIT(&mc.mc_stack);
+               mc.mc_dbi = mx.mx_txn.mt_numdbs-1;
+               mc.mc_txn = &mx.mx_txn;
+               rc = mdb_search_page(&mx.mx_txn, mx.mx_txn.mt_numdbs - 1, NULL, &mc, 0, &mp2);
+               if (rc == MDB_SUCCESS) {
+                       MDB_ppage *top, *parent;
+                       MDB_node *ni;
+                       unsigned int i;
+
+                       cursor_pop_page(&mc);
+                       top = CURSOR_TOP(&mc);
+                       parent = SLIST_NEXT(top, mp_entry);
+                       do {
+                               for (i=0; i<NUMKEYS(top->mp_page); i++) {
+                                       ni = NODEPTR(top->mp_page, i);
+                                       mdb_idl_insert(txn->mt_free_pgs, ni->mn_pgno);
+                               }
+                               if (parent) {
+                                       parent->mp_ki++;
+                                       if (parent->mp_ki >= NUMKEYS(parent->mp_page)) {
+                                               cursor_pop_page(&mc);
+                                               top = CURSOR_TOP(&mc);
+                                               parent = SLIST_NEXT(top, mp_entry);
+                                       } else {
+                                               ni = NODEPTR(parent->mp_page, parent->mp_ki);
+                                               top = mdb_get_page(mc.mc_txn, ni->mn_pgno);
+                                       }
+                               }
+                       }
+                       mdb_idl_insert(txn->mt_free_pgs, mx.mx_txn.mt_dbs[mc.mc_dbi].md_root);
+               }
+       }
+
        return mdb_del0(txn, dbi, ki, &mpp, leaf);
 }
 
@@ -2703,6 +2783,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        unsigned int     ki;
        MDB_node        *leaf;
        MDB_pageparent  mpp;
+       MDB_val xdata, *rdata;
+       MDB_db dummy;
 
        assert(key != NULL);
        assert(data != NULL);
@@ -2731,6 +2813,9 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        if (rc == MDB_SUCCESS) {
                leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
                if (leaf && exact) {
+                       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                               goto put_sub;
+                       }
                        if (F_ISSET(flags, MDB_NOOVERWRITE)) {
                                DPRINTF("duplicate key %.*s",
                                    (int)key->mv_size, (char *)key->mv_data);
@@ -2766,6 +2851,20 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        DPRINTF("there are %u keys, should insert new key at index %i",
                NUMKEYS(mpp.mp_page), ki);
 
+       /* For sorted dups, the data item at this level is a DB record
+        * for a child DB; the actual data elements are stored as keys
+        * in the child DB.
+        */
+       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+               rdata = &xdata;
+               xdata.mv_size = sizeof(MDB_db);
+               xdata.mv_data = &dummy;
+               memset(&dummy, 0, sizeof(dummy));
+               dummy.md_root = P_INVALID;
+       } else {
+               rdata = data;
+       }
+
        if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) {
                rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID);
        } else {
@@ -2775,8 +2874,26 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
 
        if (rc != MDB_SUCCESS)
                txn->mt_flags |= MDB_TXN_ERROR;
-       else
+       else {
                txn->mt_dbs[dbi].md_entries++;
+               /* Now store the actual data in the child DB. Note that we're
+                * storing the user data in the keys field, so there are strict
+                * size limits on dupdata. The actual data fields of the child
+                * DB are all zero size.
+                */
+               if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                       MDB_xcursor mx;
+
+                       leaf = NODEPTR(mpp.mp_page, ki);
+put_sub:
+                       mdb_xcursor_init0(txn, dbi, &mx);
+                       mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+                       xdata.mv_size = 0;
+                       xdata.mv_data = "";
+                       rc = mdb_put(&mx.mx_txn, mx.mx_txn.mt_numdbs-1, data, &xdata, flags);
+                       mdb_xcursor_fini(txn, dbi, &mx);
+               }
+       }
 
 done:
        return rc;