]> git.sur5r.net Git - openldap/blobdiff - libraries/libmdb/mdb.c
Add mdb_del for sorted dups
[openldap] / libraries / libmdb / mdb.c
index 7cb64417a46981bfe13b96c50cb479f9e0c2f2a0..979d9c0df20376aeaf6d01e48bd705c57b1c5e5f 100644 (file)
@@ -74,7 +74,7 @@ typedef ULONG         pgno_t;
 #define MDB_MINKEYS     4
 #define MDB_MAGIC       0xBEEFC0DE
 #define MDB_VERSION     1
-#define MAXKEYSIZE      255
+#define MAXKEYSIZE      511
 
 #define P_INVALID       (~0UL)
 
@@ -621,21 +621,20 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
                if (!r) {
                        unsigned int i;
                        pthread_mutex_lock(&env->me_txns->mt_mutex);
-                       for (i=0; i<env->me_maxreaders; i++) {
-                               if (env->me_txns->mt_readers[i].mr_pid == 0) {
-                                       env->me_txns->mt_readers[i].mr_pid = getpid();
-                                       env->me_txns->mt_readers[i].mr_tid = pthread_self();
-                                       r = &env->me_txns->mt_readers[i];
-                                       pthread_setspecific(env->me_txkey, r);
-                                       if (i >= env->me_txns->mt_numreaders)
-                                               env->me_txns->mt_numreaders = i+1;
+                       for (i=0; i<env->me_txns->mt_numreaders; i++)
+                               if (env->me_txns->mt_readers[i].mr_pid == 0)
                                        break;
-                               }
-                       }
-                       pthread_mutex_unlock(&env->me_txns->mt_mutex);
                        if (i == env->me_maxreaders) {
+                               pthread_mutex_unlock(&env->me_txns->mti_mutex);
                                return ENOSPC;
                        }
+                       env->me_txns->mt_readers[i].mr_pid = getpid();
+                       env->me_txns->mt_readers[i].mr_tid = pthread_self();
+                       r = &env->me_txns->mt_readers[i];
+                       pthread_setspecific(env->me_txkey, r);
+                       if (i >= env->me_txns->mt_numreaders)
+                               env->me_txns->mt_numreaders = i+1;
+                       pthread_mutex_unlock(&env->me_txns->mt_mutex);
                }
                r->mr_txnid = txn->mt_txnid;
                txn->mt_u.reader = r;
@@ -2148,6 +2147,56 @@ mdb_del_node(MDB_page *mp, indx_t indx)
        mp->mp_upper += sz;
 }
 
+static void
+mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
+{
+       MDB_dbi dbn;
+
+       mx->mx_cursor.mc_txn = &mx->mx_txn;
+       mx->mx_txn = *txn;
+       mx->mx_txn.mt_dbxs = mx->mx_dbxs;
+       mx->mx_txn.mt_dbs = mx->mx_dbs;
+       mx->mx_dbxs[0] = txn->mt_dbxs[0];
+       mx->mx_dbxs[1] = txn->mt_dbxs[1];
+       if (dbi > 1) {
+               mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
+               dbn = 2;
+       } else {
+               dbn = 1;
+       }
+       mx->mx_dbxs[dbn+1].md_parent = dbn;
+       mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp;
+       mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel;
+       mx->mx_dbxs[dbn+1].md_dirty = 0;
+       mx->mx_txn.mt_numdbs = dbn+2;
+}
+
+static void
+mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_db *db)
+{
+       mx->mx_dbs[0] = txn->mt_dbs[0];
+       mx->mx_dbs[1] = txn->mt_dbs[1];
+       if (dbi > 1) {
+               mx->mx_dbs[2] = txn->mt_dbs[dbi];
+               mx->mx_dbs[3] = *db;
+       } else {
+               mx->mx_dbs[2] = *db;
+       }
+}
+
+static void
+mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
+{
+       txn->mt_dbs[0] = mx->mx_dbs[0];
+       txn->mt_dbs[1] = mx->mx_dbs[1];
+       txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty;
+       txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty;
+       if (dbi > 1) {
+               txn->mt_dbs[dbi] = mx->mx_dbs[2];
+               txn->mt_dbxs[2].md_dirty = mx->mx_dbxs[2].md_dirty;
+       }
+}
+
 int
 mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
 {
@@ -2167,18 +2216,7 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
                if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
                        MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1);
                        cursor->mc_xcursor = mx;
-                       mx->mx_cursor.mc_txn = &mx->mx_txn;
-                       mx->mx_txn = *txn;
-                       mx->mx_txn.mt_dbxs = mx->mx_dbxs;
-                       mx->mx_txn.mt_dbs = mx->mx_dbs;
-                       mx->mx_dbxs[0] = txn->mt_dbxs[0];
-                       mx->mx_dbxs[1] = txn->mt_dbxs[1];
-                       if (dbi > 1) {
-                               mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
-                               mx->mx_txn.mt_numdbs = 4;
-                       } else {
-                               mx->mx_txn.mt_numdbs = 3;
-                       }
+                       mdb_xcursor_init0(txn, dbi, mx);
                }
        } else {
                return ENOMEM;
@@ -2536,6 +2574,47 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
        if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS)
                return rc;
 
+       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+       /* add all the child DB's pages to the free list */
+               MDB_cursor mc;
+               MDB_xcursor mx;
+               MDB_pageparent mp2;
+
+               mdb_xcursor_init0(txn, dbi, &mx);
+               mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+               SLIST_INIT(&mc.mc_stack);
+               mc.mc_dbi = mx.mx_txn.mt_numdbs-1;
+               mc.mc_txn = &mx.mx_txn;
+               rc = mdb_search_page(&mx.mx_txn, mx.mx_txn.mt_numdbs - 1, NULL, &mc, 0, &mp2);
+               if (rc == MDB_SUCCESS) {
+                       MDB_ppage *top, *parent;
+                       MDB_node *ni;
+                       unsigned int i;
+
+                       cursor_pop_page(&mc);
+                       top = CURSOR_TOP(&mc);
+                       parent = SLIST_NEXT(top, mp_entry);
+                       do {
+                               for (i=0; i<NUMKEYS(top->mp_page); i++) {
+                                       ni = NODEPTR(top->mp_page, i);
+                                       mdb_idl_insert(txn->mt_free_pgs, ni->mn_pgno);
+                               }
+                               if (parent) {
+                                       parent->mp_ki++;
+                                       if (parent->mp_ki >= NUMKEYS(parent->mp_page)) {
+                                               cursor_pop_page(&mc);
+                                               top = CURSOR_TOP(&mc);
+                                               parent = SLIST_NEXT(top, mp_entry);
+                                       } else {
+                                               ni = NODEPTR(parent->mp_page, parent->mp_ki);
+                                               top = mdb_get_page(mc.mc_txn, ni->mn_pgno);
+                                       }
+                               }
+                       }
+                       mdb_idl_insert(txn->mt_free_pgs, mx.mx_txn.mt_dbs[mc.mc_dbi].md_root);
+               }
+       }
+
        return mdb_del0(txn, dbi, ki, &mpp, leaf);
 }
 
@@ -2704,6 +2783,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        unsigned int     ki;
        MDB_node        *leaf;
        MDB_pageparent  mpp;
+       MDB_val xdata, *rdata;
+       MDB_db dummy;
 
        assert(key != NULL);
        assert(data != NULL);
@@ -2732,6 +2813,9 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        if (rc == MDB_SUCCESS) {
                leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
                if (leaf && exact) {
+                       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                               goto put_sub;
+                       }
                        if (F_ISSET(flags, MDB_NOOVERWRITE)) {
                                DPRINTF("duplicate key %.*s",
                                    (int)key->mv_size, (char *)key->mv_data);
@@ -2767,6 +2851,20 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        DPRINTF("there are %u keys, should insert new key at index %i",
                NUMKEYS(mpp.mp_page), ki);
 
+       /* For sorted dups, the data item at this level is a DB record
+        * for a child DB; the actual data elements are stored as keys
+        * in the child DB.
+        */
+       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+               rdata = &xdata;
+               xdata.mv_size = sizeof(MDB_db);
+               xdata.mv_data = &dummy;
+               memset(&dummy, 0, sizeof(dummy));
+               dummy.md_root = P_INVALID;
+       } else {
+               rdata = data;
+       }
+
        if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) {
                rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID);
        } else {
@@ -2776,8 +2874,26 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
 
        if (rc != MDB_SUCCESS)
                txn->mt_flags |= MDB_TXN_ERROR;
-       else
+       else {
                txn->mt_dbs[dbi].md_entries++;
+               /* Now store the actual data in the child DB. Note that we're
+                * storing the user data in the keys field, so there are strict
+                * size limits on dupdata. The actual data fields of the child
+                * DB are all zero size.
+                */
+               if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                       MDB_xcursor mx;
+
+                       leaf = NODEPTR(mpp.mp_page, ki);
+put_sub:
+                       mdb_xcursor_init0(txn, dbi, &mx);
+                       mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+                       xdata.mv_size = 0;
+                       xdata.mv_data = "";
+                       rc = mdb_put(&mx.mx_txn, mx.mx_txn.mt_numdbs-1, data, &xdata, flags);
+                       mdb_xcursor_fini(txn, dbi, &mx);
+               }
+       }
 
 done:
        return rc;