#define MDB_MINKEYS 4
#define MDB_MAGIC 0xBEEFC0DE
#define MDB_VERSION 1
-#define MAXKEYSIZE 255
+#define MAXKEYSIZE 511
#define P_INVALID (~0UL)
mp->mp_upper += sz;
}
+static void
+mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
+{
+ MDB_dbi dbn;
+
+ mx->mx_cursor.mc_txn = &mx->mx_txn;
+ mx->mx_txn = *txn;
+ mx->mx_txn.mt_dbxs = mx->mx_dbxs;
+ mx->mx_txn.mt_dbs = mx->mx_dbs;
+ mx->mx_dbxs[0] = txn->mt_dbxs[0];
+ mx->mx_dbxs[1] = txn->mt_dbxs[1];
+ if (dbi > 1) {
+ mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
+ dbn = 2;
+ } else {
+ dbn = 1;
+ }
+ mx->mx_dbxs[dbn+1].md_parent = dbn;
+ mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp;
+ mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel;
+ mx->mx_dbxs[dbn+1].md_dirty = 0;
+ mx->mx_txn.mt_numdbs = dbn+2;
+}
+
+static void
+mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_db *db)
+{
+ mx->mx_dbs[0] = txn->mt_dbs[0];
+ mx->mx_dbs[1] = txn->mt_dbs[1];
+ if (dbi > 1) {
+ mx->mx_dbs[2] = txn->mt_dbs[dbi];
+ mx->mx_dbs[3] = *db;
+ } else {
+ mx->mx_dbs[2] = *db;
+ }
+}
+
+static void
+mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
+{
+ txn->mt_dbs[0] = mx->mx_dbs[0];
+ txn->mt_dbs[1] = mx->mx_dbs[1];
+ txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty;
+ txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty;
+ if (dbi > 1) {
+ txn->mt_dbs[dbi] = mx->mx_dbs[2];
+ txn->mt_dbxs[2].md_dirty = mx->mx_dbxs[2].md_dirty;
+ }
+}
+
int
mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
{
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1);
cursor->mc_xcursor = mx;
- mx->mx_cursor.mc_txn = &mx->mx_txn;
- mx->mx_txn = *txn;
- mx->mx_txn.mt_dbxs = mx->mx_dbxs;
- mx->mx_txn.mt_dbs = mx->mx_dbs;
- mx->mx_dbxs[0] = txn->mt_dbxs[0];
- mx->mx_dbxs[1] = txn->mt_dbxs[1];
- if (dbi > 1) {
- mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
- mx->mx_txn.mt_numdbs = 4;
- } else {
- mx->mx_txn.mt_numdbs = 3;
- }
+ mdb_xcursor_init0(txn, dbi, mx);
}
} else {
return ENOMEM;
if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS)
return rc;
+ if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ /* add all the child DB's pages to the free list */
+ }
+
return mdb_del0(txn, dbi, ki, &mpp, leaf);
}
unsigned int ki;
MDB_node *leaf;
MDB_pageparent mpp;
+ MDB_val xdata, *rdata;
+ MDB_db dummy;
assert(key != NULL);
assert(data != NULL);
if (rc == MDB_SUCCESS) {
leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
if (leaf && exact) {
+ if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ goto put_sub;
+ }
if (F_ISSET(flags, MDB_NOOVERWRITE)) {
DPRINTF("duplicate key %.*s",
(int)key->mv_size, (char *)key->mv_data);
DPRINTF("there are %u keys, should insert new key at index %i",
NUMKEYS(mpp.mp_page), ki);
+ /* For sorted dups, the data item at this level is a DB record
+ * for a child DB; the actual data elements are stored as keys
+ * in the child DB.
+ */
+ if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ rdata = &xdata;
+ xdata.mv_size = sizeof(MDB_db);
+ xdata.mv_data = &dummy;
+ memset(&dummy, 0, sizeof(dummy));
+ dummy.md_root = P_INVALID;
+ } else {
+ rdata = data;
+ }
+
if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) {
rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID);
} else {
if (rc != MDB_SUCCESS)
txn->mt_flags |= MDB_TXN_ERROR;
- else
+ else {
txn->mt_dbs[dbi].md_entries++;
+ /* Now store the actual data in the child DB. Note that we're
+ * storing the user data in the keys field, so there are strict
+ * size limits on dupdata. The actual data fields of the child
+ * DB are all zero size.
+ */
+ if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ MDB_xcursor mx;
+
+ leaf = NODEPTR(mpp.mp_page, ki);
+put_sub:
+ mdb_xcursor_init0(txn, dbi, &mx);
+ mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
+ xdata.mv_size = 0;
+ xdata.mv_data = "";
+ rc = mdb_put(&mx.mx_txn, mx.mx_txn.mt_numdbs-1, data, &xdata, flags);
+ mdb_xcursor_fini(txn, dbi, &mx);
+ }
+ }
done:
return rc;