X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;ds=sidebyside;f=libraries%2Flibmdb%2Fmdb.c;h=26712a8455f428d7553791f03005e68b2f178f81;hb=88a5f35c433bd81658f5c5d72b7ab681fd7a4f67;hp=87d531964f8df99b541a9cc4bf2e9ac00d512fa7;hpb=9b4c6896019100f4095127966bc98e1ea730ec58;p=openldap diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index 87d531964f..26712a8455 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -1,3 +1,32 @@ +/* mdb.c - memory-mapped database library */ +/* + * Copyright 2011 Howard Chu, Symas Corp. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in the file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * . + * + * This code is derived from btree.c written by Martin Hedenfalk. + * + * Copyright (c) 2009, 2010 Martin Hedenfalk + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ #include #include #include @@ -10,9 +39,7 @@ #include #include -#include #include -#include #include #include #include @@ -27,13 +54,13 @@ #define ULONG unsigned long typedef ULONG pgno_t; -#include "idl.h" +#include "midl.h" #ifndef DEBUG #define DEBUG 1 #endif -#if (DEBUG +0) && defined(__GNUC__) +#if DEBUG && defined(__GNUC__) # define DPRINTF(fmt, ...) \ fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, ##__VA_ARGS__) #else @@ -44,7 +71,7 @@ typedef ULONG pgno_t; #define MDB_MINKEYS 4 #define MDB_MAGIC 0xBEEFC0DE #define MDB_VERSION 1 -#define MAXKEYSIZE 255 +#define MAXKEYSIZE 511 #define P_INVALID (~0UL) @@ -56,40 +83,51 @@ typedef uint16_t indx_t; #define DEFAULT_MAPSIZE 1048576 /* Lock descriptor stuff */ -#define RXBODY \ - ULONG mr_txnid; \ - pid_t mr_pid; \ - pthread_t mr_tid -typedef struct MDB_rxbody { - RXBODY; -} MDB_rxbody; - #ifndef CACHELINE #define CACHELINE 64 /* most CPUs. Itanium uses 128 */ #endif +typedef struct MDB_rxbody { + ULONG mrb_txnid; + pid_t mrb_pid; + pthread_t mrb_tid; +} MDB_rxbody; + typedef struct MDB_reader { - RXBODY; - /* cache line alignment */ - char pad[CACHELINE-sizeof(MDB_rxbody)]; + union { + MDB_rxbody mrx; +#define mr_txnid mru.mrx.mrb_txnid +#define mr_pid mru.mrx.mrb_pid +#define mr_tid mru.mrx.mrb_tid + /* cache line alignment */ + char pad[(sizeof(MDB_rxbody)+CACHELINE-1) & ~(CACHELINE-1)]; + } mru; } MDB_reader; -#define TXBODY \ - uint32_t mt_magic; \ - uint32_t mt_version; \ - pthread_mutex_t mt_mutex; \ - ULONG mt_txnid; \ - uint32_t mt_numreaders typedef struct MDB_txbody { - TXBODY; + uint32_t mtb_magic; + uint32_t mtb_version; + pthread_mutex_t mtb_mutex; + ULONG mtb_txnid; + uint32_t mtb_numreaders; } MDB_txbody; typedef struct MDB_txninfo { - TXBODY; - char pad[CACHELINE-sizeof(MDB_txbody)]; - pthread_mutex_t mt_wmutex; - char pad2[CACHELINE-sizeof(pthread_mutex_t)]; - MDB_reader mt_readers[1]; + union { + MDB_txbody mtb; +#define mti_magic mt1.mtb.mtb_magic +#define mti_version mt1.mtb.mtb_version +#define mti_mutex mt1.mtb.mtb_mutex +#define mti_txnid mt1.mtb.mtb_txnid +#define mti_numreaders mt1.mtb.mtb_numreaders + char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)]; + } mt1; + union { + pthread_mutex_t mt2_wmutex; +#define mti_wmutex mt2.mt2_wmutex + char pad[(sizeof(pthread_mutex_t)+CACHELINE-1) & ~(CACHELINE-1)]; + } mt2; + MDB_reader mti_readers[1]; } MDB_txninfo; /* Common header for all page types. Overflow pages @@ -97,12 +135,17 @@ typedef struct MDB_txninfo { * headers on any page after the first. */ typedef struct MDB_page { /* represents a page of storage */ - pgno_t mp_pgno; /* page number */ +#define mp_pgno mp_p.p_pgno + union padded { + pgno_t p_pgno; /* page number */ + void * p_pad; + } mp_p; #define P_BRANCH 0x01 /* branch page */ #define P_LEAF 0x02 /* leaf page */ #define P_OVERFLOW 0x04 /* overflow page */ #define P_META 0x08 /* meta page */ #define P_DIRTY 0x10 /* dirty page */ +#define P_LEAF2 0x20 /* DB with small, fixed size keys and no data */ uint32_t mp_flags; #define mp_lower mp_pb.pb.pb_lower #define mp_upper mp_pb.pb.pb_upper @@ -113,6 +156,10 @@ typedef struct MDB_page { /* represents a page of storage */ indx_t pb_upper; /* upper bound of free space */ } pb; uint32_t pb_pages; /* number of overflow pages */ + struct { + indx_t pb_ksize; /* on a LEAF2 page */ + indx_t pb_numkeys; + } pb2; } mp_pb; indx_t mp_ptrs[1]; /* dynamic size */ } MDB_page; @@ -121,33 +168,42 @@ typedef struct MDB_page { /* represents a page of storage */ #define NUMKEYS(p) (((p)->mp_lower - PAGEHDRSZ) >> 1) #define SIZELEFT(p) (indx_t)((p)->mp_upper - (p)->mp_lower) -#define PAGEFILL(env, p) (1000L * ((env)->me_meta.mm_psize - PAGEHDRSZ - SIZELEFT(p)) / \ - ((env)->me_meta.mm_psize - PAGEHDRSZ)) +#define PAGEFILL(env, p) (1000L * ((env)->me_psize - PAGEHDRSZ - SIZELEFT(p)) / \ + ((env)->me_psize - PAGEHDRSZ)) #define IS_LEAF(p) F_ISSET((p)->mp_flags, P_LEAF) #define IS_BRANCH(p) F_ISSET((p)->mp_flags, P_BRANCH) #define IS_OVERFLOW(p) F_ISSET((p)->mp_flags, P_OVERFLOW) #define OVPAGES(size, psize) (PAGEHDRSZ + size + psize - 1) / psize; +typedef struct MDB_db { + uint32_t md_pad; + uint16_t md_flags; + uint16_t md_depth; + ULONG md_branch_pages; + ULONG md_leaf_pages; + ULONG md_overflow_pages; + ULONG md_entries; + pgno_t md_root; +} MDB_db; + +#define FREE_DBI 0 +#define MAIN_DBI 1 + typedef struct MDB_meta { /* meta (footer) page content */ uint32_t mm_magic; uint32_t mm_version; void *mm_address; /* address for fixed mapping */ size_t mm_mapsize; /* size of mmap region */ + MDB_db mm_dbs[2]; /* first is free space, 2nd is main db */ +#define mm_psize mm_dbs[0].md_pad +#define mm_flags mm_dbs[0].md_flags pgno_t mm_last_pg; /* last used page in file */ ULONG mm_txnid; /* txnid that committed this page */ - uint32_t mm_psize; - uint16_t mm_flags; - uint16_t mm_depth; - ULONG mm_branch_pages; - ULONG mm_leaf_pages; - ULONG mm_overflow_pages; - ULONG mm_entries; - pgno_t mm_root; } MDB_meta; typedef struct MDB_dhead { /* a dirty page */ - SIMPLEQ_ENTRY(MDB_dpage) md_next; /* queue of dirty pages */ + STAILQ_ENTRY(MDB_dpage) md_next; /* queue of dirty pages */ MDB_page *md_parent; unsigned md_pi; /* parent index */ int md_num; @@ -158,7 +214,7 @@ typedef struct MDB_dpage { MDB_page p; } MDB_dpage; -SIMPLEQ_HEAD(dirty_queue, MDB_dpage); +STAILQ_HEAD(dirty_queue, MDB_dpage); /* FIXME: use a sorted data structure */ typedef struct MDB_oldpages { struct MDB_oldpages *mo_next; @@ -182,20 +238,25 @@ typedef struct MDB_ppage { /* ordered list of pages */ } MDB_ppage; SLIST_HEAD(page_stack, MDB_ppage); +/* FIXME: tree depth is mostly bounded, we should just + * use a fixed array and avoid malloc/pointer chasing + */ #define CURSOR_EMPTY(c) SLIST_EMPTY(&(c)->mc_stack) #define CURSOR_TOP(c) SLIST_FIRST(&(c)->mc_stack) #define CURSOR_POP(c) SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry) #define CURSOR_PUSH(c,p) SLIST_INSERT_HEAD(&(c)->mc_stack, p, mp_entry) +struct MDB_xcursor; + struct MDB_cursor { MDB_txn *mc_txn; struct page_stack mc_stack; /* stack of parent pages */ MDB_dbi mc_dbi; short mc_initialized; /* 1 if initialized */ short mc_eof; /* 1 if end is reached */ + struct MDB_xcursor *mc_xcursor; }; -#define METAHASHLEN offsetof(MDB_meta, mm_hash) #define METADATA(p) ((void *)((char *)p + PAGEHDRSZ)) typedef struct MDB_node { @@ -208,28 +269,17 @@ typedef struct MDB_node { unsigned int mn_flags:4; unsigned int mn_ksize:12; /* key size */ #define F_BIGDATA 0x01 /* data put on overflow page */ +#define F_SUBDATA 0x02 /* data is a sub-database */ char mn_data[1]; } MDB_node; -typedef struct MDB_db { - uint32_t md_pad; - uint16_t md_flags; - uint16_t md_depth; - ULONG md_branch_pages; - ULONG md_leaf_pages; - ULONG md_overflow_pages; - ULONG md_entries; - pgno_t md_root; -} MDB_db; - typedef struct MDB_dbx { - char *md_name; + MDB_val md_name; MDB_cmp_func *md_cmp; /* user compare function */ MDB_cmp_func *md_dcmp; /* user dupsort function */ MDB_rel_func *md_rel; /* user relocate function */ - MDB_db *md_db; - MDB_page *md_page; - unsigned int md_ki; /* cursor index on parent page */ + MDB_dbi md_parent; + unsigned int md_dirty; } MDB_dbx; struct MDB_txn { @@ -243,32 +293,46 @@ struct MDB_txn { MDB_reader *reader; } mt_u; MDB_dbx *mt_dbxs; /* array */ - MDB_db mt_db0; /* just for write txns */ + MDB_db *mt_dbs; unsigned int mt_numdbs; -#define MDB_TXN_RDONLY 0x01 /* read-only transaction */ -#define MDB_TXN_ERROR 0x02 /* an error has occurred */ +#define MDB_TXN_RDONLY 0x01 /* read-only transaction */ +#define MDB_TXN_ERROR 0x02 /* an error has occurred */ #define MDB_TXN_METOGGLE 0x04 /* used meta page 1 */ - unsigned int mt_flags; + unsigned int mt_flags; }; +/* Context for sorted-dup records */ +typedef struct MDB_xcursor { + MDB_cursor mx_cursor; + MDB_txn mx_txn; + MDB_dbx mx_dbxs[4]; + MDB_db mx_dbs[4]; +} MDB_xcursor; + struct MDB_env { int me_fd; int me_lfd; - uint32_t me_flags; - unsigned int me_maxreaders; + int me_mfd; /* just for writing the meta pages */ + uint16_t me_flags; + uint16_t me_db_toggle; + unsigned int me_psize; + unsigned int me_maxreaders; + unsigned int me_numdbs; + unsigned int me_maxdbs; char *me_path; char *me_map; MDB_txninfo *me_txns; - MDB_meta me_meta; + MDB_meta *me_metas[2]; + MDB_meta *me_meta; MDB_txn *me_txn; /* current write transaction */ size_t me_mapsize; off_t me_size; /* current file size */ - pthread_key_t me_txkey; /* thread-key for readers */ - MDB_oldpages *me_pghead; - MDB_oldpages *me_pgtail; MDB_dbx *me_dbxs; /* array */ - unsigned int me_numdbs; + MDB_db *me_dbs[2]; + MDB_oldpages *me_pghead; + pthread_key_t me_txkey; /* thread-key for readers */ + pgno_t me_free_pgs[MDB_IDL_UM_SIZE]; }; #define NODESIZE offsetof(MDB_node, mn_data) @@ -282,7 +346,6 @@ struct MDB_env { #define NODEDSZ(node) ((node)->mn_dsize) #define MDB_COMMIT_PAGES 64 /* max number of pages to write in one commit */ -#define MDB_MAXCACHE_DEF 1024 /* max number of pages to keep in cache */ static int mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, @@ -293,11 +356,10 @@ static int mdb_search_page(MDB_txn *txn, MDB_cursor *cursor, int modify, MDB_pageparent *mpp); -static int mdbenv_read_header(MDB_env *env); -static int mdb_check_meta_page(MDB_page *p); -static int mdbenv_read_meta(MDB_env *env, int *which); -static int mdbenv_write_meta(MDB_txn *txn); -static MDB_page *mdbenv_get_page(MDB_env *env, pgno_t pgno); +static int mdb_env_read_header(MDB_env *env, MDB_meta *meta); +static int mdb_env_read_meta(MDB_env *env, int *which); +static int mdb_env_write_meta(MDB_txn *txn); +static MDB_page *mdb_get_page(MDB_txn *txn, pgno_t pgno); static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key, int *exactp, unsigned int *kip); @@ -305,7 +367,11 @@ static int mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags); static void mdb_del_node(MDB_page *mp, indx_t indx); -static int mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data); +static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, + MDB_pageparent *mpp, MDB_node *leaf); +static int mdb_put0(MDB_txn *txn, MDB_dbi dbi, + MDB_val *key, MDB_val *data, unsigned int flags); +static int mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data); static int mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp); static int mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key); @@ -326,11 +392,19 @@ static MDB_ppage *cursor_push_page(MDB_cursor *cursor, static int mdb_set_key(MDB_node *node, MDB_val *key); static int mdb_sibling(MDB_cursor *cursor, int move_right); static int mdb_cursor_next(MDB_cursor *cursor, - MDB_val *key, MDB_val *data); + MDB_val *key, MDB_val *data, MDB_cursor_op op); +static int mdb_cursor_prev(MDB_cursor *cursor, + MDB_val *key, MDB_val *data, MDB_cursor_op op); static int mdb_cursor_set(MDB_cursor *cursor, - MDB_val *key, MDB_val *data, int *exactp); + MDB_val *key, MDB_val *data, MDB_cursor_op op, int *exactp); static int mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data); +static int mdb_cursor_last(MDB_cursor *cursor, + MDB_val *key, MDB_val *data); + +static void mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx); +static void mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node); +static void mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx); static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data); @@ -374,6 +448,15 @@ memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2) return *p1 - *p2; } +char * +mdb_version(int *maj, int *min, int *pat) +{ + *maj = MDB_VERSION_MAJOR; + *min = MDB_VERSION_MINOR; + *pat = MDB_VERSION_PATCH; + return MDB_VERSION_STRING; +} + int mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) { @@ -383,7 +466,11 @@ mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) static int _mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *key1, const MDB_val *key2) { - if (F_ISSET(txn->mt_dbxs[dbi].md_db->md_flags, MDB_REVERSEKEY)) + if (txn->mt_dbs[dbi].md_flags & (MDB_REVERSEKEY +#if __BYTE_ORDER == __LITTLE_ENDIAN + |MDB_INTEGERKEY +#endif + )) return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size); else return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size); @@ -395,15 +482,64 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) { MDB_dpage *dp; pgno_t pgno = P_INVALID; + ULONG oldest; + + if (txn->mt_txnid > 2) { + + oldest = txn->mt_txnid - 2; + if (!txn->mt_env->me_pghead && txn->mt_dbs[FREE_DBI].md_root != P_INVALID) { + /* See if there's anything in the free DB */ + MDB_pageparent mpp; + MDB_node *leaf; + ULONG *kptr; + + mpp.mp_parent = NULL; + mpp.mp_pi = 0; + mdb_search_page(txn, FREE_DBI, NULL, NULL, 0, &mpp); + leaf = NODEPTR(mpp.mp_page, 0); + kptr = (ULONG *)NODEKEY(leaf); + /* It's potentially usable, unless there are still + * older readers outstanding. Grab it. + */ + if (oldest > *kptr) { + MDB_oldpages *mop; + MDB_val data; + pgno_t *idl; + + mdb_read_data(txn, leaf, &data); + idl = (ULONG *)data.mv_data; + mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t)); + mop->mo_next = txn->mt_env->me_pghead; + mop->mo_txnid = *kptr; + txn->mt_env->me_pghead = mop; + memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl)); + +#if DEBUG > 1 + { + unsigned int i; + DPRINTF("IDL read txn %lu root %lu num %lu", + mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]); + for (i=0; imt_env->me_pghead) { - ULONG oldest = txn->mt_txnid - 2; unsigned int i; - for (i=0; imt_env->me_txns->mt_numreaders; i++) { - ULONG mr = txn->mt_env->me_txns->mt_readers[i].mr_txnid; + for (i=0; imt_env->me_txns->mti_numreaders; i++) { + ULONG mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid; if (!mr) continue; if (mr < oldest) - oldest = txn->mt_env->me_txns->mt_readers[i].mr_txnid; + oldest = txn->mt_env->me_txns->mti_readers[i].mr_txnid; } if (oldest > txn->mt_env->me_pghead->mo_txnid) { MDB_oldpages *mop = txn->mt_env->me_pghead; @@ -426,20 +562,19 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) } if (MDB_IDL_IS_ZERO(mop->mo_pages)) { txn->mt_env->me_pghead = mop->mo_next; - if (!txn->mt_env->me_pghead) - txn->mt_env->me_pgtail = NULL; free(mop); } } } } + } - if ((dp = malloc(txn->mt_env->me_meta.mm_psize * num + sizeof(MDB_dhead))) == NULL) + if ((dp = malloc(txn->mt_env->me_psize * num + sizeof(MDB_dhead))) == NULL) return NULL; dp->h.md_num = num; dp->h.md_parent = parent; dp->h.md_pi = parent_idx; - SIMPLEQ_INSERT_TAIL(txn->mt_u.dirty_queue, dp, h.md_next); + STAILQ_INSERT_TAIL(txn->mt_u.dirty_queue, dp, h.md_next); if (pgno == P_INVALID) { dp->p.mp_pgno = txn->mt_next_pgno; txn->mt_next_pgno += num; @@ -465,9 +600,9 @@ mdb_touch(MDB_txn *txn, MDB_pageparent *pp) if ((dp = mdb_alloc_page(txn, pp->mp_parent, pp->mp_pi, 1)) == NULL) return ENOMEM; DPRINTF("touched page %lu -> %lu", mp->mp_pgno, dp->p.mp_pgno); - mdb_idl_insert(txn->mt_free_pgs, mp->mp_pgno); + mdb_midl_insert(txn->mt_free_pgs, mp->mp_pgno); pgno = dp->p.mp_pgno; - memcpy(&dp->p, mp, txn->mt_env->me_meta.mm_psize); + memcpy(&dp->p, mp, txn->mt_env->me_psize); mp = &dp->p; mp->mp_pgno = pgno; mp->mp_flags |= P_DIRTY; @@ -481,25 +616,23 @@ mdb_touch(MDB_txn *txn, MDB_pageparent *pp) } int -mdbenv_sync(MDB_env *env) +mdb_env_sync(MDB_env *env, int force) { int rc = 0; - if (!F_ISSET(env->me_flags, MDB_NOSYNC)) { - if (fsync(env->me_fd)) + if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) { + if (fdatasync(env->me_fd)) rc = errno; } return rc; } -#define DBX_CHUNK 16 /* space for 16 DBs at a time */ - int mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) { MDB_txn *txn; int rc, toggle; - if ((txn = calloc(1, sizeof(*txn))) == NULL) { + if ((txn = calloc(1, sizeof(MDB_txn))) == NULL) { DPRINTF("calloc: %s", strerror(errno)); return ENOMEM; } @@ -512,40 +645,34 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) free(txn); return ENOMEM; } - SIMPLEQ_INIT(txn->mt_u.dirty_queue); + STAILQ_INIT(txn->mt_u.dirty_queue); - pthread_mutex_lock(&env->me_txns->mt_wmutex); - env->me_txns->mt_txnid++; - txn->mt_free_pgs = malloc(MDB_IDL_UM_SIZEOF); - if (txn->mt_free_pgs == NULL) { - free(txn->mt_u.dirty_queue); - free(txn); - return ENOMEM; - } + pthread_mutex_lock(&env->me_txns->mti_wmutex); + env->me_txns->mti_txnid++; + txn->mt_free_pgs = env->me_free_pgs; txn->mt_free_pgs[0] = 0; } - txn->mt_txnid = env->me_txns->mt_txnid; + txn->mt_txnid = env->me_txns->mti_txnid; if (rdonly) { MDB_reader *r = pthread_getspecific(env->me_txkey); if (!r) { unsigned int i; - pthread_mutex_lock(&env->me_txns->mt_mutex); - for (i=0; ime_maxreaders; i++) { - if (env->me_txns->mt_readers[i].mr_pid == 0) { - env->me_txns->mt_readers[i].mr_pid = getpid(); - env->me_txns->mt_readers[i].mr_tid = pthread_self(); - r = &env->me_txns->mt_readers[i]; - pthread_setspecific(env->me_txkey, r); - if (i >= env->me_txns->mt_numreaders) - env->me_txns->mt_numreaders = i+1; + pthread_mutex_lock(&env->me_txns->mti_mutex); + for (i=0; ime_txns->mti_numreaders; i++) + if (env->me_txns->mti_readers[i].mr_pid == 0) break; - } - } - pthread_mutex_unlock(&env->me_txns->mt_mutex); if (i == env->me_maxreaders) { + pthread_mutex_unlock(&env->me_txns->mti_mutex); return ENOSPC; } + env->me_txns->mti_readers[i].mr_pid = getpid(); + env->me_txns->mti_readers[i].mr_tid = pthread_self(); + r = &env->me_txns->mti_readers[i]; + pthread_setspecific(env->me_txkey, r); + if (i >= env->me_txns->mti_numreaders) + env->me_txns->mti_numreaders = i+1; + pthread_mutex_unlock(&env->me_txns->mti_mutex); } r->mr_txnid = txn->mt_txnid; txn->mt_u.reader = r; @@ -555,27 +682,28 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) txn->mt_env = env; - if ((rc = mdbenv_read_meta(env, &toggle)) != MDB_SUCCESS) { + if ((rc = mdb_env_read_meta(env, &toggle)) != MDB_SUCCESS) { mdb_txn_abort(txn); return rc; } /* Copy the DB arrays */ txn->mt_numdbs = env->me_numdbs; - rc = (txn->mt_numdbs % DBX_CHUNK) + 1; - txn->mt_dbxs = malloc(rc * DBX_CHUNK * sizeof(MDB_dbx)); - memcpy(txn->mt_dbxs, env->me_dbxs, txn->mt_numdbs * sizeof(MDB_dbx)); + txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */ + txn->mt_dbs = malloc(env->me_maxdbs * sizeof(MDB_db)); + memcpy(txn->mt_dbs, env->me_meta->mm_dbs, 2 * sizeof(MDB_db)); + if (txn->mt_numdbs > 2) + memcpy(txn->mt_dbs+2, env->me_dbs[env->me_db_toggle]+2, + (txn->mt_numdbs - 2) * sizeof(MDB_db)); if (!rdonly) { - memcpy(&txn->mt_db0, txn->mt_dbxs[0].md_db, sizeof(txn->mt_db0)); - txn->mt_dbxs[0].md_db = &txn->mt_db0; if (toggle) txn->mt_flags |= MDB_TXN_METOGGLE; - txn->mt_next_pgno = env->me_meta.mm_last_pg+1; + txn->mt_next_pgno = env->me_meta->mm_last_pg+1; } DPRINTF("begin transaction %lu on mdbenv %p, root page %lu", - txn->mt_txnid, (void *) env, txn->mt_dbxs[0].md_db->md_root); + txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); *ret = txn; return MDB_SUCCESS; @@ -592,43 +720,34 @@ mdb_txn_abort(MDB_txn *txn) env = txn->mt_env; DPRINTF("abort transaction %lu on mdbenv %p, root page %lu", - txn->mt_txnid, (void *) env, txn->mt_dbxs[0].md_db->md_root); + txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); - free(txn->mt_dbxs); + free(txn->mt_dbs); if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { txn->mt_u.reader->mr_txnid = 0; } else { - /* Discard all dirty pages. Return any re-used pages - * to the free list. - */ - MDB_IDL_ZERO(txn->mt_free_pgs); - while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) { - dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue); - SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); - if (dp->p.mp_pgno <= env->me_meta.mm_last_pg) - mdb_idl_insert(txn->mt_free_pgs, dp->p.mp_pgno); + MDB_oldpages *mop; + unsigned int i; + + /* Discard all dirty pages. */ + while (!STAILQ_EMPTY(txn->mt_u.dirty_queue)) { + dp = STAILQ_FIRST(txn->mt_u.dirty_queue); + STAILQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); free(dp); } - /* put back to head of free list */ - if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) { - MDB_oldpages *mop; + free(txn->mt_u.dirty_queue); - mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(txn->mt_free_pgs) - sizeof(pgno_t)); - mop->mo_next = env->me_pghead; - mop->mo_txnid = txn->mt_oldest - 1; - if (!env->me_pghead) { - env->me_pgtail = mop; - } - env->me_pghead = mop; - memcpy(mop->mo_pages, txn->mt_free_pgs, MDB_IDL_SIZEOF(txn->mt_free_pgs)); + while ((mop = txn->mt_env->me_pghead)) { + txn->mt_env->me_pghead = mop->mo_next; + free(mop); } - free(txn->mt_free_pgs); - free(txn->mt_u.dirty_queue); env->me_txn = NULL; - env->me_txns->mt_txnid--; - pthread_mutex_unlock(&env->me_txns->mt_wmutex); + env->me_txns->mti_txnid--; + for (i=2; ime_numdbs; i++) + env->me_dbxs[i].md_dirty = 0; + pthread_mutex_unlock(&env->me_txns->mti_wmutex); } free(txn); @@ -638,6 +757,7 @@ int mdb_txn_commit(MDB_txn *txn) { int n, done; + unsigned int i; ssize_t rc; off_t size; MDB_dpage *dp; @@ -668,11 +788,71 @@ mdb_txn_commit(MDB_txn *txn) return EINVAL; } - if (SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) + if (STAILQ_EMPTY(txn->mt_u.dirty_queue)) goto done; DPRINTF("committing transaction %lu on mdbenv %p, root page %lu", - txn->mt_txnid, (void *) env, txn->mt_dbxs[0].md_db->md_root); + txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); + + /* should only be one record now */ + if (env->me_pghead) { + MDB_val key, data; + MDB_oldpages *mop; + + mop = env->me_pghead; + key.mv_size = sizeof(pgno_t); + key.mv_data = (char *)&mop->mo_txnid; + data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages); + data.mv_data = mop->mo_pages; + mdb_put0(txn, FREE_DBI, &key, &data, 0); + free(env->me_pghead); + env->me_pghead = NULL; + } + /* save to free list */ + if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) { + MDB_val key, data; + MDB_pageparent mpp; + + /* make sure last page of freeDB is touched and on freelist */ + key.mv_size = MAXKEYSIZE+1; + key.mv_data = NULL; + mpp.mp_parent = NULL; + mpp.mp_pi = 0; + mdb_search_page(txn, FREE_DBI, &key, NULL, 1, &mpp); + +#if DEBUG > 1 + { + unsigned int i; + ULONG *idl = txn->mt_free_pgs; + DPRINTF("IDL write txn %lu root %lu num %lu", + txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]); + for (i=0; imt_txnid; + data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs); + data.mv_data = txn->mt_free_pgs; + mdb_put0(txn, FREE_DBI, &key, &data, 0); + } + + /* Update DB root pointers. Their pages have already been + * touched so this is all in-place and cannot fail. + */ + { + MDB_val data; + data.mv_size = sizeof(MDB_db); + + for (i = 2; i < txn->mt_numdbs; i++) { + if (txn->mt_dbxs[i].md_dirty) { + data.mv_data = &txn->mt_dbs[i]; + mdb_put0(txn, MAIN_DBI, &txn->mt_dbxs[i].md_name, &data, 0); + } + } + } /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done. */ @@ -681,7 +861,7 @@ mdb_txn_commit(MDB_txn *txn) n = 0; done = 1; size = 0; - SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) { + STAILQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) { if (dp->p.mp_pgno != next) { if (n) { DPRINTF("committing %u dirty pages", n); @@ -698,11 +878,11 @@ mdb_txn_commit(MDB_txn *txn) n = 0; size = 0; } - lseek(env->me_fd, dp->p.mp_pgno * env->me_meta.mm_psize, SEEK_SET); + lseek(env->me_fd, dp->p.mp_pgno * env->me_psize, SEEK_SET); next = dp->p.mp_pgno; } DPRINTF("committing page %lu", dp->p.mp_pgno); - iov[n].iov_len = env->me_meta.mm_psize * dp->h.md_num; + iov[n].iov_len = env->me_psize * dp->h.md_num; iov[n].iov_base = &dp->p; size += iov[n].iov_len; next = dp->p.mp_pgno + dp->h.md_num; @@ -733,48 +913,41 @@ mdb_txn_commit(MDB_txn *txn) /* Drop the dirty pages. */ - while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) { - dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue); - SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); + while (!STAILQ_EMPTY(txn->mt_u.dirty_queue)) { + dp = STAILQ_FIRST(txn->mt_u.dirty_queue); + STAILQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); free(dp); } - if ((n = mdbenv_sync(env)) != 0 || - (n = mdbenv_write_meta(txn)) != MDB_SUCCESS || - (n = mdbenv_sync(env)) != 0) { + if ((n = mdb_env_sync(env, 0)) != 0 || + (n = mdb_env_write_meta(txn)) != MDB_SUCCESS) { mdb_txn_abort(txn); return n; } env->me_txn = NULL; + /* update the DB tables */ { - MDB_dbx *p1 = env->me_dbxs; + int toggle = !env->me_db_toggle; - txn->mt_dbxs[0].md_db = env->me_dbxs[0].md_db; - env->me_dbxs = txn->mt_dbxs; + for (i = 2; i < env->me_numdbs; i++) { + if (txn->mt_dbxs[i].md_dirty) { + env->me_dbs[toggle][i] = txn->mt_dbs[i]; + txn->mt_dbxs[i].md_dirty = 0; + } + } + for (i = env->me_numdbs; i < txn->mt_numdbs; i++) { + txn->mt_dbxs[i].md_dirty = 0; + env->me_dbxs[i] = txn->mt_dbxs[i]; + env->me_dbs[toggle][i] = txn->mt_dbs[i]; + } + env->me_db_toggle = toggle; env->me_numdbs = txn->mt_numdbs; - free(p1); + free(txn->mt_dbs); } - /* add to tail of free list */ - if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) { - MDB_oldpages *mop; - - mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(txn->mt_free_pgs) - sizeof(pgno_t)); - mop->mo_next = NULL; - if (env->me_pghead) { - env->me_pgtail->mo_next = mop; - } else { - env->me_pghead = mop; - } - env->me_pgtail = mop; - memcpy(mop->mo_pages, txn->mt_free_pgs, MDB_IDL_SIZEOF(txn->mt_free_pgs)); - mop->mo_txnid = txn->mt_txnid; - } - - pthread_mutex_unlock(&env->me_txns->mt_wmutex); - free(txn->mt_free_pgs); + pthread_mutex_unlock(&env->me_txns->mti_wmutex); free(txn->mt_u.dirty_queue); free(txn); txn = NULL; @@ -786,7 +959,7 @@ done: } static int -mdbenv_read_header(MDB_env *env) +mdb_env_read_header(MDB_env *env, MDB_meta *meta) { char page[PAGESIZE]; MDB_page *p; @@ -823,45 +996,47 @@ mdbenv_read_header(MDB_env *env) if (m->mm_version != MDB_VERSION) { DPRINTF("database is version %u, expected version %u", m->mm_version, MDB_VERSION); - return EINVAL; + return MDB_VERSION_MISMATCH; } - memcpy(&env->me_meta, m, sizeof(*m)); + memcpy(meta, m, sizeof(*m)); return 0; } static int -mdbenv_init_meta(MDB_env *env) +mdb_env_init_meta(MDB_env *env, MDB_meta *meta) { MDB_page *p, *q; - MDB_meta *meta; + MDB_meta *m; int rc; unsigned int psize; DPRINTF("writing new meta page"); psize = sysconf(_SC_PAGE_SIZE); - env->me_meta.mm_magic = MDB_MAGIC; - env->me_meta.mm_version = MDB_VERSION; - env->me_meta.mm_psize = psize; - env->me_meta.mm_flags = env->me_flags & 0xffff; - env->me_meta.mm_root = P_INVALID; - env->me_meta.mm_last_pg = 1; + meta->mm_magic = MDB_MAGIC; + meta->mm_version = MDB_VERSION; + meta->mm_psize = psize; + meta->mm_last_pg = 1; + meta->mm_flags = env->me_flags & 0xffff; + meta->mm_flags |= MDB_INTEGERKEY; + meta->mm_dbs[0].md_root = P_INVALID; + meta->mm_dbs[1].md_root = P_INVALID; p = calloc(2, psize); p->mp_pgno = 0; p->mp_flags = P_META; - meta = METADATA(p); - memcpy(meta, &env->me_meta, sizeof(*meta)); + m = METADATA(p); + memcpy(m, meta, sizeof(*meta)); q = (MDB_page *)((char *)p + psize); q->mp_pgno = 1; q->mp_flags = P_META; - meta = METADATA(q); - memcpy(meta, &env->me_meta, sizeof(*meta)); + m = METADATA(q); + memcpy(m, meta, sizeof(*meta)); rc = write(env->me_fd, p, psize * 2); free(p); @@ -869,7 +1044,7 @@ mdbenv_init_meta(MDB_env *env) } static int -mdbenv_write_meta(MDB_txn *txn) +mdb_env_write_meta(MDB_txn *txn) { MDB_env *env; MDB_meta meta; @@ -880,32 +1055,26 @@ mdbenv_write_meta(MDB_txn *txn) assert(txn != NULL); assert(txn->mt_env != NULL); - DPRINTF("writing meta page for root page %lu", txn->mt_dbxs[0].md_db->md_root); + DPRINTF("writing meta page %d for root page %lu", + !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE), txn->mt_dbs[MAIN_DBI].md_root); env = txn->mt_env; ptr = (char *)&meta; - off = offsetof(MDB_meta, mm_last_pg); + off = offsetof(MDB_meta, mm_dbs[0].md_depth); len = sizeof(MDB_meta) - off; ptr += off; + meta.mm_dbs[0] = txn->mt_dbs[0]; + meta.mm_dbs[1] = txn->mt_dbs[1]; meta.mm_last_pg = txn->mt_next_pgno - 1; meta.mm_txnid = txn->mt_txnid; - meta.mm_psize = env->me_meta.mm_psize; - meta.mm_flags = env->me_meta.mm_flags; - meta.mm_depth = txn->mt_dbxs[0].md_db->md_depth; - meta.mm_branch_pages = txn->mt_dbxs[0].md_db->md_branch_pages; - meta.mm_leaf_pages = txn->mt_dbxs[0].md_db->md_leaf_pages; - meta.mm_overflow_pages = txn->mt_dbxs[0].md_db->md_overflow_pages; - meta.mm_entries = txn->mt_dbxs[0].md_db->md_entries; - meta.mm_root = txn->mt_dbxs[0].md_db->md_root; if (!F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE)) - off += env->me_meta.mm_psize; + off += env->me_psize; off += PAGEHDRSZ; - lseek(env->me_fd, off, SEEK_SET); - rc = write(env->me_fd, ptr, len); + rc = pwrite(env->me_fd, ptr, len, off); if (rc != len) { DPRINTF("write failed, disk error?"); return errno; @@ -914,49 +1083,20 @@ mdbenv_write_meta(MDB_txn *txn) return MDB_SUCCESS; } -/* Returns true if page p is a valid meta page, false otherwise. - */ static int -mdb_check_meta_page(MDB_page *p) +mdb_env_read_meta(MDB_env *env, int *which) { - if (!F_ISSET(p->mp_flags, P_META)) { - DPRINTF("page %lu not a meta page", p->mp_pgno); - return EINVAL; - } - - return 0; -} - -static int -mdbenv_read_meta(MDB_env *env, int *which) -{ - MDB_page *mp0, *mp1; - MDB_meta *meta[2]; - int toggle = 0, rc; + int toggle = 0; assert(env != NULL); - if ((mp0 = mdbenv_get_page(env, 0)) == NULL || - (mp1 = mdbenv_get_page(env, 1)) == NULL) - return EIO; - - rc = mdb_check_meta_page(mp0); - if (rc) return rc; - - rc = mdb_check_meta_page(mp1); - if (rc) return rc; - - meta[0] = METADATA(mp0); - meta[1] = METADATA(mp1); - - if (meta[0]->mm_txnid < meta[1]->mm_txnid) + if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid) toggle = 1; - if (meta[toggle]->mm_txnid > env->me_meta.mm_txnid) { - memcpy(&env->me_meta, meta[toggle], sizeof(env->me_meta)); - if (which) - *which = toggle; - } + if (env->me_meta != env->me_metas[toggle]) + env->me_meta = env->me_metas[toggle]; + if (which) + *which = toggle; DPRINTF("Using meta page %d", toggle); @@ -964,39 +1104,47 @@ mdbenv_read_meta(MDB_env *env, int *which) } int -mdbenv_create(MDB_env **env) +mdb_env_create(MDB_env **env) { MDB_env *e; - e = calloc(1, sizeof(*e)); + e = calloc(1, sizeof(MDB_env)); if (!e) return ENOMEM; - e->me_meta.mm_mapsize = DEFAULT_MAPSIZE; e->me_maxreaders = DEFAULT_READERS; + e->me_maxdbs = 2; e->me_fd = -1; e->me_lfd = -1; + e->me_mfd = -1; *env = e; return MDB_SUCCESS; } int -mdbenv_set_mapsize(MDB_env *env, size_t size) +mdb_env_set_mapsize(MDB_env *env, size_t size) { if (env->me_map) return EINVAL; - env->me_mapsize = env->me_meta.mm_mapsize = size; + env->me_mapsize = size; return MDB_SUCCESS; } int -mdbenv_set_maxreaders(MDB_env *env, int readers) +mdb_env_set_maxdbs(MDB_env *env, int dbs) +{ + env->me_maxdbs = dbs; + return MDB_SUCCESS; +} + +int +mdb_env_set_maxreaders(MDB_env *env, int readers) { env->me_maxreaders = readers; return MDB_SUCCESS; } int -mdbenv_get_maxreaders(MDB_env *env, int *readers) +mdb_env_get_maxreaders(MDB_env *env, int *readers) { if (!env || !readers) return EINVAL; @@ -1004,59 +1152,69 @@ mdbenv_get_maxreaders(MDB_env *env, int *readers) return MDB_SUCCESS; } -int -mdbenv_open2(MDB_env *env, unsigned int flags) +static int +mdb_env_open2(MDB_env *env, unsigned int flags) { int i, newenv = 0; + MDB_meta meta; + MDB_page *p; env->me_flags = flags; - if ((i = mdbenv_read_header(env)) != 0) { + memset(&meta, 0, sizeof(meta)); + + if ((i = mdb_env_read_header(env, &meta)) != 0) { if (i != ENOENT) return i; DPRINTF("new mdbenv"); newenv = 1; } - if (!env->me_mapsize) - env->me_mapsize = env->me_meta.mm_mapsize; + if (!env->me_mapsize) { + env->me_mapsize = newenv ? DEFAULT_MAPSIZE : meta.mm_mapsize; + } i = MAP_SHARED; - if (env->me_meta.mm_address && (flags & MDB_FIXEDMAP)) + if (meta.mm_address && (flags & MDB_FIXEDMAP)) i |= MAP_FIXED; - env->me_map = mmap(env->me_meta.mm_address, env->me_mapsize, PROT_READ, i, + env->me_map = mmap(meta.mm_address, env->me_mapsize, PROT_READ, i, env->me_fd, 0); if (env->me_map == MAP_FAILED) return errno; if (newenv) { - env->me_meta.mm_mapsize = env->me_mapsize; + meta.mm_mapsize = env->me_mapsize; if (flags & MDB_FIXEDMAP) - env->me_meta.mm_address = env->me_map; - i = mdbenv_init_meta(env); + meta.mm_address = env->me_map; + i = mdb_env_init_meta(env, &meta); if (i != MDB_SUCCESS) { munmap(env->me_map, env->me_mapsize); return i; } } + env->me_psize = meta.mm_psize; + + p = (MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)env->me_map; + env->me_metas[0] = METADATA(p); + env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize); - if ((i = mdbenv_read_meta(env, NULL)) != 0) + if ((i = mdb_env_read_meta(env, NULL)) != 0) return i; DPRINTF("opened database version %u, pagesize %u", - env->me_meta.mm_version, env->me_meta.mm_psize); - DPRINTF("depth: %u", env->me_meta.mm_depth); - DPRINTF("entries: %lu", env->me_meta.mm_entries); - DPRINTF("branch pages: %lu", env->me_meta.mm_branch_pages); - DPRINTF("leaf pages: %lu", env->me_meta.mm_leaf_pages); - DPRINTF("overflow pages: %lu", env->me_meta.mm_overflow_pages); - DPRINTF("root: %lu", env->me_meta.mm_root); + env->me_meta->mm_version, env->me_psize); + DPRINTF("depth: %u", env->me_meta->mm_dbs[MAIN_DBI].md_depth); + DPRINTF("entries: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_entries); + DPRINTF("branch pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_branch_pages); + DPRINTF("leaf pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_leaf_pages); + DPRINTF("overflow pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_overflow_pages); + DPRINTF("root: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_root); return MDB_SUCCESS; } static void -mdbenv_reader_dest(void *ptr) +mdb_env_reader_dest(void *ptr) { MDB_reader *reader = ptr; @@ -1065,12 +1223,13 @@ mdbenv_reader_dest(void *ptr) reader->mr_tid = 0; } +/* downgrade the exclusive lock on the region back to shared */ static void -mdbenv_share_locks(MDB_env *env) +mdb_env_share_locks(MDB_env *env) { struct flock lock_info; - env->me_txns->mt_txnid = env->me_meta.mm_txnid; + env->me_txns->mti_txnid = env->me_meta->mm_txnid; memset((void *)&lock_info, 0, sizeof(lock_info)); lock_info.l_type = F_RDLCK; @@ -1081,7 +1240,7 @@ mdbenv_share_locks(MDB_env *env) } static int -mdbenv_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) +mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) { int rc; off_t size, rsize; @@ -1134,23 +1293,28 @@ mdbenv_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) pthread_mutexattr_t mattr; pthread_mutexattr_init(&mattr); - pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&env->me_txns->mt_mutex, &mattr); - pthread_mutex_init(&env->me_txns->mt_wmutex, &mattr); - env->me_txns->mt_version = MDB_VERSION; - env->me_txns->mt_magic = MDB_MAGIC; - env->me_txns->mt_txnid = 0; - env->me_txns->mt_numreaders = 0; + rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + if (rc) { + goto fail; + } + pthread_mutex_init(&env->me_txns->mti_mutex, &mattr); + pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr); + env->me_txns->mti_version = MDB_VERSION; + env->me_txns->mti_magic = MDB_MAGIC; + env->me_txns->mti_txnid = 0; + env->me_txns->mti_numreaders = 0; } else { - if (env->me_txns->mt_magic != MDB_MAGIC) { + if (env->me_txns->mti_magic != MDB_MAGIC) { DPRINTF("lock region has invalid magic"); - errno = EINVAL; + rc = EINVAL; + goto fail; } - if (env->me_txns->mt_version != MDB_VERSION) { + if (env->me_txns->mti_version != MDB_VERSION) { DPRINTF("lock region is version %u, expected version %u", - env->me_txns->mt_version, MDB_VERSION); - errno = EINVAL; + env->me_txns->mti_version, MDB_VERSION); + rc = MDB_VERSION_MISMATCH; + goto fail; } if (errno != EACCES && errno != EAGAIN) { rc = errno; @@ -1161,25 +1325,28 @@ mdbenv_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) fail: close(env->me_lfd); + env->me_lfd = -1; return rc; } +#define LOCKNAME "/lock.mdb" +#define DATANAME "/data.mdb" int -mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode) +mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode) { int oflags, rc, len, excl; char *lpath, *dpath; len = strlen(path); - lpath = malloc(len + sizeof("/lock.mdb") + len + sizeof("/data.db")); + lpath = malloc(len + sizeof(LOCKNAME) + len + sizeof(DATANAME)); if (!lpath) return ENOMEM; - dpath = lpath + len + sizeof("/lock.mdb"); - sprintf(lpath, "%s/lock.mdb", path); - sprintf(dpath, "%s/data.mdb", path); + dpath = lpath + len + sizeof(LOCKNAME); + sprintf(lpath, "%s" LOCKNAME, path); + sprintf(dpath, "%s" DATANAME, path); - rc = mdbenv_setup_locks(env, lpath, mode, &excl); + rc = mdb_env_setup_locks(env, lpath, mode, &excl); if (rc) goto leave; @@ -1188,44 +1355,71 @@ mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode) else oflags = O_RDWR | O_CREAT; - if ((env->me_fd = open(dpath, oflags, mode)) == -1) - return errno; + if ((env->me_fd = open(dpath, oflags, mode)) == -1) { + rc = errno; + goto leave; + } + + if ((rc = mdb_env_open2(env, flags)) == MDB_SUCCESS) { + /* synchronous fd for meta writes */ + if (!(flags & (MDB_RDONLY|MDB_NOSYNC))) + oflags |= O_DSYNC; + if ((env->me_mfd = open(dpath, oflags, mode)) == -1) { + rc = errno; + goto leave; + } - if ((rc = mdbenv_open2(env, flags)) != MDB_SUCCESS) { - close(env->me_fd); - env->me_fd = -1; - } else { env->me_path = strdup(path); DPRINTF("opened dbenv %p", (void *) env); - pthread_key_create(&env->me_txkey, mdbenv_reader_dest); + pthread_key_create(&env->me_txkey, mdb_env_reader_dest); if (excl) - mdbenv_share_locks(env); - env->me_dbxs = calloc(DBX_CHUNK, sizeof(MDB_dbx)); - env->me_numdbs = 1; - env->me_dbxs[0].md_db = (MDB_db *)&env->me_meta.mm_psize; + mdb_env_share_locks(env); + env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx)); + env->me_dbs[0] = calloc(env->me_maxdbs, sizeof(MDB_db)); + env->me_dbs[1] = calloc(env->me_maxdbs, sizeof(MDB_db)); + env->me_numdbs = 2; } - leave: + if (rc) { + if (env->me_fd >= 0) { + close(env->me_fd); + env->me_fd = -1; + } + if (env->me_lfd >= 0) { + close(env->me_lfd); + env->me_lfd = -1; + } + } free(lpath); return rc; } void -mdbenv_close(MDB_env *env) +mdb_env_close(MDB_env *env) { if (env == NULL) return; + free(env->me_dbs[1]); + free(env->me_dbs[0]); free(env->me_dbxs); free(env->me_path); + pthread_key_delete(env->me_txkey); + if (env->me_map) { munmap(env->me_map, env->me_mapsize); } + close(env->me_mfd); close(env->me_fd); if (env->me_txns) { + pid_t pid = getpid(); size_t size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo); + int i; + for (i=0; ime_txns->mti_numreaders; i++) + if (env->me_txns->mti_readers[i].mr_pid == pid) + env->me_txns->mti_readers[i].mr_pid = 0; munmap(env->me_txns, size); } close(env->me_lfd); @@ -1322,7 +1516,7 @@ cursor_push_page(MDB_cursor *cursor, MDB_page *mp) DPRINTF("pushing page %lu on cursor %p", mp->mp_pgno, (void *) cursor); - if ((ppage = calloc(1, sizeof(*ppage))) == NULL) + if ((ppage = calloc(1, sizeof(MDB_ppage))) == NULL) return NULL; ppage->mp_page = mp; CURSOR_PUSH(cursor, ppage); @@ -1330,15 +1524,14 @@ cursor_push_page(MDB_cursor *cursor, MDB_page *mp) } static MDB_page * -mdbenv_get_page(MDB_env *env, pgno_t pgno) +mdb_get_page(MDB_txn *txn, pgno_t pgno) { MDB_page *p = NULL; - MDB_txn *txn = env->me_txn; int found = 0; - if (txn && !SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) { + if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && !STAILQ_EMPTY(txn->mt_u.dirty_queue)) { MDB_dpage *dp; - SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) { + STAILQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) { if (dp->p.mp_pgno == pgno) { p = &dp->p; found = 1; @@ -1347,7 +1540,9 @@ mdbenv_get_page(MDB_env *env, pgno_t pgno) } } if (!found) { - p = (MDB_page *)(env->me_map + env->me_meta.mm_psize * pgno); + if (pgno > txn->mt_env->me_meta->mm_last_pg) + return NULL; + p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno); } return p; } @@ -1372,7 +1567,10 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, if (key == NULL) /* Initialize cursor to first page. */ i = 0; - else { + else if (key->mv_size > MAXKEYSIZE && key->mv_data == NULL) { + /* cursor to last page */ + i = NUMKEYS(mp)-1; + } else { int exact; node = mdb_search_node(txn, dbi, mp, key, &exact, &i); if (node == NULL) @@ -1393,7 +1591,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, CURSOR_TOP(cursor)->mp_ki = i; mpp->mp_parent = mp; - if ((mp = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL) + if ((mp = mdb_get_page(txn, NODEPGNO(node))) == NULL) return MDB_FAIL; mpp->mp_pi = i; mpp->mp_page = mp; @@ -1446,31 +1644,42 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, DPRINTF("transaction has failed, must abort"); return EINVAL; } else - root = txn->mt_dbxs[dbi].md_db->md_root; + root = txn->mt_dbs[dbi].md_root; if (root == P_INVALID) { /* Tree is empty. */ DPRINTF("tree is empty"); - return ENOENT; + return MDB_NOTFOUND; } - if ((mpp->mp_page = mdbenv_get_page(txn->mt_env, root)) == NULL) + if ((mpp->mp_page = mdb_get_page(txn, root)) == NULL) return MDB_FAIL; DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags); - if (modify && !F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) { - mpp->mp_parent = NULL; - mpp->mp_pi = 0; - if ((rc = mdb_touch(txn, mpp))) - return rc; - txn->mt_dbxs[dbi].md_db->md_root = mpp->mp_page->mp_pgno; + if (modify) { + /* For sub-databases, update main root first */ + if (dbi > MAIN_DBI && !txn->mt_dbxs[dbi].md_dirty) { + MDB_pageparent mp2; + rc = mdb_search_page(txn, MAIN_DBI, &txn->mt_dbxs[dbi].md_name, + NULL, 1, &mp2); + if (rc) + return rc; + txn->mt_dbxs[dbi].md_dirty = 1; + } + if (!F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) { + mpp->mp_parent = NULL; + mpp->mp_pi = 0; + if ((rc = mdb_touch(txn, mpp))) + return rc; + txn->mt_dbs[dbi].md_root = mpp->mp_page->mp_pgno; + } } return mdb_search_page_root(txn, dbi, key, cursor, modify, mpp); } static int -mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data) +mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data) { MDB_page *omp; /* overflow mpage */ pgno_t pgno; @@ -1485,7 +1694,7 @@ mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data) */ data->mv_size = leaf->mn_dsize; memcpy(&pgno, NODEDATA(leaf), sizeof(pgno)); - if ((omp = mdbenv_get_page(env, pgno)) == NULL) { + if ((omp = mdb_get_page(txn, pgno)) == NULL) { DPRINTF("read overflow page %lu failed", pgno); return MDB_FAIL; } @@ -1506,6 +1715,9 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, assert(data); DPRINTF("===> get key [%.*s]", (int)key->mv_size, (char *)key->mv_data); + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) + return EINVAL; + if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { return EINVAL; } @@ -1514,10 +1726,21 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, return rc; leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL); - if (leaf && exact) - rc = mdb_read_data(txn->mt_env, leaf, data); - else { - rc = ENOENT; + if (leaf && exact) { + /* Return first duplicate data item */ + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + MDB_xcursor mx; + + mdb_xcursor_init0(txn, dbi, &mx); + mdb_xcursor_init1(txn, dbi, &mx, leaf); + rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi, NULL, NULL, 0, &mpp); + if (rc != MDB_SUCCESS) + return rc; + leaf = NODEPTR(mpp.mp_page, 0); + } + rc = mdb_read_data(txn, leaf, data); + } else { + rc = MDB_NOTFOUND; } return rc; @@ -1533,7 +1756,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right) top = CURSOR_TOP(cursor); if ((parent = SLIST_NEXT(top, mp_entry)) == NULL) { - return ENOENT; /* root has no siblings */ + return MDB_NOTFOUND; /* root has no siblings */ } DPRINTF("parent page is page %lu, index %u", @@ -1558,7 +1781,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right) assert(IS_BRANCH(parent->mp_page)); indx = NODEPTR(parent->mp_page, parent->mp_ki); - if ((mp = mdbenv_get_page(cursor->mc_txn->mt_env, indx->mn_pgno)) == NULL) + if ((mp = mdb_get_page(cursor->mc_txn, indx->mn_pgno)) == NULL) return MDB_FAIL; #if 0 mp->parent = parent->mp_page; @@ -1583,18 +1806,27 @@ mdb_set_key(MDB_node *node, MDB_val *key) } static int -mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data) +mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op) { MDB_ppage *top; MDB_page *mp; MDB_node *leaf; + int rc; if (cursor->mc_eof) { - return ENOENT; + return MDB_NOTFOUND; } assert(cursor->mc_initialized); + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + if (op == MDB_NEXT || op == MDB_NEXT_DUP) { + rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT); + if (op != MDB_NEXT || rc == MDB_SUCCESS) + return rc; + } + } + top = CURSOR_TOP(cursor); mp = top->mp_page; @@ -1604,7 +1836,7 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data) DPRINTF("=====> move to next sibling page"); if (mdb_sibling(cursor, 1) != MDB_SUCCESS) { cursor->mc_eof = 1; - return ENOENT; + return MDB_NOTFOUND; } top = CURSOR_TOP(cursor); mp = top->mp_page; @@ -1618,15 +1850,82 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data) assert(IS_LEAF(mp)); leaf = NODEPTR(mp, top->mp_ki); - if (data && mdb_read_data(cursor->mc_txn->mt_env, leaf, data) != MDB_SUCCESS) - return MDB_FAIL; + if (data) { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS)) + return rc; + + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); + if (rc != MDB_SUCCESS) + return rc; + } + } + + return mdb_set_key(leaf, key); +} + +static int +mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op) +{ + MDB_ppage *top; + MDB_page *mp; + MDB_node *leaf; + int rc; + + assert(cursor->mc_initialized); + + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + if (op == MDB_PREV || op == MDB_PREV_DUP) { + rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV); + if (op != MDB_PREV || rc == MDB_SUCCESS) + return rc; + } + } + + top = CURSOR_TOP(cursor); + mp = top->mp_page; + + DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor); + + if (top->mp_ki == 0) { + DPRINTF("=====> move to prev sibling page"); + if (mdb_sibling(cursor, 0) != MDB_SUCCESS) { + return MDB_NOTFOUND; + } + top = CURSOR_TOP(cursor); + mp = top->mp_page; + top->mp_ki = NUMKEYS(mp) - 1; + DPRINTF("prev page is %lu, key index %u", mp->mp_pgno, top->mp_ki); + } else + top->mp_ki--; + + cursor->mc_eof = 0; + + DPRINTF("==> cursor points to page %lu with %u keys, key index %u", + mp->mp_pgno, NUMKEYS(mp), top->mp_ki); + + assert(IS_LEAF(mp)); + leaf = NODEPTR(mp, top->mp_ki); + + if (data) { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS)) + return rc; + + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL); + if (rc != MDB_SUCCESS) + return rc; + } + } return mdb_set_key(leaf, key); } static int mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, - int *exactp) + MDB_cursor_op op, int *exactp) { int rc; MDB_node *leaf; @@ -1637,6 +1936,9 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, assert(key); assert(key->mv_size > 0); + while (CURSOR_TOP(cursor) != NULL) + cursor_pop_page(cursor); + rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp); if (rc != MDB_SUCCESS) return rc; @@ -1645,8 +1947,8 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, top = CURSOR_TOP(cursor); leaf = mdb_search_node(cursor->mc_txn, cursor->mc_dbi, mpp.mp_page, key, exactp, &top->mp_ki); if (exactp != NULL && !*exactp) { - /* MDB_CURSOR_EXACT specified and not an exact match. */ - return ENOENT; + /* MDB_SET specified and not an exact match. */ + return MDB_NOTFOUND; } if (leaf == NULL) { @@ -1663,8 +1965,30 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, cursor->mc_initialized = 1; cursor->mc_eof = 0; - if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS) - return rc; + if (data) { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; + + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + if (op == MDB_SET || op == MDB_SET_RANGE) { + rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); + } else { + int ex2, *ex2p; + MDB_cursor_op op2; + if (op == MDB_GET_BOTH) { + ex2p = &ex2; + op2 = MDB_SET; + } else { + ex2p = NULL; + op2 = MDB_SET_RANGE; + } + rc = mdb_cursor_set(&cursor->mc_xcursor->mx_cursor, data, NULL, op2, ex2p); + if (rc != MDB_SUCCESS) + return rc; + } + } + } rc = mdb_set_key(leaf, key); if (rc == MDB_SUCCESS) { @@ -1683,6 +2007,9 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) MDB_pageparent mpp; MDB_node *leaf; + while (CURSOR_TOP(cursor) != NULL) + cursor_pop_page(cursor); + rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, NULL, cursor, 0, &mpp); if (rc != MDB_SUCCESS) return rc; @@ -1692,8 +2019,58 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) cursor->mc_initialized = 1; cursor->mc_eof = 0; - if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS) + if (data) { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; + + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); + if (rc) + return rc; + } + } + return mdb_set_key(leaf, key); +} + +static int +mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data) +{ + int rc; + MDB_ppage *top; + MDB_pageparent mpp; + MDB_node *leaf; + MDB_val lkey; + + while (CURSOR_TOP(cursor) != NULL) + cursor_pop_page(cursor); + + lkey.mv_size = MAXKEYSIZE+1; + lkey.mv_data = NULL; + + rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, &lkey, cursor, 0, &mpp); + if (rc != MDB_SUCCESS) return rc; + assert(IS_LEAF(mpp.mp_page)); + + leaf = NODEPTR(mpp.mp_page, NUMKEYS(mpp.mp_page)-1); + cursor->mc_initialized = 1; + cursor->mc_eof = 0; + + top = CURSOR_TOP(cursor); + top->mp_ki = NUMKEYS(top->mp_page) - 1; + + if (data) { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; + + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL); + if (rc) + return rc; + } + } return mdb_set_key(leaf, key); } @@ -1708,28 +2085,44 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data, assert(cursor); switch (op) { - case MDB_CURSOR: - case MDB_CURSOR_EXACT: - while (CURSOR_TOP(cursor) != NULL) - cursor_pop_page(cursor); + case MDB_GET_BOTH: + case MDB_GET_BOTH_RANGE: + if (data == NULL) { + rc = EINVAL; + break; + } + /* FALLTHRU */ + case MDB_SET: + case MDB_SET_RANGE: if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { rc = EINVAL; - } else if (op == MDB_CURSOR_EXACT) - rc = mdb_cursor_set(cursor, key, data, &exact); + } else if (op != MDB_SET_RANGE) + rc = mdb_cursor_set(cursor, key, data, op, NULL); else - rc = mdb_cursor_set(cursor, key, data, NULL); + rc = mdb_cursor_set(cursor, key, data, op, &exact); break; case MDB_NEXT: + case MDB_NEXT_DUP: + case MDB_NEXT_NODUP: if (!cursor->mc_initialized) rc = mdb_cursor_first(cursor, key, data); else - rc = mdb_cursor_next(cursor, key, data); + rc = mdb_cursor_next(cursor, key, data, op); + break; + case MDB_PREV: + case MDB_PREV_DUP: + case MDB_PREV_NODUP: + if (!cursor->mc_initialized || cursor->mc_eof) + rc = mdb_cursor_last(cursor, key, data); + else + rc = mdb_cursor_prev(cursor, key, data, op); break; case MDB_FIRST: - while (CURSOR_TOP(cursor) != NULL) - cursor_pop_page(cursor); rc = mdb_cursor_first(cursor, key, data); break; + case MDB_LAST: + rc = mdb_cursor_last(cursor, key, data); + break; default: DPRINTF("unhandled/unimplemented cursor operation %u", op); rc = EINVAL; @@ -1749,17 +2142,17 @@ mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num) if ((dp = mdb_alloc_page(txn, NULL, 0, num)) == NULL) return NULL; DPRINTF("allocated new mpage %lu, page size %u", - dp->p.mp_pgno, txn->mt_env->me_meta.mm_psize); + dp->p.mp_pgno, txn->mt_env->me_psize); dp->p.mp_flags = flags | P_DIRTY; dp->p.mp_lower = PAGEHDRSZ; - dp->p.mp_upper = txn->mt_env->me_meta.mm_psize; + dp->p.mp_upper = txn->mt_env->me_psize; if (IS_BRANCH(&dp->p)) - txn->mt_dbxs[dbi].md_db->md_branch_pages++; + txn->mt_dbs[dbi].md_branch_pages++; else if (IS_LEAF(&dp->p)) - txn->mt_dbxs[dbi].md_db->md_leaf_pages++; + txn->mt_dbs[dbi].md_leaf_pages++; else if (IS_OVERFLOW(&dp->p)) { - txn->mt_dbxs[dbi].md_db->md_overflow_pages += num; + txn->mt_dbs[dbi].md_overflow_pages += num; dp->p.mp_pages = num; } @@ -1772,7 +2165,7 @@ mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data) size_t sz; sz = LEAFSIZE(key, data); - if (data->mv_size >= env->me_meta.mm_psize / MDB_MINKEYS) { + if (data->mv_size >= env->me_psize / MDB_MINKEYS) { /* put on overflow page */ sz -= data->mv_size - sizeof(pgno_t); } @@ -1786,7 +2179,7 @@ mdb_branch_size(MDB_env *env, MDB_val *key) size_t sz; sz = INDXSIZE(key); - if (sz >= env->me_meta.mm_psize / MDB_MINKEYS) { + if (sz >= env->me_psize / MDB_MINKEYS) { /* put on overflow page */ /* not implemented */ /* sz -= key->size - sizeof(pgno_t); */ @@ -1820,8 +2213,8 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, if (F_ISSET(flags, F_BIGDATA)) { /* Data already on overflow page. */ node_size += sizeof(pgno_t); - } else if (data->mv_size >= txn->mt_env->me_meta.mm_psize / MDB_MINKEYS) { - int ovpages = OVPAGES(data->mv_size, txn->mt_env->me_meta.mm_psize); + } else if (data->mv_size >= txn->mt_env->me_psize / MDB_MINKEYS) { + int ovpages = OVPAGES(data->mv_size, txn->mt_env->me_psize); /* Put data on overflow page. */ DPRINTF("data size is %zu, put on overflow page", data->mv_size); @@ -1925,18 +2318,93 @@ mdb_del_node(MDB_page *mp, indx_t indx) mp->mp_upper += sz; } +static void +mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx) +{ + MDB_dbi dbn; + + mx->mx_txn = *txn; + mx->mx_txn.mt_dbxs = mx->mx_dbxs; + mx->mx_txn.mt_dbs = mx->mx_dbs; + mx->mx_dbxs[0] = txn->mt_dbxs[0]; + mx->mx_dbxs[1] = txn->mt_dbxs[1]; + if (dbi > 1) { + mx->mx_dbxs[2] = txn->mt_dbxs[dbi]; + dbn = 2; + } else { + dbn = 1; + } + mx->mx_dbxs[dbn+1].md_parent = dbn; + mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp; + mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel; + mx->mx_dbxs[dbn+1].md_dirty = 0; + mx->mx_txn.mt_numdbs = dbn+2; + + SLIST_INIT(&mx->mx_cursor.mc_stack); + mx->mx_cursor.mc_txn = &mx->mx_txn; + mx->mx_cursor.mc_dbi = dbn+1; +} + +static void +mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node) +{ + MDB_db *db = NODEDATA(node); + MDB_dbi dbn; + mx->mx_dbs[0] = txn->mt_dbs[0]; + mx->mx_dbs[1] = txn->mt_dbs[1]; + if (dbi > 1) { + mx->mx_dbs[2] = txn->mt_dbs[dbi]; + dbn = 3; + } else { + dbn = 2; + } + mx->mx_dbs[dbn] = *db; + mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node); + mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize; + mx->mx_txn.mt_next_pgno = txn->mt_next_pgno; + mx->mx_txn.mt_oldest = txn->mt_oldest; + mx->mx_txn.mt_u = txn->mt_u; +} + +static void +mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx) +{ + txn->mt_next_pgno = mx->mx_txn.mt_next_pgno; + txn->mt_oldest = mx->mx_txn.mt_oldest; + txn->mt_u = mx->mx_txn.mt_u; + txn->mt_dbs[0] = mx->mx_dbs[0]; + txn->mt_dbs[1] = mx->mx_dbs[1]; + txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty; + txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty; + if (dbi > 1) { + txn->mt_dbs[dbi] = mx->mx_dbs[2]; + txn->mt_dbxs[dbi].md_dirty = mx->mx_dbxs[2].md_dirty; + } +} + int mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) { MDB_cursor *cursor; + size_t size = sizeof(MDB_cursor); - if (txn == NULL || ret == NULL) + if (txn == NULL || ret == NULL || !dbi || dbi >= txn->mt_numdbs) return EINVAL; - if ((cursor = calloc(1, sizeof(*cursor))) != NULL) { + if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) + size += sizeof(MDB_xcursor); + + if ((cursor = calloc(1, size)) != NULL) { SLIST_INIT(&cursor->mc_stack); cursor->mc_dbi = dbi; cursor->mc_txn = txn; + if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) { + MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1); + cursor->mc_xcursor = mx; + mdb_xcursor_init0(txn, dbi, mx); + } + } else { + return ENOMEM; } *ret = cursor; @@ -1944,14 +2412,35 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) return MDB_SUCCESS; } +/* Return the count of duplicate data items for the current key */ +int +mdb_cursor_count(MDB_cursor *mc, unsigned long *countp) +{ + if (mc == NULL || countp == NULL) + return EINVAL; + + if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT)) + return EINVAL; + + if (!mc->mc_xcursor->mx_cursor.mc_initialized) + return EINVAL; + + *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries; + return MDB_SUCCESS; +} + void mdb_cursor_close(MDB_cursor *cursor) { if (cursor != NULL) { - while (!CURSOR_EMPTY(cursor)) + while(!CURSOR_EMPTY(cursor)) cursor_pop_page(cursor); + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + mdb_xcursor_fini(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor); + while(!CURSOR_EMPTY(&cursor->mc_xcursor->mx_cursor)) + cursor_pop_page(&cursor->mc_xcursor->mx_cursor); + } -/* btree_close(cursor->bt); */ free(cursor); } } @@ -2120,9 +2609,9 @@ mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst) } if (IS_LEAF(src->mp_page)) - txn->mt_dbxs[dbi].md_db->md_leaf_pages--; + txn->mt_dbs[dbi].md_leaf_pages--; else - txn->mt_dbxs[dbi].md_db->md_branch_pages--; + txn->mt_dbs[dbi].md_branch_pages--; mpp.mp_page = src->mp_parent; dh = (MDB_dhead *)src->mp_parent; @@ -2159,16 +2648,16 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) if (mpp->mp_parent == NULL) { if (NUMKEYS(mpp->mp_page) == 0) { DPRINTF("tree is completely empty"); - txn->mt_dbxs[dbi].md_db->md_root = P_INVALID; - txn->mt_dbxs[dbi].md_db->md_depth--; - txn->mt_dbxs[dbi].md_db->md_leaf_pages--; + txn->mt_dbs[dbi].md_root = P_INVALID; + txn->mt_dbs[dbi].md_depth--; + txn->mt_dbs[dbi].md_leaf_pages--; } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) { DPRINTF("collapsing root page!"); - txn->mt_dbxs[dbi].md_db->md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0)); - if ((root = mdbenv_get_page(txn->mt_env, txn->mt_dbxs[dbi].md_db->md_root)) == NULL) + txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0)); + if ((root = mdb_get_page(txn, txn->mt_dbs[dbi].md_root)) == NULL) return MDB_FAIL; - txn->mt_dbxs[dbi].md_db->md_depth--; - txn->mt_dbxs[dbi].md_db->md_branch_pages--; + txn->mt_dbs[dbi].md_depth--; + txn->mt_dbs[dbi].md_branch_pages--; } else DPRINTF("root page doesn't need rebalancing"); return MDB_SUCCESS; @@ -2191,7 +2680,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) */ DPRINTF("reading right neighbor"); node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1); - if ((npp.mp_page = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL) + if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL) return MDB_FAIL; npp.mp_pi = mpp->mp_pi + 1; si = 0; @@ -2201,7 +2690,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) */ DPRINTF("reading left neighbor"); node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1); - if ((npp.mp_page = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL) + if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL) return MDB_FAIL; npp.mp_pi = mpp->mp_pi - 1; si = NUMKEYS(npp.mp_page) - 1; @@ -2227,9 +2716,37 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) } } +static int +mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_node *leaf) +{ + int rc; + + /* add overflow pages to free list */ + if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { + int i, ovpages; + pgno_t pg; + + memcpy(&pg, NODEDATA(leaf), sizeof(pg)); + ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_psize); + for (i=0; imt_free_pgs, pg); + pg++; + } + } + mdb_del_node(mpp->mp_page, ki); + txn->mt_dbs[dbi].md_entries--; + rc = mdb_rebalance(txn, dbi, mpp); + if (rc != MDB_SUCCESS) + txn->mt_flags |= MDB_TXN_ERROR; + + return rc; +} + int mdb_del(MDB_txn *txn, MDB_dbi dbi, - MDB_val *key, MDB_val *data) + MDB_val *key, MDB_val *data, + unsigned int flags) { int rc, exact; unsigned int ki; @@ -2240,7 +2757,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, assert(key != NULL); - if (txn == NULL || dbi >= txn->mt_numdbs) + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) return EINVAL; if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { @@ -2251,36 +2768,74 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, return EINVAL; } + mpp.mp_parent = NULL; + mpp.mp_pi = 0; if ((rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp)) != MDB_SUCCESS) return rc; leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki); if (leaf == NULL || !exact) { - return ENOENT; + return MDB_NOTFOUND; } - if (data && (rc = mdb_read_data(txn->mt_env, leaf, data)) != MDB_SUCCESS) - return rc; - - mdb_del_node(mpp.mp_page, ki); - /* add overflow pages to free list */ - if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { - int i, ovpages; - pgno_t pg; + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + MDB_xcursor mx; + MDB_pageparent mp2; - memcpy(&pg, NODEDATA(leaf), sizeof(pg)); - ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_meta.mm_psize); - for (i=0; imt_free_pgs, pg); - pg++; + mdb_xcursor_init0(txn, dbi, &mx); + mdb_xcursor_init1(txn, dbi, &mx, leaf); + if (flags == MDB_DEL_DUP) { + rc = mdb_del(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, NULL, 0); + mdb_xcursor_fini(txn, dbi, &mx); + if (rc != MDB_SUCCESS) + return rc; + /* If sub-DB still has entries, we're done */ + if (mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root != P_INVALID) { + memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi], + sizeof(MDB_db)); + return rc; + } + /* otherwise fall thru and delete the sub-DB */ + } else { + /* add all the child DB's pages to the free list */ + rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi, + NULL, &mx.mx_cursor, 0, &mp2); + if (rc == MDB_SUCCESS) { + MDB_ppage *top, *parent; + MDB_node *ni; + unsigned int i; + + cursor_pop_page(&mx.mx_cursor); + top = CURSOR_TOP(&mx.mx_cursor); + if (top != NULL) { + parent = SLIST_NEXT(top, mp_entry); + while (parent != NULL) { + for (i=0; imp_page); i++) { + ni = NODEPTR(top->mp_page, i); + mdb_midl_insert(txn->mt_free_pgs, ni->mn_pgno); + } + if (parent) { + parent->mp_ki++; + if (parent->mp_ki >= NUMKEYS(parent->mp_page)) { + cursor_pop_page(&mx.mx_cursor); + top = CURSOR_TOP(&mx.mx_cursor); + parent = SLIST_NEXT(top, mp_entry); + } else { + ni = NODEPTR(parent->mp_page, parent->mp_ki); + top->mp_page = mdb_get_page(&mx.mx_txn, ni->mn_pgno); + } + } + } + } + mdb_midl_insert(txn->mt_free_pgs, mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root); + } } } - txn->mt_dbxs[dbi].md_db->md_entries--; - rc = mdb_rebalance(txn, dbi, &mpp); - if (rc != MDB_SUCCESS) - txn->mt_flags |= MDB_TXN_ERROR; - return rc; + if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS) + return rc; + + return mdb_del0(txn, dbi, ki, &mpp, leaf); } /* Split page <*mpp>, and insert in either left or @@ -2318,9 +2873,9 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, return MDB_FAIL; mdp->h.md_pi = 0; mdp->h.md_parent = &pdp->p; - txn->mt_dbxs[dbi].md_db->md_root = pdp->p.mp_pgno; + txn->mt_dbs[dbi].md_root = pdp->p.mp_pgno; DPRINTF("root split! new root = %lu", pdp->p.mp_pgno); - txn->mt_dbxs[dbi].md_db->md_depth++; + txn->mt_dbs[dbi].md_depth++; /* Add left (implicit) pointer. */ if (mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL, @@ -2338,12 +2893,12 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno); /* Move half of the keys to the right sibling. */ - if ((copy = malloc(txn->mt_env->me_meta.mm_psize)) == NULL) + if ((copy = malloc(txn->mt_env->me_psize)) == NULL) return MDB_FAIL; - memcpy(copy, &mdp->p, txn->mt_env->me_meta.mm_psize); - memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_meta.mm_psize - PAGEHDRSZ); + memcpy(copy, &mdp->p, txn->mt_env->me_psize); + memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_psize - PAGEHDRSZ); mdp->p.mp_lower = PAGEHDRSZ; - mdp->p.mp_upper = txn->mt_env->me_meta.mm_psize; + mdp->p.mp_upper = txn->mt_env->me_psize; split_indx = NUMKEYS(copy) / 2 + 1; @@ -2440,44 +2995,38 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, return rc; } -int -mdb_put(MDB_txn *txn, MDB_dbi dbi, +static int +mdb_put0(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, unsigned int flags) { int rc = MDB_SUCCESS, exact; unsigned int ki; MDB_node *leaf; MDB_pageparent mpp; - - assert(key != NULL); - assert(data != NULL); - - if (txn == NULL) - return EINVAL; - - if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { - return EINVAL; - } - - if (txn->mt_env->me_txn != txn) { - return EINVAL; - } - - if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { - return EINVAL; - } + MDB_val xdata, *rdata; + MDB_db dummy; DPRINTF("==> put key %.*s, size %zu, data size %zu", (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size); + mpp.mp_parent = NULL; + mpp.mp_pi = 0; rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp); if (rc == MDB_SUCCESS) { leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki); if (leaf && exact) { - if (F_ISSET(flags, MDB_NOOVERWRITE)) { + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + goto put_sub; + } + if (flags == MDB_NOOVERWRITE) { DPRINTF("duplicate key %.*s", (int)key->mv_size, (char *)key->mv_data); - return EEXIST; + return MDB_KEYEXIST; + } + /* same size, just replace it */ + if (NODEDSZ(leaf) == data->mv_size) { + memcpy(NODEDATA(leaf), data->mv_data, data->mv_size); + goto done; } mdb_del_node(mpp.mp_page, ki); } @@ -2485,7 +3034,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, ki = NUMKEYS(mpp.mp_page); DPRINTF("appending key at index %i", ki); } - } else if (rc == ENOENT) { + } else if (rc == MDB_NOTFOUND) { MDB_dpage *dp; /* new file, just write a root leaf page */ DPRINTF("allocating new root leaf page"); @@ -2493,8 +3042,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, return ENOMEM; } mpp.mp_page = &dp->p; - txn->mt_dbxs[dbi].md_db->md_root = mpp.mp_page->mp_pgno; - txn->mt_dbxs[dbi].md_db->md_depth++; + txn->mt_dbs[dbi].md_root = mpp.mp_page->mp_pgno; + txn->mt_dbs[dbi].md_depth++; ki = 0; } else @@ -2504,24 +3053,91 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, DPRINTF("there are %u keys, should insert new key at index %i", NUMKEYS(mpp.mp_page), ki); - if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) { - rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID); + /* For sorted dups, the data item at this level is a DB record + * for a child DB; the actual data elements are stored as keys + * in the child DB. + */ + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + rdata = &xdata; + xdata.mv_size = sizeof(MDB_db); + xdata.mv_data = &dummy; + memset(&dummy, 0, sizeof(dummy)); + dummy.md_root = P_INVALID; + } else { + rdata = data; + } + + if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, rdata)) { + rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID); } else { /* There is room already in this leaf page. */ - rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, data, 0, 0); + rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, rdata, 0, 0); } if (rc != MDB_SUCCESS) txn->mt_flags |= MDB_TXN_ERROR; - else - txn->mt_dbxs[dbi].md_db->md_entries++; + else { + txn->mt_dbs[dbi].md_entries++; + + /* Remember if we just added a subdatabase */ + if (flags & F_SUBDATA) { + leaf = NODEPTR(mpp.mp_page, ki); + leaf->mn_flags |= F_SUBDATA; + } + + /* Now store the actual data in the child DB. Note that we're + * storing the user data in the keys field, so there are strict + * size limits on dupdata. The actual data fields of the child + * DB are all zero size. + */ + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + MDB_xcursor mx; + + leaf = NODEPTR(mpp.mp_page, ki); +put_sub: + mdb_xcursor_init0(txn, dbi, &mx); + mdb_xcursor_init1(txn, dbi, &mx, leaf); + xdata.mv_size = 0; + xdata.mv_data = ""; + if (flags == MDB_NODUPDATA) + flags = MDB_NOOVERWRITE; + rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags); + mdb_xcursor_fini(txn, dbi, &mx); + memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi], + sizeof(MDB_db)); + } + } done: return rc; } int -mdbenv_get_flags(MDB_env *env, unsigned int *arg) +mdb_put(MDB_txn *txn, MDB_dbi dbi, + MDB_val *key, MDB_val *data, unsigned int flags) +{ + assert(key != NULL); + assert(data != NULL); + + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) + return EINVAL; + + if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { + return EINVAL; + } + + if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) { + return EINVAL; + } + + if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA)) != flags) + return EINVAL; + + return mdb_put0(txn, dbi, key, data, flags); +} + +int +mdb_env_get_flags(MDB_env *env, unsigned int *arg) { if (!env || !arg) return EINVAL; @@ -2531,7 +3147,7 @@ mdbenv_get_flags(MDB_env *env, unsigned int *arg) } int -mdbenv_get_path(MDB_env *env, const char **arg) +mdb_env_get_path(MDB_env *env, const char **arg) { if (!env || !arg) return EINVAL; @@ -2540,77 +3156,82 @@ mdbenv_get_path(MDB_env *env, const char **arg) return MDB_SUCCESS; } +static int +mdb_stat0(MDB_env *env, MDB_db *db, MDB_stat *arg) +{ + arg->ms_psize = env->me_psize; + arg->ms_depth = db->md_depth; + arg->ms_branch_pages = db->md_branch_pages; + arg->ms_leaf_pages = db->md_leaf_pages; + arg->ms_overflow_pages = db->md_overflow_pages; + arg->ms_entries = db->md_entries; + + return MDB_SUCCESS; +} int -mdbenv_stat(MDB_env *env, MDB_stat *arg) +mdb_env_stat(MDB_env *env, MDB_stat *arg) { if (env == NULL || arg == NULL) return EINVAL; - arg->ms_psize = env->me_meta.mm_psize; - arg->ms_depth = env->me_meta.mm_depth; - arg->ms_branch_pages = env->me_meta.mm_branch_pages; - arg->ms_leaf_pages = env->me_meta.mm_leaf_pages; - arg->ms_overflow_pages = env->me_meta.mm_overflow_pages; - arg->ms_entries = env->me_meta.mm_entries; - - return MDB_SUCCESS; + return mdb_stat0(env, &env->me_meta->mm_dbs[MAIN_DBI], arg); } int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi) { MDB_val key, data; MDB_dbi i; - int rc; + int rc, dirty = 0; + size_t len; /* main DB? */ if (!name) { - *dbi = 0; + *dbi = MAIN_DBI; + if (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY)) + txn->mt_dbs[MAIN_DBI].md_flags |= (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY)); return MDB_SUCCESS; } /* Is the DB already open? */ - for (i=0; imt_numdbs; i++) { - if (!strcmp(name, txn->mt_dbxs[i].md_name)) { + len = strlen(name); + for (i=2; imt_numdbs; i++) { + if (len == txn->mt_dbxs[i].md_name.mv_size && + !strncmp(name, txn->mt_dbxs[i].md_name.mv_data, len)) { *dbi = i; return MDB_SUCCESS; } } + if (txn->mt_numdbs >= txn->mt_env->me_maxdbs - 1) + return ENFILE; + /* Find the DB info */ - key.mv_size = strlen(name); + key.mv_size = len; key.mv_data = (void *)name; - rc = mdb_get(txn, 0, &key, &data); + rc = mdb_get(txn, MAIN_DBI, &key, &data); /* Create if requested */ - if (rc == ENOENT && (flags & MDB_CREATE)) { + if (rc == MDB_NOTFOUND && (flags & MDB_CREATE)) { MDB_db dummy; data.mv_size = sizeof(MDB_db); data.mv_data = &dummy; memset(&dummy, 0, sizeof(dummy)); dummy.md_root = P_INVALID; dummy.md_flags = flags & 0xffff; - rc = mdb_put(txn, 0, &key, &data, 0); - if (rc == MDB_SUCCESS) - rc = mdb_get(txn, 0, &key, &data); + rc = mdb_put0(txn, MAIN_DBI, &key, &data, F_SUBDATA); + dirty = 1; } /* OK, got info, add to table */ if (rc == MDB_SUCCESS) { - /* Is there a free slot? */ - if ((txn->mt_numdbs & (DBX_CHUNK-1)) == 0) { - MDB_dbx *p1; - int i; - i = txn->mt_numdbs + DBX_CHUNK; - p1 = realloc(txn->mt_dbxs, i * sizeof(MDB_dbx)); - if (p1 == NULL) - return ENOMEM; - txn->mt_dbxs = p1; - } - txn->mt_dbxs[txn->mt_numdbs].md_name = strdup(name); + txn->mt_dbxs[txn->mt_numdbs].md_name.mv_data = strdup(name); + txn->mt_dbxs[txn->mt_numdbs].md_name.mv_size = len; txn->mt_dbxs[txn->mt_numdbs].md_cmp = NULL; txn->mt_dbxs[txn->mt_numdbs].md_dcmp = NULL; txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL; - txn->mt_dbxs[txn->mt_numdbs].md_db = data.mv_data; + txn->mt_dbxs[txn->mt_numdbs].md_parent = MAIN_DBI; + txn->mt_dbxs[txn->mt_numdbs].md_dirty = dirty; + memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db)); *dbi = txn->mt_numdbs; txn->mt_numdbs++; } @@ -2620,23 +3241,46 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi) int mdb_stat(MDB_txn *txn, MDB_dbi dbi, MDB_stat *arg) { - if (txn == NULL || arg == NULL) + if (txn == NULL || arg == NULL || dbi >= txn->mt_numdbs) return EINVAL; - arg->ms_psize = txn->mt_env->me_meta.mm_psize; - arg->ms_depth = txn->mt_dbxs[dbi].md_db->md_depth; - arg->ms_branch_pages = txn->mt_dbxs[dbi].md_db->md_branch_pages; - arg->ms_leaf_pages = txn->mt_dbxs[dbi].md_db->md_leaf_pages; - arg->ms_overflow_pages = txn->mt_dbxs[dbi].md_db->md_overflow_pages; - arg->ms_entries = txn->mt_dbxs[dbi].md_db->md_entries; - - return MDB_SUCCESS; + return mdb_stat0(txn->mt_env, &txn->mt_dbs[dbi], arg); } void mdb_close(MDB_txn *txn, MDB_dbi dbi) { - if (dbi >= txn->mt_numdbs) + char *ptr; + if (dbi <= MAIN_DBI || dbi >= txn->mt_numdbs) return; - free(txn->mt_dbxs[dbi].md_name); - txn->mt_dbxs[dbi].md_name = NULL; + ptr = txn->mt_dbxs[dbi].md_name.mv_data; + txn->mt_dbxs[dbi].md_name.mv_data = NULL; + txn->mt_dbxs[dbi].md_name.mv_size = 0; + free(ptr); +} + +int mdb_set_compare(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp) +{ + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) + return EINVAL; + + txn->mt_dbxs[dbi].md_cmp = cmp; + return MDB_SUCCESS; +} + +int mdb_set_dupsort(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp) +{ + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) + return EINVAL; + + txn->mt_dbxs[dbi].md_dcmp = cmp; + return MDB_SUCCESS; +} + +int mdb_set_relfunc(MDB_txn *txn, MDB_dbi dbi, MDB_rel_func *rel) +{ + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) + return EINVAL; + + txn->mt_dbxs[dbi].md_rel = rel; + return MDB_SUCCESS; }