]> git.sur5r.net Git - openldap/commitdiff
ITS#7515 Fix tracking of parent txn's cursors.
authorHallvard Furuseth <hallvard@openldap.org>
Sun, 7 Jul 2013 15:13:27 +0000 (17:13 +0200)
committerHallvard Furuseth <hallvard@openldap.org>
Sun, 7 Jul 2013 15:13:27 +0000 (17:13 +0200)
Restore mc_flags and xcursors, they were tracked but not merged.
Simplify: Track parent txn's original cursors after backing them
up, instead of tracking copies and merging them back at commit.

libraries/liblmdb/mdb.c

index 08690fbd83a169746d104d0189688aef6e3fca1f..7647ea3e34cccde20fb239b46a0c4280a5dc8e4c 100644 (file)
@@ -879,8 +879,8 @@ struct MDB_xcursor;
 struct MDB_cursor {
        /** Next cursor on this DB in this txn */
        MDB_cursor      *mc_next;
-       /** Original cursor if this is a shadow */
-       MDB_cursor      *mc_orig;
+       /** Backup of the original cursor if this cursor is a shadow */
+       MDB_cursor      *mc_backup;
        /** Context used for databases with #MDB_DUPSORT, otherwise NULL */
        struct MDB_xcursor      *mc_xcursor;
        /** The transaction that owns this cursor */
@@ -1633,57 +1633,35 @@ mdb_env_sync(MDB_env *env, int force)
        return rc;
 }
 
-/** Make shadow copies of all of parent txn's cursors */
+/** Back up parent txn's cursors, then grab the originals for tracking */
 static int
 mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
 {
-       MDB_cursor *mc, *m2;
-       unsigned int i, j, size;
+       MDB_cursor *mc, *bk;
+       MDB_xcursor *mx;
+       size_t size;
+       int i;
 
-       for (i=0;i<src->mt_numdbs; i++) {
-               if (src->mt_cursors[i]) {
+       for (i = src->mt_numdbs; --i >= 0; ) {
+               if ((mc = src->mt_cursors[i]) != NULL) {
                        size = sizeof(MDB_cursor);
-                       if (src->mt_cursors[i]->mc_xcursor)
+                       if (mc->mc_xcursor)
                                size += sizeof(MDB_xcursor);
-                       for (m2 = src->mt_cursors[i]; m2; m2=m2->mc_next) {
-                               mc = malloc(size);
-                               if (!mc)
+                       for (; mc; mc = bk->mc_next) {
+                               bk = malloc(size);
+                               if (!bk)
                                        return ENOMEM;
-                               mc->mc_orig = m2;
-                               mc->mc_txn = dst;
-                               mc->mc_dbi = i;
+                               *bk = *mc;
+                               mc->mc_backup = bk;
                                mc->mc_db = &dst->mt_dbs[i];
-                               mc->mc_dbx = m2->mc_dbx;
-                               mc->mc_dbflag = &dst->mt_dbflags[i];
-                               mc->mc_snum = m2->mc_snum;
-                               mc->mc_top = m2->mc_top;
-                               mc->mc_flags = m2->mc_flags;
-                               for (j=0; j<mc->mc_snum; j++) {
-                                       mc->mc_pg[j] = m2->mc_pg[j];
-                                       mc->mc_ki[j] = m2->mc_ki[j];
-                               }
-                               if (m2->mc_xcursor) {
-                                       MDB_xcursor *mx, *mx2;
-                                       mx = (MDB_xcursor *)(mc+1);
-                                       mc->mc_xcursor = mx;
-                                       mx2 = m2->mc_xcursor;
-                                       mx->mx_db = mx2->mx_db;
-                                       mx->mx_dbx = mx2->mx_dbx;
-                                       mx->mx_dbflag = mx2->mx_dbflag;
-                                       mx->mx_cursor.mc_txn = dst;
-                                       mx->mx_cursor.mc_dbi = mx2->mx_cursor.mc_dbi;
-                                       mx->mx_cursor.mc_db = &mx->mx_db;
-                                       mx->mx_cursor.mc_dbx = &mx->mx_dbx;
-                                       mx->mx_cursor.mc_dbflag = &mx->mx_dbflag;
-                                       mx->mx_cursor.mc_snum = mx2->mx_cursor.mc_snum;
-                                       mx->mx_cursor.mc_top = mx2->mx_cursor.mc_top;
-                                       mx->mx_cursor.mc_flags = mx2->mx_cursor.mc_flags;
-                                       for (j=0; j<mx2->mx_cursor.mc_snum; j++) {
-                                               mx->mx_cursor.mc_pg[j] = mx2->mx_cursor.mc_pg[j];
-                                               mx->mx_cursor.mc_ki[j] = mx2->mx_cursor.mc_ki[j];
-                                       }
-                               } else {
-                                       mc->mc_xcursor = NULL;
+                               /* Kill pointers into src - and dst to reduce abuse: The
+                                * user may not use mc until dst ends. Otherwise we'd...
+                                */
+                               mc->mc_txn    = NULL;   /* ...set this to dst */
+                               mc->mc_dbflag = NULL;   /* ...and &dst->mt_dbflags[i] */
+                               if ((mx = mc->mc_xcursor) != NULL) {
+                                       *(MDB_xcursor *)(bk+1) = *mx;
+                                       mx->mx_cursor.mc_txn = NULL; /* ...and dst. */
                                }
                                mc->mc_next = dst->mt_cursors[i];
                                dst->mt_cursors[i] = mc;
@@ -1693,32 +1671,40 @@ mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
        return MDB_SUCCESS;
 }
 
-/** Close this write txn's cursors, after optionally merging its shadow
- * cursors back into parent's.
+/** Close this write txn's cursors, give parent txn's cursors back to parent.
  * @param[in] txn the transaction handle.
- * @param[in] merge zero to not merge cursors, non-zero to merge.
+ * @param[in] merge true to keep changes to parent cursors, false to revert.
  * @return 0 on success, non-zero on failure.
  */
 static void
 mdb_cursors_close(MDB_txn *txn, unsigned merge)
 {
-       MDB_cursor **cursors = txn->mt_cursors, *mc, *next;
-       int i, j;
+       MDB_cursor **cursors = txn->mt_cursors, *mc, *next, *bk;
+       MDB_xcursor *mx;
+       int i;
 
        for (i = txn->mt_numdbs; --i >= 0; ) {
                for (mc = cursors[i]; mc; mc = next) {
-                               next = mc->mc_next;
-                               if (merge && mc->mc_orig) {
-                                       MDB_cursor *m2 = mc->mc_orig;
-                                       m2->mc_snum = mc->mc_snum;
-                                       m2->mc_top = mc->mc_top;
-                                       for (j = mc->mc_snum; --j >= 0; ) {
-                                               m2->mc_pg[j] = mc->mc_pg[j];
-                                               m2->mc_ki[j] = mc->mc_ki[j];
-                                       }
+                       next = mc->mc_next;
+                       if ((bk = mc->mc_backup) != NULL) {
+                               if (merge) {
+                                       /* Commit changes to parent txn */
+                                       mc->mc_next = bk->mc_next;
+                                       mc->mc_backup = bk->mc_backup;
+                                       mc->mc_txn = bk->mc_txn;
+                                       mc->mc_db = bk->mc_db;
+                                       mc->mc_dbflag = bk->mc_dbflag;
+                                       if ((mx = mc->mc_xcursor) != NULL)
+                                               mx->mx_cursor.mc_txn = bk->mc_txn;
+                               } else {
+                                       /* Abort nested txn */
+                                       *mc = *bk;
+                                       if ((mx = mc->mc_xcursor) != NULL)
+                                               *mx = *(MDB_xcursor *)(bk+1);
                                }
-                               /* Only malloced cursors are permanently tracked. */
-                               free(mc);
+                               mc = bk;
+                       }
+                       free(mc);
                }
                cursors[i] = NULL;
        }
@@ -5867,7 +5853,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
 static void
 mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
 {
-       mc->mc_orig = NULL;
+       mc->mc_backup = NULL;
        mc->mc_dbi = dbi;
        mc->mc_txn = txn;
        mc->mc_db = &txn->mt_dbs[dbi];
@@ -5961,7 +5947,7 @@ mdb_cursor_count(MDB_cursor *mc, size_t *countp)
 void
 mdb_cursor_close(MDB_cursor *mc)
 {
-       if (mc != NULL) {
+       if (mc && !mc->mc_backup) {
                /* remove from txn, if tracked */
                if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) {
                        MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi];