X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=libraries%2Flibmdb%2Fmdb.c;h=b485e156fbcc6427cc5224d304925a14cdb97b4b;hb=b070f7bff9603259469b241282f2825e8576ee85;hp=10dd09a4c52ca442efee52dd11965066d0271571;hpb=9f0b00b467e0d1e29ba1290f8b8bd4c77802962a;p=openldap diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index 10dd09a4c5..b485e156fb 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -39,9 +39,7 @@ #include #include -#include #include -#include #include #include #include @@ -50,7 +48,6 @@ #include #include #include -#include #include "mdb.h" @@ -74,7 +71,7 @@ typedef ULONG pgno_t; #define MDB_MINKEYS 4 #define MDB_MAGIC 0xBEEFC0DE #define MDB_VERSION 1 -#define MAXKEYSIZE 255 +#define MAXKEYSIZE 511 #define P_INVALID (~0UL) @@ -95,7 +92,11 @@ typedef struct MDB_rxbody { } MDB_rxbody; #ifndef CACHELINE -#define CACHELINE 64 /* most CPUs. Itanium uses 128 */ +# ifdef __APPLE__ +# define CACHELINE 128 /* 64 is too small to contain a mutex */ +# else +# define CACHELINE 64 /* most CPUs. Itanium uses 128 */ +# endif #endif typedef struct MDB_reader { @@ -254,10 +255,10 @@ typedef struct MDB_node { unsigned int mn_flags:4; unsigned int mn_ksize:12; /* key size */ #define F_BIGDATA 0x01 /* data put on overflow page */ +#define F_SUBDATA 0x02 /* data is a sub-database */ char mn_data[1]; } MDB_node; - typedef struct MDB_dbx { MDB_val md_name; MDB_cmp_func *md_cmp; /* user compare function */ @@ -316,6 +317,7 @@ struct MDB_env { MDB_db *me_dbs[2]; MDB_oldpages *me_pghead; pthread_key_t me_txkey; /* thread-key for readers */ + pgno_t me_free_pgs[MDB_IDL_UM_SIZE]; }; #define NODESIZE offsetof(MDB_node, mn_data) @@ -353,6 +355,8 @@ static int mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, static void mdb_del_node(MDB_page *mp, indx_t indx); static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_node *leaf); +static int mdb_put0(MDB_txn *txn, MDB_dbi dbi, + MDB_val *key, MDB_val *data, unsigned int flags); static int mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data); static int mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp); @@ -374,16 +378,20 @@ static MDB_ppage *cursor_push_page(MDB_cursor *cursor, static int mdb_set_key(MDB_node *node, MDB_val *key); static int mdb_sibling(MDB_cursor *cursor, int move_right); static int mdb_cursor_next(MDB_cursor *cursor, - MDB_val *key, MDB_val *data); + MDB_val *key, MDB_val *data, MDB_cursor_op op); static int mdb_cursor_prev(MDB_cursor *cursor, - MDB_val *key, MDB_val *data); + MDB_val *key, MDB_val *data, MDB_cursor_op op); static int mdb_cursor_set(MDB_cursor *cursor, - MDB_val *key, MDB_val *data, int *exactp); + MDB_val *key, MDB_val *data, MDB_cursor_op op, int *exactp); static int mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data); static int mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data); +static void mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx); +static void mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node); +static void mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx); + static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data); static size_t mdb_branch_size(MDB_env *env, MDB_val *key); @@ -435,7 +443,11 @@ mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) static int _mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *key1, const MDB_val *key2) { - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_REVERSEKEY)) + if (txn->mt_dbs[dbi].md_flags & (MDB_REVERSEKEY +#if __BYTE_ORDER == __LITTLE_ENDIAN + |MDB_INTEGERKEY +#endif + )) return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size); else return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size); @@ -447,8 +459,11 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) { MDB_dpage *dp; pgno_t pgno = P_INVALID; - ULONG oldest = txn->mt_txnid - 2; + ULONG oldest; + + if (txn->mt_txnid > 2) { + oldest = txn->mt_txnid - 2; if (!txn->mt_env->me_pghead && txn->mt_dbs[FREE_DBI].md_root != P_INVALID) { /* See if there's anything in the free DB */ MDB_pageparent mpp; @@ -529,6 +544,7 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) } } } + } if ((dp = malloc(txn->mt_env->me_psize * num + sizeof(MDB_dhead))) == NULL) return NULL; @@ -593,7 +609,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) MDB_txn *txn; int rc, toggle; - if ((txn = calloc(1, sizeof(*txn))) == NULL) { + if ((txn = calloc(1, sizeof(MDB_txn))) == NULL) { DPRINTF("calloc: %s", strerror(errno)); return ENOMEM; } @@ -610,12 +626,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) pthread_mutex_lock(&env->me_txns->mt_wmutex); env->me_txns->mt_txnid++; - txn->mt_free_pgs = malloc(MDB_IDL_UM_SIZEOF); - if (txn->mt_free_pgs == NULL) { - free(txn->mt_u.dirty_queue); - free(txn); - return ENOMEM; - } + txn->mt_free_pgs = env->me_free_pgs; txn->mt_free_pgs[0] = 0; } @@ -625,21 +636,20 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) if (!r) { unsigned int i; pthread_mutex_lock(&env->me_txns->mt_mutex); - for (i=0; ime_maxreaders; i++) { - if (env->me_txns->mt_readers[i].mr_pid == 0) { - env->me_txns->mt_readers[i].mr_pid = getpid(); - env->me_txns->mt_readers[i].mr_tid = pthread_self(); - r = &env->me_txns->mt_readers[i]; - pthread_setspecific(env->me_txkey, r); - if (i >= env->me_txns->mt_numreaders) - env->me_txns->mt_numreaders = i+1; + for (i=0; ime_txns->mt_numreaders; i++) + if (env->me_txns->mt_readers[i].mr_pid == 0) break; - } - } - pthread_mutex_unlock(&env->me_txns->mt_mutex); if (i == env->me_maxreaders) { + pthread_mutex_unlock(&env->me_txns->mti_mutex); return ENOSPC; } + env->me_txns->mt_readers[i].mr_pid = getpid(); + env->me_txns->mt_readers[i].mr_tid = pthread_self(); + r = &env->me_txns->mt_readers[i]; + pthread_setspecific(env->me_txkey, r); + if (i >= env->me_txns->mt_numreaders) + env->me_txns->mt_numreaders = i+1; + pthread_mutex_unlock(&env->me_txns->mt_mutex); } r->mr_txnid = txn->mt_txnid; txn->mt_u.reader = r; @@ -703,7 +713,6 @@ mdb_txn_abort(MDB_txn *txn) STAILQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); free(dp); } - free(txn->mt_free_pgs); free(txn->mt_u.dirty_queue); while ((mop = txn->mt_env->me_pghead)) { @@ -772,7 +781,7 @@ mdb_txn_commit(MDB_txn *txn) key.mv_data = (char *)&mop->mo_txnid; data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages); data.mv_data = mop->mo_pages; - mdb_put(txn, FREE_DBI, &key, &data, 0); + mdb_put0(txn, FREE_DBI, &key, &data, 0); free(env->me_pghead); env->me_pghead = NULL; } @@ -804,7 +813,7 @@ mdb_txn_commit(MDB_txn *txn) key.mv_data = (char *)&txn->mt_txnid; data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs); data.mv_data = txn->mt_free_pgs; - mdb_put(txn, FREE_DBI, &key, &data, 0); + mdb_put0(txn, FREE_DBI, &key, &data, 0); } /* Update DB root pointers. Their pages have already been @@ -817,7 +826,7 @@ mdb_txn_commit(MDB_txn *txn) for (i = 2; i < txn->mt_numdbs; i++) { if (txn->mt_dbxs[i].md_dirty) { data.mv_data = &txn->mt_dbs[i]; - mdb_put(txn, i, &txn->mt_dbxs[i].md_name, &data, 0); + mdb_put0(txn, MAIN_DBI, &txn->mt_dbxs[i].md_name, &data, 0); } } } @@ -917,7 +926,6 @@ mdb_txn_commit(MDB_txn *txn) } pthread_mutex_unlock(&env->me_txns->mt_wmutex); - free(txn->mt_free_pgs); free(txn->mt_u.dirty_queue); free(txn); txn = NULL; @@ -966,7 +974,7 @@ mdbenv_read_header(MDB_env *env, MDB_meta *meta) if (m->mm_version != MDB_VERSION) { DPRINTF("database is version %u, expected version %u", m->mm_version, MDB_VERSION); - return EINVAL; + return MDB_VERSION_MISMATCH; } memcpy(meta, m, sizeof(*m)); @@ -989,10 +997,7 @@ mdbenv_init_meta(MDB_env *env, MDB_meta *meta) meta->mm_psize = psize; meta->mm_last_pg = 1; meta->mm_flags = env->me_flags & 0xffff; -#if __BYTE_ORDER == __LITTLE_ENDIAN - /* freeDB keys are pgno_t's, must compare in int order */ - meta->mm_flags |= MDB_REVERSEKEY; -#endif + meta->mm_flags |= MDB_INTEGERKEY; meta->mm_dbs[0].md_root = P_INVALID; meta->mm_dbs[1].md_root = P_INVALID; @@ -1028,7 +1033,8 @@ mdbenv_write_meta(MDB_txn *txn) assert(txn != NULL); assert(txn->mt_env != NULL); - DPRINTF("writing meta page for root page %lu", txn->mt_dbs[MAIN_DBI].md_root); + DPRINTF("writing meta page %d for root page %lu", + !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE), txn->mt_dbs[MAIN_DBI].md_root); env = txn->mt_env; @@ -1081,7 +1087,7 @@ mdbenv_create(MDB_env **env) { MDB_env *e; - e = calloc(1, sizeof(*e)); + e = calloc(1, sizeof(MDB_env)); if (!e) return ENOMEM; e->me_maxreaders = DEFAULT_READERS; @@ -1104,8 +1110,6 @@ mdbenv_set_mapsize(MDB_env *env, size_t size) int mdbenv_set_maxdbs(MDB_env *env, int dbs) { - if (env->me_map) - return EINVAL; env->me_maxdbs = dbs; return MDB_SUCCESS; } @@ -1126,7 +1130,7 @@ mdbenv_get_maxreaders(MDB_env *env, int *readers) return MDB_SUCCESS; } -int +static int mdbenv_open2(MDB_env *env, unsigned int flags) { int i, newenv = 0; @@ -1278,12 +1282,14 @@ mdbenv_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) } else { if (env->me_txns->mt_magic != MDB_MAGIC) { DPRINTF("lock region has invalid magic"); - errno = EINVAL; + rc = EINVAL; + goto fail; } if (env->me_txns->mt_version != MDB_VERSION) { DPRINTF("lock region is version %u, expected version %u", env->me_txns->mt_version, MDB_VERSION); - errno = EINVAL; + rc = MDB_VERSION_MISMATCH; + goto fail; } if (errno != EACCES && errno != EAGAIN) { rc = errno; @@ -1298,6 +1304,8 @@ fail: } +#define LOCKNAME "/lock.mdb" +#define DATANAME "/data.mdb" int mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode) { @@ -1305,12 +1313,12 @@ mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode) char *lpath, *dpath; len = strlen(path); - lpath = malloc(len + sizeof("/lock.mdb") + len + sizeof("/data.db")); + lpath = malloc(len + sizeof(LOCKNAME) + len + sizeof(DATANAME)); if (!lpath) return ENOMEM; - dpath = lpath + len + sizeof("/lock.mdb"); - sprintf(lpath, "%s/lock.mdb", path); - sprintf(dpath, "%s/data.mdb", path); + dpath = lpath + len + sizeof(LOCKNAME); + sprintf(lpath, "%s" LOCKNAME, path); + sprintf(dpath, "%s" DATANAME, path); rc = mdbenv_setup_locks(env, lpath, mode, &excl); if (rc) @@ -1350,6 +1358,8 @@ mdbenv_close(MDB_env *env) if (env == NULL) return; + free(env->me_dbs[1]); + free(env->me_dbs[0]); free(env->me_dbxs); free(env->me_path); @@ -1455,7 +1465,7 @@ cursor_push_page(MDB_cursor *cursor, MDB_page *mp) DPRINTF("pushing page %lu on cursor %p", mp->mp_pgno, (void *) cursor); - if ((ppage = calloc(1, sizeof(*ppage))) == NULL) + if ((ppage = calloc(1, sizeof(MDB_ppage))) == NULL) return NULL; ppage->mp_page = mp; CURSOR_PUSH(cursor, ppage); @@ -1479,6 +1489,8 @@ mdb_get_page(MDB_txn *txn, pgno_t pgno) } } if (!found) { + if (pgno > txn->mt_env->me_meta->mm_last_pg) + return NULL; p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno); } return p; @@ -1585,7 +1597,7 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, if (root == P_INVALID) { /* Tree is empty. */ DPRINTF("tree is empty"); - return ENOENT; + return MDB_NOTFOUND; } if ((mpp->mp_page = mdb_get_page(txn, root)) == NULL) @@ -1597,7 +1609,7 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, /* For sub-databases, update main root first */ if (dbi > MAIN_DBI && !txn->mt_dbxs[dbi].md_dirty) { MDB_pageparent mp2; - rc = mdb_search_page(txn, 0, &txn->mt_dbxs[dbi].md_name, + rc = mdb_search_page(txn, MAIN_DBI, &txn->mt_dbxs[dbi].md_name, NULL, 1, &mp2); if (rc) return rc; @@ -1652,6 +1664,9 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, assert(data); DPRINTF("===> get key [%.*s]", (int)key->mv_size, (char *)key->mv_data); + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) + return EINVAL; + if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { return EINVAL; } @@ -1660,10 +1675,21 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, return rc; leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL); - if (leaf && exact) + if (leaf && exact) { + /* Return first duplicate data item */ + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + MDB_xcursor mx; + + mdb_xcursor_init0(txn, dbi, &mx); + mdb_xcursor_init1(txn, dbi, &mx, leaf); + rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi, NULL, NULL, 0, &mpp); + if (rc != MDB_SUCCESS) + return rc; + leaf = NODEPTR(mpp.mp_page, 0); + } rc = mdb_read_data(txn, leaf, data); - else { - rc = ENOENT; + } else { + rc = MDB_NOTFOUND; } return rc; @@ -1679,7 +1705,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right) top = CURSOR_TOP(cursor); if ((parent = SLIST_NEXT(top, mp_entry)) == NULL) { - return ENOENT; /* root has no siblings */ + return MDB_NOTFOUND; /* root has no siblings */ } DPRINTF("parent page is page %lu, index %u", @@ -1729,18 +1755,27 @@ mdb_set_key(MDB_node *node, MDB_val *key) } static int -mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data) +mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op) { MDB_ppage *top; MDB_page *mp; MDB_node *leaf; + int rc; if (cursor->mc_eof) { - return ENOENT; + return MDB_NOTFOUND; } assert(cursor->mc_initialized); + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + if (op == MDB_NEXT || op == MDB_NEXT_DUP) { + rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT); + if (op != MDB_NEXT || rc == MDB_SUCCESS) + return rc; + } + } + top = CURSOR_TOP(cursor); mp = top->mp_page; @@ -1750,7 +1785,7 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data) DPRINTF("=====> move to next sibling page"); if (mdb_sibling(cursor, 1) != MDB_SUCCESS) { cursor->mc_eof = 1; - return ENOENT; + return MDB_NOTFOUND; } top = CURSOR_TOP(cursor); mp = top->mp_page; @@ -1764,21 +1799,39 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data) assert(IS_LEAF(mp)); leaf = NODEPTR(mp, top->mp_ki); - if (data && mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS) - return MDB_FAIL; + if (data) { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS)) + return rc; + + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); + if (rc != MDB_SUCCESS) + return rc; + } + } return mdb_set_key(leaf, key); } static int -mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data) +mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op) { MDB_ppage *top; MDB_page *mp; MDB_node *leaf; + int rc; assert(cursor->mc_initialized); + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + if (op == MDB_PREV || op == MDB_PREV_DUP) { + rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV); + if (op != MDB_PREV || rc == MDB_SUCCESS) + return rc; + } + } + top = CURSOR_TOP(cursor); mp = top->mp_page; @@ -1787,7 +1840,7 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data) if (top->mp_ki == 0) { DPRINTF("=====> move to prev sibling page"); if (mdb_sibling(cursor, 0) != MDB_SUCCESS) { - return ENOENT; + return MDB_NOTFOUND; } top = CURSOR_TOP(cursor); mp = top->mp_page; @@ -1804,15 +1857,24 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data) assert(IS_LEAF(mp)); leaf = NODEPTR(mp, top->mp_ki); - if (data && mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS) - return MDB_FAIL; + if (data) { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS)) + return rc; + + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL); + if (rc != MDB_SUCCESS) + return rc; + } + } return mdb_set_key(leaf, key); } static int mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, - int *exactp) + MDB_cursor_op op, int *exactp) { int rc; MDB_node *leaf; @@ -1823,6 +1885,9 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, assert(key); assert(key->mv_size > 0); + while (CURSOR_TOP(cursor) != NULL) + cursor_pop_page(cursor); + rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp); if (rc != MDB_SUCCESS) return rc; @@ -1832,7 +1897,7 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, leaf = mdb_search_node(cursor->mc_txn, cursor->mc_dbi, mpp.mp_page, key, exactp, &top->mp_ki); if (exactp != NULL && !*exactp) { /* MDB_SET specified and not an exact match. */ - return ENOENT; + return MDB_NOTFOUND; } if (leaf == NULL) { @@ -1849,8 +1914,30 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, cursor->mc_initialized = 1; cursor->mc_eof = 0; - if (data && (rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) - return rc; + if (data) { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; + + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + if (op == MDB_SET || op == MDB_SET_RANGE) { + rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); + } else { + int ex2, *ex2p; + MDB_cursor_op op2; + if (op == MDB_GET_BOTH) { + ex2p = &ex2; + op2 = MDB_SET; + } else { + ex2p = NULL; + op2 = MDB_SET_RANGE; + } + rc = mdb_cursor_set(&cursor->mc_xcursor->mx_cursor, data, NULL, op2, ex2p); + if (rc != MDB_SUCCESS) + return rc; + } + } + } rc = mdb_set_key(leaf, key); if (rc == MDB_SUCCESS) { @@ -1869,6 +1956,9 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) MDB_pageparent mpp; MDB_node *leaf; + while (CURSOR_TOP(cursor) != NULL) + cursor_pop_page(cursor); + rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, NULL, cursor, 0, &mpp); if (rc != MDB_SUCCESS) return rc; @@ -1878,9 +1968,17 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) cursor->mc_initialized = 1; cursor->mc_eof = 0; - if (data && (rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) - return rc; + if (data) { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); + if (rc) + return rc; + } + } return mdb_set_key(leaf, key); } @@ -1893,6 +1991,9 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data) MDB_node *leaf; MDB_val lkey; + while (CURSOR_TOP(cursor) != NULL) + cursor_pop_page(cursor); + lkey.mv_size = MAXKEYSIZE+1; lkey.mv_data = NULL; @@ -1908,8 +2009,17 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data) top = CURSOR_TOP(cursor); top->mp_ki = NUMKEYS(top->mp_page) - 1; - if (data && (rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) - return rc; + if (data) { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; + + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL); + if (rc) + return rc; + } + } return mdb_set_key(leaf, key); } @@ -1924,39 +2034,42 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data, assert(cursor); switch (op) { + case MDB_GET_BOTH: + case MDB_GET_BOTH_RANGE: + if (data == NULL) { + rc = EINVAL; + break; + } + /* FALLTHRU */ case MDB_SET: case MDB_SET_RANGE: - while (CURSOR_TOP(cursor) != NULL) - cursor_pop_page(cursor); if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { rc = EINVAL; - } else if (op == MDB_SET) - rc = mdb_cursor_set(cursor, key, data, &exact); + } else if (op != MDB_SET_RANGE) + rc = mdb_cursor_set(cursor, key, data, op, NULL); else - rc = mdb_cursor_set(cursor, key, data, NULL); + rc = mdb_cursor_set(cursor, key, data, op, &exact); break; case MDB_NEXT: + case MDB_NEXT_DUP: + case MDB_NEXT_NODUP: if (!cursor->mc_initialized) rc = mdb_cursor_first(cursor, key, data); else - rc = mdb_cursor_next(cursor, key, data); + rc = mdb_cursor_next(cursor, key, data, op); break; case MDB_PREV: - if (!cursor->mc_initialized || cursor->mc_eof) { - while (CURSOR_TOP(cursor) != NULL) - cursor_pop_page(cursor); + case MDB_PREV_DUP: + case MDB_PREV_NODUP: + if (!cursor->mc_initialized || cursor->mc_eof) rc = mdb_cursor_last(cursor, key, data); - } else - rc = mdb_cursor_prev(cursor, key, data); + else + rc = mdb_cursor_prev(cursor, key, data, op); break; case MDB_FIRST: - while (CURSOR_TOP(cursor) != NULL) - cursor_pop_page(cursor); rc = mdb_cursor_first(cursor, key, data); break; case MDB_LAST: - while (CURSOR_TOP(cursor) != NULL) - cursor_pop_page(cursor); rc = mdb_cursor_last(cursor, key, data); break; default: @@ -2154,6 +2267,70 @@ mdb_del_node(MDB_page *mp, indx_t indx) mp->mp_upper += sz; } +static void +mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx) +{ + MDB_dbi dbn; + + mx->mx_txn = *txn; + mx->mx_txn.mt_dbxs = mx->mx_dbxs; + mx->mx_txn.mt_dbs = mx->mx_dbs; + mx->mx_dbxs[0] = txn->mt_dbxs[0]; + mx->mx_dbxs[1] = txn->mt_dbxs[1]; + if (dbi > 1) { + mx->mx_dbxs[2] = txn->mt_dbxs[dbi]; + dbn = 2; + } else { + dbn = 1; + } + mx->mx_dbxs[dbn+1].md_parent = dbn; + mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp; + mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel; + mx->mx_dbxs[dbn+1].md_dirty = 0; + mx->mx_txn.mt_numdbs = dbn+2; + + SLIST_INIT(&mx->mx_cursor.mc_stack); + mx->mx_cursor.mc_txn = &mx->mx_txn; + mx->mx_cursor.mc_dbi = dbn+1; +} + +static void +mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node) +{ + MDB_db *db = NODEDATA(node); + MDB_dbi dbn; + mx->mx_dbs[0] = txn->mt_dbs[0]; + mx->mx_dbs[1] = txn->mt_dbs[1]; + if (dbi > 1) { + mx->mx_dbs[2] = txn->mt_dbs[dbi]; + dbn = 3; + } else { + dbn = 2; + } + mx->mx_dbs[dbn] = *db; + mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node); + mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize; + mx->mx_txn.mt_next_pgno = txn->mt_next_pgno; + mx->mx_txn.mt_oldest = txn->mt_oldest; + mx->mx_txn.mt_u = txn->mt_u; +} + +static void +mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx) +{ + txn->mt_next_pgno = mx->mx_txn.mt_next_pgno; + txn->mt_oldest = mx->mx_txn.mt_oldest; + txn->mt_u = mx->mx_txn.mt_u; + txn->mt_dbs[0] = mx->mx_dbs[0]; + txn->mt_dbs[1] = mx->mx_dbs[1]; + txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty; + txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty; + if (dbi > 1) { + txn->mt_dbs[dbi] = mx->mx_dbs[2]; + txn->mt_dbxs[dbi].md_dirty = mx->mx_dbxs[2].md_dirty; + } +} + int mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) { @@ -2173,18 +2350,7 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) { MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1); cursor->mc_xcursor = mx; - mx->mx_cursor.mc_txn = &mx->mx_txn; - mx->mx_txn = *txn; - mx->mx_txn.mt_dbxs = mx->mx_dbxs; - mx->mx_txn.mt_dbs = mx->mx_dbs; - mx->mx_dbxs[0] = txn->mt_dbxs[0]; - mx->mx_dbxs[1] = txn->mt_dbxs[1]; - if (dbi > 1) { - mx->mx_dbxs[2] = txn->mt_dbxs[dbi]; - mx->mx_txn.mt_numdbs = 4; - } else { - mx->mx_txn.mt_numdbs = 3; - } + mdb_xcursor_init0(txn, dbi, mx); } } else { return ENOMEM; @@ -2195,14 +2361,35 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) return MDB_SUCCESS; } +/* Return the count of duplicate data items for the current key */ +int +mdb_cursor_count(MDB_cursor *mc, unsigned long *countp) +{ + if (mc == NULL || countp == NULL) + return EINVAL; + + if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT)) + return EINVAL; + + if (!mc->mc_xcursor->mx_cursor.mc_initialized) + return EINVAL; + + *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries; + return MDB_SUCCESS; +} + void mdb_cursor_close(MDB_cursor *cursor) { if (cursor != NULL) { - while (!CURSOR_EMPTY(cursor)) + while(!CURSOR_EMPTY(cursor)) cursor_pop_page(cursor); + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_fini(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor); + while(!CURSOR_EMPTY(&cursor->mc_xcursor->mx_cursor)) + cursor_pop_page(&cursor->mc_xcursor->mx_cursor); + } -/* btree_close(cursor->bt); */ free(cursor); } } @@ -2507,7 +2694,8 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_no int mdb_del(MDB_txn *txn, MDB_dbi dbi, - MDB_val *key, MDB_val *data) + MDB_val *key, MDB_val *data, + unsigned int flags) { int rc, exact; unsigned int ki; @@ -2518,7 +2706,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, assert(key != NULL); - if (txn == NULL || dbi >= txn->mt_numdbs) + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) return EINVAL; if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { @@ -2536,7 +2724,61 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki); if (leaf == NULL || !exact) { - return ENOENT; + return MDB_NOTFOUND; + } + + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + MDB_xcursor mx; + MDB_pageparent mp2; + + mdb_xcursor_init0(txn, dbi, &mx); + mdb_xcursor_init1(txn, dbi, &mx, leaf); + if (flags == MDB_DEL_DUP) { + rc = mdb_del(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, NULL, 0); + mdb_xcursor_fini(txn, dbi, &mx); + if (rc != MDB_SUCCESS) + return rc; + /* If sub-DB still has entries, we're done */ + if (mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root != P_INVALID) { + memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi], + sizeof(MDB_db)); + return rc; + } + /* otherwise fall thru and delete the sub-DB */ + } else { + /* add all the child DB's pages to the free list */ + rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi, + NULL, &mx.mx_cursor, 0, &mp2); + if (rc == MDB_SUCCESS) { + MDB_ppage *top, *parent; + MDB_node *ni; + unsigned int i; + + cursor_pop_page(&mx.mx_cursor); + top = CURSOR_TOP(&mx.mx_cursor); + if (top != NULL) { + parent = SLIST_NEXT(top, mp_entry); + while (parent != NULL) { + for (i=0; imp_page); i++) { + ni = NODEPTR(top->mp_page, i); + mdb_idl_insert(txn->mt_free_pgs, ni->mn_pgno); + } + if (parent) { + parent->mp_ki++; + if (parent->mp_ki >= NUMKEYS(parent->mp_page)) { + cursor_pop_page(&mx.mx_cursor); + top = CURSOR_TOP(&mx.mx_cursor); + parent = SLIST_NEXT(top, mp_entry); + } else { + ni = NODEPTR(parent->mp_page, parent->mp_ki); + top->mp_page = mdb_get_page(&mx.mx_txn, ni->mn_pgno); + } + } + } + } + mdb_idl_insert(txn->mt_free_pgs, mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root); + } + } } if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS) @@ -2702,32 +2944,16 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, return rc; } -int -mdb_put(MDB_txn *txn, MDB_dbi dbi, +static int +mdb_put0(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, unsigned int flags) { int rc = MDB_SUCCESS, exact; unsigned int ki; MDB_node *leaf; MDB_pageparent mpp; - - assert(key != NULL); - assert(data != NULL); - - if (txn == NULL) - return EINVAL; - - if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { - return EINVAL; - } - - if (txn->mt_env->me_txn != txn) { - return EINVAL; - } - - if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { - return EINVAL; - } + MDB_val xdata, *rdata; + MDB_db dummy; DPRINTF("==> put key %.*s, size %zu, data size %zu", (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size); @@ -2738,10 +2964,13 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, if (rc == MDB_SUCCESS) { leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki); if (leaf && exact) { - if (F_ISSET(flags, MDB_NOOVERWRITE)) { + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + goto put_sub; + } + if (flags == MDB_NOOVERWRITE) { DPRINTF("duplicate key %.*s", (int)key->mv_size, (char *)key->mv_data); - return EEXIST; + return MDB_KEYEXIST; } /* same size, just replace it */ if (NODEDSZ(leaf) == data->mv_size) { @@ -2754,7 +2983,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, ki = NUMKEYS(mpp.mp_page); DPRINTF("appending key at index %i", ki); } - } else if (rc == ENOENT) { + } else if (rc == MDB_NOTFOUND) { MDB_dpage *dp; /* new file, just write a root leaf page */ DPRINTF("allocating new root leaf page"); @@ -2773,22 +3002,89 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, DPRINTF("there are %u keys, should insert new key at index %i", NUMKEYS(mpp.mp_page), ki); - if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) { - rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID); + /* For sorted dups, the data item at this level is a DB record + * for a child DB; the actual data elements are stored as keys + * in the child DB. + */ + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + rdata = &xdata; + xdata.mv_size = sizeof(MDB_db); + xdata.mv_data = &dummy; + memset(&dummy, 0, sizeof(dummy)); + dummy.md_root = P_INVALID; + } else { + rdata = data; + } + + if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, rdata)) { + rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID); } else { /* There is room already in this leaf page. */ - rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, data, 0, 0); + rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, rdata, 0, 0); } if (rc != MDB_SUCCESS) txn->mt_flags |= MDB_TXN_ERROR; - else + else { txn->mt_dbs[dbi].md_entries++; + /* Remember if we just added a subdatabase */ + if (flags & F_SUBDATA) { + leaf = NODEPTR(mpp.mp_page, ki); + leaf->mn_flags |= F_SUBDATA; + } + + /* Now store the actual data in the child DB. Note that we're + * storing the user data in the keys field, so there are strict + * size limits on dupdata. The actual data fields of the child + * DB are all zero size. + */ + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + MDB_xcursor mx; + + leaf = NODEPTR(mpp.mp_page, ki); +put_sub: + mdb_xcursor_init0(txn, dbi, &mx); + mdb_xcursor_init1(txn, dbi, &mx, leaf); + xdata.mv_size = 0; + xdata.mv_data = ""; + if (flags == MDB_NODUPDATA) + flags = MDB_NOOVERWRITE; + rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags); + mdb_xcursor_fini(txn, dbi, &mx); + memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi], + sizeof(MDB_db)); + } + } + done: return rc; } +int +mdb_put(MDB_txn *txn, MDB_dbi dbi, + MDB_val *key, MDB_val *data, unsigned int flags) +{ + assert(key != NULL); + assert(data != NULL); + + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) + return EINVAL; + + if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { + return EINVAL; + } + + if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { + return EINVAL; + } + + if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA)) != flags) + return EINVAL; + + return mdb_put0(txn, dbi, key, data, flags); +} + int mdbenv_get_flags(MDB_env *env, unsigned int *arg) { @@ -2809,32 +3105,39 @@ mdbenv_get_path(MDB_env *env, const char **arg) return MDB_SUCCESS; } +static int +mdb_stat0(MDB_env *env, MDB_db *db, MDB_stat *arg) +{ + arg->ms_psize = env->me_psize; + arg->ms_depth = db->md_depth; + arg->ms_branch_pages = db->md_branch_pages; + arg->ms_leaf_pages = db->md_leaf_pages; + arg->ms_overflow_pages = db->md_overflow_pages; + arg->ms_entries = db->md_entries; + + return MDB_SUCCESS; +} int mdbenv_stat(MDB_env *env, MDB_stat *arg) { if (env == NULL || arg == NULL) return EINVAL; - arg->ms_psize = env->me_psize; - arg->ms_depth = env->me_meta->mm_dbs[MAIN_DBI].md_depth; - arg->ms_branch_pages = env->me_meta->mm_dbs[MAIN_DBI].md_branch_pages; - arg->ms_leaf_pages = env->me_meta->mm_dbs[MAIN_DBI].md_leaf_pages; - arg->ms_overflow_pages = env->me_meta->mm_dbs[MAIN_DBI].md_overflow_pages; - arg->ms_entries = env->me_meta->mm_dbs[MAIN_DBI].md_entries; - - return MDB_SUCCESS; + return mdb_stat0(env, &env->me_meta->mm_dbs[MAIN_DBI], arg); } int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi) { MDB_val key, data; MDB_dbi i; - int rc; + int rc, dirty = 0; size_t len; /* main DB? */ if (!name) { *dbi = MAIN_DBI; + if (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY)) + txn->mt_dbs[MAIN_DBI].md_flags |= (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY)); return MDB_SUCCESS; } @@ -2857,14 +3160,15 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi) rc = mdb_get(txn, MAIN_DBI, &key, &data); /* Create if requested */ - if (rc == ENOENT && (flags & MDB_CREATE)) { + if (rc == MDB_NOTFOUND && (flags & MDB_CREATE)) { MDB_db dummy; data.mv_size = sizeof(MDB_db); data.mv_data = &dummy; memset(&dummy, 0, sizeof(dummy)); dummy.md_root = P_INVALID; dummy.md_flags = flags & 0xffff; - rc = mdb_put(txn, 0, &key, &data, 0); + rc = mdb_put0(txn, MAIN_DBI, &key, &data, F_SUBDATA); + dirty = 1; } /* OK, got info, add to table */ @@ -2875,7 +3179,7 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi) txn->mt_dbxs[txn->mt_numdbs].md_dcmp = NULL; txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL; txn->mt_dbxs[txn->mt_numdbs].md_parent = MAIN_DBI; - txn->mt_dbxs[txn->mt_numdbs].md_dirty = 0; + txn->mt_dbxs[txn->mt_numdbs].md_dirty = dirty; memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db)); *dbi = txn->mt_numdbs; txn->mt_numdbs++; @@ -2886,17 +3190,10 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi) int mdb_stat(MDB_txn *txn, MDB_dbi dbi, MDB_stat *arg) { - if (txn == NULL || arg == NULL) + if (txn == NULL || arg == NULL || dbi >= txn->mt_numdbs) return EINVAL; - arg->ms_psize = txn->mt_env->me_psize; - arg->ms_depth = txn->mt_dbs[dbi].md_depth; - arg->ms_branch_pages = txn->mt_dbs[dbi].md_branch_pages; - arg->ms_leaf_pages = txn->mt_dbs[dbi].md_leaf_pages; - arg->ms_overflow_pages = txn->mt_dbs[dbi].md_overflow_pages; - arg->ms_entries = txn->mt_dbs[dbi].md_entries; - - return MDB_SUCCESS; + return mdb_stat0(txn->mt_env, &txn->mt_dbs[dbi], arg); } void mdb_close(MDB_txn *txn, MDB_dbi dbi) @@ -2909,3 +3206,30 @@ void mdb_close(MDB_txn *txn, MDB_dbi dbi) txn->mt_dbxs[dbi].md_name.mv_size = 0; free(ptr); } + +int mdb_set_compare(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp) +{ + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) + return EINVAL; + + txn->mt_dbxs[dbi].md_cmp = cmp; + return MDB_SUCCESS; +} + +int mdb_set_dupsort(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp) +{ + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) + return EINVAL; + + txn->mt_dbxs[dbi].md_dcmp = cmp; + return MDB_SUCCESS; +} + +int mdb_set_relfunc(MDB_txn *txn, MDB_dbi dbi, MDB_rel_func *rel) +{ + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) + return EINVAL; + + txn->mt_dbxs[dbi].md_rel = rel; + return MDB_SUCCESS; +}