X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=libraries%2Flibmdb%2Fmdb.c;h=30037849858b05ad2b383f2ddaf2e8192228e7d7;hb=a1b4144b8079e0de8b99149a7b4b6a5be3123d54;hp=26712a8455f428d7553791f03005e68b2f178f81;hpb=88a5f35c433bd81658f5c5d72b7ab681fd7a4f67;p=openldap diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index 26712a8455..3003784985 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -29,7 +29,6 @@ */ #include #include -#include #include #include #include @@ -60,12 +59,15 @@ typedef ULONG pgno_t; #define DEBUG 1 #endif -#if DEBUG && defined(__GNUC__) -# define DPRINTF(fmt, ...) \ - fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, ##__VA_ARGS__) +#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__)) +# define DPRINTF (void) /* Vararg macros may be unsupported */ +#elif DEBUG +# define DPRINTF(fmt, ...) /* Requires 2 or more args */ \ + fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, __VA_ARGS__) #else -# define DPRINTF(...) ((void) 0) +# define DPRINTF(fmt, ...) ((void) 0) #endif +#define DPUTS(arg) DPRINTF("%s", arg) #define PAGESIZE 4096 #define MDB_MINKEYS 4 @@ -110,6 +112,7 @@ typedef struct MDB_txbody { pthread_mutex_t mtb_mutex; ULONG mtb_txnid; uint32_t mtb_numreaders; + uint32_t mtb_me_toggle; } MDB_txbody; typedef struct MDB_txninfo { @@ -120,6 +123,7 @@ typedef struct MDB_txninfo { #define mti_mutex mt1.mtb.mtb_mutex #define mti_txnid mt1.mtb.mtb_txnid #define mti_numreaders mt1.mtb.mtb_numreaders +#define mti_me_toggle mt1.mtb.mtb_me_toggle char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)]; } mt1; union { @@ -138,7 +142,7 @@ typedef struct MDB_page { /* represents a page of storage */ #define mp_pgno mp_p.p_pgno union padded { pgno_t p_pgno; /* page number */ - void * p_pad; + void * p_align; /* for IL32P64 */ } mp_p; #define P_BRANCH 0x01 /* branch page */ #define P_LEAF 0x02 /* leaf page */ @@ -203,7 +207,6 @@ typedef struct MDB_meta { /* meta (footer) page content */ } MDB_meta; typedef struct MDB_dhead { /* a dirty page */ - STAILQ_ENTRY(MDB_dpage) md_next; /* queue of dirty pages */ MDB_page *md_parent; unsigned md_pi; /* parent index */ int md_num; @@ -214,8 +217,6 @@ typedef struct MDB_dpage { MDB_page p; } MDB_dpage; -STAILQ_HEAD(dirty_queue, MDB_dpage); /* FIXME: use a sorted data structure */ - typedef struct MDB_oldpages { struct MDB_oldpages *mo_next; ULONG mo_txnid; @@ -232,25 +233,19 @@ static MDB_dpage *mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int pa static int mdb_touch(MDB_txn *txn, MDB_pageparent *mp); typedef struct MDB_ppage { /* ordered list of pages */ - SLIST_ENTRY(MDB_ppage) mp_entry; MDB_page *mp_page; unsigned int mp_ki; /* cursor index on page */ } MDB_ppage; -SLIST_HEAD(page_stack, MDB_ppage); -/* FIXME: tree depth is mostly bounded, we should just - * use a fixed array and avoid malloc/pointer chasing - */ -#define CURSOR_EMPTY(c) SLIST_EMPTY(&(c)->mc_stack) -#define CURSOR_TOP(c) SLIST_FIRST(&(c)->mc_stack) -#define CURSOR_POP(c) SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry) -#define CURSOR_PUSH(c,p) SLIST_INSERT_HEAD(&(c)->mc_stack, p, mp_entry) +#define CURSOR_TOP(c) (&(c)->mc_stack[(c)->mc_snum-1]) +#define CURSOR_PARENT(c) (&(c)->mc_stack[(c)->mc_snum-2]) struct MDB_xcursor; struct MDB_cursor { MDB_txn *mc_txn; - struct page_stack mc_stack; /* stack of parent pages */ + MDB_ppage mc_stack[32]; /* stack of parent pages */ + unsigned int mc_snum; /* number of pushed pages */ MDB_dbi mc_dbi; short mc_initialized; /* 1 if initialized */ short mc_eof; /* 1 if end is reached */ @@ -270,6 +265,7 @@ typedef struct MDB_node { unsigned int mn_ksize:12; /* key size */ #define F_BIGDATA 0x01 /* data put on overflow page */ #define F_SUBDATA 0x02 /* data is a sub-database */ +#define F_DUPDATA 0x04 /* data has duplicates */ char mn_data[1]; } MDB_node; @@ -289,7 +285,7 @@ struct MDB_txn { MDB_env *mt_env; pgno_t *mt_free_pgs; /* this is an IDL */ union { - struct dirty_queue *dirty_queue; /* modified pages */ + MIDL2 *dirty_list; /* modified pages */ MDB_reader *reader; } mt_u; MDB_dbx *mt_dbxs; /* array */ @@ -314,9 +310,9 @@ struct MDB_env { int me_fd; int me_lfd; int me_mfd; /* just for writing the meta pages */ - uint16_t me_flags; - uint16_t me_db_toggle; - unsigned int me_psize; +#define MDB_FATAL_ERROR 0x80000000U + uint32_t me_flags; + uint32_t me_extrapad; /* unused for now */ unsigned int me_maxreaders; unsigned int me_numdbs; unsigned int me_maxdbs; @@ -328,11 +324,15 @@ struct MDB_env { MDB_txn *me_txn; /* current write transaction */ size_t me_mapsize; off_t me_size; /* current file size */ + pgno_t me_maxpg; /* me_mapsize / me_psize */ + unsigned int me_psize; + unsigned int me_db_toggle; MDB_dbx *me_dbxs; /* array */ MDB_db *me_dbs[2]; MDB_oldpages *me_pghead; pthread_key_t me_txkey; /* thread-key for readers */ pgno_t me_free_pgs[MDB_IDL_UM_SIZE]; + MIDL2 me_dirty_list[MDB_IDL_DB_SIZE]; }; #define NODESIZE offsetof(MDB_node, mn_data) @@ -457,23 +457,50 @@ mdb_version(int *maj, int *min, int *pat) return MDB_VERSION_STRING; } -int -mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) +static const char *errstr[] = { + "MDB_KEYEXIST: Key/data pair already exists", + "MDB_NOTFOUND: No matching key/data pair found", + "MDB_PAGE_NOTFOUND: Requested page not found", + "MDB_CORRUPTED: Located page was wrong type", + "MDB_PANIC: Update of meta page failed", + "MDB_VERSION_MISMATCH: Database environment version mismatch" +}; + +char * +mdb_strerror(int err) { - return txn->mt_dbxs[dbi].md_cmp(a, b); + if (!err) + return ("Successful return: 0"); + + if (err >= MDB_KEYEXIST && err <= MDB_VERSION_MISMATCH) + return (char *)errstr[err - MDB_KEYEXIST]; + + return strerror(err); } -static int -_mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *key1, const MDB_val *key2) +int +mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) { + if (txn->mt_dbxs[dbi].md_cmp) + return txn->mt_dbxs[dbi].md_cmp(a, b); + if (txn->mt_dbs[dbi].md_flags & (MDB_REVERSEKEY #if __BYTE_ORDER == __LITTLE_ENDIAN |MDB_INTEGERKEY #endif )) - return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size); + return memnrcmp(a->mv_data, a->mv_size, b->mv_data, b->mv_size); else - return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size); + return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size); +} + +int +mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) +{ + if (txn->mt_dbxs[dbi].md_dcmp) + return txn->mt_dbxs[dbi].md_dcmp(a, b); + + return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size); } /* Allocate new page(s) for writing */ @@ -483,6 +510,7 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) MDB_dpage *dp; pgno_t pgno = P_INVALID; ULONG oldest; + MIDL2 mid; if (txn->mt_txnid > 2) { @@ -569,18 +597,25 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) } } + if (pgno == P_INVALID) { + /* DB size is maxed out */ + if (txn->mt_next_pgno + num >= txn->mt_env->me_maxpg) + return NULL; + } if ((dp = malloc(txn->mt_env->me_psize * num + sizeof(MDB_dhead))) == NULL) return NULL; dp->h.md_num = num; dp->h.md_parent = parent; dp->h.md_pi = parent_idx; - STAILQ_INSERT_TAIL(txn->mt_u.dirty_queue, dp, h.md_next); if (pgno == P_INVALID) { dp->p.mp_pgno = txn->mt_next_pgno; txn->mt_next_pgno += num; } else { dp->p.mp_pgno = pgno; } + mid.mid = dp->p.mp_pgno; + mid.mptr = dp; + mdb_midl2_insert(txn->mt_u.dirty_list, &mid); return dp; } @@ -632,6 +667,10 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) MDB_txn *txn; int rc, toggle; + if (env->me_flags & MDB_FATAL_ERROR) { + DPUTS("mdb_txn_begin: environment had fatal error, must shutdown!"); + return MDB_PANIC; + } if ((txn = calloc(1, sizeof(MDB_txn))) == NULL) { DPRINTF("calloc: %s", strerror(errno)); return ENOMEM; @@ -640,17 +679,13 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) if (rdonly) { txn->mt_flags |= MDB_TXN_RDONLY; } else { - txn->mt_u.dirty_queue = calloc(1, sizeof(*txn->mt_u.dirty_queue)); - if (txn->mt_u.dirty_queue == NULL) { - free(txn); - return ENOMEM; - } - STAILQ_INIT(txn->mt_u.dirty_queue); + txn->mt_u.dirty_list = env->me_dirty_list; + txn->mt_u.dirty_list[0].mid = 0; + txn->mt_free_pgs = env->me_free_pgs; + txn->mt_free_pgs[0] = 0; pthread_mutex_lock(&env->me_txns->mti_wmutex); env->me_txns->mti_txnid++; - txn->mt_free_pgs = env->me_free_pgs; - txn->mt_free_pgs[0] = 0; } txn->mt_txnid = env->me_txns->mti_txnid; @@ -682,6 +717,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) txn->mt_env = env; + toggle = env->me_txns->mti_me_toggle; if ((rc = mdb_env_read_meta(env, &toggle)) != MDB_SUCCESS) { mdb_txn_abort(txn); return rc; @@ -712,7 +748,6 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) void mdb_txn_abort(MDB_txn *txn) { - MDB_dpage *dp; MDB_env *env; if (txn == NULL) @@ -731,12 +766,8 @@ mdb_txn_abort(MDB_txn *txn) unsigned int i; /* Discard all dirty pages. */ - while (!STAILQ_EMPTY(txn->mt_u.dirty_queue)) { - dp = STAILQ_FIRST(txn->mt_u.dirty_queue); - STAILQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); - free(dp); - } - free(txn->mt_u.dirty_queue); + for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) + free(txn->mt_u.dirty_list[i].mptr); while ((mop = txn->mt_env->me_pghead)) { txn->mt_env->me_pghead = mop->mo_next; @@ -771,24 +802,23 @@ mdb_txn_commit(MDB_txn *txn) env = txn->mt_env; if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { - DPRINTF("attempt to commit read-only transaction"); mdb_txn_abort(txn); - return EPERM; + return MDB_SUCCESS; } if (txn != env->me_txn) { - DPRINTF("attempt to commit unknown transaction"); + DPUTS("attempt to commit unknown transaction"); mdb_txn_abort(txn); return EINVAL; } if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) { - DPRINTF("error flag is set, can't commit"); + DPUTS("error flag is set, can't commit"); mdb_txn_abort(txn); return EINVAL; } - if (STAILQ_EMPTY(txn->mt_u.dirty_queue)) + if (!txn->mt_u.dirty_list[0].mid) goto done; DPRINTF("committing transaction %lu on mdbenv %p, root page %lu", @@ -857,11 +887,13 @@ mdb_txn_commit(MDB_txn *txn) /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done. */ next = 0; + i = 1; do { n = 0; done = 1; size = 0; - STAILQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) { + for (; i<=txn->mt_u.dirty_list[0].mid; i++) { + dp = txn->mt_u.dirty_list[i].mptr; if (dp->p.mp_pgno != next) { if (n) { DPRINTF("committing %u dirty pages", n); @@ -869,7 +901,7 @@ mdb_txn_commit(MDB_txn *txn) if (rc != size) { n = errno; if (rc > 0) - DPRINTF("short write, filesystem full?"); + DPUTS("short write, filesystem full?"); else DPRINTF("writev: %s", strerror(errno)); mdb_txn_abort(txn); @@ -902,7 +934,7 @@ mdb_txn_commit(MDB_txn *txn) if (rc != size) { n = errno; if (rc > 0) - DPRINTF("short write, filesystem full?"); + DPUTS("short write, filesystem full?"); else DPRINTF("writev: %s", strerror(errno)); mdb_txn_abort(txn); @@ -913,19 +945,17 @@ mdb_txn_commit(MDB_txn *txn) /* Drop the dirty pages. */ - while (!STAILQ_EMPTY(txn->mt_u.dirty_queue)) { - dp = STAILQ_FIRST(txn->mt_u.dirty_queue); - STAILQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); - free(dp); - } + for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) + free(txn->mt_u.dirty_list[i].mptr); if ((n = mdb_env_sync(env, 0)) != 0 || (n = mdb_env_write_meta(txn)) != MDB_SUCCESS) { mdb_txn_abort(txn); return n; } - env->me_txn = NULL; +done: + env->me_txn = NULL; /* update the DB tables */ { int toggle = !env->me_db_toggle; @@ -948,12 +978,7 @@ mdb_txn_commit(MDB_txn *txn) } pthread_mutex_unlock(&env->me_txns->mti_wmutex); - free(txn->mt_u.dirty_queue); free(txn); - txn = NULL; - -done: - mdb_txn_abort(txn); return MDB_SUCCESS; } @@ -989,7 +1014,7 @@ mdb_env_read_header(MDB_env *env, MDB_meta *meta) m = METADATA(p); if (m->mm_magic != MDB_MAGIC) { - DPRINTF("meta has invalid magic"); + DPUTS("meta has invalid magic"); return EINVAL; } @@ -1011,7 +1036,7 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta) int rc; unsigned int psize; - DPRINTF("writing new meta page"); + DPUTS("writing new meta page"); psize = sysconf(_SC_PAGE_SIZE); meta->mm_magic = MDB_MAGIC; @@ -1047,19 +1072,23 @@ static int mdb_env_write_meta(MDB_txn *txn) { MDB_env *env; - MDB_meta meta; + MDB_meta meta, metab; off_t off; - int rc, len; + int rc, len, toggle; char *ptr; assert(txn != NULL); assert(txn->mt_env != NULL); + toggle = !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE); DPRINTF("writing meta page %d for root page %lu", - !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE), txn->mt_dbs[MAIN_DBI].md_root); + toggle, txn->mt_dbs[MAIN_DBI].md_root); env = txn->mt_env; + metab.mm_txnid = env->me_metas[toggle]->mm_txnid; + metab.mm_last_pg = env->me_metas[toggle]->mm_last_pg; + ptr = (char *)&meta; off = offsetof(MDB_meta, mm_dbs[0].md_depth); len = sizeof(MDB_meta) - off; @@ -1070,15 +1099,25 @@ mdb_env_write_meta(MDB_txn *txn) meta.mm_last_pg = txn->mt_next_pgno - 1; meta.mm_txnid = txn->mt_txnid; - if (!F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE)) + if (toggle) off += env->me_psize; off += PAGEHDRSZ; - rc = pwrite(env->me_fd, ptr, len, off); + /* Write to the SYNC fd */ + rc = pwrite(env->me_mfd, ptr, len, off); if (rc != len) { - DPRINTF("write failed, disk error?"); + DPUTS("write failed, disk error?"); + /* On a failure, the pagecache still contains the new data. + * Write some old data back, to prevent it from being used. + * Use the non-SYNC fd; we know it will fail anyway. + */ + meta.mm_last_pg = metab.mm_last_pg; + meta.mm_txnid = metab.mm_txnid; + rc = pwrite(env->me_fd, ptr, len, off); + env->me_flags |= MDB_FATAL_ERROR; return errno; } + txn->mt_env->me_txns->mti_me_toggle = toggle; return MDB_SUCCESS; } @@ -1090,13 +1129,13 @@ mdb_env_read_meta(MDB_env *env, int *which) assert(env != NULL); - if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid) + if (which) + toggle = *which; + else if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid) toggle = 1; if (env->me_meta != env->me_metas[toggle]) env->me_meta = env->me_metas[toggle]; - if (which) - *which = toggle; DPRINTF("Using meta page %d", toggle); @@ -1166,7 +1205,7 @@ mdb_env_open2(MDB_env *env, unsigned int flags) if ((i = mdb_env_read_header(env, &meta)) != 0) { if (i != ENOENT) return i; - DPRINTF("new mdbenv"); + DPUTS("new mdbenv"); newenv = 1; } @@ -1194,7 +1233,9 @@ mdb_env_open2(MDB_env *env, unsigned int flags) } env->me_psize = meta.mm_psize; - p = (MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)env->me_map; + env->me_maxpg = env->me_mapsize / env->me_psize; + + p = (MDB_page *)env->me_map; env->me_metas[0] = METADATA(p); env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize); @@ -1230,6 +1271,8 @@ mdb_env_share_locks(MDB_env *env) struct flock lock_info; env->me_txns->mti_txnid = env->me_meta->mm_txnid; + if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid) + env->me_txns->mti_me_toggle = 1; memset((void *)&lock_info, 0, sizeof(lock_info)); lock_info.l_type = F_RDLCK; @@ -1303,10 +1346,11 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) env->me_txns->mti_magic = MDB_MAGIC; env->me_txns->mti_txnid = 0; env->me_txns->mti_numreaders = 0; + env->me_txns->mti_me_toggle = 0; } else { if (env->me_txns->mti_magic != MDB_MAGIC) { - DPRINTF("lock region has invalid magic"); + DPUTS("lock region has invalid magic"); rc = EINVAL; goto fail; } @@ -1461,10 +1505,7 @@ mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key, nodekey.mv_size = node->mn_ksize; nodekey.mv_data = NODEKEY(node); - if (txn->mt_dbxs[dbi].md_cmp) - rc = txn->mt_dbxs[dbi].md_cmp(key, &nodekey); - else - rc = _mdb_cmp(txn, dbi, key, &nodekey); + rc = mdb_cmp(txn, dbi, key, &nodekey); if (IS_LEAF(mp)) DPRINTF("found leaf index %u [%.*s], rc = %i", @@ -1501,12 +1542,12 @@ cursor_pop_page(MDB_cursor *cursor) { MDB_ppage *top; - top = CURSOR_TOP(cursor); - CURSOR_POP(cursor); - - DPRINTF("popped page %lu off cursor %p", top->mp_page->mp_pgno, (void *) cursor); + if (cursor->mc_snum) { + top = CURSOR_TOP(cursor); + cursor->mc_snum--; - free(top); + DPRINTF("popped page %lu off cursor %p", top->mp_page->mp_pgno, (void *) cursor); + } } static MDB_ppage * @@ -1516,10 +1557,9 @@ cursor_push_page(MDB_cursor *cursor, MDB_page *mp) DPRINTF("pushing page %lu on cursor %p", mp->mp_pgno, (void *) cursor); - if ((ppage = calloc(1, sizeof(MDB_ppage))) == NULL) - return NULL; + ppage = &cursor->mc_stack[cursor->mc_snum++]; ppage->mp_page = mp; - CURSOR_PUSH(cursor, ppage); + ppage->mp_ki = 0; return ppage; } @@ -1529,14 +1569,16 @@ mdb_get_page(MDB_txn *txn, pgno_t pgno) MDB_page *p = NULL; int found = 0; - if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && !STAILQ_EMPTY(txn->mt_u.dirty_queue)) { + if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && txn->mt_u.dirty_list[0].mid) { MDB_dpage *dp; - STAILQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) { - if (dp->p.mp_pgno == pgno) { - p = &dp->p; - found = 1; - break; - } + MIDL2 id; + unsigned x; + id.mid = pgno; + x = mdb_midl2_search(txn->mt_u.dirty_list, &id); + if (x <= txn->mt_u.dirty_list[0].mid && txn->mt_u.dirty_list[x].mid == pgno) { + dp = txn->mt_u.dirty_list[x].mptr; + p = &dp->p; + found = 1; } } if (!found) { @@ -1555,7 +1597,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, int rc; if (cursor && cursor_push_page(cursor, mp) == NULL) - return MDB_FAIL; + return ENOMEM; while (IS_BRANCH(mp)) { unsigned int i = 0; @@ -1592,12 +1634,12 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, mpp->mp_parent = mp; if ((mp = mdb_get_page(txn, NODEPGNO(node))) == NULL) - return MDB_FAIL; + return MDB_PAGE_NOTFOUND; mpp->mp_pi = i; mpp->mp_page = mp; if (cursor && cursor_push_page(cursor, mp) == NULL) - return MDB_FAIL; + return ENOMEM; if (modify) { MDB_dhead *dh = ((MDB_dhead *)mp)-1; @@ -1614,7 +1656,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, if (!IS_LEAF(mp)) { DPRINTF("internal error, index points to a %02X page!?", mp->mp_flags); - return MDB_FAIL; + return MDB_CORRUPTED; } DPRINTF("found leaf page %lu for key %.*s", mp->mp_pgno, @@ -1641,18 +1683,18 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, * committed root page. */ if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) { - DPRINTF("transaction has failed, must abort"); + DPUTS("transaction has failed, must abort"); return EINVAL; } else root = txn->mt_dbs[dbi].md_root; if (root == P_INVALID) { /* Tree is empty. */ - DPRINTF("tree is empty"); + DPUTS("tree is empty"); return MDB_NOTFOUND; } if ((mpp->mp_page = mdb_get_page(txn, root)) == NULL) - return MDB_FAIL; + return MDB_PAGE_NOTFOUND; DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags); @@ -1696,9 +1738,9 @@ mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data) memcpy(&pgno, NODEDATA(leaf), sizeof(pgno)); if ((omp = mdb_get_page(txn, pgno)) == NULL) { DPRINTF("read overflow page %lu failed", pgno); - return MDB_FAIL; + return MDB_PAGE_NOTFOUND; } - data->mv_data = omp; + data->mv_data = METADATA(omp); return MDB_SUCCESS; } @@ -1728,7 +1770,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL); if (leaf && exact) { /* Return first duplicate data item */ - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { MDB_xcursor mx; mdb_xcursor_init0(txn, dbi, &mx); @@ -1751,13 +1793,13 @@ mdb_sibling(MDB_cursor *cursor, int move_right) { int rc; MDB_node *indx; - MDB_ppage *parent, *top; + MDB_ppage *parent; MDB_page *mp; - top = CURSOR_TOP(cursor); - if ((parent = SLIST_NEXT(top, mp_entry)) == NULL) { + if (cursor->mc_snum < 2) { return MDB_NOTFOUND; /* root has no siblings */ } + parent = CURSOR_PARENT(cursor); DPRINTF("parent page is page %lu, index %u", parent->mp_page->mp_pgno, parent->mp_ki); @@ -1782,7 +1824,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right) indx = NODEPTR(parent->mp_page, parent->mp_ki); if ((mp = mdb_get_page(cursor->mc_txn, indx->mn_pgno)) == NULL) - return MDB_FAIL; + return MDB_PAGE_NOTFOUND; #if 0 mp->parent = parent->mp_page; mp->parent_index = parent->mp_ki; @@ -1819,21 +1861,22 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o assert(cursor->mc_initialized); + top = CURSOR_TOP(cursor); + mp = top->mp_page; + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - if (op == MDB_NEXT || op == MDB_NEXT_DUP) { + leaf = NODEPTR(mp, top->mp_ki); + if ((op == MDB_NEXT || op == MDB_NEXT_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT); if (op != MDB_NEXT || rc == MDB_SUCCESS) return rc; } } - top = CURSOR_TOP(cursor); - mp = top->mp_page; - DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor); if (top->mp_ki + 1 >= NUMKEYS(mp)) { - DPRINTF("=====> move to next sibling page"); + DPUTS("=====> move to next sibling page"); if (mdb_sibling(cursor, 1) != MDB_SUCCESS) { cursor->mc_eof = 1; return MDB_NOTFOUND; @@ -1850,12 +1893,14 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o assert(IS_LEAF(mp)); leaf = NODEPTR(mp, top->mp_ki); + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + } if (data) { if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS)) return rc; - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); if (rc != MDB_SUCCESS) return rc; @@ -1875,21 +1920,22 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o assert(cursor->mc_initialized); + top = CURSOR_TOP(cursor); + mp = top->mp_page; + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - if (op == MDB_PREV || op == MDB_PREV_DUP) { + leaf = NODEPTR(mp, top->mp_ki); + if ((op == MDB_PREV || op == MDB_PREV_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV); if (op != MDB_PREV || rc == MDB_SUCCESS) return rc; } } - top = CURSOR_TOP(cursor); - mp = top->mp_page; - DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor); if (top->mp_ki == 0) { - DPRINTF("=====> move to prev sibling page"); + DPUTS("=====> move to prev sibling page"); if (mdb_sibling(cursor, 0) != MDB_SUCCESS) { return MDB_NOTFOUND; } @@ -1908,12 +1954,14 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o assert(IS_LEAF(mp)); leaf = NODEPTR(mp, top->mp_ki); + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + } if (data) { if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS)) return rc; - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL); if (rc != MDB_SUCCESS) return rc; @@ -1936,8 +1984,7 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, assert(key); assert(key->mv_size > 0); - while (CURSOR_TOP(cursor) != NULL) - cursor_pop_page(cursor); + cursor->mc_snum = 0; rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp); if (rc != MDB_SUCCESS) @@ -1952,7 +1999,7 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, } if (leaf == NULL) { - DPRINTF("===> inexact leaf not found, goto sibling"); + DPUTS("===> inexact leaf not found, goto sibling"); if ((rc = mdb_sibling(cursor, 1)) != MDB_SUCCESS) return rc; /* no entries matched */ top = CURSOR_TOP(cursor); @@ -1965,12 +2012,11 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, cursor->mc_initialized = 1; cursor->mc_eof = 0; + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + } if (data) { - if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) - return rc; - - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (op == MDB_SET || op == MDB_SET_RANGE) { rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); } else { @@ -1987,6 +2033,19 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, if (rc != MDB_SUCCESS) return rc; } + } else if (op == MDB_GET_BOTH || op == MDB_GET_BOTH_RANGE) { + MDB_val d2; + if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS) + return rc; + rc = mdb_dcmp(cursor->mc_txn, cursor->mc_dbi, data, &d2); + if (rc) { + if (op == MDB_GET_BOTH || rc > 0) + return MDB_NOTFOUND; + } + + } else { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; } } @@ -2007,8 +2066,7 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) MDB_pageparent mpp; MDB_node *leaf; - while (CURSOR_TOP(cursor) != NULL) - cursor_pop_page(cursor); + cursor->mc_snum = 0; rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, NULL, cursor, 0, &mpp); if (rc != MDB_SUCCESS) @@ -2020,14 +2078,14 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) cursor->mc_eof = 0; if (data) { - if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) - return rc; - - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); if (rc) return rc; + } else { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; } } return mdb_set_key(leaf, key); @@ -2042,8 +2100,7 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data) MDB_node *leaf; MDB_val lkey; - while (CURSOR_TOP(cursor) != NULL) - cursor_pop_page(cursor); + cursor->mc_snum = 0; lkey.mv_size = MAXKEYSIZE+1; lkey.mv_data = NULL; @@ -2061,14 +2118,14 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data) top->mp_ki = NUMKEYS(top->mp_page) - 1; if (data) { - if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) - return rc; - - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL); if (rc) return rc; + } else { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; } } @@ -2087,7 +2144,7 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data, switch (op) { case MDB_GET_BOTH: case MDB_GET_BOTH_RANGE: - if (data == NULL) { + if (data == NULL || cursor->mc_xcursor == NULL) { rc = EINVAL; break; } @@ -2096,7 +2153,7 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data, case MDB_SET_RANGE: if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { rc = EINVAL; - } else if (op != MDB_SET_RANGE) + } else if (op == MDB_SET_RANGE) rc = mdb_cursor_set(cursor, key, data, op, NULL); else rc = mdb_cursor_set(cursor, key, data, op, &exact); @@ -2220,7 +2277,7 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, data->mv_size); node_size += sizeof(pgno_t); if ((ofp = mdb_new_page(txn, dbi, P_OVERFLOW, ovpages)) == NULL) - return MDB_FAIL; + return ENOMEM; DPRINTF("allocated overflow page %lu", ofp->p.mp_pgno); flags |= F_BIGDATA; } else { @@ -2340,7 +2397,7 @@ mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx) mx->mx_dbxs[dbn+1].md_dirty = 0; mx->mx_txn.mt_numdbs = dbn+2; - SLIST_INIT(&mx->mx_cursor.mc_stack); + mx->mx_cursor.mc_snum = 0; mx->mx_cursor.mc_txn = &mx->mx_txn; mx->mx_cursor.mc_dbi = dbn+1; } @@ -2395,7 +2452,6 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) size += sizeof(MDB_xcursor); if ((cursor = calloc(1, size)) != NULL) { - SLIST_INIT(&cursor->mc_stack); cursor->mc_dbi = dbi; cursor->mc_txn = txn; if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) { @@ -2416,16 +2472,25 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) int mdb_cursor_count(MDB_cursor *mc, unsigned long *countp) { + MDB_ppage *top; + MDB_node *leaf; + if (mc == NULL || countp == NULL) return EINVAL; if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT)) return EINVAL; - if (!mc->mc_xcursor->mx_cursor.mc_initialized) - return EINVAL; + top = CURSOR_TOP(mc); + leaf = NODEPTR(top->mp_page, top->mp_ki); + if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { + *countp = 1; + } else { + if (!mc->mc_xcursor->mx_cursor.mc_initialized) + return EINVAL; - *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries; + *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries; + } return MDB_SUCCESS; } @@ -2433,14 +2498,6 @@ void mdb_cursor_close(MDB_cursor *cursor) { if (cursor != NULL) { - while(!CURSOR_EMPTY(cursor)) - cursor_pop_page(cursor); - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - mdb_xcursor_fini(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor); - while(!CURSOR_EMPTY(&cursor->mc_xcursor->mx_cursor)) - cursor_pop_page(&cursor->mc_xcursor->mx_cursor); - } - free(cursor); } } @@ -2647,19 +2704,19 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) if (mpp->mp_parent == NULL) { if (NUMKEYS(mpp->mp_page) == 0) { - DPRINTF("tree is completely empty"); + DPUTS("tree is completely empty"); txn->mt_dbs[dbi].md_root = P_INVALID; txn->mt_dbs[dbi].md_depth--; txn->mt_dbs[dbi].md_leaf_pages--; } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) { - DPRINTF("collapsing root page!"); + DPUTS("collapsing root page!"); txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0)); if ((root = mdb_get_page(txn, txn->mt_dbs[dbi].md_root)) == NULL) - return MDB_FAIL; + return MDB_PAGE_NOTFOUND; txn->mt_dbs[dbi].md_depth--; txn->mt_dbs[dbi].md_branch_pages--; } else - DPRINTF("root page doesn't need rebalancing"); + DPUTS("root page doesn't need rebalancing"); return MDB_SUCCESS; } @@ -2678,20 +2735,20 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) if (mpp->mp_pi == 0) { /* We're the leftmost leaf in our parent. */ - DPRINTF("reading right neighbor"); + DPUTS("reading right neighbor"); node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1); if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL) - return MDB_FAIL; + return MDB_PAGE_NOTFOUND; npp.mp_pi = mpp->mp_pi + 1; si = 0; di = NUMKEYS(mpp->mp_page); } else { /* There is at least one neighbor to the left. */ - DPRINTF("reading left neighbor"); + DPUTS("reading left neighbor"); node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1); if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL) - return MDB_FAIL; + return MDB_PAGE_NOTFOUND; npp.mp_pi = mpp->mp_pi - 1; si = NUMKEYS(npp.mp_page) - 1; di = 0; @@ -2778,7 +2835,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, return MDB_NOTFOUND; } - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { MDB_xcursor mx; MDB_pageparent mp2; @@ -2793,6 +2850,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, if (mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root != P_INVALID) { memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi], sizeof(MDB_db)); + txn->mt_dbs[dbi].md_entries--; return rc; } /* otherwise fall thru and delete the sub-DB */ @@ -2806,24 +2864,21 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, unsigned int i; cursor_pop_page(&mx.mx_cursor); - top = CURSOR_TOP(&mx.mx_cursor); - if (top != NULL) { - parent = SLIST_NEXT(top, mp_entry); - while (parent != NULL) { + if (mx.mx_cursor.mc_snum) { + top = CURSOR_TOP(&mx.mx_cursor); + while (mx.mx_cursor.mc_snum > 1) { + parent = CURSOR_PARENT(&mx.mx_cursor); for (i=0; imp_page); i++) { ni = NODEPTR(top->mp_page, i); mdb_midl_insert(txn->mt_free_pgs, ni->mn_pgno); } - if (parent) { - parent->mp_ki++; - if (parent->mp_ki >= NUMKEYS(parent->mp_page)) { - cursor_pop_page(&mx.mx_cursor); - top = CURSOR_TOP(&mx.mx_cursor); - parent = SLIST_NEXT(top, mp_entry); - } else { - ni = NODEPTR(parent->mp_page, parent->mp_ki); - top->mp_page = mdb_get_page(&mx.mx_txn, ni->mn_pgno); - } + parent->mp_ki++; + if (parent->mp_ki >= NUMKEYS(parent->mp_page)) { + cursor_pop_page(&mx.mx_cursor); + top = parent; + } else { + ni = NODEPTR(parent->mp_page, parent->mp_ki); + top->mp_page = mdb_get_page(&mx.mx_txn, ni->mn_pgno); } } } @@ -2870,7 +2925,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, if (mdp->h.md_parent == NULL) { if ((pdp = mdb_new_page(txn, dbi, P_BRANCH, 1)) == NULL) - return MDB_FAIL; + return ENOMEM; mdp->h.md_pi = 0; mdp->h.md_parent = &pdp->p; txn->mt_dbs[dbi].md_root = pdp->p.mp_pgno; @@ -2878,23 +2933,23 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, txn->mt_dbs[dbi].md_depth++; /* Add left (implicit) pointer. */ - if (mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL, - mdp->p.mp_pgno, 0) != MDB_SUCCESS) - return MDB_FAIL; + if ((rc = mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL, + mdp->p.mp_pgno, 0)) != MDB_SUCCESS) + return rc; } else { DPRINTF("parent branch page is %lu", mdp->h.md_parent->mp_pgno); } /* Create a right sibling. */ if ((rdp = mdb_new_page(txn, dbi, mdp->p.mp_flags, 1)) == NULL) - return MDB_FAIL; + return ENOMEM; rdp->h.md_parent = mdp->h.md_parent; rdp->h.md_pi = mdp->h.md_pi + 1; DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno); /* Move half of the keys to the right sibling. */ if ((copy = malloc(txn->mt_env->me_psize)) == NULL) - return MDB_FAIL; + return ENOMEM; memcpy(copy, &mdp->p, txn->mt_env->me_psize); memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_psize - PAGEHDRSZ); mdp->p.mp_lower = PAGEHDRSZ; @@ -2936,7 +2991,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, } if (rc != MDB_SUCCESS) { free(copy); - return MDB_FAIL; + return rc; } for (i = j = 0; i <= NUMKEYS(copy); j++) { @@ -3003,26 +3058,46 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki; MDB_node *leaf; MDB_pageparent mpp; - MDB_val xdata, *rdata; + MDB_val xdata, *rdata, dkey; MDB_db dummy; + char dbuf[PAGESIZE]; + int do_sub = 0; DPRINTF("==> put key %.*s, size %zu, data size %zu", (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size); + dkey.mv_size = 0; mpp.mp_parent = NULL; mpp.mp_pi = 0; rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp); if (rc == MDB_SUCCESS) { leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki); if (leaf && exact) { - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { - goto put_sub; - } if (flags == MDB_NOOVERWRITE) { DPRINTF("duplicate key %.*s", (int)key->mv_size, (char *)key->mv_data); return MDB_KEYEXIST; } + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + /* Was a single item before, must convert now */ + if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { + dkey.mv_size = NODEDSZ(leaf); + memcpy(dbuf, NODEDATA(leaf), dkey.mv_size); + memset(&dummy, 0, sizeof(dummy)); + dummy.md_root = P_INVALID; + if (dkey.mv_size == sizeof(MDB_db)) { + memcpy(NODEDATA(leaf), &dummy, sizeof(dummy)); + goto put_sub; + } + mdb_del_node(mpp.mp_page, ki); + do_sub = 1; + rdata = &xdata; + xdata.mv_size = sizeof(MDB_db); + xdata.mv_data = &dummy; + goto new_sub; + } + goto put_sub; + } /* same size, just replace it */ if (NODEDSZ(leaf) == data->mv_size) { memcpy(NODEDATA(leaf), data->mv_data, data->mv_size); @@ -3037,13 +3112,14 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, } else if (rc == MDB_NOTFOUND) { MDB_dpage *dp; /* new file, just write a root leaf page */ - DPRINTF("allocating new root leaf page"); + DPUTS("allocating new root leaf page"); if ((dp = mdb_new_page(txn, dbi, P_LEAF, 1)) == NULL) { return ENOMEM; } mpp.mp_page = &dp->p; txn->mt_dbs[dbi].md_root = mpp.mp_page->mp_pgno; txn->mt_dbs[dbi].md_depth++; + txn->mt_dbxs[dbi].md_dirty = 1; ki = 0; } else @@ -3053,20 +3129,9 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, DPRINTF("there are %u keys, should insert new key at index %i", NUMKEYS(mpp.mp_page), ki); - /* For sorted dups, the data item at this level is a DB record - * for a child DB; the actual data elements are stored as keys - * in the child DB. - */ - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { - rdata = &xdata; - xdata.mv_size = sizeof(MDB_db); - xdata.mv_data = &dummy; - memset(&dummy, 0, sizeof(dummy)); - dummy.md_root = P_INVALID; - } else { - rdata = data; - } + rdata = data; +new_sub: if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, rdata)) { rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID); } else { @@ -3077,8 +3142,6 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, if (rc != MDB_SUCCESS) txn->mt_flags |= MDB_TXN_ERROR; else { - txn->mt_dbs[dbi].md_entries++; - /* Remember if we just added a subdatabase */ if (flags & F_SUBDATA) { leaf = NODEPTR(mpp.mp_page, ki); @@ -3090,7 +3153,7 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, * size limits on dupdata. The actual data fields of the child * DB are all zero size. */ - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + if (do_sub) { MDB_xcursor mx; leaf = NODEPTR(mpp.mp_page, ki); @@ -3101,11 +3164,19 @@ put_sub: xdata.mv_data = ""; if (flags == MDB_NODUPDATA) flags = MDB_NOOVERWRITE; + /* converted, write the original data first */ + if (dkey.mv_size) { + dkey.mv_data = dbuf; + rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, &dkey, &xdata, flags); + if (rc) return rc; + leaf->mn_flags |= F_DUPDATA; + } rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags); mdb_xcursor_fini(txn, dbi, &mx); memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi], sizeof(MDB_db)); } + txn->mt_dbs[dbi].md_entries++; } done: @@ -3136,6 +3207,19 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, return mdb_put0(txn, dbi, key, data, flags); } +int +mdb_env_set_flags(MDB_env *env, unsigned int flag, int onoff) +{ +#define CHANGEABLE (MDB_NOSYNC) + if ((flag & CHANGEABLE) != flag) + return EINVAL; + if (onoff) + env->me_flags |= flag; + else + env->me_flags &= ~flag; + return MDB_SUCCESS; +} + int mdb_env_get_flags(MDB_env *env, unsigned int *arg) { @@ -3233,6 +3317,8 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi) txn->mt_dbxs[txn->mt_numdbs].md_dirty = dirty; memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db)); *dbi = txn->mt_numdbs; + txn->mt_env->me_dbs[0][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs]; + txn->mt_env->me_dbs[1][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs]; txn->mt_numdbs++; }