]> git.sur5r.net Git - openldap/commitdiff
Add mdb_put for sorted dups
authorHoward Chu <hyc@symas.com>
Thu, 11 Aug 2011 02:26:06 +0000 (19:26 -0700)
committerHoward Chu <hyc@symas.com>
Thu, 1 Sep 2011 23:17:07 +0000 (16:17 -0700)
libraries/libmdb/mdb.c

index 545c7632db6cf7c96bc0211f6834049ae55fa841..319c1fc6c5a85a5327928b0e0baa3d05b22af061 100644 (file)
@@ -74,7 +74,7 @@ typedef ULONG         pgno_t;
 #define MDB_MINKEYS     4
 #define MDB_MAGIC       0xBEEFC0DE
 #define MDB_VERSION     1
-#define MAXKEYSIZE      255
+#define MAXKEYSIZE      511
 
 #define P_INVALID       (~0UL)
 
@@ -2147,6 +2147,56 @@ mdb_del_node(MDB_page *mp, indx_t indx)
        mp->mp_upper += sz;
 }
 
+static void
+mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
+{
+       MDB_dbi dbn;
+
+       mx->mx_cursor.mc_txn = &mx->mx_txn;
+       mx->mx_txn = *txn;
+       mx->mx_txn.mt_dbxs = mx->mx_dbxs;
+       mx->mx_txn.mt_dbs = mx->mx_dbs;
+       mx->mx_dbxs[0] = txn->mt_dbxs[0];
+       mx->mx_dbxs[1] = txn->mt_dbxs[1];
+       if (dbi > 1) {
+               mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
+               dbn = 2;
+       } else {
+               dbn = 1;
+       }
+       mx->mx_dbxs[dbn+1].md_parent = dbn;
+       mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp;
+       mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel;
+       mx->mx_dbxs[dbn+1].md_dirty = 0;
+       mx->mx_txn.mt_numdbs = dbn+2;
+}
+
+static void
+mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_db *db)
+{
+       mx->mx_dbs[0] = txn->mt_dbs[0];
+       mx->mx_dbs[1] = txn->mt_dbs[1];
+       if (dbi > 1) {
+               mx->mx_dbs[2] = txn->mt_dbs[dbi];
+               mx->mx_dbs[3] = *db;
+       } else {
+               mx->mx_dbs[2] = *db;
+       }
+}
+
+static void
+mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
+{
+       txn->mt_dbs[0] = mx->mx_dbs[0];
+       txn->mt_dbs[1] = mx->mx_dbs[1];
+       txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty;
+       txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty;
+       if (dbi > 1) {
+               txn->mt_dbs[dbi] = mx->mx_dbs[2];
+               txn->mt_dbxs[2].md_dirty = mx->mx_dbxs[2].md_dirty;
+       }
+}
+
 int
 mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
 {
@@ -2166,18 +2216,7 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
                if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
                        MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1);
                        cursor->mc_xcursor = mx;
-                       mx->mx_cursor.mc_txn = &mx->mx_txn;
-                       mx->mx_txn = *txn;
-                       mx->mx_txn.mt_dbxs = mx->mx_dbxs;
-                       mx->mx_txn.mt_dbs = mx->mx_dbs;
-                       mx->mx_dbxs[0] = txn->mt_dbxs[0];
-                       mx->mx_dbxs[1] = txn->mt_dbxs[1];
-                       if (dbi > 1) {
-                               mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
-                               mx->mx_txn.mt_numdbs = 4;
-                       } else {
-                               mx->mx_txn.mt_numdbs = 3;
-                       }
+                       mdb_xcursor_init0(txn, dbi, mx);
                }
        } else {
                return ENOMEM;
@@ -2535,6 +2574,10 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
        if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS)
                return rc;
 
+       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+       /* add all the child DB's pages to the free list */
+       }
+
        return mdb_del0(txn, dbi, ki, &mpp, leaf);
 }
 
@@ -2703,6 +2746,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        unsigned int     ki;
        MDB_node        *leaf;
        MDB_pageparent  mpp;
+       MDB_val xdata, *rdata;
+       MDB_db dummy;
 
        assert(key != NULL);
        assert(data != NULL);
@@ -2731,6 +2776,9 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        if (rc == MDB_SUCCESS) {
                leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
                if (leaf && exact) {
+                       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                               goto put_sub;
+                       }
                        if (F_ISSET(flags, MDB_NOOVERWRITE)) {
                                DPRINTF("duplicate key %.*s",
                                    (int)key->mv_size, (char *)key->mv_data);
@@ -2766,6 +2814,20 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        DPRINTF("there are %u keys, should insert new key at index %i",
                NUMKEYS(mpp.mp_page), ki);
 
+       /* For sorted dups, the data item at this level is a DB record
+        * for a child DB; the actual data elements are stored as keys
+        * in the child DB.
+        */
+       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+               rdata = &xdata;
+               xdata.mv_size = sizeof(MDB_db);
+               xdata.mv_data = &dummy;
+               memset(&dummy, 0, sizeof(dummy));
+               dummy.md_root = P_INVALID;
+       } else {
+               rdata = data;
+       }
+
        if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) {
                rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID);
        } else {
@@ -2775,8 +2837,26 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
 
        if (rc != MDB_SUCCESS)
                txn->mt_flags |= MDB_TXN_ERROR;
-       else
+       else {
                txn->mt_dbs[dbi].md_entries++;
+               /* Now store the actual data in the child DB. Note that we're
+                * storing the user data in the keys field, so there are strict
+                * size limits on dupdata. The actual data fields of the child
+                * DB are all zero size.
+                */
+               if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                       MDB_xcursor mx;
+
+                       leaf = NODEPTR(mpp.mp_page, ki);
+put_sub:
+                       mdb_xcursor_init0(txn, dbi, &mx);
+                       mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+                       xdata.mv_size = 0;
+                       xdata.mv_data = "";
+                       rc = mdb_put(&mx.mx_txn, mx.mx_txn.mt_numdbs-1, data, &xdata, flags);
+                       mdb_xcursor_fini(txn, dbi, &mx);
+               }
+       }
 
 done:
        return rc;