unsigned int mn_ksize:12; /* key size */
#define F_BIGDATA 0x01 /* data put on overflow page */
#define F_SUBDATA 0x02 /* data is a sub-database */
+#define F_DUPDATA 0x04 /* data has duplicates */
char mn_data[1];
} MDB_node;
leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL);
if (leaf && exact) {
/* Return first duplicate data item */
- if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
MDB_xcursor mx;
mdb_xcursor_init0(txn, dbi, &mx);
assert(cursor->mc_initialized);
+ top = CURSOR_TOP(cursor);
+ mp = top->mp_page;
+
if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- if (op == MDB_NEXT || op == MDB_NEXT_DUP) {
+ leaf = NODEPTR(mp, top->mp_ki);
+ if ((op == MDB_NEXT || op == MDB_NEXT_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT);
if (op != MDB_NEXT || rc == MDB_SUCCESS)
return rc;
}
}
- top = CURSOR_TOP(cursor);
- mp = top->mp_page;
-
DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
if (top->mp_ki + 1 >= NUMKEYS(mp)) {
assert(IS_LEAF(mp));
leaf = NODEPTR(mp, top->mp_ki);
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ }
if (data) {
if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
return rc;
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
if (rc != MDB_SUCCESS)
return rc;
assert(cursor->mc_initialized);
+ top = CURSOR_TOP(cursor);
+ mp = top->mp_page;
+
if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- if (op == MDB_PREV || op == MDB_PREV_DUP) {
+ leaf = NODEPTR(mp, top->mp_ki);
+ if ((op == MDB_PREV || op == MDB_PREV_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV);
if (op != MDB_PREV || rc == MDB_SUCCESS)
return rc;
}
}
- top = CURSOR_TOP(cursor);
- mp = top->mp_page;
-
DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
if (top->mp_ki == 0) {
assert(IS_LEAF(mp));
leaf = NODEPTR(mp, top->mp_ki);
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ }
if (data) {
if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
return rc;
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
if (rc != MDB_SUCCESS)
return rc;
cursor->mc_initialized = 1;
cursor->mc_eof = 0;
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ }
if (data) {
- MDB_val d2;
- if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS)
- return rc;
-
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
- mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
if (op == MDB_SET || op == MDB_SET_RANGE) {
rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
} else {
return rc;
}
} else {
- *data = d2;
+ if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+ return rc;
}
-
}
rc = mdb_set_key(leaf, key);
cursor->mc_eof = 0;
if (data) {
- if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
- return rc;
-
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
if (rc)
return rc;
+ } else {
+ if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+ return rc;
}
}
return mdb_set_key(leaf, key);
top->mp_ki = NUMKEYS(top->mp_page) - 1;
if (data) {
- if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
- return rc;
-
- if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
if (rc)
return rc;
+ } else {
+ if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
+ return rc;
}
}
int
mdb_cursor_count(MDB_cursor *mc, unsigned long *countp)
{
+ MDB_ppage *top;
+ MDB_node *leaf;
+
if (mc == NULL || countp == NULL)
return EINVAL;
if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT))
return EINVAL;
- if (!mc->mc_xcursor->mx_cursor.mc_initialized)
- return EINVAL;
+ top = CURSOR_TOP(mc);
+ leaf = NODEPTR(top->mp_page, top->mp_ki);
+ if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ *countp = 1;
+ } else {
+ if (!mc->mc_xcursor->mx_cursor.mc_initialized)
+ return EINVAL;
- *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
+ *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
+ }
return MDB_SUCCESS;
}
return MDB_NOTFOUND;
}
- if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
MDB_xcursor mx;
MDB_pageparent mp2;
unsigned int ki;
MDB_node *leaf;
MDB_pageparent mpp;
- MDB_val xdata, *rdata;
+ MDB_val xdata, *rdata, dkey;
MDB_db dummy;
+ char dbuf[PAGESIZE];
+ int do_sub = 0;
DPRINTF("==> put key %.*s, size %zu, data size %zu",
(int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size);
+ dkey.mv_size = 0;
mpp.mp_parent = NULL;
mpp.mp_pi = 0;
rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp);
if (rc == MDB_SUCCESS) {
leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
if (leaf && exact) {
- if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
- goto put_sub;
- }
if (flags == MDB_NOOVERWRITE) {
DPRINTF("duplicate key %.*s",
(int)key->mv_size, (char *)key->mv_data);
return MDB_KEYEXIST;
}
+ if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ /* Was a single item before, must convert now */
+ if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
+ dkey.mv_size = NODEDSZ(leaf);
+ memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
+ memset(&dummy, 0, sizeof(dummy));
+ dummy.md_root = P_INVALID;
+ if (dkey.mv_size == sizeof(MDB_db)) {
+ memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
+ goto put_sub;
+ }
+ mdb_del_node(mpp.mp_page, ki);
+ do_sub = 1;
+ rdata = &xdata;
+ xdata.mv_size = sizeof(MDB_db);
+ xdata.mv_data = &dummy;
+ goto new_sub;
+ }
+ goto put_sub;
+ }
/* same size, just replace it */
if (NODEDSZ(leaf) == data->mv_size) {
memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
DPRINTF("there are %u keys, should insert new key at index %i",
NUMKEYS(mpp.mp_page), ki);
- /* For sorted dups, the data item at this level is a DB record
- * for a child DB; the actual data elements are stored as keys
- * in the child DB.
- */
- if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
- rdata = &xdata;
- xdata.mv_size = sizeof(MDB_db);
- xdata.mv_data = &dummy;
- memset(&dummy, 0, sizeof(dummy));
- dummy.md_root = P_INVALID;
- } else {
- rdata = data;
- }
+ rdata = data;
+new_sub:
if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, rdata)) {
rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID);
} else {
* size limits on dupdata. The actual data fields of the child
* DB are all zero size.
*/
- if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
+ if (do_sub) {
MDB_xcursor mx;
leaf = NODEPTR(mpp.mp_page, ki);
xdata.mv_data = "";
if (flags == MDB_NODUPDATA)
flags = MDB_NOOVERWRITE;
+ /* converted, write the original data first */
+ if (dkey.mv_size) {
+ dkey.mv_data = dbuf;
+ rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, &dkey, &xdata, flags);
+ if (rc) return rc;
+ leaf->mn_flags |= F_DUPDATA;
+ }
rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags);
mdb_xcursor_fini(txn, dbi, &mx);
memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],