]> git.sur5r.net Git - openldap/commitdiff
Spill pages, take 3
authorHoward Chu <hyc@symas.com>
Thu, 11 Jul 2013 20:09:47 +0000 (22:09 +0200)
committerHoward Chu <hyc@symas.com>
Fri, 12 Jul 2013 20:17:04 +0000 (13:17 -0700)
libraries/liblmdb/mdb.c
libraries/liblmdb/midl.c
libraries/liblmdb/midl.h

index a533ed4d8930772ec2a960e263998e4a5450428a..cd99d67added76476fa83b611aa6d5354beb26bd 100644 (file)
@@ -585,6 +585,7 @@ typedef struct MDB_page {
 #define        P_DIRTY          0x10           /**< dirty page */
 #define        P_LEAF2          0x20           /**< for #MDB_DUPFIXED records */
 #define        P_SUBP           0x40           /**< for #MDB_DUPSORT sub-pages */
+#define        P_KEEP           0x8000         /**< leave this page alone during spill */
 /** @} */
        uint16_t        mp_flags;               /**< @ref mdb_page */
 #define mp_lower       mp_pb.pb.pb_lower
@@ -824,6 +825,10 @@ struct MDB_txn {
        /** The list of pages that became unused during this transaction.
         */
        MDB_IDL         mt_free_pgs;
+       /** The list of dirty pages we temporarily wrote to disk
+        *      because the dirty list was full.
+        */
+       MDB_IDL         mt_spill_pgs;
        union {
                MDB_ID2L        dirty_list;     /**< for write txns: modified pages */
                MDB_reader      *reader;        /**< this thread's reader table slot or NULL */
@@ -857,6 +862,7 @@ struct MDB_txn {
 #define MDB_TXN_RDONLY         0x01            /**< read-only transaction */
 #define MDB_TXN_ERROR          0x02            /**< an error has occurred */
 #define MDB_TXN_DIRTY          0x04            /**< must write, even if dirty list is empty */
+#define MDB_TXN_SPILLS         0x08            /**< txn or a parent has spilled pages */
 /** @} */
        unsigned int    mt_flags;               /**< @ref mdb_txn */
        /** dirty_list maxsize - # of allocated pages allowed, including in parent txns */
@@ -1306,7 +1312,7 @@ mdb_dpage_free(MDB_env *env, MDB_page *dp)
        }
 }
 
-/* Return all dirty pages to dpage list */
+/**    Return all dirty pages to dpage list */
 static void
 mdb_dlist_free(MDB_txn *txn)
 {
@@ -1320,6 +1326,148 @@ mdb_dlist_free(MDB_txn *txn)
        dl[0].mid = 0;
 }
 
+static int mdb_page_flush(MDB_txn *txn);
+
+/**    Spill pages from the dirty list back to disk.
+ * This is intended to prevent running into #MDB_TXN_FULL situations,
+ * but note that they may still occur in a few cases:
+ *     1) pages in #MDB_DUPSORT sub-DBs are never spilled, so if there
+ *      are too many of these dirtied in one txn, the txn may still get
+ *      too full.
+ *     2) child txns may run out of space if their parents dirtied a
+ *      lot of pages and never spilled them. TODO: we probably should do
+ *      a preemptive spill during #mdb_txn_begin() of a child txn, if
+ *      the parent's dirty_room is below a given threshold.
+ *     3) our estimate of the txn size could be too small. At the
+ *      moment this seems unlikely.
+ *
+ * Otherwise, if not using nested txns, it is expected that apps will
+ * not run into #MDB_TXN_FULL any more. The pages are flushed to disk
+ * the same way as for a txn commit, e.g. their P_DIRTY flag is cleared.
+ * If the txn never references them again, they can be left alone.
+ * If the txn only reads them, they can be used without any fuss.
+ * If the txn writes them again, they can be dirtied immediately without
+ * going thru all of the work of #mdb_page_touch(). Such references are
+ * handled by #mdb_page_unspill().
+ *
+ * Also note, we never spill DB root pages, nor pages of active cursors,
+ * because we'll need these back again soon anyway. And in nested txns,
+ * we can't spill a page in a child txn if it was already spilled in a
+ * parent txn. That would alter the parent txns' data even though
+ * the child hasn't committed yet, and we'd have no way to undo it if
+ * the child aborted.
+ *
+ * @param[in] m0 cursor A cursor handle identifying the transaction and
+ *     database for which we are checking space.
+ * @param[in] key For a put operation, the key being stored.
+ * @param[in] data For a put operation, the data being stored.
+ * @return 0 on success, non-zero on failure.
+ */
+static int
+mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data)
+{
+       MDB_txn *txn = m0->mc_txn;
+       MDB_page *dp;
+       MDB_ID2L dl = txn->mt_u.dirty_list;
+       unsigned int i, j;
+       int rc;
+
+       if (m0->mc_flags & C_SUB)
+               return MDB_SUCCESS;
+
+       /* Estimate how much space this op will take */
+       i = m0->mc_db->md_depth;
+       /* Named DBs also dirty the main DB */
+       if (m0->mc_dbi > MAIN_DBI)
+               i += txn->mt_dbs[MAIN_DBI].md_depth;
+       /* For puts, roughly factor in the key+data size */
+       if (key)
+               i += (LEAFSIZE(key, data) + txn->mt_env->me_psize) / txn->mt_env->me_psize;
+       i += i; /* double it for good measure */
+
+       if (txn->mt_dirty_room > i)
+               return MDB_SUCCESS;
+
+       if (!txn->mt_spill_pgs) {
+               txn->mt_spill_pgs = mdb_midl_alloc(MDB_IDL_UM_MAX);
+               if (!txn->mt_spill_pgs)
+                       return ENOMEM;
+       }
+
+       /* Mark all the dirty root pages we want to preserve */
+       for (i=0; i<txn->mt_numdbs; i++) {
+               if (txn->mt_dbflags[i] & DB_DIRTY) {
+                       j = mdb_mid2l_search(dl, txn->mt_dbs[i].md_root);
+                       if (j <= dl[0].mid) {
+                               dp = dl[j].mptr;
+                               dp->mp_flags |= P_KEEP;
+                       }
+               }
+       }
+       /* Mark all the pages of active cursors we want to preserve */
+       for (i=0; i<txn->mt_numdbs; i++) {
+               MDB_cursor *mc = txn->mt_cursors[i];
+               /* See if m0 is tracked or not */
+               if (i == m0->mc_dbi && !(m0->mc_flags & C_UNTRACK)) {
+                               /* nope. tack it on in front */
+                               m0->mc_next = mc;
+                               mc = m0;
+               }
+               for (; mc; mc=mc->mc_next) {
+                       if (mc->mc_flags & C_INITIALIZED) {
+                               for (j=0; j<mc->mc_snum; j++) {
+                                       if (mc->mc_pg[j]->mp_flags & P_DIRTY)
+                                               mc->mc_pg[j]->mp_flags |= P_KEEP;
+                               }
+                               if (txn->mt_dbs[i].md_flags & MDB_DUPSORT) {
+                                       MDB_cursor *mx = &mc->mc_xcursor->mx_cursor;
+                                       if (mx->mc_flags & C_INITIALIZED) {
+                                               for (j=0; j<mx->mc_snum; j++) {
+                                                       if ((mx->mc_pg[j]->mp_flags & (P_SUBP|P_DIRTY))
+                                                                       == P_DIRTY)
+                                                               mx->mc_pg[j]->mp_flags |= P_KEEP;
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+       /* Save the page IDs of all the pages we're flushing */
+       for (i=1; i<=dl[0].mid; i++) {
+               dp = dl[i].mptr;
+               if (dp->mp_flags & P_KEEP)
+                       continue;
+               /* Can't spill twice, make sure it's not already in a parent's
+                * spill list.
+                */
+               if (txn->mt_parent) {
+                       MDB_txn *tx2;
+                       for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) {
+                               if (tx2->mt_spill_pgs) {
+                                       j = mdb_midl_search(tx2->mt_spill_pgs, dl[i].mid);
+                                       if (j <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[j] == dl[i].mid) {
+                                               dp->mp_flags |= P_KEEP;
+                                               break;
+                                       }
+                               }
+                       }
+                       if (tx2)
+                               continue;
+               }
+               if ((rc = mdb_midl_append(&txn->mt_spill_pgs, dl[i].mid)))
+                       return rc;
+       }
+       mdb_midl_sort(txn->mt_spill_pgs);
+
+       rc = mdb_page_flush(txn);
+       if (rc == 0) {
+               txn->mt_dirty_room = MDB_IDL_UM_MAX - dl[0].mid;
+               txn->mt_flags |= MDB_TXN_SPILLS;
+       }
+       return rc;
+}
+
 /** Find oldest txnid still referenced. Expects txn->mt_txnid > 0. */
 static txnid_t
 mdb_find_oldest(MDB_txn *txn)
@@ -1533,6 +1681,61 @@ mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize)
        }
 }
 
+/** Pull a page off the txn's spill list, if present.
+ * If a page being referenced was spilled to disk in this txn, bring
+ * it back and make it dirty/writable again.
+ * @param[in] tx0 the transaction handle.
+ * @param[in] mp the page being referenced.
+ * @param[out] ret the writable page, if any. ret is unchanged if
+ * mp wasn't spilled.
+ */
+static int
+mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret)
+{
+       MDB_env *env = tx0->mt_env;
+       MDB_txn *txn;
+       unsigned x;
+       pgno_t pgno = mp->mp_pgno;
+
+       for (txn = tx0; txn; txn=txn->mt_parent) {
+               if (!txn->mt_spill_pgs)
+                       continue;
+               x = mdb_midl_search(txn->mt_spill_pgs, pgno);
+               if (x <= txn->mt_spill_pgs[0] && txn->mt_spill_pgs[x] == pgno) {
+                       MDB_page *np;
+                       int num;
+                       if (IS_OVERFLOW(mp))
+                               num = mp->mp_pages;
+                       else
+                               num = 1;
+                       if (env->me_flags & MDB_WRITEMAP) {
+                               np = mp;
+                       } else {
+                               np = mdb_page_malloc(txn, num);
+                               if (!np)
+                                       return ENOMEM;
+                               if (num > 1)
+                                       memcpy(np, mp, num * env->me_psize);
+                               else
+                                       mdb_page_copy(np, mp, env->me_psize);
+                       }
+                       if (txn == tx0) {
+                               /* If in current txn, this page is no longer spilled */
+                               for (; x < txn->mt_spill_pgs[0]; x++)
+                                       txn->mt_spill_pgs[x] = txn->mt_spill_pgs[x+1];
+                               txn->mt_spill_pgs[0]--;
+                       }       /* otherwise, if belonging to a parent txn, the
+                                * page remains spilled until child commits
+                                */
+                       mdb_page_dirty(tx0, np);
+                       np->mp_flags |= P_DIRTY;
+                       *ret = np;
+                       break;
+               }
+       }
+       return MDB_SUCCESS;
+}
+
 /** Touch a page: make it dirty and re-insert into tree with updated pgno.
  * @param[in] mc cursor pointing to the page to be touched
  * @return 0 on success, non-zero on failure.
@@ -1548,6 +1751,14 @@ mdb_page_touch(MDB_cursor *mc)
        int rc;
 
        if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
+               if (txn->mt_flags & MDB_TXN_SPILLS) {
+                       np = NULL;
+                       rc = mdb_page_unspill(txn, mp, &np);
+                       if (rc)
+                               return rc;
+                       if (np)
+                               goto done;
+               }
                if ((rc = mdb_midl_need(&txn->mt_free_pgs, 1)) ||
                        (rc = mdb_page_alloc(mc, 1, &np)))
                        return rc;
@@ -1595,6 +1806,7 @@ mdb_page_touch(MDB_cursor *mc)
        np->mp_pgno = pgno;
        np->mp_flags |= P_DIRTY;
 
+done:
        /* Adjust cursors pointing to mp */
        mc->mc_pg[mc->mc_top] = np;
        dbi = mc->mc_dbi;
@@ -1801,6 +2013,7 @@ mdb_txn_renew0(MDB_txn *txn)
                txn->mt_u.dirty_list[0].mid = 0;
                txn->mt_free_pgs = env->me_free_pgs;
                txn->mt_free_pgs[0] = 0;
+               txn->mt_spill_pgs = NULL;
                env->me_txn = txn;
        }
 
@@ -1906,6 +2119,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
                txn->mt_toggle = parent->mt_toggle;
                txn->mt_dirty_room = parent->mt_dirty_room;
                txn->mt_u.dirty_list[0].mid = 0;
+               txn->mt_spill_pgs = NULL;
                txn->mt_next_pgno = parent->mt_next_pgno;
                parent->mt_child = txn;
                txn->mt_parent = parent;
@@ -2008,6 +2222,7 @@ mdb_txn_reset0(MDB_txn *txn, const char *act)
                        txn->mt_parent->mt_child = NULL;
                        env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate;
                        mdb_midl_free(txn->mt_free_pgs);
+                       mdb_midl_free(txn->mt_spill_pgs);
                        free(txn->mt_u.dirty_list);
                        return;
                }
@@ -2210,7 +2425,7 @@ mdb_page_flush(MDB_txn *txn)
 {
        MDB_env         *env = txn->mt_env;
        MDB_ID2L        dl = txn->mt_u.dirty_list;
-       unsigned        psize = env->me_psize;
+       unsigned        psize = env->me_psize, j;
        int                     i, pagecount = dl[0].mid, rc;
        size_t          size = 0, pos = 0;
        pgno_t          pgno = 0;
@@ -2224,13 +2439,20 @@ mdb_page_flush(MDB_txn *txn)
        int                     n = 0;
 #endif
 
+       j = 0;
        if (env->me_flags & MDB_WRITEMAP) {
                /* Clear dirty flags */
                for (i = pagecount; i; i--) {
                        dp = dl[i].mptr;
+                       /* Don't flush this page yet */
+                       if (dp->mp_flags & P_KEEP) {
+                               dp->mp_flags ^= P_KEEP;
+                               dl[++j] = dl[i];
+                               continue;
+                       }
                        dp->mp_flags &= ~P_DIRTY;
                }
-               dl[0].mid = 0;
+               dl[0].mid = j;
                return MDB_SUCCESS;
        }
 
@@ -2238,6 +2460,12 @@ mdb_page_flush(MDB_txn *txn)
        for (i = 1;; i++) {
                if (i <= pagecount) {
                        dp = dl[i].mptr;
+                       /* Don't flush this page yet */
+                       if (dp->mp_flags & P_KEEP) {
+                               dp->mp_flags ^= P_KEEP;
+                               dl[i].mid = 0;
+                               continue;
+                       }
                        pgno = dl[i].mid;
                        /* clear dirty flag */
                        dp->mp_flags &= ~P_DIRTY;
@@ -2309,7 +2537,18 @@ mdb_page_flush(MDB_txn *txn)
 #endif /* _WIN32 */
        }
 
-       mdb_dlist_free(txn);
+       j = 0;
+       for (i=1; i<=pagecount; i++) {
+               dp = dl[i].mptr;
+               /* This is a page we skipped above */
+               if (!dl[i].mid) {
+                       dl[++j] = dl[i];
+                       dl[j].mid = dp->mp_pgno;
+                       continue;
+               }
+               mdb_dpage_free(env, dp);
+       }
+       dl[0].mid = j;
 
        return MDB_SUCCESS;
 }
@@ -2378,6 +2617,37 @@ mdb_txn_commit(MDB_txn *txn)
 
                dst = parent->mt_u.dirty_list;
                src = txn->mt_u.dirty_list;
+               /* Remove anything in our dirty list from parent's spill list */
+               if (parent->mt_spill_pgs) {
+                       x = parent->mt_spill_pgs[0];
+                       len = x;
+                       /* zero out our dirty pages in parent spill list */
+                       for (i=1; i<=src[0].mid; i++) {
+                               if (src[i].mid < parent->mt_spill_pgs[x])
+                                       continue;
+                               if (src[i].mid > parent->mt_spill_pgs[x]) {
+                                       if (x <= 1)
+                                               break;
+                                       x--;
+                                       continue;
+                               }
+                               parent->mt_spill_pgs[x] = 0;
+                               len--;
+                       }
+                       /* OK, we had a few hits, squash zeros from the spill list */
+                       if (len < parent->mt_spill_pgs[0]) {
+                               x=1;
+                               for (y=1; y<=parent->mt_spill_pgs[0]; y++) {
+                                       if (parent->mt_spill_pgs[y]) {
+                                               if (y != x) {
+                                                       parent->mt_spill_pgs[x] = parent->mt_spill_pgs[y];
+                                               }
+                                               x++;
+                                       }
+                               }
+                               parent->mt_spill_pgs[0] = len;
+                       }
+               }
                /* Find len = length of merging our dirty list with parent's */
                x = dst[0].mid;
                dst[0].mid = 0;         /* simplify loops */
@@ -2409,6 +2679,15 @@ mdb_txn_commit(MDB_txn *txn)
                dst[0].mid = len;
                free(txn->mt_u.dirty_list);
                parent->mt_dirty_room = txn->mt_dirty_room;
+               if (txn->mt_spill_pgs) {
+                       if (parent->mt_spill_pgs) {
+                               mdb_midl_append_list(&parent->mt_spill_pgs, txn->mt_spill_pgs);
+                               mdb_midl_free(txn->mt_spill_pgs);
+                               mdb_midl_sort(parent->mt_spill_pgs);
+                       } else {
+                               parent->mt_spill_pgs = txn->mt_spill_pgs;
+                       }
+               }
 
                parent->mt_child = NULL;
                mdb_midl_free(((MDB_ntxn *)txn)->mnt_pgstate.mf_pghead);
@@ -3991,6 +4270,19 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl)
                level = 1;
                do {
                        MDB_ID2L dl = tx2->mt_u.dirty_list;
+                       unsigned x;
+                       /* Spilled pages were dirtied in this txn and flushed
+                        * because the dirty list got full. Bring this page
+                        * back in from the map (but don't unspill it here,
+                        * leave that unless page_touch happens again).
+                        */
+                       if (tx2->mt_spill_pgs) {
+                               x = mdb_midl_search(tx2->mt_spill_pgs, pgno);
+                               if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pgno) {
+                                       p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
+                                       goto done;
+                               }
+                       }
                        if (dl[0].mid) {
                                unsigned x = mdb_mid2l_search(dl, pgno);
                                if (x <= dl[0].mid && dl[x].mid == pgno) {
@@ -4091,6 +4383,8 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify)
 
        DPRINTF("found leaf page %zu for key [%s]", mp->mp_pgno,
            key ? DKEY(key) : NULL);
+       mc->mc_flags |= C_INITIALIZED;
+       mc->mc_flags &= ~C_EOF;
 
        return MDB_SUCCESS;
 }
@@ -4218,11 +4512,21 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp)
        int rc;
 
        DPRINTF("free ov page %zu (%d)", pg, ovpages);
-       /* If the page is dirty we just acquired it, so we should
-        * give it back to our current free list, if any.
+       /* If the page is dirty or on the spill list we just acquired it,
+        * so we should give it back to our current free list, if any.
         * Not currently supported in nested txns.
         * Otherwise put it onto the list of pages we freed in this txn.
         */
+       if (!(mp->mp_flags & P_DIRTY) && txn->mt_spill_pgs) {
+               unsigned x = mdb_midl_search(txn->mt_spill_pgs, pg);
+               if (x <= txn->mt_spill_pgs[0] && txn->mt_spill_pgs[x] == pg) {
+                       /* This page is no longer spilled */
+                       for (; x < txn->mt_spill_pgs[0]; x++)
+                               txn->mt_spill_pgs[x] = txn->mt_spill_pgs[x+1];
+                       txn->mt_spill_pgs[0]--;
+                       goto release;
+               }
+       }
        if ((mp->mp_flags & P_DIRTY) && !txn->mt_parent && env->me_pghead) {
                unsigned j, x;
                pgno_t *mop;
@@ -4248,6 +4552,7 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp)
                }
                if (!(env->me_flags & MDB_WRITEMAP))
                        mdb_dpage_free(env, mp);
+release:
                /* Insert in me_pghead */
                mop = env->me_pghead;
                j = mop[0] + ovpages;
@@ -4964,6 +5269,9 @@ mdb_cursor_touch(MDB_cursor *mc)
        return MDB_SUCCESS;
 }
 
+/** Do not spill pages to disk if txn is getting full, may fail instead */
+#define MDB_NOSPILL    0x8000
+
 int
 mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
     unsigned int flags)
@@ -4974,7 +5282,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
        MDB_page        *fp;
        MDB_db dummy;
        int do_sub = 0, insert = 0;
-       unsigned int mcount = 0, dcount = 0;
+       unsigned int mcount = 0, dcount = 0, nospill;
        size_t nsize;
        int rc, rc2;
        MDB_pagebuf pbuf;
@@ -4992,6 +5300,9 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                        return EINVAL;
        }
 
+       nospill = flags & MDB_NOSPILL;
+       flags &= ~MDB_NOSPILL;
+
        if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
                return EACCES;
 
@@ -5048,7 +5359,17 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
                        return rc;
        }
 
-       /* Cursor is positioned */
+       /* Cursor is positioned, check for room in the dirty list */
+       if (!nospill) {
+               if (flags & MDB_MULTIPLE) {
+                       rdata = &xdata;
+                       xdata.mv_size = data->mv_size * dcount;
+               } else {
+                       rdata = data;
+               }
+               if ((rc2 = mdb_page_spill(mc, key, rdata)))
+                       return rc2;
+       }
 
        if (rc == MDB_NO_ROOT) {
                MDB_page *np;
@@ -5227,8 +5548,18 @@ current:
                                return rc2;
                        ovpages = omp->mp_pages;
 
-                       /* Is the ov page writable and large enough? */
-                       if ((omp->mp_flags & P_DIRTY) && ovpages >= dpages) {
+                       /* Is the ov page large enough? */
+                       if (ovpages >= dpages) {
+                         if (!(omp->mp_flags & P_DIRTY) &&
+                                 (level || (mc->mc_txn->mt_env->me_flags & MDB_WRITEMAP)))
+                         {
+                               rc = mdb_page_unspill(mc->mc_txn, omp, &omp);
+                               if (rc)
+                                       return rc;
+                               level = 0;              /* dirty in this txn or clean */
+                         }
+                         /* Is it dirty? */
+                         if (omp->mp_flags & P_DIRTY) {
                                /* yes, overwrite it. Note in this case we don't
                                 * bother to try shrinking the page if the new data
                                 * is smaller than the overflow threshold.
@@ -5261,10 +5592,10 @@ current:
                                else
                                        memcpy(METADATA(omp), data->mv_data, data->mv_size);
                                goto done;
-                       } else {
-                               if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS)
-                                       return rc2;
+                         }
                        }
+                       if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS)
+                               return rc2;
                } else if (NODEDSZ(leaf) == data->mv_size) {
                        /* same size, just replace it. Note that we could
                         * also reuse this node if the new data is smaller,
@@ -5337,10 +5668,11 @@ put_sub:
                        xdata.mv_data = "";
                        leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
                        if (flags & MDB_CURRENT) {
-                               xflags = MDB_CURRENT;
+                               xflags = MDB_CURRENT|MDB_NOSPILL;
                        } else {
                                mdb_xcursor_init1(mc, leaf);
-                               xflags = (flags & MDB_NODUPDATA) ? MDB_NOOVERWRITE : 0;
+                               xflags = (flags & MDB_NODUPDATA) ?
+                                       MDB_NOOVERWRITE|MDB_NOSPILL : MDB_NOSPILL;
                        }
                        /* converted, write the original data first */
                        if (dkey.mv_size) {
@@ -5411,6 +5743,10 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
        if (!(mc->mc_flags & C_INITIALIZED))
                return EINVAL;
 
+       if (!(flags & MDB_NOSPILL) && (rc = mdb_page_spill(mc, NULL, NULL)))
+               return rc;
+       flags &= ~MDB_NOSPILL; /* TODO: Or change (flags != MDB_NODUPDATA) to ~(flags & MDB_NODUPDATA), not looking at the logic of that code just now */
+
        rc = mdb_cursor_touch(mc);
        if (rc)
                return rc;
@@ -5422,7 +5758,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
                        if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
                                mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf);
                        }
-                       rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0);
+                       rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, MDB_NOSPILL);
                        /* If sub-DB still has entries, we're done */
                        if (mc->mc_xcursor->mx_db.md_entries) {
                                if (leaf->mn_flags & F_SUBDATA) {
index e7bd680cb015f5dad2cc9d0923c7d09f0d832e9e..86e4592d2dbcda7dbc2e3fda618980e584eff93a 100644 (file)
@@ -31,8 +31,7 @@
  */
 #define CMP(x,y)        ( (x) < (y) ? -1 : (x) > (y) )
 
-#if 0  /* superseded by append/sort */
-static unsigned mdb_midl_search( MDB_IDL ids, MDB_ID id )
+unsigned mdb_midl_search( MDB_IDL ids, MDB_ID id )
 {
        /*
         * binary search of id in ids
@@ -67,6 +66,7 @@ static unsigned mdb_midl_search( MDB_IDL ids, MDB_ID id )
        return cursor;
 }
 
+#if 0  /* superseded by append/sort */
 int mdb_midl_insert( MDB_IDL ids, MDB_ID id )
 {
        unsigned x, i;
index 9ce7133c6e9f2f3cebf4000d6384dfbfd63b28b6..b0bdff3f4918fb2d8a2a916ed59f1b5267a61845 100644 (file)
@@ -74,14 +74,12 @@ typedef MDB_ID *MDB_IDL;
                xidl[xlen] = (id); \
        } while (0)
 
-#if 0  /* superseded by append/sort */
-       /** Insert an ID into an IDL.
-        * @param[in,out] ids   The IDL to insert into.
-        * @param[in] id        The ID to insert.
-        * @return      0 on success, -1 if ID was already present, -2 on error.
+       /** Search for an ID in an IDL.
+        * @param[in] ids       The IDL to search.
+        * @param[in] id        The ID to search for.
+        * @return      The index of the first ID greater than or equal to \b id.
         */
-int mdb_midl_insert( MDB_IDL ids, MDB_ID id );
-#endif
+unsigned mdb_midl_search( MDB_IDL ids, MDB_ID id );
 
        /** Allocate an IDL.
         * Allocates memory for an IDL of the given size.