From d69d2ce2307e9d0e2167843935c386b05a869c31 Mon Sep 17 00:00:00 2001 From: Hallvard Furuseth Date: Wed, 11 Dec 2013 11:57:13 +0100 Subject: [PATCH] Raise safe max MDB_MAXKEYSIZE. Use a sub-DB for DUPSORT item #1/#2 per key if needed: Not a sub- page too big for a node, nor an overflow page (which not all DUPSORT code checks for). Move "insert" code, to avoid non-loop goto upwards. (This is the commit which needs the change to xdata.mv_size in commit 9d6e4a916367e85ffdf37b1cb7b5cdb7ac0e15b5 "page sizes".) --- libraries/liblmdb/mdb.c | 62 ++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index ecf29bfa46..6e0fbf0614 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -385,8 +385,7 @@ static txnid_t mdb_debug_start; /** @brief The maximum size of a key we can write to a database. * * We require that keys all fit onto a regular page. This limit - * could be raised a bit further if needed; to something just - * under (page size / #MDB_MINKEYS / 3). + * can be raised to page size / #MDB_MINKEYS - (64-bit ? 66 : 44). * * Note that data items in an #MDB_DUPSORT database are actually keys * of a subDB, so they're also limited to this size. @@ -5666,9 +5665,11 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, enum { MDB_NO_ROOT = MDB_LAST_ERRCODE+10 }; /* internal code */ MDB_env *env = mc->mc_txn->mt_env; MDB_node *leaf = NULL; - MDB_val xdata, *rdata, dkey; + MDB_page *fp, *mp; + uint16_t fp_flags; + MDB_val xdata, *rdata, dkey, olddata; MDB_db dummy; - int do_sub = 0, insert = 0; + int do_sub = 0, insert; unsigned int mcount = 0, dcount = 0, nospill; size_t nsize; int rc, rc2; @@ -5782,11 +5783,21 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, return rc2; } - /* The key already exists */ - if (rc == MDB_SUCCESS) { - MDB_page *fp, *mp; - MDB_val olddata; - + insert = rc; + if (insert) { + /* The key does not exist */ + DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top])); + if ((mc->mc_db->md_flags & MDB_DUPSORT) && + LEAFSIZE(key, data) > env->me_nodemax) + { + /* Too big for a node, insert in sub-DB */ + fp_flags = P_LEAF|P_DIRTY; + fp = env->me_pbuf; + fp->mp_pad = data->mv_size; /* used if MDB_DUPFIXED */ + fp->mp_lower = fp->mp_upper = olddata.mv_size = PAGEHDRSZ; + goto prep_subDB; + } + } else { /* there's only a key anyway, so this is a no-op */ if (IS_LEAF2(mc->mc_pg[mc->mc_top])) { unsigned int ksize = mc->mc_db->md_pad; @@ -5806,6 +5817,12 @@ more: /* DB has dups? */ if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) { + /* Prepare (sub-)page/sub-DB to accept the new item, + * if needed. fp: old sub-page or a header faking + * it. mp: new (sub-)page. offset: growth in page + * size. xdata: node data with new page or DB. + */ + unsigned i, offset = 0; mp = fp = xdata.mv_data = env->me_pbuf; mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; @@ -5851,16 +5868,13 @@ more: (dkey.mv_size & 1) + (data->mv_size & 1); } fp->mp_upper = xdata.mv_size; + olddata.mv_size = fp->mp_upper; /* pretend olddata is fp */ } else if (leaf->mn_flags & F_SUBDATA) { /* Data is on sub-DB, just store it */ flags |= F_DUPDATA|F_SUBDATA; goto put_sub; } else { - /* See if we need to convert from fake page to subDB */ - unsigned int offset; - unsigned int i; - uint16_t fp_flags; - + /* Data is on sub-page */ fp = olddata.mv_data; switch (flags) { default: @@ -5882,11 +5896,16 @@ more: flags |= F_DUPDATA; goto put_sub; } - fp_flags = fp->mp_flags; xdata.mv_size = olddata.mv_size + offset; - if (NODESIZE+NODEKSZ(leaf)+xdata.mv_size > env->me_nodemax) { - /* yes, convert it */ + } + + fp_flags = fp->mp_flags; + if (NODESIZE + NODEKSZ(leaf) + xdata.mv_size > env->me_nodemax) { + /* Too big for a sub-page, convert to sub-DB */ + fp_flags &= ~P_SUBP; +prep_subDB: if (mc->mc_db->md_flags & MDB_DUPFIXED) { + fp_flags |= P_LEAF2; dummy.md_pad = fp->mp_pad; dummy.md_flags = MDB_DUPFIXED; if (mc->mc_db->md_flags & MDB_INTEGERDUP) @@ -5907,13 +5926,13 @@ more: offset = env->me_psize - olddata.mv_size; flags |= F_DUPDATA|F_SUBDATA; dummy.md_root = mp->mp_pgno; - fp_flags &= ~P_SUBP; - } + } + if (mp != fp) { mp->mp_flags = fp_flags | P_DIRTY; mp->mp_pad = fp->mp_pad; mp->mp_lower = fp->mp_lower; mp->mp_upper = fp->mp_upper + offset; - if (IS_LEAF2(fp)) { + if (fp_flags & P_LEAF2) { memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad); } else { memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, @@ -6004,9 +6023,6 @@ current: } mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); mc->mc_db->md_entries--; - } else { - DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top])); - insert = 1; } rdata = data; -- 2.39.2