]> git.sur5r.net Git - openldap/blobdiff - libraries/liblmdb/mdb.c
ITS#8321 reorganize page_split fixups
[openldap] / libraries / liblmdb / mdb.c
index 77287b77281bcc169f9c35cb2b03a1e6f9becb8d..c5174e983acb0923da159fe7874f8337de8d3169 100644 (file)
@@ -1161,7 +1161,6 @@ struct MDB_cursor {
 #define C_EOF  0x02                    /**< No more data */
 #define C_SUB  0x04                    /**< Cursor is a sub-cursor */
 #define C_DEL  0x08                    /**< last op was a cursor_del */
-#define C_SPLITTING    0x20            /**< Cursor is in page_split */
 #define C_UNTRACK      0x40            /**< Un-track cursor when closing */
 /** @} */
        unsigned int    mc_flags;       /**< @ref mdb_cursor */
@@ -1605,7 +1604,7 @@ mdb_cursor_chk(MDB_cursor *mc)
        MDB_node *node;
        MDB_page *mp;
 
-       if (!mc->mc_snum && !(mc->mc_flags & C_INITIALIZED)) return;
+       if (!mc->mc_snum || !(mc->mc_flags & C_INITIALIZED)) return;
        for (i=0; i<mc->mc_top; i++) {
                mp = mc->mc_pg[i];
                node = NODEPTR(mp, mc->mc_ki[i]);
@@ -2490,14 +2489,15 @@ mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
                                *bk = *mc;
                                mc->mc_backup = bk;
                                mc->mc_db = &dst->mt_dbs[i];
-                               /* Kill pointers into src - and dst to reduce abuse: The
-                                * user may not use mc until dst ends. Otherwise we'd...
+                               /* Kill pointers into src to reduce abuse: The
+                                * user may not use mc until dst ends. But we need a valid
+                                * txn pointer here for cursor fixups to keep working.
                                 */
-                               mc->mc_txn    = NULL;   /* ...set this to dst */
-                               mc->mc_dbflag = NULL;   /* ...and &dst->mt_dbflags[i] */
+                               mc->mc_txn    = dst;
+                               mc->mc_dbflag = &dst->mt_dbflags[i];
                                if ((mx = mc->mc_xcursor) != NULL) {
                                        *(MDB_xcursor *)(bk+1) = *mx;
-                                       mx->mx_cursor.mc_txn = NULL; /* ...and dst. */
+                                       mx->mx_cursor.mc_txn = dst;
                                }
                                mc->mc_next = dst->mt_cursors[i];
                                dst->mt_cursors[i] = mc;
@@ -6710,6 +6710,7 @@ put_sub:
                                MDB_xcursor *mx = mc->mc_xcursor;
                                unsigned i = mc->mc_top;
                                MDB_page *mp = mc->mc_pg[i];
+                               int nkeys = NUMKEYS(mp);
 
                                for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
                                        if (m2 == mc || m2->mc_snum < mc->mc_snum) continue;
@@ -6717,9 +6718,9 @@ put_sub:
                                        if (m2->mc_pg[i] == mp) {
                                                if (m2->mc_ki[i] == mc->mc_ki[i]) {
                                                        mdb_xcursor_init2(m2, mx, new_dupdata);
-                                               } else if (!insert_key) {
+                                               } else if (!insert_key && m2->mc_ki[i] < nkeys) {
                                                        MDB_node *n2 = NODEPTR(mp, m2->mc_ki[i]);
-                                                       if (!(n2->mn_flags & F_SUBDATA))
+                                                       if ((n2->mn_flags & (F_SUBDATA|F_DUPDATA)) == F_DUPDATA)
                                                                m2->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(n2);
                                                }
                                        }
@@ -7523,6 +7524,22 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key)
 static void
 mdb_cursor_copy(const MDB_cursor *csrc, MDB_cursor *cdst);
 
+/** Track a temporary cursor */
+#define CURSOR_TMP_TRACK(mc, mn, dummy, tracked) \
+       if (mc->mc_flags & C_SUB) { \
+               dummy.mc_flags =  C_INITIALIZED; \
+               dummy.mc_xcursor = (MDB_xcursor *)&mn; \
+               tracked = &dummy; \
+       } else { \
+               tracked = &mn; \
+       } \
+       tracked->mc_next = mc->mc_txn->mt_cursors[mc->mc_dbi]; \
+       mc->mc_txn->mt_cursors[mc->mc_dbi] = tracked
+
+/** Stop tracking a temporary cursor */
+#define CURSOR_TMP_UNTRACK(mc, tracked) \
+       mc->mc_txn->mt_cursors[mc->mc_dbi] = tracked->mc_next
+
 /** Move a node from csrc to cdst.
  */
 static int
@@ -7678,6 +7695,7 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
         */
        if (csrc->mc_ki[csrc->mc_top] == 0) {
                if (csrc->mc_ki[csrc->mc_top-1] != 0) {
+                       MDB_cursor dummy, *tracked;
                        if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
                                key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size);
                        } else {
@@ -7690,7 +7708,11 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
                        mdb_cursor_copy(csrc, &mn);
                        mn.mc_snum--;
                        mn.mc_top--;
-                       if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS)
+                       /* We want mdb_rebalance to find mn when doing fixups */
+                       CURSOR_TMP_TRACK(csrc, mn, dummy, tracked);
+                       rc = mdb_update_key(&mn, &key);
+                       CURSOR_TMP_UNTRACK(csrc, tracked);
+                       if (rc)
                                return rc;
                }
                if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
@@ -7706,6 +7728,7 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
 
        if (cdst->mc_ki[cdst->mc_top] == 0) {
                if (cdst->mc_ki[cdst->mc_top-1] != 0) {
+                       MDB_cursor dummy, *tracked;
                        if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
                                key.mv_data = LEAF2KEY(cdst->mc_pg[cdst->mc_top], 0, key.mv_size);
                        } else {
@@ -7718,7 +7741,11 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
                        mdb_cursor_copy(cdst, &mn);
                        mn.mc_snum--;
                        mn.mc_top--;
-                       if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS)
+                       /* We want mdb_rebalance to find mn when doing fixups */
+                       CURSOR_TMP_TRACK(cdst, mn, dummy, tracked);
+                       rc = mdb_update_key(&mn, &key);
+                       CURSOR_TMP_UNTRACK(cdst, tracked);
+                       if (rc)
                                return rc;
                }
                if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) {
@@ -8076,24 +8103,13 @@ mdb_rebalance(MDB_cursor *mc)
                if (!fromleft) {
                        rc = mdb_page_merge(&mn, mc);
                } else {
-                       MDB_cursor dummy;
+                       MDB_cursor dummy, *tracked;
                        oldki += NUMKEYS(mn.mc_pg[mn.mc_top]);
                        mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1;
                        /* We want mdb_rebalance to find mn when doing fixups */
-                       if (mc->mc_flags & C_SUB) {
-                               dummy.mc_flags = C_INITIALIZED;
-                               dummy.mc_next = mc->mc_txn->mt_cursors[mc->mc_dbi];
-                               mc->mc_txn->mt_cursors[mc->mc_dbi] = &dummy;
-                               dummy.mc_xcursor = (MDB_xcursor *)&mn;
-                       } else {
-                               mn.mc_next = mc->mc_txn->mt_cursors[mc->mc_dbi];
-                               mc->mc_txn->mt_cursors[mc->mc_dbi] = &mn;
-                       }
+                       CURSOR_TMP_TRACK(mc, mn, dummy, tracked);
                        rc = mdb_page_merge(mc, &mn);
-                       if (mc->mc_flags & C_SUB)
-                               mc->mc_txn->mt_cursors[mc->mc_dbi] = dummy.mc_next;
-                       else
-                               mc->mc_txn->mt_cursors[mc->mc_dbi] = mn.mc_next;
+                       CURSOR_TMP_UNTRACK(mc, tracked);
                        mdb_cursor_copy(&mn, mc);
                }
                mc->mc_flags &= ~C_EOF;
@@ -8131,7 +8147,7 @@ mdb_cursor_del0(MDB_cursor *mc)
                                        if (m3->mc_ki[mc->mc_top] > ki)
                                                m3->mc_ki[mc->mc_top]--;
                                        else if (mc->mc_db->md_flags & MDB_DUPSORT)
-                                               m3->mc_xcursor->mx_cursor.mc_flags |= C_EOF;
+                                               m3->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
                                }
                        }
                }
@@ -8314,7 +8330,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                DPRINTF(("parent branch page is %"Z"u", mc->mc_pg[ptop]->mp_pgno));
        }
 
-       mc->mc_flags |= C_SPLITTING;
        mdb_cursor_copy(mc, &mn);
        mn.mc_pg[mn.mc_top] = rp;
        mn.mc_ki[ptop] = mc->mc_ki[ptop]+1;
@@ -8365,8 +8380,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                                rp->mp_lower += sizeof(indx_t);
                                rp->mp_upper -= ksize - sizeof(indx_t);
                                mc->mc_ki[mc->mc_top] = x;
-                               mc->mc_pg[mc->mc_top] = rp;
-                               mc->mc_ki[ptop]++;
                        }
                } else {
                        int psize, nsize, k;
@@ -8460,10 +8473,14 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
         */
        if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(env, &sepkey)) {
                int snum = mc->mc_snum;
+               MDB_cursor dummy, *tracked;
                mn.mc_snum--;
                mn.mc_top--;
                did_split = 1;
+               /* We want other splits to find mn when doing fixups */
+               CURSOR_TMP_TRACK(mc, mn, dummy, tracked);
                rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno, 0);
+               CURSOR_TMP_UNTRACK(mc, tracked);
                if (rc)
                        goto done;
 
@@ -8494,7 +8511,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
                mn.mc_top++;
        }
-       mc->mc_flags ^= C_SPLITTING;
        if (rc != MDB_SUCCESS) {
                goto done;
        }
@@ -8564,11 +8580,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                /* reset back to original page */
                if (newindx < split_indx) {
                        mc->mc_pg[mc->mc_top] = mp;
-                       if (nflags & MDB_RESERVE) {
-                               node = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
-                               if (!(node->mn_flags & F_BIGDATA))
-                                       newdata->mv_data = NODEDATA(node);
-                       }
                } else {
                        mc->mc_pg[mc->mc_top] = rp;
                        mc->mc_ki[ptop]++;
@@ -8582,13 +8593,32 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                                }
                        }
                }
+               if (nflags & MDB_RESERVE) {
+                       node = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
+                       if (!(node->mn_flags & F_BIGDATA))
+                               newdata->mv_data = NODEDATA(node);
+               }
+       } else {
+               if (newindx >= split_indx) {
+                       mc->mc_pg[mc->mc_top] = rp;
+                       mc->mc_ki[ptop]++;
+                       /* Make sure mc_ki is still valid.
+                        */
+                       if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
+                               mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
+                               for (i=0; i<=ptop; i++) {
+                                       mc->mc_pg[i] = mn.mc_pg[i];
+                                       mc->mc_ki[i] = mn.mc_ki[i];
+                               }
+                       }
+               }
        }
 
        {
                /* Adjust other cursors pointing to mp */
                MDB_cursor *m2, *m3;
                MDB_dbi dbi = mc->mc_dbi;
-               int fixup = NUMKEYS(mp);
+               nkeys = NUMKEYS(mp);
 
                for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
                        if (mc->mc_flags & C_SUB)
@@ -8601,12 +8631,15 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                                continue;
                        if (new_root) {
                                int k;
+                               /* sub cursors may be on different DB */
+                               if (m3->mc_pg[0] != mp)
+                                       continue;
                                /* root split */
                                for (k=new_root; k>=0; k--) {
                                        m3->mc_ki[k+1] = m3->mc_ki[k];
                                        m3->mc_pg[k+1] = m3->mc_pg[k];
                                }
-                               if (m3->mc_ki[0] >= split_indx) {
+                               if (m3->mc_ki[0] > nkeys) {
                                        m3->mc_ki[0] = 1;
                                } else {
                                        m3->mc_ki[0] = 0;
@@ -8615,15 +8648,16 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
                                m3->mc_snum++;
                                m3->mc_top++;
                        }
-                       if (m3->mc_flags & C_SPLITTING)
-                               continue;
                        if (m3->mc_top >= mc->mc_top && m3->mc_pg[mc->mc_top] == mp) {
                                if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDB_SPLIT_REPLACE))
                                        m3->mc_ki[mc->mc_top]++;
-                               if (m3->mc_ki[mc->mc_top] >= fixup) {
+                               if (m3->mc_ki[mc->mc_top] >= nkeys) {
                                        m3->mc_pg[mc->mc_top] = rp;
-                                       m3->mc_ki[mc->mc_top] -= fixup;
-                                       m3->mc_ki[ptop] = mn.mc_ki[ptop];
+                                       m3->mc_ki[mc->mc_top] -= nkeys;
+                                       for (i=0; i<mc->mc_top; i++) {
+                                               m3->mc_ki[i] = mn.mc_ki[i];
+                                               m3->mc_pg[i] = mn.mc_pg[i];
+                                       }
                                }
                        } else if (!did_split && m3->mc_top >= ptop && m3->mc_pg[ptop] == mc->mc_pg[ptop] &&
                                m3->mc_ki[ptop] >= mc->mc_ki[ptop]) {