From f89d0cc4b73168ea9b98a6bb623dd683296d053a Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Wed, 3 Aug 2011 01:41:54 -0700 Subject: [PATCH] More subDBs, compiles now --- libraries/libmdb/mdb.c | 457 +++++++++++++++++++----------------- libraries/libmdb/mdb.h | 2 +- libraries/libmdb/mdb_stat.c | 15 +- libraries/libmdb/mtest.c | 18 +- 4 files changed, 265 insertions(+), 227 deletions(-) diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index 8a1f3518f6..08e02d91a5 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -215,7 +215,6 @@ typedef struct MDB_dbx { char *md_name; MDB_cmp_func *md_cmp; /* user compare function */ MDB_rel_func *md_rel; /* user relocate function */ - MDB_db *md_parent; /* parent tree */ } MDB_dbx; typedef struct MDB_db { @@ -283,12 +282,12 @@ struct MDB_env { #define MDB_COMMIT_PAGES 64 /* max number of pages to write in one commit */ #define MDB_MAXCACHE_DEF 1024 /* max number of pages to keep in cache */ -static int mdb_search_page_root(MDB_db *db, - MDB_val *key, +static int mdb_search_page_root(MDB_txn *txn, + MDB_dbi dbi, MDB_val *key, MDB_cursor *cursor, int modify, MDB_pageparent *mpp); -static int mdb_search_page(MDB_db *db, - MDB_txn *txn, MDB_val *key, +static int mdb_search_page(MDB_txn *txn, + MDB_dbi dbi, MDB_val *key, MDB_cursor *cursor, int modify, MDB_pageparent *mpp); @@ -298,35 +297,31 @@ static int mdbenv_read_meta(MDB_env *env, int *which); static int mdbenv_write_meta(MDB_txn *txn); static MDB_page *mdbenv_get_page(MDB_env *env, pgno_t pgno); -static MDB_node *mdb_search_node(MDB_db *db, MDB_page *mp, +static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key, int *exactp, unsigned int *kip); -static int mdb_add_node(MDB_db *bt, MDB_page *mp, +static int mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags); -static void mdb_del_node(MDB_db *bt, MDB_page *mp, - indx_t indx); -static int mdb_read_data(MDB_db *bt, MDB_page *mp, - MDB_node *leaf, MDB_val *data); - -static int mdb_rebalance(MDB_db *bt, MDB_pageparent *mp); -static int mdb_update_key(MDB_db *bt, MDB_page *mp, - indx_t indx, MDB_val *key); -static int mdb_move_node(MDB_db *bt, +static void mdb_del_node(MDB_page *mp, indx_t indx); +static int mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data); + +static int mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp); +static int mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key); +static int mdb_move_node(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, indx_t srcindx, MDB_pageparent *dst, indx_t dstindx); -static int mdb_merge(MDB_db *bt, MDB_pageparent *src, +static int mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst); -static int mdb_split(MDB_db *bt, MDB_page **mpp, +static int mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno); -static MDB_dpage *mdb_new_page(MDB_db *db, uint32_t flags, int num); +static MDB_dpage *mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num); static void cursor_pop_page(MDB_cursor *cursor); static MDB_ppage *cursor_push_page(MDB_cursor *cursor, MDB_page *mp); -static int mdb_set_key(MDB_db *bt, MDB_page *mp, - MDB_node *node, MDB_val *key); +static int mdb_set_key(MDB_node *node, MDB_val *key); static int mdb_sibling(MDB_cursor *cursor, int move_right); static int mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data); @@ -335,9 +330,9 @@ static int mdb_cursor_set(MDB_cursor *cursor, static int mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data); -static size_t mdb_leaf_size(MDB_db *bt, MDB_val *key, +static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data); -static size_t mdb_branch_size(MDB_db *bt, MDB_val *key); +static size_t mdb_branch_size(MDB_env *env, MDB_val *key); static int memncmp(const void *s1, size_t n1, const void *s2, size_t n2); @@ -378,15 +373,15 @@ memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2) } int -mdb_cmp(MDB_db *db, const MDB_val *a, const MDB_val *b) +mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) { - return db->md_cmp(a, b); + return txn->mt_dbxs[dbi].md_cmp(a, b); } static int -_mdb_cmp(MDB_db *db, const MDB_val *key1, const MDB_val *key2) +_mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *key1, const MDB_val *key2) { - if (F_ISSET(db->md_flags, MDB_REVERSEKEY)) + if (F_ISSET(txn->mt_dbs[dbi]->md_flags, MDB_REVERSEKEY)) return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size); else return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size); @@ -492,6 +487,8 @@ mdbenv_sync(MDB_env *env) return rc; } +#define DBX_CHUNK 16 /* space for 16 DBs at a time */ + int mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) { @@ -523,6 +520,14 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) } txn->mt_free_pgs[0] = 0; } + /* Copy the DB arrays */ + txn->mt_numdbs = env->me_numdbs; + rc = (txn->mt_numdbs % DBX_CHUNK) + 1; + txn->mt_dbxs = malloc(rc * DBX_CHUNK * sizeof(MDB_dbx)); + txn->mt_dbs = malloc(rc * DBX_CHUNK * sizeof(MDB_db *)); + memcpy(txn->mt_dbxs, env->me_dbxs, txn->mt_numdbs * sizeof(MDB_dbx)); + memcpy(txn->mt_dbs, env->me_dbs, txn->mt_numdbs * sizeof(MDB_db *)); + txn->mt_txnid = env->me_txns->mt_txnid; if (rdonly) { MDB_reader *r = pthread_getspecific(env->me_txkey); @@ -865,11 +870,11 @@ mdbenv_write_meta(MDB_txn *txn) len = sizeof(MDB_meta) - off; ptr += off; - meta.mm_depth = env->me_db.md_depth; - meta.mm_branch_pages = env->me_db.md_branch_pages; - meta.mm_leaf_pages = env->me_db.md_leaf_pages; - meta.mm_overflow_pages = env->me_db.md_overflow_pages; - meta.mm_entries = env->me_db.md_entries; + meta.mm_depth = txn->mt_dbs[0]->md_depth; + meta.mm_branch_pages = txn->mt_dbs[0]->md_branch_pages; + meta.mm_leaf_pages = txn->mt_dbs[0]->md_leaf_pages; + meta.mm_overflow_pages = txn->mt_dbs[0]->md_overflow_pages; + meta.mm_entries = txn->mt_dbs[0]->md_entries; meta.mm_root = txn->mt_root; meta.mm_last_pg = txn->mt_next_pgno - 1; meta.mm_txnid = txn->mt_txnid; @@ -947,7 +952,6 @@ mdbenv_create(MDB_env **env) e->me_meta.mm_mapsize = DEFAULT_MAPSIZE; e->me_maxreaders = DEFAULT_READERS; - e->me_db.md_env = e; e->me_fd = -1; e->me_lfd = -1; *env = e; @@ -1175,6 +1179,10 @@ mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode) pthread_key_create(&env->me_txkey, mdbenv_reader_dest); if (excl) mdbenv_share_locks(env); + env->me_dbxs = calloc(DBX_CHUNK, sizeof(MDB_dbx)); + env->me_dbs = calloc(DBX_CHUNK, sizeof(MDB_db *)); + env->me_numdbs = 1; + env->me_dbs[0] = (MDB_db *)&env->me_meta.mm_psize; } @@ -1211,7 +1219,7 @@ mdbenv_close(MDB_env *env) * If no entry larger or equal to the key is found, returns NULL. */ static MDB_node * -mdb_search_node(MDB_db *bt, MDB_page *mp, MDB_val *key, +mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key, int *exactp, unsigned int *kip) { unsigned int i = 0; @@ -1227,7 +1235,7 @@ mdb_search_node(MDB_db *bt, MDB_page *mp, MDB_val *key, assert(NUMKEYS(mp) > 0); - bzero(&nodekey, sizeof(nodekey)); + memset(&nodekey, 0, sizeof(nodekey)); low = IS_LEAF(mp) ? 0 : 1; high = NUMKEYS(mp) - 1; @@ -1238,10 +1246,10 @@ mdb_search_node(MDB_db *bt, MDB_page *mp, MDB_val *key, nodekey.mv_size = node->mn_ksize; nodekey.mv_data = NODEKEY(node); - if (bt->md_cmp) - rc = bt->md_cmp(key, &nodekey); + if (txn->mt_dbxs[dbi].md_cmp) + rc = txn->mt_dbxs[dbi].md_cmp(key, &nodekey); else - rc = _mdb_cmp(bt, key, &nodekey); + rc = _mdb_cmp(txn, dbi, key, &nodekey); if (IS_LEAF(mp)) DPRINTF("found leaf index %u [%.*s], rc = %i", @@ -1324,7 +1332,7 @@ mdbenv_get_page(MDB_env *env, pgno_t pgno) } static int -mdb_search_page_root(MDB_db *bt, MDB_val *key, +mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_cursor *cursor, int modify, MDB_pageparent *mpp) { MDB_page *mp = mpp->mp_page; @@ -1345,7 +1353,7 @@ mdb_search_page_root(MDB_db *bt, MDB_val *key, i = 0; else { int exact; - node = mdb_search_node(bt, mp, key, &exact, &i); + node = mdb_search_node(txn, dbi, mp, key, &exact, &i); if (node == NULL) i = NUMKEYS(mp) - 1; else if (!exact) { @@ -1364,7 +1372,7 @@ mdb_search_page_root(MDB_db *bt, MDB_val *key, CURSOR_TOP(cursor)->mp_ki = i; mpp->mp_parent = mp; - if ((mp = mdbenv_get_page(bt->md_env, NODEPGNO(node))) == NULL) + if ((mp = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL) return MDB_FAIL; mpp->mp_pi = i; mpp->mp_page = mp; @@ -1374,7 +1382,7 @@ mdb_search_page_root(MDB_db *bt, MDB_val *key, if (modify) { MDB_dhead *dh = ((MDB_dhead *)mp)-1; - if ((rc = mdb_touch(bt->md_env->me_txn, mpp)) != 0) + if ((rc = mdb_touch(txn, mpp)) != 0) return rc; dh = ((MDB_dhead *)mpp->mp_page)-1; dh->md_parent = mpp->mp_parent; @@ -1403,26 +1411,17 @@ mdb_search_page_root(MDB_db *bt, MDB_val *key, * If modify is true, visited pages are updated with new page numbers. */ static int -mdb_search_page(MDB_db *db, MDB_txn *txn, MDB_val *key, +mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_cursor *cursor, int modify, MDB_pageparent *mpp) { int rc; pgno_t root; - /* Can't modify pages outside a transaction. */ - if (txn == NULL && modify) { - return EINVAL; - } - /* Choose which root page to start with. If a transaction is given * use the root page from the transaction, otherwise read the last * committed root page. */ - if (txn == NULL) { - if ((rc = mdbenv_read_meta(db->md_env, NULL)) != MDB_SUCCESS) - return rc; - root = db->md_env->me_meta.mm_root; - } else if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) { + if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) { DPRINTF("transaction has failed, must abort"); return EINVAL; } else @@ -1433,7 +1432,7 @@ mdb_search_page(MDB_db *db, MDB_txn *txn, MDB_val *key, return ENOENT; } - if ((mpp->mp_page = mdbenv_get_page(db->md_env, root)) == NULL) + if ((mpp->mp_page = mdbenv_get_page(txn->mt_env, root)) == NULL) return MDB_FAIL; DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags); @@ -1446,20 +1445,15 @@ mdb_search_page(MDB_db *db, MDB_txn *txn, MDB_val *key, txn->mt_root = mpp->mp_page->mp_pgno; } - return mdb_search_page_root(db, key, cursor, modify, mpp); + return mdb_search_page_root(txn, dbi, key, cursor, modify, mpp); } static int -mdb_read_data(MDB_db *db, MDB_page *mp, MDB_node *leaf, - MDB_val *data) +mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data) { MDB_page *omp; /* overflow mpage */ - size_t max; pgno_t pgno; - bzero(data, sizeof(*data)); - max = db->md_env->me_meta.mm_psize - PAGEHDRSZ; - if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) { data->mv_size = leaf->mn_dsize; data->mv_data = NODEDATA(leaf); @@ -1469,8 +1463,8 @@ mdb_read_data(MDB_db *db, MDB_page *mp, MDB_node *leaf, /* Read overflow data. */ data->mv_size = leaf->mn_dsize; - bcopy(NODEDATA(leaf), &pgno, sizeof(pgno)); - if ((omp = mdbenv_get_page(db->md_env, pgno)) == NULL) { + memcpy(&pgno, NODEDATA(leaf), sizeof(pgno)); + if ((omp = mdbenv_get_page(env, pgno)) == NULL) { DPRINTF("read overflow page %lu failed", pgno); return MDB_FAIL; } @@ -1480,7 +1474,7 @@ mdb_read_data(MDB_db *db, MDB_page *mp, MDB_node *leaf, } int -mdb_get(MDB_db *db, MDB_txn *txn, +mdb_get(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data) { int rc, exact; @@ -1491,23 +1485,16 @@ mdb_get(MDB_db *db, MDB_txn *txn, assert(data); DPRINTF("===> get key [%.*s]", (int)key->mv_size, (char *)key->mv_data); - if (db == NULL) - return EINVAL; - - if (txn != NULL && db->md_env != txn->mt_env) { - return EINVAL; - } - if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { return EINVAL; } - if ((rc = mdb_search_page(db, txn, key, NULL, 0, &mpp)) != MDB_SUCCESS) + if ((rc = mdb_search_page(txn, dbi, key, NULL, 0, &mpp)) != MDB_SUCCESS) return rc; - leaf = mdb_search_node(db, mpp.mp_page, key, &exact, NULL); + leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL); if (leaf && exact) - rc = mdb_read_data(db, mpp.mp_page, leaf, data); + rc = mdb_read_data(txn->mt_env, leaf, data); else { rc = ENOENT; } @@ -1550,7 +1537,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right) assert(IS_BRANCH(parent->mp_page)); indx = NODEPTR(parent->mp_page, parent->mp_ki); - if ((mp = mdbenv_get_page(cursor->mc_db->md_env, indx->mn_pgno)) == NULL) + if ((mp = mdbenv_get_page(cursor->mc_txn->mt_env, indx->mn_pgno)) == NULL) return MDB_FAIL; #if 0 mp->parent = parent->mp_page; @@ -1563,8 +1550,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right) } static int -mdb_set_key(MDB_db *bt, MDB_page *mp, MDB_node *node, - MDB_val *key) +mdb_set_key(MDB_node *node, MDB_val *key) { if (key == NULL) return 0; @@ -1611,10 +1597,10 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data) assert(IS_LEAF(mp)); leaf = NODEPTR(mp, top->mp_ki); - if (data && mdb_read_data(cursor->mc_db, mp, leaf, data) != MDB_SUCCESS) + if (data && mdb_read_data(cursor->mc_txn->mt_env, leaf, data) != MDB_SUCCESS) return MDB_FAIL; - return mdb_set_key(cursor->mc_db, mp, leaf, key); + return mdb_set_key(leaf, key); } static int @@ -1630,13 +1616,13 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, assert(key); assert(key->mv_size > 0); - rc = mdb_search_page(cursor->mc_db, cursor->mc_txn, key, cursor, 0, &mpp); + rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp); if (rc != MDB_SUCCESS) return rc; assert(IS_LEAF(mpp.mp_page)); top = CURSOR_TOP(cursor); - leaf = mdb_search_node(cursor->mc_db, mpp.mp_page, key, exactp, &top->mp_ki); + leaf = mdb_search_node(cursor->mc_txn, cursor->mc_dbi, mpp.mp_page, key, exactp, &top->mp_ki); if (exactp != NULL && !*exactp) { /* MDB_CURSOR_EXACT specified and not an exact match. */ return ENOENT; @@ -1656,10 +1642,10 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, cursor->mc_initialized = 1; cursor->mc_eof = 0; - if (data && (rc = mdb_read_data(cursor->mc_db, mpp.mp_page, leaf, data)) != MDB_SUCCESS) + if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS) return rc; - rc = mdb_set_key(cursor->mc_db, mpp.mp_page, leaf, key); + rc = mdb_set_key(leaf, key); if (rc == MDB_SUCCESS) { DPRINTF("==> cursor placed on key %.*s", (int)key->mv_size, (char *)key->mv_data); @@ -1676,7 +1662,7 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) MDB_pageparent mpp; MDB_node *leaf; - rc = mdb_search_page(cursor->mc_db, cursor->mc_txn, NULL, cursor, 0, &mpp); + rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, NULL, cursor, 0, &mpp); if (rc != MDB_SUCCESS) return rc; assert(IS_LEAF(mpp.mp_page)); @@ -1685,10 +1671,10 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) cursor->mc_initialized = 1; cursor->mc_eof = 0; - if (data && (rc = mdb_read_data(cursor->mc_db, mpp.mp_page, leaf, data)) != MDB_SUCCESS) + if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS) return rc; - return mdb_set_key(cursor->mc_db, mpp.mp_page, leaf, key); + return mdb_set_key(leaf, key); } int @@ -1735,28 +1721,24 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data, /* Allocate a page and initialize it */ static MDB_dpage * -mdb_new_page(MDB_db *db, uint32_t flags, int num) +mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num) { MDB_dpage *dp; - assert(db != NULL); - assert(db->md_env != NULL); - assert(db->md_env->me_txn != NULL); - - if ((dp = mdb_alloc_page(db->md_env->me_txn, NULL, 0, num)) == NULL) + if ((dp = mdb_alloc_page(txn, NULL, 0, num)) == NULL) return NULL; DPRINTF("allocated new mpage %lu, page size %u", - dp->p.mp_pgno, db->md_env->me_meta.mm_psize); + dp->p.mp_pgno, txn->mt_env->me_meta.mm_psize); dp->p.mp_flags = flags | P_DIRTY; dp->p.mp_lower = PAGEHDRSZ; - dp->p.mp_upper = db->md_env->me_meta.mm_psize; + dp->p.mp_upper = txn->mt_env->me_meta.mm_psize; if (IS_BRANCH(&dp->p)) - db->md_branch_pages++; + txn->mt_dbs[dbi]->md_branch_pages++; else if (IS_LEAF(&dp->p)) - db->md_leaf_pages++; + txn->mt_dbs[dbi]->md_leaf_pages++; else if (IS_OVERFLOW(&dp->p)) { - db->md_overflow_pages += num; + txn->mt_dbs[dbi]->md_overflow_pages += num; dp->p.mp_pages = num; } @@ -1764,12 +1746,12 @@ mdb_new_page(MDB_db *db, uint32_t flags, int num) } static size_t -mdb_leaf_size(MDB_db *db, MDB_val *key, MDB_val *data) +mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data) { size_t sz; sz = LEAFSIZE(key, data); - if (data->mv_size >= db->md_env->me_meta.mm_psize / MDB_MINKEYS) { + if (data->mv_size >= env->me_meta.mm_psize / MDB_MINKEYS) { /* put on overflow page */ sz -= data->mv_size - sizeof(pgno_t); } @@ -1778,12 +1760,12 @@ mdb_leaf_size(MDB_db *db, MDB_val *key, MDB_val *data) } static size_t -mdb_branch_size(MDB_db *db, MDB_val *key) +mdb_branch_size(MDB_env *env, MDB_val *key) { size_t sz; sz = INDXSIZE(key); - if (sz >= db->md_env->me_meta.mm_psize / MDB_MINKEYS) { + if (sz >= env->me_meta.mm_psize / MDB_MINKEYS) { /* put on overflow page */ /* not implemented */ /* sz -= key->size - sizeof(pgno_t); */ @@ -1793,7 +1775,7 @@ mdb_branch_size(MDB_db *db, MDB_val *key) } static int -mdb_add_node(MDB_db *db, MDB_page *mp, indx_t indx, +mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags) { unsigned int i; @@ -1817,13 +1799,13 @@ mdb_add_node(MDB_db *db, MDB_page *mp, indx_t indx, if (F_ISSET(flags, F_BIGDATA)) { /* Data already on overflow page. */ node_size += sizeof(pgno_t); - } else if (data->mv_size >= db->md_env->me_meta.mm_psize / MDB_MINKEYS) { - int ovpages = OVPAGES(data->mv_size, db->md_env->me_meta.mm_psize); + } else if (data->mv_size >= txn->mt_env->me_meta.mm_psize / MDB_MINKEYS) { + int ovpages = OVPAGES(data->mv_size, txn->mt_env->me_meta.mm_psize); /* Put data on overflow page. */ DPRINTF("data size is %zu, put on overflow page", data->mv_size); node_size += sizeof(pgno_t); - if ((ofp = mdb_new_page(db, P_OVERFLOW, ovpages)) == NULL) + if ((ofp = mdb_new_page(txn, dbi, P_OVERFLOW, ovpages)) == NULL) return MDB_FAIL; DPRINTF("allocated overflow page %lu", ofp->p.mp_pgno); flags |= F_BIGDATA; @@ -1884,7 +1866,7 @@ mdb_add_node(MDB_db *db, MDB_page *mp, indx_t indx, } static void -mdb_del_node(MDB_db *db, MDB_page *mp, indx_t indx) +mdb_del_node(MDB_page *mp, indx_t indx) { unsigned int sz; indx_t i, j, numkeys, ptr; @@ -1923,20 +1905,16 @@ mdb_del_node(MDB_db *db, MDB_page *mp, indx_t indx) } int -mdb_cursor_open(MDB_db *db, MDB_txn *txn, MDB_cursor **ret) +mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) { MDB_cursor *cursor; - if (db == NULL || ret == NULL) - return EINVAL; - - if (txn != NULL && db->md_env != txn->mt_env) { + if (txn == NULL || ret == NULL) return EINVAL; - } if ((cursor = calloc(1, sizeof(*cursor))) != NULL) { SLIST_INIT(&cursor->mc_stack); - cursor->mc_db = db; + cursor->mc_dbi = dbi; cursor->mc_txn = txn; } @@ -1958,8 +1936,7 @@ mdb_cursor_close(MDB_cursor *cursor) } static int -mdb_update_key(MDB_db *db, MDB_page *mp, indx_t indx, - MDB_val *key) +mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key) { indx_t ptr, i, numkeys; int delta; @@ -2005,7 +1982,7 @@ mdb_update_key(MDB_db *db, MDB_page *mp, indx_t indx, /* Move a node from src to dst. */ static int -mdb_move_node(MDB_db *bt, MDB_pageparent *src, indx_t srcindx, +mdb_move_node(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, indx_t srcindx, MDB_pageparent *dst, indx_t dstindx) { int rc; @@ -2021,8 +1998,8 @@ mdb_move_node(MDB_db *bt, MDB_pageparent *src, indx_t srcindx, dstindx, dst->mp_page->mp_pgno); /* Mark src and dst as dirty. */ - if ((rc = mdb_touch(bt->md_env->me_txn, src)) || - (rc = mdb_touch(bt->md_env->me_txn, dst))) + if ((rc = mdb_touch(txn, src)) || + (rc = mdb_touch(txn, dst))) return rc;; /* Add the node to the destination page. @@ -2031,21 +2008,21 @@ mdb_move_node(MDB_db *bt, MDB_pageparent *src, indx_t srcindx, key.mv_data = NODEKEY(srcnode); data.mv_size = NODEDSZ(srcnode); data.mv_data = NODEDATA(srcnode); - rc = mdb_add_node(bt, dst->mp_page, dstindx, &key, &data, NODEPGNO(srcnode), + rc = mdb_add_node(txn, dbi, dst->mp_page, dstindx, &key, &data, NODEPGNO(srcnode), srcnode->mn_flags); if (rc != MDB_SUCCESS) return rc; /* Delete the node from the source page. */ - mdb_del_node(bt, src->mp_page, srcindx); + mdb_del_node(src->mp_page, srcindx); /* Update the parent separators. */ if (srcindx == 0 && src->mp_pi != 0) { DPRINTF("update separator for source page %lu to [%.*s]", src->mp_page->mp_pgno, (int)key.mv_size, (char *)key.mv_data); - if ((rc = mdb_update_key(bt, src->mp_parent, src->mp_pi, + if ((rc = mdb_update_key(src->mp_parent, src->mp_pi, &key)) != MDB_SUCCESS) return rc; } @@ -2053,13 +2030,13 @@ mdb_move_node(MDB_db *bt, MDB_pageparent *src, indx_t srcindx, if (srcindx == 0 && IS_BRANCH(src->mp_page)) { MDB_val nullkey; nullkey.mv_size = 0; - assert(mdb_update_key(bt, src->mp_page, 0, &nullkey) == MDB_SUCCESS); + assert(mdb_update_key(src->mp_page, 0, &nullkey) == MDB_SUCCESS); } if (dstindx == 0 && dst->mp_pi != 0) { DPRINTF("update separator for destination page %lu to [%.*s]", dst->mp_page->mp_pgno, (int)key.mv_size, (char *)key.mv_data); - if ((rc = mdb_update_key(bt, dst->mp_parent, dst->mp_pi, + if ((rc = mdb_update_key(dst->mp_parent, dst->mp_pi, &key)) != MDB_SUCCESS) return rc; } @@ -2067,14 +2044,14 @@ mdb_move_node(MDB_db *bt, MDB_pageparent *src, indx_t srcindx, if (dstindx == 0 && IS_BRANCH(dst->mp_page)) { MDB_val nullkey; nullkey.mv_size = 0; - assert(mdb_update_key(bt, dst->mp_page, 0, &nullkey) == MDB_SUCCESS); + assert(mdb_update_key(dst->mp_page, 0, &nullkey) == MDB_SUCCESS); } return MDB_SUCCESS; } static int -mdb_merge(MDB_db *bt, MDB_pageparent *src, MDB_pageparent *dst) +mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst) { int rc; indx_t i; @@ -2085,13 +2062,13 @@ mdb_merge(MDB_db *bt, MDB_pageparent *src, MDB_pageparent *dst) DPRINTF("merging page %lu and %lu", src->mp_page->mp_pgno, dst->mp_page->mp_pgno); + assert(txn != NULL); assert(src->mp_parent); /* can't merge root page */ assert(dst->mp_parent); - assert(bt->md_env->me_txn != NULL); /* Mark src and dst as dirty. */ - if ((rc = mdb_touch(bt->md_env->me_txn, src)) || - (rc = mdb_touch(bt->md_env->me_txn, dst))) + if ((rc = mdb_touch(txn, src)) || + (rc = mdb_touch(txn, dst))) return rc; /* Move all nodes from src to dst. @@ -2103,28 +2080,28 @@ mdb_merge(MDB_db *bt, MDB_pageparent *src, MDB_pageparent *dst) key.mv_data = NODEKEY(srcnode); data.mv_size = NODEDSZ(srcnode); data.mv_data = NODEDATA(srcnode); - rc = mdb_add_node(bt, dst->mp_page, NUMKEYS(dst->mp_page), &key, + rc = mdb_add_node(txn, dbi, dst->mp_page, NUMKEYS(dst->mp_page), &key, &data, NODEPGNO(srcnode), srcnode->mn_flags); if (rc != MDB_SUCCESS) return rc; } DPRINTF("dst page %lu now has %u keys (%.1f%% filled)", - dst->mp_page->mp_pgno, NUMKEYS(dst->mp_page), (float)PAGEFILL(bt->md_env, dst->mp_page) / 10); + dst->mp_page->mp_pgno, NUMKEYS(dst->mp_page), (float)PAGEFILL(txn->mt_env, dst->mp_page) / 10); /* Unlink the src page from parent. */ - mdb_del_node(bt, src->mp_parent, src->mp_pi); + mdb_del_node(src->mp_parent, src->mp_pi); if (src->mp_pi == 0) { key.mv_size = 0; - if ((rc = mdb_update_key(bt, src->mp_parent, 0, &key)) != MDB_SUCCESS) + if ((rc = mdb_update_key(src->mp_parent, 0, &key)) != MDB_SUCCESS) return rc; } if (IS_LEAF(src->mp_page)) - bt->md_leaf_pages--; + txn->mt_dbs[dbi]->md_leaf_pages--; else - bt->md_branch_pages--; + txn->mt_dbs[dbi]->md_branch_pages--; mpp.mp_page = src->mp_parent; dh = (MDB_dhead *)src->mp_parent; @@ -2132,28 +2109,27 @@ mdb_merge(MDB_db *bt, MDB_pageparent *src, MDB_pageparent *dst) mpp.mp_parent = dh->md_parent; mpp.mp_pi = dh->md_pi; - return mdb_rebalance(bt, &mpp); + return mdb_rebalance(txn, dbi, &mpp); } #define FILL_THRESHOLD 250 static int -mdb_rebalance(MDB_db *db, MDB_pageparent *mpp) +mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) { MDB_node *node; MDB_page *root; MDB_pageparent npp; indx_t si = 0, di = 0; - assert(db != NULL); - assert(db->md_env->me_txn != NULL); + assert(txn != NULL); assert(mpp != NULL); DPRINTF("rebalancing %s page %lu (has %u keys, %.1f%% full)", IS_LEAF(mpp->mp_page) ? "leaf" : "branch", - mpp->mp_page->mp_pgno, NUMKEYS(mpp->mp_page), (float)PAGEFILL(db->md_env, mpp->mp_page) / 10); + mpp->mp_page->mp_pgno, NUMKEYS(mpp->mp_page), (float)PAGEFILL(txn->mt_env, mpp->mp_page) / 10); - if (PAGEFILL(db->md_env, mpp->mp_page) >= FILL_THRESHOLD) { + if (PAGEFILL(txn->mt_env, mpp->mp_page) >= FILL_THRESHOLD) { DPRINTF("no need to rebalance page %lu, above fill threshold", mpp->mp_page->mp_pgno); return MDB_SUCCESS; @@ -2162,16 +2138,16 @@ mdb_rebalance(MDB_db *db, MDB_pageparent *mpp) if (mpp->mp_parent == NULL) { if (NUMKEYS(mpp->mp_page) == 0) { DPRINTF("tree is completely empty"); - db->md_root = P_INVALID; - db->md_depth--; - db->md_leaf_pages--; + txn->mt_dbs[dbi]->md_root = P_INVALID; + txn->mt_dbs[dbi]->md_depth--; + txn->mt_dbs[dbi]->md_leaf_pages--; } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) { DPRINTF("collapsing root page!"); - db->md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0)); - if ((root = mdbenv_get_page(db->md_env, db->md_root)) == NULL) + txn->mt_dbs[dbi]->md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0)); + if ((root = mdbenv_get_page(txn->mt_env, txn->mt_dbs[dbi]->md_root)) == NULL) return MDB_FAIL; - db->md_depth--; - db->md_branch_pages--; + txn->mt_dbs[dbi]->md_depth--; + txn->mt_dbs[dbi]->md_branch_pages--; } else DPRINTF("root page doesn't need rebalancing"); return MDB_SUCCESS; @@ -2194,7 +2170,7 @@ mdb_rebalance(MDB_db *db, MDB_pageparent *mpp) */ DPRINTF("reading right neighbor"); node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1); - if ((npp.mp_page = mdbenv_get_page(db->md_env, NODEPGNO(node))) == NULL) + if ((npp.mp_page = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL) return MDB_FAIL; npp.mp_pi = mpp->mp_pi + 1; si = 0; @@ -2204,7 +2180,7 @@ mdb_rebalance(MDB_db *db, MDB_pageparent *mpp) */ DPRINTF("reading left neighbor"); node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1); - if ((npp.mp_page = mdbenv_get_page(db->md_env, NODEPGNO(node))) == NULL) + if ((npp.mp_page = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL) return MDB_FAIL; npp.mp_pi = mpp->mp_pi - 1; si = NUMKEYS(npp.mp_page) - 1; @@ -2213,25 +2189,25 @@ mdb_rebalance(MDB_db *db, MDB_pageparent *mpp) npp.mp_parent = mpp->mp_parent; DPRINTF("found neighbor page %lu (%u keys, %.1f%% full)", - npp.mp_page->mp_pgno, NUMKEYS(npp.mp_page), (float)PAGEFILL(db->md_env, npp.mp_page) / 10); + npp.mp_page->mp_pgno, NUMKEYS(npp.mp_page), (float)PAGEFILL(txn->mt_env, npp.mp_page) / 10); /* If the neighbor page is above threshold and has at least two * keys, move one key from it. * * Otherwise we should try to merge them. */ - if (PAGEFILL(db->md_env, npp.mp_page) >= FILL_THRESHOLD && NUMKEYS(npp.mp_page) >= 2) - return mdb_move_node(db, &npp, si, mpp, di); + if (PAGEFILL(txn->mt_env, npp.mp_page) >= FILL_THRESHOLD && NUMKEYS(npp.mp_page) >= 2) + return mdb_move_node(txn, dbi, &npp, si, mpp, di); else { /* FIXME: if (has_enough_room()) */ if (mpp->mp_pi == 0) - return mdb_merge(db, &npp, mpp); + return mdb_merge(txn, dbi, &npp, mpp); else - return mdb_merge(db, mpp, &npp); + return mdb_merge(txn, dbi, mpp, &npp); } } int -mdb_del(MDB_db *bt, MDB_txn *txn, +mdb_del(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data) { int rc, exact; @@ -2243,47 +2219,43 @@ mdb_del(MDB_db *bt, MDB_txn *txn, assert(key != NULL); - if (bt == NULL || txn == NULL) + if (txn == NULL || dbi >= txn->mt_numdbs) return EINVAL; if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { return EINVAL; } - if (bt->md_env->me_txn != txn) { - return EINVAL; - } - if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { return EINVAL; } - if ((rc = mdb_search_page(bt, txn, key, NULL, 1, &mpp)) != MDB_SUCCESS) + if ((rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp)) != MDB_SUCCESS) return rc; - leaf = mdb_search_node(bt, mpp.mp_page, key, &exact, &ki); + leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki); if (leaf == NULL || !exact) { return ENOENT; } - if (data && (rc = mdb_read_data(bt, NULL, leaf, data)) != MDB_SUCCESS) + if (data && (rc = mdb_read_data(txn->mt_env, leaf, data)) != MDB_SUCCESS) return rc; - mdb_del_node(bt, mpp.mp_page, ki); + mdb_del_node(mpp.mp_page, ki); /* add overflow pages to free list */ if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { int i, ovpages; pgno_t pg; bcopy(NODEDATA(leaf), &pg, sizeof(pg)); - ovpages = OVPAGES(NODEDSZ(leaf), bt->md_env->me_meta.mm_psize); + ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_meta.mm_psize); for (i=0; imt_free_pgs, pg); pg++; } } - bt->md_entries--; - rc = mdb_rebalance(bt, &mpp); + txn->mt_dbs[dbi]->md_entries--; + rc = mdb_rebalance(txn, dbi, &mpp); if (rc != MDB_SUCCESS) txn->mt_flags |= MDB_TXN_ERROR; @@ -2296,7 +2268,7 @@ mdb_del(MDB_db *bt, MDB_txn *txn, * refer to a node in the new right sibling page. */ static int -mdb_split(MDB_db *bt, MDB_page **mpp, unsigned int *newindxp, +mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno) { uint8_t flags; @@ -2310,8 +2282,7 @@ mdb_split(MDB_db *bt, MDB_page **mpp, unsigned int *newindxp, MDB_dpage *mdp, *rdp, *pdp; MDB_dhead *dh; - assert(bt != NULL); - assert(bt->md_env != NULL); + assert(txn != NULL); dh = ((MDB_dhead *)*mpp) - 1; mdp = (MDB_dpage *)dh; @@ -2322,16 +2293,18 @@ mdb_split(MDB_db *bt, MDB_page **mpp, unsigned int *newindxp, (int)newkey->mv_size, (char *)newkey->mv_data, *newindxp); if (mdp->h.md_parent == NULL) { - if ((pdp = mdb_new_page(bt, P_BRANCH, 1)) == NULL) + if ((pdp = mdb_new_page(txn, dbi, P_BRANCH, 1)) == NULL) return MDB_FAIL; mdp->h.md_pi = 0; mdp->h.md_parent = &pdp->p; - bt->md_env->me_txn->mt_root = pdp->p.mp_pgno; + txn->mt_dbs[dbi]->md_root = pdp->p.mp_pgno; + if (!dbi) + txn->mt_root = pdp->p.mp_pgno; DPRINTF("root split! new root = %lu", pdp->p.mp_pgno); - bt->md_depth++; + txn->mt_dbs[dbi]->md_depth++; /* Add left (implicit) pointer. */ - if (mdb_add_node(bt, &pdp->p, 0, NULL, NULL, + if (mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL, mdp->p.mp_pgno, 0) != MDB_SUCCESS) return MDB_FAIL; } else { @@ -2339,25 +2312,25 @@ mdb_split(MDB_db *bt, MDB_page **mpp, unsigned int *newindxp, } /* Create a right sibling. */ - if ((rdp = mdb_new_page(bt, mdp->p.mp_flags, 1)) == NULL) + if ((rdp = mdb_new_page(txn, dbi, mdp->p.mp_flags, 1)) == NULL) return MDB_FAIL; rdp->h.md_parent = mdp->h.md_parent; rdp->h.md_pi = mdp->h.md_pi + 1; DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno); /* Move half of the keys to the right sibling. */ - if ((copy = malloc(bt->md_env->me_meta.mm_psize)) == NULL) + if ((copy = malloc(txn->mt_env->me_meta.mm_psize)) == NULL) return MDB_FAIL; - bcopy(&mdp->p, copy, bt->md_env->me_meta.mm_psize); - bzero(&mdp->p.mp_ptrs, bt->md_env->me_meta.mm_psize - PAGEHDRSZ); + memcpy(copy, &mdp->p, txn->mt_env->me_meta.mm_psize); + memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_meta.mm_psize - PAGEHDRSZ); mdp->p.mp_lower = PAGEHDRSZ; - mdp->p.mp_upper = bt->md_env->me_meta.mm_psize; + mdp->p.mp_upper = txn->mt_env->me_meta.mm_psize; split_indx = NUMKEYS(copy) / 2 + 1; /* First find the separating key between the split pages. */ - bzero(&sepkey, sizeof(sepkey)); + memset(&sepkey, 0, sizeof(sepkey)); if (newindx == split_indx) { sepkey.mv_size = newkey->mv_size; sepkey.mv_data = newkey->mv_data; @@ -2371,8 +2344,8 @@ mdb_split(MDB_db *bt, MDB_page **mpp, unsigned int *newindxp, /* Copy separator key to the parent. */ - if (SIZELEFT(rdp->h.md_parent) < mdb_branch_size(bt, &sepkey)) { - rc = mdb_split(bt, &rdp->h.md_parent, &rdp->h.md_pi, + if (SIZELEFT(rdp->h.md_parent) < mdb_branch_size(txn->mt_env, &sepkey)) { + rc = mdb_split(txn, dbi, &rdp->h.md_parent, &rdp->h.md_pi, &sepkey, NULL, rdp->p.mp_pgno); /* Right page might now have changed parent. @@ -2384,7 +2357,7 @@ mdb_split(MDB_db *bt, MDB_page **mpp, unsigned int *newindxp, mdp->h.md_pi = rdp->h.md_pi - 1; } } else { - rc = mdb_add_node(bt, rdp->h.md_parent, rdp->h.md_pi, + rc = mdb_add_node(txn, dbi, rdp->h.md_parent, rdp->h.md_pi, &sepkey, NULL, rdp->p.mp_pgno, 0); } if (rc != MDB_SUCCESS) { @@ -2441,7 +2414,7 @@ mdb_split(MDB_db *bt, MDB_page **mpp, unsigned int *newindxp, rkey.mv_size = 0; } - rc = mdb_add_node(bt, &pdp->p, j, &rkey, &rdata, pgno,flags); + rc = mdb_add_node(txn, dbi, &pdp->p, j, &rkey, &rdata, pgno,flags); } free(copy); @@ -2449,7 +2422,7 @@ mdb_split(MDB_db *bt, MDB_page **mpp, unsigned int *newindxp, } int -mdb_put(MDB_db *bt, MDB_txn *txn, +mdb_put(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, unsigned int flags) { int rc = MDB_SUCCESS, exact; @@ -2460,14 +2433,14 @@ mdb_put(MDB_db *bt, MDB_txn *txn, assert(key != NULL); assert(data != NULL); - if (bt == NULL || txn == NULL) + if (txn == NULL) return EINVAL; if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { return EINVAL; } - if (bt->md_env->me_txn != txn) { + if (txn->mt_env->me_txn != txn) { return EINVAL; } @@ -2478,16 +2451,16 @@ mdb_put(MDB_db *bt, MDB_txn *txn, DPRINTF("==> put key %.*s, size %zu, data size %zu", (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size); - rc = mdb_search_page(bt, txn, key, NULL, 1, &mpp); + rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp); if (rc == MDB_SUCCESS) { - leaf = mdb_search_node(bt, mpp.mp_page, key, &exact, &ki); + leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki); if (leaf && exact) { if (F_ISSET(flags, MDB_NOOVERWRITE)) { DPRINTF("duplicate key %.*s", (int)key->mv_size, (char *)key->mv_data); return EEXIST; } - mdb_del_node(bt, mpp.mp_page, ki); + mdb_del_node(mpp.mp_page, ki); } if (leaf == NULL) { /* append if not found */ ki = NUMKEYS(mpp.mp_page); @@ -2497,12 +2470,14 @@ mdb_put(MDB_db *bt, MDB_txn *txn, MDB_dpage *dp; /* new file, just write a root leaf page */ DPRINTF("allocating new root leaf page"); - if ((dp = mdb_new_page(bt, P_LEAF, 1)) == NULL) { + if ((dp = mdb_new_page(txn, dbi, P_LEAF, 1)) == NULL) { return ENOMEM; } mpp.mp_page = &dp->p; - txn->mt_root = mpp.mp_page->mp_pgno; - bt->md_depth++; + txn->mt_dbs[dbi]->md_root = mpp.mp_page->mp_pgno; + if (!dbi) + txn->mt_root = mpp.mp_page->mp_pgno; + txn->mt_dbs[dbi]->md_depth++; ki = 0; } else @@ -2512,17 +2487,17 @@ mdb_put(MDB_db *bt, MDB_txn *txn, DPRINTF("there are %u keys, should insert new key at index %i", NUMKEYS(mpp.mp_page), ki); - if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(bt, key, data)) { - rc = mdb_split(bt, &mpp.mp_page, &ki, key, data, P_INVALID); + if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) { + rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID); } else { /* There is room already in this leaf page. */ - rc = mdb_add_node(bt, mpp.mp_page, ki, key, data, 0, 0); + rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, data, 0, 0); } if (rc != MDB_SUCCESS) txn->mt_flags |= MDB_TXN_ERROR; else - bt->md_entries++; + txn->mt_dbs[dbi]->md_entries++; done: return rc; @@ -2566,36 +2541,88 @@ mdbenv_stat(MDB_env *env, MDB_stat *arg) int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi) { + MDB_val key, data; + MDB_dbi i; + int rc; + + /* main DB? */ if (!name) { *dbi = 0; return MDB_SUCCESS; } - return EINVAL; + + /* Is the DB already open? */ + for (i=0; imt_numdbs; i++) { + if (!strcmp(name, txn->mt_dbxs[i].md_name)) { + *dbi = i; + return MDB_SUCCESS; + } + } + + /* Find the DB info */ + key.mv_size = strlen(name); + key.mv_data = (void *)name; + rc = mdb_get(txn, 0, &key, &data); + + /* Create if requested */ + if (rc == ENOENT && (flags & MDB_CREATE)) { + MDB_db dummy; + data.mv_size = sizeof(MDB_db); + data.mv_data = &dummy; + memset(&dummy, 0, sizeof(dummy)); + dummy.md_root = P_INVALID; + rc = mdb_put(txn, 0, &key, &data, 0); + if (rc == MDB_SUCCESS) + rc = mdb_get(txn, 0, &key, &data); + } + + /* OK, got info, add to table */ + if (rc == MDB_SUCCESS) { + /* Is there a free slot? */ + if ((txn->mt_numdbs & (DBX_CHUNK-1)) == 0) { + MDB_dbx *p1; + MDB_db **p2; + int i; + i = txn->mt_numdbs + DBX_CHUNK; + p1 = realloc(txn->mt_dbxs, i * sizeof(MDB_dbx)); + if (p1 == NULL) + return ENOMEM; + txn->mt_dbxs = p1; + p2 = realloc(txn->mt_dbs, i * sizeof(MDB_db *)); + if (p2 == NULL) + return ENOMEM; + txn->mt_dbs = p2; + } + txn->mt_dbxs[txn->mt_numdbs].md_name = strdup(name); + txn->mt_dbxs[txn->mt_numdbs].md_cmp = NULL; + txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL; + txn->mt_dbs[txn->mt_numdbs] = data.mv_data; + *dbi = txn->mt_numdbs; + txn->mt_numdbs++; + } + + return rc; } -int mdb_stat(MDB_db *db, MDB_stat *arg) +int mdb_stat(MDB_txn *txn, MDB_dbi dbi, MDB_stat *arg) { - if (db == NULL || arg == NULL) + if (txn == NULL || arg == NULL) return EINVAL; - arg->ms_psize = db->md_env->me_meta.mm_psize; - if (!db->md_parent) { - arg->ms_depth = db->md_env->me_meta.mm_depth; - arg->ms_branch_pages = db->md_env->me_meta.mm_branch_pages; - arg->ms_leaf_pages = db->md_env->me_meta.mm_leaf_pages; - arg->ms_overflow_pages = db->md_env->me_meta.mm_overflow_pages; - arg->ms_entries = db->md_env->me_meta.mm_entries; - } else { - arg->ms_depth = db->md_depth; - arg->ms_branch_pages = db->md_branch_pages; - arg->ms_leaf_pages = db->md_leaf_pages; - arg->ms_overflow_pages = db->md_overflow_pages; - arg->ms_entries = db->md_entries; - } + arg->ms_psize = txn->mt_env->me_meta.mm_psize; + arg->ms_depth = txn->mt_dbs[dbi]->md_depth; + arg->ms_branch_pages = txn->mt_dbs[dbi]->md_branch_pages; + arg->ms_leaf_pages = txn->mt_dbs[dbi]->md_leaf_pages; + arg->ms_overflow_pages = txn->mt_dbs[dbi]->md_overflow_pages; + arg->ms_entries = txn->mt_dbs[dbi]->md_entries; return MDB_SUCCESS; } -void mdb_close(MDB_db *db) +void mdb_close(MDB_env *env, MDB_dbi dbi) { + if (dbi >= env->me_numdbs) + return; + free(env->me_dbxs[dbi].md_name); + env->me_dbxs[dbi].md_name = NULL; } diff --git a/libraries/libmdb/mdb.h b/libraries/libmdb/mdb.h index 14ee901c18..da182393c0 100644 --- a/libraries/libmdb/mdb.h +++ b/libraries/libmdb/mdb.h @@ -71,7 +71,7 @@ void mdb_txn_abort(MDB_txn *txn); int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi); int mdb_stat(MDB_txn *txn, MDB_dbi dbi, MDB_stat *stat); -void mdb_close(MDB_txn *txn, MDB_dbi dbi); +void mdb_close(MDB_env *env, MDB_dbi dbi); int mdb_get(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data); int mdb_put(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, diff --git a/libraries/libmdb/mdb_stat.c b/libraries/libmdb/mdb_stat.c index 9663b12594..4e41f233d3 100644 --- a/libraries/libmdb/mdb_stat.c +++ b/libraries/libmdb/mdb_stat.c @@ -7,7 +7,8 @@ int main(int argc,char * argv[]) { int rc; MDB_env *env; - MDB_db *db; + MDB_txn *txn; + MDB_dbi dbi; MDB_stat mst; char *envname = argv[1]; char *subname = NULL; @@ -21,20 +22,26 @@ int main(int argc,char * argv[]) printf("mdbenv_open failed, error %d\n", rc); exit(1); } - rc = mdb_open(env, NULL, NULL, 0, &db); + rc = mdb_txn_begin(env, 1, &txn); + if (rc) { + printf("mdb_txn_begin failed, error %d\n", rc); + exit(1); + } + rc = mdb_open(txn, subname, 0, &dbi); if (rc) { printf("mdb_open failed, error %d\n", rc); exit(1); } - rc = mdb_stat(db, &mst); + rc = mdb_stat(txn, dbi, &mst); printf("Page size: %u\n", mst.ms_psize); printf("Tree depth: %u\n", mst.ms_depth); printf("Branch pages: %lu\n", mst.ms_branch_pages); printf("Leaf pages: %lu\n", mst.ms_leaf_pages); printf("Overflow pages: %lu\n", mst.ms_overflow_pages); printf("Entries: %lu\n", mst.ms_entries); - mdb_close(db); + mdb_txn_abort(txn); + mdb_close(env, dbi); mdbenv_close(env); return 0; diff --git a/libraries/libmdb/mtest.c b/libraries/libmdb/mtest.c index 525d227fa4..e631f8d94d 100644 --- a/libraries/libmdb/mtest.c +++ b/libraries/libmdb/mtest.c @@ -8,7 +8,7 @@ int main(int argc,char * argv[]) { int i = 0, j = 0, rc; MDB_env *env; - MDB_db *db; + MDB_dbi dbi; MDB_val key, data; MDB_txn *txn; MDB_stat mst; @@ -30,7 +30,7 @@ int main(int argc,char * argv[]) rc = mdbenv_set_mapsize(env, 10485760); rc = mdbenv_open(env, "./testdb", MDB_FIXEDMAP|MDB_NOSYNC, 0664); rc = mdb_txn_begin(env, 0, &txn); - rc = mdb_open(env, txn, NULL, 0, &db); + rc = mdb_open(txn, NULL, 0, &dbi); key.mv_size = sizeof(int); key.mv_data = sval; @@ -40,20 +40,22 @@ int main(int argc,char * argv[]) printf("Adding %d values\n", count); for (i=0;i