]> git.sur5r.net Git - openldap/blobdiff - libraries/liblmdb/mdb.c
Simplify last commits
[openldap] / libraries / liblmdb / mdb.c
index 9b8b675279e14e618908a97f8f5aa57b951766c4..640419c6e746cd9495054977e9bc5639b4c950f4 100644 (file)
@@ -392,7 +392,7 @@ static txnid_t mdb_debug_start;
         */
 #define        DKEY(x) mdb_dkey(x, kbuf)
 #else
-#define        DKBUF   typedef int dummy_kbuf  /* so we can put ';' after */
+#define        DKBUF
 #define DKEY(x)        0
 #endif
 
@@ -756,9 +756,12 @@ typedef struct MDB_node {
         */
 #define LEAF2KEY(p, i, ks)     ((char *)(p) + PAGEHDRSZ + ((i)*(ks)))
 
-       /** Set the \b node's key into \b key, if requested. */
-#define MDB_GET_KEY(node, key) { if ((key) != NULL) { \
-       (key)->mv_size = NODEKSZ(node); (key)->mv_data = NODEKEY(node); } }
+       /** Set the \b node's key into \b keyptr, if requested. */
+#define MDB_GET_KEY(node, keyptr)      { if ((keyptr) != NULL) { \
+       (keyptr)->mv_size = NODEKSZ(node); (keyptr)->mv_data = NODEKEY(node); } }
+
+       /** Set the \b node's key into \b key. */
+#define MDB_GET_KEY2(node, key)        { key.mv_size = NODEKSZ(node); key.mv_data = NODEKEY(node); }
 
        /** Information about a single database in the environment. */
 typedef struct MDB_db {
@@ -1351,19 +1354,24 @@ mdb_dlist_free(MDB_txn *txn)
        dl[0].mid = 0;
 }
 
-/* Set or clear P_KEEP in non-overflow, non-sub pages in this txn's cursors.
+/* Set or clear P_KEEP in dirty, non-overflow, non-sub pages watched by txn.
  * @param[in] mc A cursor handle for the current operation.
  * @param[in] pflags Flags of the pages to update:
  * P_DIRTY to set P_KEEP, P_DIRTY|P_KEEP to clear it.
+ * @param[in] all No shortcuts. Needed except after a full #mdb_page_flush().
+ * @return 0 on success, non-zero on failure.
  */
-static void
-mdb_cursorpages_mark(MDB_cursor *mc, unsigned pflags)
+static int
+mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all)
 {
        MDB_txn *txn = mc->mc_txn;
        MDB_cursor *m3;
        MDB_xcursor *mx;
+       MDB_page *dp;
        unsigned i, j;
+       int rc = MDB_SUCCESS, level;
 
+       /* Mark pages seen by cursors */
        if (mc->mc_flags & C_UNTRACK)
                mc = NULL;                              /* will find mc in mt_cursors */
        for (i = txn->mt_numdbs;; mc = txn->mt_cursors[--i]) {
@@ -1381,9 +1389,26 @@ mdb_cursorpages_mark(MDB_cursor *mc, unsigned pflags)
                if (i == 0)
                        break;
        }
+
+       if (all) {
+               /* Mark dirty root pages */
+               for (i=0; i<txn->mt_numdbs; i++) {
+                       if (txn->mt_dbflags[i] & DB_DIRTY) {
+                               pgno_t pgno = txn->mt_dbs[i].md_root;
+                               if (pgno == P_INVALID)
+                                       continue;
+                               if ((rc = mdb_page_get(txn, pgno, &dp, &level)) != MDB_SUCCESS)
+                                       break;
+                               if ((dp->mp_flags & (P_DIRTY|P_KEEP)) == pflags && level <= 1)
+                                       dp->mp_flags ^= P_KEEP;
+                       }
+               }
+       }
+
+       return rc;
 }
 
-static int mdb_page_flush(MDB_txn *txn);
+static int mdb_page_flush(MDB_txn *txn, int keep);
 
 /**    Spill pages from the dirty list back to disk.
  * This is intended to prevent running into #MDB_TXN_FULL situations,
@@ -1426,8 +1451,8 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data)
        MDB_txn *txn = m0->mc_txn;
        MDB_page *dp;
        MDB_ID2L dl = txn->mt_u.dirty_list;
-       unsigned int i, j;
-       int rc, level;
+       unsigned int i, j, need;
+       int rc;
 
        if (m0->mc_flags & C_SUB)
                return MDB_SUCCESS;
@@ -1441,6 +1466,7 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data)
        if (key)
                i += (LEAFSIZE(key, data) + txn->mt_env->me_psize) / txn->mt_env->me_psize;
        i += i; /* double it for good measure */
+       need = i;
 
        if (txn->mt_dirty_room > i)
                return MDB_SUCCESS;
@@ -1451,24 +1477,23 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data)
                        return ENOMEM;
        }
 
-       /* Mark all the dirty root pages we want to preserve */
-       for (i=0; i<txn->mt_numdbs; i++) {
-               if (txn->mt_dbflags[i] & DB_DIRTY) {
-                       pgno_t pgno = txn->mt_dbs[i].md_root;
-                       if (pgno == P_INVALID)
-                               continue;
-                       if ((rc = mdb_page_get(txn, pgno, &dp, &level)) != MDB_SUCCESS)
-                               goto done;
-                       if ((dp->mp_flags & P_DIRTY) && level <= 1)
-                               dp->mp_flags |= P_KEEP;
-               }
-       }
+       /* Preserve pages which may soon be dirtied again */
+       if ((rc = mdb_pages_xkeep(m0, P_DIRTY, 1)) != MDB_SUCCESS)
+               goto done;
 
-       /* Preserve pages used by cursors */
-       mdb_cursorpages_mark(m0, P_DIRTY);
+       /* Less aggressive spill - we originally spilled the entire dirty list,
+        * with a few exceptions for cursor pages and DB root pages. But this
+        * turns out to be a lot of wasted effort because in a large txn many
+        * of those pages will need to be used again. So now we spill only 1/8th
+        * of the dirty pages. Testing revealed this to be a good tradeoff,
+        * better than 1/2, 1/4, or 1/10.
+        */
+       if (need < MDB_IDL_UM_MAX / 8)
+               need = MDB_IDL_UM_MAX / 8;
 
        /* Save the page IDs of all the pages we're flushing */
-       for (i=1; i<=dl[0].mid; i++) {
+       /* flush from the tail forward, this saves a lot of shifting later on. */
+       for (i=dl[0].mid; i && need; i--) {
                dp = dl[i].mptr;
                if (dp->mp_flags & P_KEEP)
                        continue;
@@ -1491,12 +1516,16 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data)
                }
                if ((rc = mdb_midl_append(&txn->mt_spill_pgs, dl[i].mid)))
                        goto done;
+               need--;
        }
        mdb_midl_sort(txn->mt_spill_pgs);
 
-       rc = mdb_page_flush(txn);
+       /* Flush the spilled part of dirty list */
+       if ((rc = mdb_page_flush(txn, i)) != MDB_SUCCESS)
+               goto done;
 
-       mdb_cursorpages_mark(m0, P_DIRTY|P_KEEP);
+       /* Reset any dirty pages we kept that page_flush didn't see */
+       rc = mdb_pages_xkeep(m0, P_DIRTY|P_KEEP, i);
 
 done:
        if (rc == 0) {
@@ -2571,10 +2600,13 @@ mdb_freelist_save(MDB_txn *txn)
        return rc;
 }
 
-/** Flush dirty pages to the map, after clearing their dirty flag.
+/** Flush (some) dirty pages to the map, after clearing their dirty flag.
+ * @param[in] txn the transaction that's being committed
+ * @param[in] keep number of initial pages in dirty_list to keep dirty.
+ * @return 0 on success, non-zero on failure.
  */
 static int
-mdb_page_flush(MDB_txn *txn)
+mdb_page_flush(MDB_txn *txn, int keep)
 {
        MDB_env         *env = txn->mt_env;
        MDB_ID2L        dl = txn->mt_u.dirty_list;
@@ -2592,10 +2624,11 @@ mdb_page_flush(MDB_txn *txn)
        int                     n = 0;
 #endif
 
-       j = 0;
+       j = i = keep;
+
        if (env->me_flags & MDB_WRITEMAP) {
                /* Clear dirty flags */
-               for (i=1; i<=pagecount; i++) {
+               while (++i <= pagecount) {
                        dp = dl[i].mptr;
                        /* Don't flush this page yet */
                        if (dp->mp_flags & P_KEEP) {
@@ -2610,8 +2643,8 @@ mdb_page_flush(MDB_txn *txn)
        }
 
        /* Write the pages */
-       for (i = 1;; i++) {
-               if (i <= pagecount) {
+       for (;;) {
+               if (++i <= pagecount) {
                        dp = dl[i].mptr;
                        /* Don't flush this page yet */
                        if (dp->mp_flags & P_KEEP) {
@@ -2690,8 +2723,7 @@ mdb_page_flush(MDB_txn *txn)
 #endif /* _WIN32 */
        }
 
-       j = 0;
-       for (i=1; i<=pagecount; i++) {
+       for (i = keep; ++i <= pagecount; ) {
                dp = dl[i].mptr;
                /* This is a page we skipped above */
                if (!dl[i].mid) {
@@ -2894,7 +2926,7 @@ mdb_txn_commit(MDB_txn *txn)
        mdb_audit(txn);
 #endif
 
-       if ((rc = mdb_page_flush(txn)) ||
+       if ((rc = mdb_page_flush(txn, 0)) ||
                (rc = mdb_env_sync(env, 0)) ||
                (rc = mdb_env_write_meta(txn)))
                goto fail;
@@ -3137,6 +3169,7 @@ mdb_env_write_meta(MDB_txn *txn)
                WriteFile(env->me_fd, ptr, len, NULL, &ov);
 #else
                r2 = pwrite(env->me_fd, ptr, len, off);
+               (void)r2;       /* Silence warnings. We don't care about pwrite's return value */
 #endif
 fail:
                env->me_flags |= MDB_FATAL_ERROR;
@@ -3607,10 +3640,9 @@ static void
 mdb_hash_enc(MDB_val *val, char *encbuf)
 {
        mdb_hash_t h = mdb_hash_val(val, MDB_HASH_INIT);
-       unsigned long *l = (unsigned long *)&h;
 
-       mdb_pack85(l[0], encbuf);
-       mdb_pack85(l[1], encbuf+5);
+       mdb_pack85(h, encbuf);
+       mdb_pack85(h>>32, encbuf+5);
        encbuf[10] = '\0';
 }
 #endif
@@ -3898,9 +3930,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode
                goto leave;
        }
 
-       rc = mdb_env_setup_locks(env, lpath, mode, &excl);
-       if (rc)
-               goto leave;
+       /* For RDONLY, get lockfile after we know datafile exists */
+       if (!F_ISSET(flags, MDB_RDONLY)) {
+               rc = mdb_env_setup_locks(env, lpath, mode, &excl);
+               if (rc)
+                       goto leave;
+       }
 
 #ifdef _WIN32
        if (F_ISSET(flags, MDB_RDONLY)) {
@@ -3926,6 +3961,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode
                goto leave;
        }
 
+       if (F_ISSET(flags, MDB_RDONLY)) {
+               rc = mdb_env_setup_locks(env, lpath, mode, &excl);
+               if (rc)
+                       goto leave;
+       }
+
        if ((rc = mdb_env_open2(env)) == MDB_SUCCESS) {
                if (flags & (MDB_RDONLY|MDB_WRITEMAP)) {
                        env->me_mfd = env->me_fd;
@@ -3934,10 +3975,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode
                         * MDB_NOSYNC/MDB_NOMETASYNC, in case these get reset.
                         */
 #ifdef _WIN32
+                       len = OPEN_EXISTING;
                        env->me_mfd = CreateFile(dpath, oflags,
                                FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, len,
                                mode | FILE_FLAG_WRITE_THROUGH, NULL);
 #else
+                       oflags &= ~O_CREAT;
                        env->me_mfd = open(dpath, oflags | MDB_DSYNC, mode);
 #endif
                        if (env->me_mfd == INVALID_HANDLE_VALUE) {
@@ -4522,9 +4565,8 @@ static int
 mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify)
 {
        MDB_page        *mp = mc->mc_pg[mc->mc_top];
-       DKBUF;
        int rc;
-
+       DKBUF;
 
        while (IS_BRANCH(mp)) {
                MDB_node        *node;
@@ -5062,7 +5104,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                        nodekey.mv_data = LEAF2KEY(mp, 0, nodekey.mv_size);
                } else {
                        leaf = NODEPTR(mp, 0);
-                       MDB_GET_KEY(leaf, &nodekey);
+                       MDB_GET_KEY2(leaf, nodekey);
                }
                rc = mc->mc_dbx->md_cmp(key, &nodekey);
                if (rc == 0) {
@@ -5083,7 +5125,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                                                 nkeys-1, nodekey.mv_size);
                                } else {
                                        leaf = NODEPTR(mp, nkeys-1);
-                                       MDB_GET_KEY(leaf, &nodekey);
+                                       MDB_GET_KEY2(leaf, nodekey);
                                }
                                rc = mc->mc_dbx->md_cmp(key, &nodekey);
                                if (rc == 0) {
@@ -5101,7 +5143,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                                                                 mc->mc_ki[mc->mc_top], nodekey.mv_size);
                                                } else {
                                                        leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
-                                                       MDB_GET_KEY(leaf, &nodekey);
+                                                       MDB_GET_KEY2(leaf, nodekey);
                                                }
                                                rc = mc->mc_dbx->md_cmp(key, &nodekey);
                                                if (rc == 0) {
@@ -5653,9 +5695,16 @@ more:
                                        mc->mc_dbx->md_dcmp = mdb_cmp_cint;
 #endif
 #endif
-                               /* if data matches, ignore it */
-                               if (!mc->mc_dbx->md_dcmp(data, &dkey))
-                                       return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
+                               /* if data matches, skip it */
+                               if (!mc->mc_dbx->md_dcmp(data, &dkey)) {
+                                       if (flags & MDB_NODUPDATA)
+                                               rc = MDB_KEYEXIST;
+                                       else if (flags & MDB_MULTIPLE)
+                                               goto next_mult;
+                                       else
+                                               rc = MDB_SUCCESS;
+                                       return rc;
+                               }
 
                                /* create a fake page for the dup items */
                                memcpy(dbuf, dkey.mv_data, dkey.mv_size);
@@ -5936,15 +5985,16 @@ put_sub:
                        mc->mc_db->md_entries++;
                if (flags & MDB_MULTIPLE) {
                        if (!rc) {
+next_mult:
                                mcount++;
+                               /* let caller know how many succeeded, if any */
+                               data[1].mv_size = mcount;
                                if (mcount < dcount) {
                                        data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size;
                                        leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
                                        goto more;
                                }
                        }
-                       /* let caller know how many succeeded, if any */
-                       data[1].mv_size = mcount;
                }
        }
 done:
@@ -5979,7 +6029,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
        leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
 
        if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
-               if (flags != MDB_NODUPDATA) {
+               if (!(flags & MDB_NODUPDATA)) {
                        if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
                                mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf);
                        }
@@ -7645,7 +7695,7 @@ done:
                                m3->mc_snum++;
                                m3->mc_top++;
                        }
-                       if (m3->mc_pg[mc->mc_top] == mp) {
+                       if (m3->mc_top >= mc->mc_top && m3->mc_pg[mc->mc_top] == mp) {
                                if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDB_SPLIT_REPLACE))
                                        m3->mc_ki[mc->mc_top]++;
                                if (m3->mc_ki[mc->mc_top] >= fixup) {
@@ -7653,7 +7703,7 @@ done:
                                        m3->mc_ki[mc->mc_top] -= fixup;
                                        m3->mc_ki[ptop] = mn.mc_ki[ptop];
                                }
-                       } else if (!did_split && m3->mc_pg[ptop] == mc->mc_pg[ptop] &&
+                       } else if (!did_split && m3->mc_top >= ptop && m3->mc_pg[ptop] == mc->mc_pg[ptop] &&
                                m3->mc_ki[ptop] >= mc->mc_ki[ptop]) {
                                m3->mc_ki[ptop]++;
                        }