]> git.sur5r.net Git - openldap/blobdiff - libraries/libmdb/mdb.c
Make sure subDB info is saved after writes
[openldap] / libraries / libmdb / mdb.c
index e26510465140d05ebfa366978d7ee12522dab666..9b0e4f18f7129c08bd1cd2e3a09d88d8a1e15ea5 100644 (file)
@@ -1,3 +1,32 @@
+/* mdb.c - memory-mapped database library */
+/*
+ * Copyright 2011 Howard Chu, Symas Corp.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * <http://www.OpenLDAP.org/license.html>.
+ *
+ * This code is derived from btree.c written by Martin Hedenfalk.
+ *
+ * Copyright (c) 2009, 2010 Martin Hedenfalk <martin@bzero.se>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <sys/queue.h>
@@ -21,6 +50,7 @@
 #include <time.h>
 #include <unistd.h>
 #include <pthread.h>
+#include <endian.h>
 
 #include "mdb.h"
 
@@ -44,7 +74,7 @@ typedef ULONG         pgno_t;
 #define MDB_MINKEYS     4
 #define MDB_MAGIC       0xBEEFC0DE
 #define MDB_VERSION     1
-#define MAXKEYSIZE      255
+#define MAXKEYSIZE      511
 
 #define P_INVALID       (~0UL)
 
@@ -171,7 +201,7 @@ typedef struct MDB_dpage {
        MDB_page        p;
 } MDB_dpage;
 
-STAILQ_HEAD(dirty_queue, MDB_dpage);
+STAILQ_HEAD(dirty_queue, MDB_dpage);   /* FIXME: use a sorted data structure */
 
 typedef struct MDB_oldpages {
        struct MDB_oldpages *mo_next;
@@ -200,12 +230,15 @@ SLIST_HEAD(page_stack, MDB_ppage);
 #define CURSOR_POP(c)           SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry)
 #define CURSOR_PUSH(c,p)        SLIST_INSERT_HEAD(&(c)->mc_stack, p, mp_entry)
 
+struct MDB_xcursor;
+
 struct MDB_cursor {
        MDB_txn         *mc_txn;
        struct page_stack        mc_stack;              /* stack of parent pages */
        MDB_dbi         mc_dbi;
        short           mc_initialized; /* 1 if initialized */
        short           mc_eof;         /* 1 if end is reached */
+       struct MDB_xcursor      *mc_xcursor;
 };
 
 #define METAHASHLEN     offsetof(MDB_meta, mm_hash)
@@ -230,6 +263,8 @@ typedef struct MDB_dbx {
        MDB_cmp_func    *md_cmp;                /* user compare function */
        MDB_cmp_func    *md_dcmp;               /* user dupsort function */
        MDB_rel_func    *md_rel;                /* user relocate function */
+       MDB_dbi md_parent;
+       unsigned int    md_dirty;
 } MDB_dbx;
 
 struct MDB_txn {
@@ -249,9 +284,17 @@ struct MDB_txn {
 #define MDB_TXN_RDONLY          0x01           /* read-only transaction */
 #define MDB_TXN_ERROR           0x02           /* an error has occurred */
 #define MDB_TXN_METOGGLE       0x04            /* used meta page 1 */
-       unsigned int             mt_flags;
+       unsigned int    mt_flags;
 };
 
+/* Context for sorted-dup records */
+typedef struct MDB_xcursor {
+       MDB_cursor mx_cursor;
+       MDB_txn mx_txn;
+       MDB_dbx mx_dbxs[4];
+       MDB_db  mx_dbs[4];
+} MDB_xcursor;
+
 struct MDB_env {
        int                     me_fd;
        int                     me_lfd;
@@ -272,8 +315,8 @@ struct MDB_env {
        MDB_dbx         *me_dbxs;               /* array */
        MDB_db          *me_dbs[2];
        MDB_oldpages *me_pghead;
-       MDB_oldpages *me_pgtail;
        pthread_key_t   me_txkey;       /* thread-key for readers */
+       pgno_t          me_free_pgs[MDB_IDL_UM_SIZE];
 };
 
 #define NODESIZE        offsetof(MDB_node, mn_data)
@@ -301,7 +344,7 @@ static int  mdb_search_page(MDB_txn *txn,
 static int  mdbenv_read_header(MDB_env *env, MDB_meta *meta);
 static int  mdbenv_read_meta(MDB_env *env, int *which);
 static int  mdbenv_write_meta(MDB_txn *txn);
-static MDB_page *mdbenv_get_page(MDB_env *env, pgno_t pgno);
+static MDB_page *mdb_get_page(MDB_txn *txn, pgno_t pgno);
 
 static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
                            MDB_val *key, int *exactp, unsigned int *kip);
@@ -309,7 +352,9 @@ static int  mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
                            indx_t indx, MDB_val *key, MDB_val *data,
                            pgno_t pgno, uint8_t flags);
 static void mdb_del_node(MDB_page *mp, indx_t indx);
-static int  mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data);
+static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki,
+    MDB_pageparent *mpp, MDB_node *leaf);
+static int  mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
 
 static int              mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp);
 static int              mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key);
@@ -330,14 +375,20 @@ static MDB_ppage *cursor_push_page(MDB_cursor *cursor,
 static int              mdb_set_key(MDB_node *node, MDB_val *key);
 static int              mdb_sibling(MDB_cursor *cursor, int move_right);
 static int              mdb_cursor_next(MDB_cursor *cursor,
-                           MDB_val *key, MDB_val *data);
+                           MDB_val *key, MDB_val *data, MDB_cursor_op op);
+static int              mdb_cursor_prev(MDB_cursor *cursor,
+                           MDB_val *key, MDB_val *data, MDB_cursor_op op);
 static int              mdb_cursor_set(MDB_cursor *cursor,
-                           MDB_val *key, MDB_val *data, int *exactp);
+                           MDB_val *key, MDB_val *data, MDB_cursor_op op, int *exactp);
 static int              mdb_cursor_first(MDB_cursor *cursor,
                            MDB_val *key, MDB_val *data);
 static int              mdb_cursor_last(MDB_cursor *cursor,
                            MDB_val *key, MDB_val *data);
 
+static void            mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx);
+static void            mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_db *db);
+static void            mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx);
+
 static size_t           mdb_leaf_size(MDB_env *env, MDB_val *key,
                            MDB_val *data);
 static size_t           mdb_branch_size(MDB_env *env, MDB_val *key);
@@ -401,9 +452,55 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
 {
        MDB_dpage *dp;
        pgno_t pgno = P_INVALID;
+       ULONG oldest = txn->mt_txnid - 2;
+
+       if (!txn->mt_env->me_pghead && txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
+               /* See if there's anything in the free DB */
+               MDB_pageparent mpp;
+               MDB_node *leaf;
+               ULONG *kptr;
+
+               mpp.mp_parent = NULL;
+               mpp.mp_pi = 0;
+               mdb_search_page(txn, FREE_DBI, NULL, NULL, 0, &mpp);
+               leaf = NODEPTR(mpp.mp_page, 0);
+               kptr = (ULONG *)NODEKEY(leaf);
 
+               /* It's potentially usable, unless there are still
+                * older readers outstanding. Grab it.
+                */
+               if (oldest > *kptr) {
+                       MDB_oldpages *mop;
+                       MDB_val data;
+                       pgno_t *idl;
+
+                       mdb_read_data(txn, leaf, &data);
+                       idl = (ULONG *)data.mv_data;
+                       mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
+                       mop->mo_next = txn->mt_env->me_pghead;
+                       mop->mo_txnid = *kptr;
+                       txn->mt_env->me_pghead = mop;
+                       memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
+
+#if DEBUG > 1
+                       {
+                               unsigned int i;
+                               DPRINTF("IDL read txn %lu root %lu num %lu",
+                                       mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
+                               for (i=0; i<idl[0]; i++) {
+                                       DPRINTF("IDL %lu", idl[i+1]);
+                               }
+                       }
+#endif
+                       /* drop this IDL from the DB */
+                       mpp.mp_parent = NULL;
+                       mpp.mp_pi = 0;
+                       mdb_search_page(txn, FREE_DBI, NULL, NULL, 1, &mpp);
+                       leaf = NODEPTR(mpp.mp_page, 0);
+                       mdb_del0(txn, FREE_DBI, 0, &mpp, leaf);
+               }
+       }
        if (txn->mt_env->me_pghead) {
-               ULONG oldest = txn->mt_txnid - 2;
                unsigned int i;
                for (i=0; i<txn->mt_env->me_txns->mt_numreaders; i++) {
                        ULONG mr = txn->mt_env->me_txns->mt_readers[i].mr_txnid;
@@ -432,8 +529,6 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
                                }
                                if (MDB_IDL_IS_ZERO(mop->mo_pages)) {
                                        txn->mt_env->me_pghead = mop->mo_next;
-                                       if (!txn->mt_env->me_pghead)
-                                               txn->mt_env->me_pgtail = NULL;
                                        free(mop);
                                }
                        }
@@ -503,7 +598,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
        MDB_txn *txn;
        int rc, toggle;
 
-       if ((txn = calloc(1, sizeof(*txn))) == NULL) {
+       if ((txn = calloc(1, sizeof(MDB_txn))) == NULL) {
                DPRINTF("calloc: %s", strerror(errno));
                return ENOMEM;
        }
@@ -520,12 +615,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
 
                pthread_mutex_lock(&env->me_txns->mt_wmutex);
                env->me_txns->mt_txnid++;
-               txn->mt_free_pgs = malloc(MDB_IDL_UM_SIZEOF);
-               if (txn->mt_free_pgs == NULL) {
-                       free(txn->mt_u.dirty_queue);
-                       free(txn);
-                       return ENOMEM;
-               }
+               txn->mt_free_pgs = env->me_free_pgs;
                txn->mt_free_pgs[0] = 0;
        }
 
@@ -535,21 +625,20 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
                if (!r) {
                        unsigned int i;
                        pthread_mutex_lock(&env->me_txns->mt_mutex);
-                       for (i=0; i<env->me_maxreaders; i++) {
-                               if (env->me_txns->mt_readers[i].mr_pid == 0) {
-                                       env->me_txns->mt_readers[i].mr_pid = getpid();
-                                       env->me_txns->mt_readers[i].mr_tid = pthread_self();
-                                       r = &env->me_txns->mt_readers[i];
-                                       pthread_setspecific(env->me_txkey, r);
-                                       if (i >= env->me_txns->mt_numreaders)
-                                               env->me_txns->mt_numreaders = i+1;
+                       for (i=0; i<env->me_txns->mt_numreaders; i++)
+                               if (env->me_txns->mt_readers[i].mr_pid == 0)
                                        break;
-                               }
-                       }
-                       pthread_mutex_unlock(&env->me_txns->mt_mutex);
                        if (i == env->me_maxreaders) {
+                               pthread_mutex_unlock(&env->me_txns->mti_mutex);
                                return ENOSPC;
                        }
+                       env->me_txns->mt_readers[i].mr_pid = getpid();
+                       env->me_txns->mt_readers[i].mr_tid = pthread_self();
+                       r = &env->me_txns->mt_readers[i];
+                       pthread_setspecific(env->me_txkey, r);
+                       if (i >= env->me_txns->mt_numreaders)
+                               env->me_txns->mt_numreaders = i+1;
+                       pthread_mutex_unlock(&env->me_txns->mt_mutex);
                }
                r->mr_txnid = txn->mt_txnid;
                txn->mt_u.reader = r;
@@ -604,35 +693,26 @@ mdb_txn_abort(MDB_txn *txn)
        if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
                txn->mt_u.reader->mr_txnid = 0;
        } else {
-               /* Discard all dirty pages. Return any re-used pages
-                * to the free list.
-                */
-               MDB_IDL_ZERO(txn->mt_free_pgs);
+               MDB_oldpages *mop;
+               unsigned int i;
+
+               /* Discard all dirty pages. */
                while (!STAILQ_EMPTY(txn->mt_u.dirty_queue)) {
                        dp = STAILQ_FIRST(txn->mt_u.dirty_queue);
                        STAILQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
-                       if (dp->p.mp_pgno <= env->me_meta->mm_last_pg)
-                               mdb_idl_insert(txn->mt_free_pgs, dp->p.mp_pgno);
                        free(dp);
                }
-               /* put back to head of free list */
-               if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
-                       MDB_oldpages *mop;
+               free(txn->mt_u.dirty_queue);
 
-                       mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(txn->mt_free_pgs) - sizeof(pgno_t));
-                       mop->mo_next = env->me_pghead;
-                       mop->mo_txnid = txn->mt_oldest - 1;
-                       if (!env->me_pghead) {
-                               env->me_pgtail = mop;
-                       }
-                       env->me_pghead = mop;
-                       memcpy(mop->mo_pages, txn->mt_free_pgs, MDB_IDL_SIZEOF(txn->mt_free_pgs));
+               while ((mop = txn->mt_env->me_pghead)) {
+                       txn->mt_env->me_pghead = mop->mo_next;
+                       free(mop);
                }
 
-               free(txn->mt_free_pgs);
-               free(txn->mt_u.dirty_queue);
                env->me_txn = NULL;
                env->me_txns->mt_txnid--;
+               for (i=2; i<env->me_numdbs; i++)
+                       env->me_dbxs[i].md_dirty = 0;
                pthread_mutex_unlock(&env->me_txns->mt_wmutex);
        }
 
@@ -680,6 +760,51 @@ mdb_txn_commit(MDB_txn *txn)
        DPRINTF("committing transaction %lu on mdbenv %p, root page %lu",
            txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
 
+       /* should only be one record now */
+       if (env->me_pghead) {
+               MDB_val key, data;
+               MDB_oldpages *mop;
+
+               mop = env->me_pghead;
+               key.mv_size = sizeof(pgno_t);
+               key.mv_data = (char *)&mop->mo_txnid;
+               data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
+               data.mv_data = mop->mo_pages;
+               mdb_put(txn, FREE_DBI, &key, &data, 0);
+               free(env->me_pghead);
+               env->me_pghead = NULL;
+       }
+       /* save to free list */
+       if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
+               MDB_val key, data;
+               MDB_pageparent mpp;
+
+               /* make sure last page of freeDB is touched and on freelist */
+               key.mv_size = MAXKEYSIZE+1;
+               key.mv_data = NULL;
+               mpp.mp_parent = NULL;
+               mpp.mp_pi = 0;
+               mdb_search_page(txn, FREE_DBI, &key, NULL, 1, &mpp);
+
+#if DEBUG > 1
+               {
+                       unsigned int i;
+                       ULONG *idl = txn->mt_free_pgs;
+                       DPRINTF("IDL write txn %lu root %lu num %lu",
+                               txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
+                       for (i=0; i<idl[0]; i++) {
+                               DPRINTF("IDL %lu", idl[i+1]);
+                       }
+               }
+#endif
+               /* write to last page of freeDB */
+               key.mv_size = sizeof(pgno_t);
+               key.mv_data = (char *)&txn->mt_txnid;
+               data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
+               data.mv_data = txn->mt_free_pgs;
+               mdb_put(txn, FREE_DBI, &key, &data, 0);
+       }
+
        /* Update DB root pointers. Their pages have already been
         * touched so this is all in-place and cannot fail.
         */
@@ -687,16 +812,12 @@ mdb_txn_commit(MDB_txn *txn)
                MDB_val data;
                data.mv_size = sizeof(MDB_db);
 
-               for (i = 2; i < env->me_numdbs; i++) {
-                       if (env->me_dbs[env->me_db_toggle][i].md_root != txn->mt_dbs[i].md_root) {
+               for (i = 2; i < txn->mt_numdbs; i++) {
+                       if (txn->mt_dbxs[i].md_dirty) {
                                data.mv_data = &txn->mt_dbs[i];
                                mdb_put(txn, i, &txn->mt_dbxs[i].md_name, &data, 0);
                        }
                }
-               for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
-                       data.mv_data = &txn->mt_dbs[i];
-                       mdb_put(txn, i, &txn->mt_dbxs[i].md_name, &data, 0);
-               }
        }
 
        /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
@@ -776,11 +897,14 @@ mdb_txn_commit(MDB_txn *txn)
        {
                int toggle = !env->me_db_toggle;
 
-               for (i = 0; i < env->me_numdbs; i++) {
-                       if (env->me_dbs[toggle][i].md_root != txn->mt_dbs[i].md_root)
+               for (i = 2; i < env->me_numdbs; i++) {
+                       if (txn->mt_dbxs[i].md_dirty) {
                                env->me_dbs[toggle][i] = txn->mt_dbs[i];
+                               txn->mt_dbxs[i].md_dirty = 0;
+                       }
                }
                for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
+                       txn->mt_dbxs[i].md_dirty = 0;
                        env->me_dbxs[i] = txn->mt_dbxs[i];
                        env->me_dbs[toggle][i] = txn->mt_dbs[i];
                }
@@ -790,24 +914,7 @@ mdb_txn_commit(MDB_txn *txn)
                free(txn->mt_dbs);
        }
 
-       /* add to tail of free list */
-       if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
-               MDB_oldpages *mop;
-
-               mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(txn->mt_free_pgs) - sizeof(pgno_t));
-               mop->mo_next = NULL;
-               if (env->me_pghead) {
-                       env->me_pgtail->mo_next = mop;
-               } else {
-                       env->me_pghead = mop;
-               }
-               env->me_pgtail = mop;
-               memcpy(mop->mo_pages, txn->mt_free_pgs, MDB_IDL_SIZEOF(txn->mt_free_pgs));
-               mop->mo_txnid = txn->mt_txnid;
-       }
-
        pthread_mutex_unlock(&env->me_txns->mt_wmutex);
-       free(txn->mt_free_pgs);
        free(txn->mt_u.dirty_queue);
        free(txn);
        txn = NULL;
@@ -856,7 +963,7 @@ mdbenv_read_header(MDB_env *env, MDB_meta *meta)
        if (m->mm_version != MDB_VERSION) {
                DPRINTF("database is version %u, expected version %u",
                    m->mm_version, MDB_VERSION);
-               return EINVAL;
+               return MDB_VERSION_MISMATCH;
        }
 
        memcpy(meta, m, sizeof(*m));
@@ -879,6 +986,10 @@ mdbenv_init_meta(MDB_env *env, MDB_meta *meta)
        meta->mm_psize = psize;
        meta->mm_last_pg = 1;
        meta->mm_flags = env->me_flags & 0xffff;
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+       /* freeDB keys are pgno_t's, must compare in int order */
+       meta->mm_flags |= MDB_REVERSEKEY;
+#endif
        meta->mm_dbs[0].md_root = P_INVALID;
        meta->mm_dbs[1].md_root = P_INVALID;
 
@@ -967,7 +1078,7 @@ mdbenv_create(MDB_env **env)
 {
        MDB_env *e;
 
-       e = calloc(1, sizeof(*e));
+       e = calloc(1, sizeof(MDB_env));
        if (!e) return ENOMEM;
 
        e->me_maxreaders = DEFAULT_READERS;
@@ -1164,12 +1275,14 @@ mdbenv_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
        } else {
                if (env->me_txns->mt_magic != MDB_MAGIC) {
                        DPRINTF("lock region has invalid magic");
-                       errno = EINVAL;
+                       rc = EINVAL;
+                       goto fail;
                }
                if (env->me_txns->mt_version != MDB_VERSION) {
                        DPRINTF("lock region is version %u, expected version %u",
                                env->me_txns->mt_version, MDB_VERSION);
-                       errno = EINVAL;
+                       rc = MDB_VERSION_MISMATCH;
+                       goto fail;
                }
                if (errno != EACCES && errno != EAGAIN) {
                        rc = errno;
@@ -1341,7 +1454,7 @@ cursor_push_page(MDB_cursor *cursor, MDB_page *mp)
 
        DPRINTF("pushing page %lu on cursor %p", mp->mp_pgno, (void *) cursor);
 
-       if ((ppage = calloc(1, sizeof(*ppage))) == NULL)
+       if ((ppage = calloc(1, sizeof(MDB_ppage))) == NULL)
                return NULL;
        ppage->mp_page = mp;
        CURSOR_PUSH(cursor, ppage);
@@ -1349,13 +1462,12 @@ cursor_push_page(MDB_cursor *cursor, MDB_page *mp)
 }
 
 static MDB_page *
-mdbenv_get_page(MDB_env *env, pgno_t pgno)
+mdb_get_page(MDB_txn *txn, pgno_t pgno)
 {
        MDB_page *p = NULL;
-       MDB_txn *txn = env->me_txn;
        int found = 0;
 
-       if (txn && !STAILQ_EMPTY(txn->mt_u.dirty_queue)) {
+       if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && !STAILQ_EMPTY(txn->mt_u.dirty_queue)) {
                MDB_dpage *dp;
                STAILQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
                        if (dp->p.mp_pgno == pgno) {
@@ -1366,7 +1478,9 @@ mdbenv_get_page(MDB_env *env, pgno_t pgno)
                }
        }
        if (!found) {
-               p = (MDB_page *)(env->me_map + env->me_psize * pgno);
+               if (pgno > txn->mt_env->me_meta->mm_last_pg)
+                       return NULL;
+               p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
        }
        return p;
 }
@@ -1415,7 +1529,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
                        CURSOR_TOP(cursor)->mp_ki = i;
 
                mpp->mp_parent = mp;
-               if ((mp = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL)
+               if ((mp = mdb_get_page(txn, NODEPGNO(node))) == NULL)
                        return MDB_FAIL;
                mpp->mp_pi = i;
                mpp->mp_page = mp;
@@ -1472,23 +1586,23 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
 
        if (root == P_INVALID) {                /* Tree is empty. */
                DPRINTF("tree is empty");
-               return ENOENT;
+               return MDB_NOTFOUND;
        }
 
-       if ((mpp->mp_page = mdbenv_get_page(txn->mt_env, root)) == NULL)
+       if ((mpp->mp_page = mdb_get_page(txn, root)) == NULL)
                return MDB_FAIL;
 
        DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags);
 
        if (modify) {
                /* For sub-databases, update main root first */
-               if (dbi > MAIN_DBI && txn->mt_dbs[dbi].md_root ==
-                       txn->mt_env->me_dbs[txn->mt_env->me_db_toggle][dbi].md_root) {
+               if (dbi > MAIN_DBI && !txn->mt_dbxs[dbi].md_dirty) {
                        MDB_pageparent mp2;
                        rc = mdb_search_page(txn, 0, &txn->mt_dbxs[dbi].md_name,
                                NULL, 1, &mp2);
                        if (rc)
                                return rc;
+                       txn->mt_dbxs[dbi].md_dirty = 1;
                }
                if (!F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) {
                        mpp->mp_parent = NULL;
@@ -1503,7 +1617,7 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
 }
 
 static int
-mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data)
+mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data)
 {
        MDB_page        *omp;           /* overflow mpage */
        pgno_t           pgno;
@@ -1518,7 +1632,7 @@ mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data)
         */
        data->mv_size = leaf->mn_dsize;
        memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
-       if ((omp = mdbenv_get_page(env, pgno)) == NULL) {
+       if ((omp = mdb_get_page(txn, pgno)) == NULL) {
                DPRINTF("read overflow page %lu failed", pgno);
                return MDB_FAIL;
        }
@@ -1547,10 +1661,21 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi,
                return rc;
 
        leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL);
-       if (leaf && exact)
-               rc = mdb_read_data(txn->mt_env, leaf, data);
-       else {
-               rc = ENOENT;
+       if (leaf && exact) {
+               /* Return first duplicate data item */
+               if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                       MDB_xcursor mx;
+
+                       mdb_xcursor_init0(txn, dbi, &mx);
+                       mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+                       rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi, NULL, NULL, 0, &mpp);
+                       if (rc != MDB_SUCCESS)
+                               return rc;
+                       leaf = NODEPTR(mpp.mp_page, 0);
+               }
+               rc = mdb_read_data(txn, leaf, data);
+       } else {
+               rc = MDB_NOTFOUND;
        }
 
        return rc;
@@ -1566,7 +1691,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right)
 
        top = CURSOR_TOP(cursor);
        if ((parent = SLIST_NEXT(top, mp_entry)) == NULL) {
-               return ENOENT;          /* root has no siblings */
+               return MDB_NOTFOUND;            /* root has no siblings */
        }
 
        DPRINTF("parent page is page %lu, index %u",
@@ -1591,7 +1716,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right)
        assert(IS_BRANCH(parent->mp_page));
 
        indx = NODEPTR(parent->mp_page, parent->mp_ki);
-       if ((mp = mdbenv_get_page(cursor->mc_txn->mt_env, indx->mn_pgno)) == NULL)
+       if ((mp = mdb_get_page(cursor->mc_txn, indx->mn_pgno)) == NULL)
                return MDB_FAIL;
 #if 0
        mp->parent = parent->mp_page;
@@ -1616,18 +1741,27 @@ mdb_set_key(MDB_node *node, MDB_val *key)
 }
 
 static int
-mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
+mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op)
 {
        MDB_ppage       *top;
        MDB_page        *mp;
        MDB_node        *leaf;
+       int rc;
 
        if (cursor->mc_eof) {
-               return ENOENT;
+               return MDB_NOTFOUND;
        }
 
        assert(cursor->mc_initialized);
 
+       if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+               if (op == MDB_NEXT || op == MDB_NEXT_DUP) {
+                       rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT);
+                       if (op != MDB_NEXT || rc == MDB_SUCCESS)
+                               return rc;
+               }
+       }
+
        top = CURSOR_TOP(cursor);
        mp = top->mp_page;
 
@@ -1637,7 +1771,7 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
                DPRINTF("=====> move to next sibling page");
                if (mdb_sibling(cursor, 1) != MDB_SUCCESS) {
                        cursor->mc_eof = 1;
-                       return ENOENT;
+                       return MDB_NOTFOUND;
                }
                top = CURSOR_TOP(cursor);
                mp = top->mp_page;
@@ -1651,15 +1785,82 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        assert(IS_LEAF(mp));
        leaf = NODEPTR(mp, top->mp_ki);
 
-       if (data && mdb_read_data(cursor->mc_txn->mt_env, leaf, data) != MDB_SUCCESS)
-               return MDB_FAIL;
+       if (data) {
+               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
+                       return rc;
+
+               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, NODEDATA(leaf));
+                       rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
+                       if (rc != MDB_SUCCESS)
+                               return rc;
+               }
+       }
+
+       return mdb_set_key(leaf, key);
+}
+
+static int
+mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op)
+{
+       MDB_ppage       *top;
+       MDB_page        *mp;
+       MDB_node        *leaf;
+       int rc;
+
+       assert(cursor->mc_initialized);
+
+       if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+               if (op == MDB_PREV || op == MDB_PREV_DUP) {
+                       rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV);
+                       if (op != MDB_PREV || rc == MDB_SUCCESS)
+                               return rc;
+               }
+       }
+
+       top = CURSOR_TOP(cursor);
+       mp = top->mp_page;
+
+       DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
+
+       if (top->mp_ki == 0)  {
+               DPRINTF("=====> move to prev sibling page");
+               if (mdb_sibling(cursor, 0) != MDB_SUCCESS) {
+                       return MDB_NOTFOUND;
+               }
+               top = CURSOR_TOP(cursor);
+               mp = top->mp_page;
+               top->mp_ki = NUMKEYS(mp) - 1;
+               DPRINTF("prev page is %lu, key index %u", mp->mp_pgno, top->mp_ki);
+       } else
+               top->mp_ki--;
+
+       cursor->mc_eof = 0;
+
+       DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
+           mp->mp_pgno, NUMKEYS(mp), top->mp_ki);
+
+       assert(IS_LEAF(mp));
+       leaf = NODEPTR(mp, top->mp_ki);
+
+       if (data) {
+               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
+                       return rc;
+
+               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, NODEDATA(leaf));
+                       rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
+                       if (rc != MDB_SUCCESS)
+                               return rc;
+               }
+       }
 
        return mdb_set_key(leaf, key);
 }
 
 static int
 mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
-    int *exactp)
+    MDB_cursor_op op, int *exactp)
 {
        int              rc;
        MDB_node        *leaf;
@@ -1670,6 +1871,9 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        assert(key);
        assert(key->mv_size > 0);
 
+       while (CURSOR_TOP(cursor) != NULL)
+               cursor_pop_page(cursor);
+
        rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp);
        if (rc != MDB_SUCCESS)
                return rc;
@@ -1678,8 +1882,8 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        top = CURSOR_TOP(cursor);
        leaf = mdb_search_node(cursor->mc_txn, cursor->mc_dbi, mpp.mp_page, key, exactp, &top->mp_ki);
        if (exactp != NULL && !*exactp) {
-               /* MDB_CURSOR_EXACT specified and not an exact match. */
-               return ENOENT;
+               /* MDB_SET specified and not an exact match. */
+               return MDB_NOTFOUND;
        }
 
        if (leaf == NULL) {
@@ -1696,8 +1900,30 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        cursor->mc_initialized = 1;
        cursor->mc_eof = 0;
 
-       if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS)
-               return rc;
+       if (data) {
+               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+                       return rc;
+
+               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, NODEDATA(leaf));
+                       if (op == MDB_SET || op == MDB_SET_RANGE) {
+                               rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
+                       } else {
+                               int ex2, *ex2p;
+                               MDB_cursor_op op2;
+                               if (op == MDB_GET_BOTH) {
+                                       ex2p = &ex2;
+                                       op2 = MDB_SET;
+                               } else {
+                                       ex2p = NULL;
+                                       op2 = MDB_SET_RANGE;
+                               }
+                               rc = mdb_cursor_set(&cursor->mc_xcursor->mx_cursor, data, NULL, op2, ex2p);
+                               if (rc != MDB_SUCCESS)
+                                       return rc;
+                       }
+               }
+       }
 
        rc = mdb_set_key(leaf, key);
        if (rc == MDB_SUCCESS) {
@@ -1716,6 +1942,9 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        MDB_pageparent  mpp;
        MDB_node        *leaf;
 
+       while (CURSOR_TOP(cursor) != NULL)
+               cursor_pop_page(cursor);
+
        rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, NULL, cursor, 0, &mpp);
        if (rc != MDB_SUCCESS)
                return rc;
@@ -1725,9 +1954,17 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
        cursor->mc_initialized = 1;
        cursor->mc_eof = 0;
 
-       if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS)
-               return rc;
+       if (data) {
+               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+                       return rc;
 
+               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, NODEDATA(leaf));
+                       rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
+                       if (rc)
+                               return rc;
+               }
+       }
        return mdb_set_key(leaf, key);
 }
 
@@ -1735,10 +1972,14 @@ static int
 mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
 {
        int              rc;
+       MDB_ppage       *top;
        MDB_pageparent  mpp;
        MDB_node        *leaf;
        MDB_val lkey;
 
+       while (CURSOR_TOP(cursor) != NULL)
+               cursor_pop_page(cursor);
+
        lkey.mv_size = MAXKEYSIZE+1;
        lkey.mv_data = NULL;
 
@@ -1749,10 +1990,22 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
 
        leaf = NODEPTR(mpp.mp_page, NUMKEYS(mpp.mp_page)-1);
        cursor->mc_initialized = 1;
-       cursor->mc_eof = 1;
+       cursor->mc_eof = 0;
 
-       if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS)
-               return rc;
+       top = CURSOR_TOP(cursor);
+       top->mp_ki = NUMKEYS(top->mp_page) - 1;
+
+       if (data) {
+               if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+                       return rc;
+
+               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+                       mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, NODEDATA(leaf));
+                       rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
+                       if (rc)
+                               return rc;
+               }
+       }
 
        return mdb_set_key(leaf, key);
 }
@@ -1767,31 +2020,42 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
        assert(cursor);
 
        switch (op) {
-       case MDB_CURSOR:
-       case MDB_CURSOR_EXACT:
-               while (CURSOR_TOP(cursor) != NULL)
-                       cursor_pop_page(cursor);
+       case MDB_GET_BOTH:
+       case MDB_GET_BOTH_RANGE:
+               if (data == NULL) {
+                       rc = EINVAL;
+                       break;
+               }
+               /* FALLTHRU */
+       case MDB_SET:
+       case MDB_SET_RANGE:
                if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
                        rc = EINVAL;
-               } else if (op == MDB_CURSOR_EXACT)
-                       rc = mdb_cursor_set(cursor, key, data, &exact);
+               } else if (op != MDB_SET_RANGE)
+                       rc = mdb_cursor_set(cursor, key, data, op, NULL);
                else
-                       rc = mdb_cursor_set(cursor, key, data, NULL);
+                       rc = mdb_cursor_set(cursor, key, data, op, &exact);
                break;
        case MDB_NEXT:
+       case MDB_NEXT_DUP:
+       case MDB_NEXT_NODUP:
                if (!cursor->mc_initialized)
                        rc = mdb_cursor_first(cursor, key, data);
                else
-                       rc = mdb_cursor_next(cursor, key, data);
+                       rc = mdb_cursor_next(cursor, key, data, op);
+               break;
+       case MDB_PREV:
+       case MDB_PREV_DUP:
+       case MDB_PREV_NODUP:
+               if (!cursor->mc_initialized || cursor->mc_eof)
+                       rc = mdb_cursor_last(cursor, key, data);
+               else
+                       rc = mdb_cursor_prev(cursor, key, data, op);
                break;
        case MDB_FIRST:
-               while (CURSOR_TOP(cursor) != NULL)
-                       cursor_pop_page(cursor);
                rc = mdb_cursor_first(cursor, key, data);
                break;
        case MDB_LAST:
-               while (CURSOR_TOP(cursor) != NULL)
-                       cursor_pop_page(cursor);
                rc = mdb_cursor_last(cursor, key, data);
                break;
        default:
@@ -1989,18 +2253,82 @@ mdb_del_node(MDB_page *mp, indx_t indx)
        mp->mp_upper += sz;
 }
 
+static void
+mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
+{
+       MDB_dbi dbn;
+
+       mx->mx_txn = *txn;
+       mx->mx_txn.mt_dbxs = mx->mx_dbxs;
+       mx->mx_txn.mt_dbs = mx->mx_dbs;
+       mx->mx_dbxs[0] = txn->mt_dbxs[0];
+       mx->mx_dbxs[1] = txn->mt_dbxs[1];
+       if (dbi > 1) {
+               mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
+               dbn = 2;
+       } else {
+               dbn = 1;
+       }
+       mx->mx_dbxs[dbn+1].md_parent = dbn;
+       mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp;
+       mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel;
+       mx->mx_dbxs[dbn+1].md_dirty = 0;
+       mx->mx_txn.mt_numdbs = dbn+2;
+
+       SLIST_INIT(&mx->mx_cursor.mc_stack);
+       mx->mx_cursor.mc_txn = &mx->mx_txn;
+       mx->mx_cursor.mc_dbi = dbn+1;
+}
+
+static void
+mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_db *db)
+{
+       mx->mx_dbs[0] = txn->mt_dbs[0];
+       mx->mx_dbs[1] = txn->mt_dbs[1];
+       if (dbi > 1) {
+               mx->mx_dbs[2] = txn->mt_dbs[dbi];
+               mx->mx_dbs[3] = *db;
+       } else {
+               mx->mx_dbs[2] = *db;
+       }
+}
+
+static void
+mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
+{
+       txn->mt_dbs[0] = mx->mx_dbs[0];
+       txn->mt_dbs[1] = mx->mx_dbs[1];
+       txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty;
+       txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty;
+       if (dbi > 1) {
+               txn->mt_dbs[dbi] = mx->mx_dbs[2];
+               txn->mt_dbxs[dbi].md_dirty = mx->mx_dbxs[2].md_dirty;
+       }
+}
+
 int
 mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
 {
        MDB_cursor      *cursor;
+       size_t size = sizeof(MDB_cursor);
 
-       if (txn == NULL || ret == NULL)
+       if (txn == NULL || ret == NULL || !dbi || dbi >= txn->mt_numdbs)
                return EINVAL;
 
-       if ((cursor = calloc(1, sizeof(*cursor))) != NULL) {
+       if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)
+               size += sizeof(MDB_xcursor);
+
+       if ((cursor = calloc(1, size)) != NULL) {
                SLIST_INIT(&cursor->mc_stack);
                cursor->mc_dbi = dbi;
                cursor->mc_txn = txn;
+               if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
+                       MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1);
+                       cursor->mc_xcursor = mx;
+                       mdb_xcursor_init0(txn, dbi, mx);
+               }
+       } else {
+               return ENOMEM;
        }
 
        *ret = cursor;
@@ -2012,10 +2340,14 @@ void
 mdb_cursor_close(MDB_cursor *cursor)
 {
        if (cursor != NULL) {
-               while (!CURSOR_EMPTY(cursor))
+               while(!CURSOR_EMPTY(cursor))
                        cursor_pop_page(cursor);
+               if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+                       mdb_xcursor_fini(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor);
+                       while(!CURSOR_EMPTY(&cursor->mc_xcursor->mx_cursor))
+                               cursor_pop_page(&cursor->mc_xcursor->mx_cursor);
+               }
 
-/*             btree_close(cursor->bt); */
                free(cursor);
        }
 }
@@ -2229,7 +2561,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) {
                        DPRINTF("collapsing root page!");
                        txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
-                       if ((root = mdbenv_get_page(txn->mt_env, txn->mt_dbs[dbi].md_root)) == NULL)
+                       if ((root = mdb_get_page(txn, txn->mt_dbs[dbi].md_root)) == NULL)
                                return MDB_FAIL;
                        txn->mt_dbs[dbi].md_depth--;
                        txn->mt_dbs[dbi].md_branch_pages--;
@@ -2255,7 +2587,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                 */
                DPRINTF("reading right neighbor");
                node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
-               if ((npp.mp_page = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL)
+               if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
                        return MDB_FAIL;
                npp.mp_pi = mpp->mp_pi + 1;
                si = 0;
@@ -2265,7 +2597,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
                 */
                DPRINTF("reading left neighbor");
                node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
-               if ((npp.mp_page = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL)
+               if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL)
                        return MDB_FAIL;
                npp.mp_pi = mpp->mp_pi - 1;
                si = NUMKEYS(npp.mp_page) - 1;
@@ -2291,9 +2623,37 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
        }
 }
 
+static int
+mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_node *leaf)
+{
+       int rc;
+
+       /* add overflow pages to free list */
+       if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
+               int i, ovpages;
+               pgno_t pg;
+
+               memcpy(&pg, NODEDATA(leaf), sizeof(pg));
+               ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_psize);
+               for (i=0; i<ovpages; i++) {
+                       DPRINTF("freed ov page %lu", pg);
+                       mdb_idl_insert(txn->mt_free_pgs, pg);
+                       pg++;
+               }
+       }
+       mdb_del_node(mpp->mp_page, ki);
+       txn->mt_dbs[dbi].md_entries--;
+       rc = mdb_rebalance(txn, dbi, mpp);
+       if (rc != MDB_SUCCESS)
+               txn->mt_flags |= MDB_TXN_ERROR;
+
+       return rc;
+}
+
 int
 mdb_del(MDB_txn *txn, MDB_dbi dbi,
-    MDB_val *key, MDB_val *data)
+    MDB_val *key, MDB_val *data,
+       unsigned int flags)
 {
        int              rc, exact;
        unsigned int     ki;
@@ -2315,36 +2675,74 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
                return EINVAL;
        }
 
+       mpp.mp_parent = NULL;
+       mpp.mp_pi = 0;
        if ((rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp)) != MDB_SUCCESS)
                return rc;
 
        leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
        if (leaf == NULL || !exact) {
-               return ENOENT;
+               return MDB_NOTFOUND;
        }
 
-       if (data && (rc = mdb_read_data(txn->mt_env, leaf, data)) != MDB_SUCCESS)
-               return rc;
+       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+               MDB_xcursor mx;
+               MDB_pageparent mp2;
 
-       mdb_del_node(mpp.mp_page, ki);
-       /* add overflow pages to free list */
-       if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
-               int i, ovpages;
-               pgno_t pg;
-
-               memcpy(&pg, NODEDATA(leaf), sizeof(pg));
-               ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_psize);
-               for (i=0; i<ovpages; i++) {
-                       mdb_idl_insert(txn->mt_free_pgs, pg);
-                       pg++;
+               mdb_xcursor_init0(txn, dbi, &mx);
+               mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+               if (flags == MDB_DEL_DUP) {
+                       rc = mdb_del(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, NULL, 0);
+                       mdb_xcursor_fini(txn, dbi, &mx);
+                       if (rc != MDB_SUCCESS)
+                               return rc;
+                       /* If sub-DB still has entries, we're done */
+                       if (mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root != P_INVALID) {
+                               memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
+                                       sizeof(MDB_db));
+                               return rc;
+                       }
+                       /* otherwise fall thru and delete the sub-DB */
+               } else {
+                       /* add all the child DB's pages to the free list */
+                       rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi,
+                               NULL, &mx.mx_cursor, 0, &mp2);
+                       if (rc == MDB_SUCCESS) {
+                               MDB_ppage *top, *parent;
+                               MDB_node *ni;
+                               unsigned int i;
+
+                               cursor_pop_page(&mx.mx_cursor);
+                               top = CURSOR_TOP(&mx.mx_cursor);
+                               if (top != NULL) {
+                                       parent = SLIST_NEXT(top, mp_entry);
+                                       while (parent != NULL) {
+                                               for (i=0; i<NUMKEYS(top->mp_page); i++) {
+                                                       ni = NODEPTR(top->mp_page, i);
+                                                       mdb_idl_insert(txn->mt_free_pgs, ni->mn_pgno);
+                                               }
+                                               if (parent) {
+                                                       parent->mp_ki++;
+                                                       if (parent->mp_ki >= NUMKEYS(parent->mp_page)) {
+                                                               cursor_pop_page(&mx.mx_cursor);
+                                                               top = CURSOR_TOP(&mx.mx_cursor);
+                                                               parent = SLIST_NEXT(top, mp_entry);
+                                                       } else {
+                                                               ni = NODEPTR(parent->mp_page, parent->mp_ki);
+                                                               top->mp_page = mdb_get_page(&mx.mx_txn, ni->mn_pgno);
+                                                       }
+                                               }
+                                       }
+                               }
+                               mdb_idl_insert(txn->mt_free_pgs, mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root);
+                       }
                }
        }
-       txn->mt_dbs[dbi].md_entries--;
-       rc = mdb_rebalance(txn, dbi, &mpp);
-       if (rc != MDB_SUCCESS)
-               txn->mt_flags |= MDB_TXN_ERROR;
 
-       return rc;
+       if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS)
+               return rc;
+
+       return mdb_del0(txn, dbi, ki, &mpp, leaf);
 }
 
 /* Split page <*mpp>, and insert <key,(data|newpgno)> in either left or
@@ -2512,6 +2910,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        unsigned int     ki;
        MDB_node        *leaf;
        MDB_pageparent  mpp;
+       MDB_val xdata, *rdata;
+       MDB_db dummy;
 
        assert(key != NULL);
        assert(data != NULL);
@@ -2523,10 +2923,6 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
                return EINVAL;
        }
 
-       if (txn->mt_env->me_txn != txn) {
-               return EINVAL;
-       }
-
        if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
                return EINVAL;
        }
@@ -2534,14 +2930,19 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        DPRINTF("==> put key %.*s, size %zu, data size %zu",
                (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size);
 
+       mpp.mp_parent = NULL;
+       mpp.mp_pi = 0;
        rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp);
        if (rc == MDB_SUCCESS) {
                leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
                if (leaf && exact) {
-                       if (F_ISSET(flags, MDB_NOOVERWRITE)) {
+                       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                               goto put_sub;
+                       }
+                       if (flags == MDB_NOOVERWRITE) {
                                DPRINTF("duplicate key %.*s",
                                    (int)key->mv_size, (char *)key->mv_data);
-                               return EEXIST;
+                               return MDB_KEYEXIST;
                        }
                        /* same size, just replace it */
                        if (NODEDSZ(leaf) == data->mv_size) {
@@ -2554,7 +2955,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
                        ki = NUMKEYS(mpp.mp_page);
                        DPRINTF("appending key at index %i", ki);
                }
-       } else if (rc == ENOENT) {
+       } else if (rc == MDB_NOTFOUND) {
                MDB_dpage *dp;
                /* new file, just write a root leaf page */
                DPRINTF("allocating new root leaf page");
@@ -2573,6 +2974,20 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
        DPRINTF("there are %u keys, should insert new key at index %i",
                NUMKEYS(mpp.mp_page), ki);
 
+       /* For sorted dups, the data item at this level is a DB record
+        * for a child DB; the actual data elements are stored as keys
+        * in the child DB.
+        */
+       if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+               rdata = &xdata;
+               xdata.mv_size = sizeof(MDB_db);
+               xdata.mv_data = &dummy;
+               memset(&dummy, 0, sizeof(dummy));
+               dummy.md_root = P_INVALID;
+       } else {
+               rdata = data;
+       }
+
        if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) {
                rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID);
        } else {
@@ -2582,8 +2997,30 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
 
        if (rc != MDB_SUCCESS)
                txn->mt_flags |= MDB_TXN_ERROR;
-       else
+       else {
                txn->mt_dbs[dbi].md_entries++;
+               /* Now store the actual data in the child DB. Note that we're
+                * storing the user data in the keys field, so there are strict
+                * size limits on dupdata. The actual data fields of the child
+                * DB are all zero size.
+                */
+               if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+                       MDB_xcursor mx;
+
+                       leaf = NODEPTR(mpp.mp_page, ki);
+put_sub:
+                       mdb_xcursor_init0(txn, dbi, &mx);
+                       mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+                       xdata.mv_size = 0;
+                       xdata.mv_data = "";
+                       if (flags == MDB_NODUPDATA)
+                               flags = MDB_NOOVERWRITE;
+                       rc = mdb_put(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags);
+                       mdb_xcursor_fini(txn, dbi, &mx);
+                       memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
+                               sizeof(MDB_db));
+               }
+       }
 
 done:
        return rc;
@@ -2657,7 +3094,7 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
        rc = mdb_get(txn, MAIN_DBI, &key, &data);
 
        /* Create if requested */
-       if (rc == ENOENT && (flags & MDB_CREATE)) {
+       if (rc == MDB_NOTFOUND && (flags & MDB_CREATE)) {
                MDB_db dummy;
                data.mv_size = sizeof(MDB_db);
                data.mv_data = &dummy;
@@ -2674,6 +3111,8 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
                txn->mt_dbxs[txn->mt_numdbs].md_cmp = NULL;
                txn->mt_dbxs[txn->mt_numdbs].md_dcmp = NULL;
                txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL;
+               txn->mt_dbxs[txn->mt_numdbs].md_parent = MAIN_DBI;
+               txn->mt_dbxs[txn->mt_numdbs].md_dirty = 0;
                memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db));
                *dbi = txn->mt_numdbs;
                txn->mt_numdbs++;