X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=libraries%2Fliblmdb%2Fmdb.c;h=46e4490c47b352fea77e00b1bde8ed2c0665068f;hb=c3d84bcd06d0017a7fad5f1d7932c590605e6369;hp=e52da918a43c563c1395d6fdef7ae7f4efde2ea4;hpb=30e1c38f80224eea03f29adaedcaef229044eb85;p=openldap diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index e52da918a4..46e4490c47 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -37,10 +37,26 @@ #endif #include #include -#include #ifdef _WIN32 #include +/** getpid() returns int; MinGW defines pid_t but MinGW64 typedefs it + * as int64 which is wrong. MSVC doesn't define it at all, so just + * don't use it. + */ +#define MDB_PID_T int +#ifdef __GNUC__ +# include +#else +# define LITTLE_ENDIAN 1234 +# define BIG_ENDIAN 4321 +# define BYTE_ORDER LITTLE_ENDIAN +# ifndef SSIZE_MAX +# define SSIZE_MAX INT_MAX +# endif +#endif #else +#define MDB_PID_T pid_t +#include #include #include #ifdef HAVE_SYS_FILE_H @@ -324,10 +340,13 @@ static txnid_t mdb_debug_start; (((mc)->mc_flags & C_SUB) ? -(int)(mc)->mc_dbi : (int)(mc)->mc_dbi) /** @} */ - /** A default memory page size. - * The actual size is platform-dependent, but we use this for - * boot-strapping. We probably should not be using this any more. - * The #GET_PAGESIZE() macro is used to get the actual size. + /** @brief The maximum size of a database page. + * + * This is 32k, since it must fit in #MDB_page.#mp_upper. + * + * LMDB will use database pages < OS pages if needed. + * That causes more I/O in write transactions: The OS must + * know (read) the whole page before writing a partial page. * * Note that we don't currently support Huge pages. On Linux, * regular data files cannot use Huge pages, and in general @@ -336,7 +355,7 @@ static txnid_t mdb_debug_start; * pressure from other processes is high. So until OSs have * actual paging support for Huge pages, they're not viable. */ -#define MDB_PAGESIZE 4096 +#define MAX_PAGESIZE 0x8000 /** The minimum number of keys required in a database page. * Setting this to a larger value will place a smaller bound on the @@ -370,7 +389,7 @@ static txnid_t mdb_debug_start; * * We require that keys all fit onto a regular page. This limit * could be raised a bit further if needed; to something just - * under #MDB_PAGESIZE / #MDB_MINKEYS. + * under (page size / #MDB_MINKEYS / 3). * * Note that data items in an #MDB_DUPSORT database are actually keys * of a subDB, so they're also limited to this size. @@ -494,7 +513,7 @@ typedef struct MDB_rxbody { */ txnid_t mrb_txnid; /** The process ID of the process owning this reader txn. */ - pid_t mrb_pid; + MDB_PID_T mrb_pid; /** The thread ID of the thread owning this txn. */ pthread_t mrb_tid; } MDB_rxbody; @@ -813,19 +832,18 @@ typedef struct MDB_meta { txnid_t mm_txnid; /**< txnid that committed this page */ } MDB_meta; - /** Buffer for a stack-allocated dirty page. + /** Buffer for a stack-allocated meta page. * The members define size and alignment, and silence type * aliasing warnings. They are not used directly; that could * mean incorrectly using several union members in parallel. */ -typedef union MDB_pagebuf { - char mb_raw[MDB_PAGESIZE]; +typedef union MDB_metabuf { MDB_page mb_page; struct { char mm_pad[PAGEHDRSZ]; MDB_meta mm_meta; } mb_metabuf; -} MDB_pagebuf; +} MDB_metabuf; /** Auxiliary DB info. * The information here is mostly static/read-only. There is @@ -994,16 +1012,18 @@ struct MDB_env { /** Have liveness lock in reader table */ #define MDB_LIVE_READER 0x08000000U uint32_t me_flags; /**< @ref mdb_env */ - unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */ + unsigned int me_psize; /**< DB page size, inited from me_os_psize */ + unsigned int me_os_psize; /**< OS page size, from #GET_PAGESIZE */ unsigned int me_maxreaders; /**< size of the reader table */ unsigned int me_numreaders; /**< max numreaders set by this env */ MDB_dbi me_numdbs; /**< number of DBs opened */ MDB_dbi me_maxdbs; /**< size of the DB table */ - pid_t me_pid; /**< process ID of this env */ + MDB_PID_T me_pid; /**< process ID of this env */ char *me_path; /**< path to the DB files */ char *me_map; /**< the memory map of the data file */ MDB_txninfo *me_txns; /**< the memory map of the lock file or NULL */ MDB_meta *me_metas[2]; /**< pointers to the two meta pages */ + void *me_pbuf; /**< scratch area for DUPSORT put() */ MDB_txn *me_txn; /**< current write transaction */ size_t me_mapsize; /**< size of the data memory map */ off_t me_size; /**< current file size */ @@ -1202,7 +1222,7 @@ void mdb_page_list(MDB_page *mp) { MDB_node *node; - unsigned int i, nkeys, nsize; + unsigned int i, nkeys, nsize, total = 0; MDB_val key; DKBUF; @@ -1212,18 +1232,23 @@ mdb_page_list(MDB_page *mp) node = NODEPTR(mp, i); key.mv_size = node->mn_ksize; key.mv_data = node->mn_data; - nsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t); + nsize = NODESIZE + key.mv_size; if (IS_BRANCH(mp)) { fprintf(stderr, "key %d: page %"Z"u, %s\n", i, NODEPGNO(node), DKEY(&key)); + total += nsize; } else { if (F_ISSET(node->mn_flags, F_BIGDATA)) nsize += sizeof(pgno_t); else nsize += NODEDSZ(node); + total += nsize; + nsize += sizeof(indx_t); fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key)); } + total += (total & 1); } + fprintf(stderr, "Total: %d\n", total); } void @@ -1318,7 +1343,12 @@ mdb_page_malloc(MDB_txn *txn, unsigned num) { MDB_env *env = txn->mt_env; MDB_page *ret = env->me_dpages; - size_t sz = env->me_psize; + size_t psize = env->me_psize, sz = psize, off; + /* For ! #MDB_NOMEMINIT, psize counts how much to init. + * For a single page alloc, we init everything after the page header. + * For multi-page, we init the final page; if the caller needed that + * many pages they will be filling in at least up to the last page. + */ if (num == 1) { if (ret) { VGMEMP_ALLOC(env, ret, sz); @@ -1326,10 +1356,16 @@ mdb_page_malloc(MDB_txn *txn, unsigned num) env->me_dpages = ret->mp_next; return ret; } + psize -= off = PAGEHDRSZ; } else { sz *= num; + off = sz - psize; } if ((ret = malloc(sz)) != NULL) { + if (!(env->me_flags & MDB_NOMEMINIT)) { + memset((char *)ret + off, 0, psize); + ret->mp_pad = 0; + } VGMEMP_ALLOC(env, ret, sz); } return ret; @@ -2078,7 +2114,7 @@ enum Pidlock_op { * lock on the lockfile, set at an offset equal to the pid. */ static int -mdb_reader_pid(MDB_env *env, enum Pidlock_op op, pid_t pid) +mdb_reader_pid(MDB_env *env, enum Pidlock_op op, MDB_PID_T pid) { #if !(MDB_PIDLOCK) /* Currently the same as defined(_WIN32) */ int ret = 0; @@ -2143,7 +2179,7 @@ mdb_txn_renew0(MDB_txn *txn) if (r->mr_pid != env->me_pid || r->mr_txnid != (txnid_t)-1) return MDB_BAD_RSLOT; } else { - pid_t pid = env->me_pid; + MDB_PID_T pid = env->me_pid; pthread_t tid = pthread_self(); if (!(env->me_flags & MDB_LIVE_READER)) { @@ -2482,7 +2518,7 @@ mdb_freelist_save(MDB_txn *txn) int rc, maxfree_1pg = env->me_maxfree_1pg, more = 1; txnid_t pglast = 0, head_id = 0; pgno_t freecnt = 0, *free_pgs, *mop; - ssize_t head_room = 0, total_room = 0, mop_len; + ssize_t head_room = 0, total_room = 0, mop_len, clean_limit; mdb_cursor_init(&mc, txn, FREE_DBI, NULL); @@ -2493,9 +2529,15 @@ mdb_freelist_save(MDB_txn *txn) return rc; } + /* MDB_RESERVE cancels meminit in ovpage malloc (when no WRITEMAP) */ + clean_limit = (env->me_flags & (MDB_NOMEMINIT|MDB_WRITEMAP)) + ? SSIZE_MAX : maxfree_1pg; + for (;;) { /* Come back here after each Put() in case freelist changed */ MDB_val key, data; + pgno_t *pgs; + ssize_t j; /* If using records from freeDB which we have not yet * deleted, delete them and any we reserved for me_pghead. @@ -2579,7 +2621,12 @@ mdb_freelist_save(MDB_txn *txn) rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE); if (rc) return rc; - *(MDB_ID *)data.mv_data = 0; /* IDL is initially empty */ + /* IDL is initially empty, zero out at least the length */ + pgs = (pgno_t *)data.mv_data; + j = head_room > clean_limit ? head_room : 0; + do { + pgs[j] = 0; + } while (--j >= 0); total_room += head_room; } @@ -2970,10 +3017,11 @@ fail: static int mdb_env_read_header(MDB_env *env, MDB_meta *meta) { - MDB_pagebuf pbuf; + MDB_metabuf pbuf; MDB_page *p; MDB_meta *m; int i, rc, off; + enum { Size = sizeof(pbuf) }; /* We don't know the page size yet, so use a minimum value. * Read both meta pages so we can use the latest one. @@ -2985,13 +3033,13 @@ mdb_env_read_header(MDB_env *env, MDB_meta *meta) OVERLAPPED ov; memset(&ov, 0, sizeof(ov)); ov.Offset = off; - rc = ReadFile(env->me_fd,&pbuf,MDB_PAGESIZE,&len,&ov) ? (int)len : -1; + rc = ReadFile(env->me_fd, &pbuf, Size, &len, &ov) ? (int)len : -1; if (rc == -1 && ErrCode() == ERROR_HANDLE_EOF) rc = 0; #else - rc = pread(env->me_fd, &pbuf, MDB_PAGESIZE, off); + rc = pread(env->me_fd, &pbuf, Size, off); #endif - if (rc != MDB_PAGESIZE) { + if (rc != Size) { if (rc == 0 && off == 0) return ENOENT; rc = rc < 0 ? (int) ErrCode() : MDB_INVALID; @@ -3122,11 +3170,18 @@ mdb_env_write_meta(MDB_txn *txn) mp->mm_last_pg = txn->mt_next_pgno - 1; mp->mm_txnid = txn->mt_txnid; if (!(env->me_flags & (MDB_NOMETASYNC|MDB_NOSYNC))) { + unsigned meta_size = env->me_psize; rc = (env->me_flags & MDB_MAPASYNC) ? MS_ASYNC : MS_SYNC; ptr = env->me_map; - if (toggle) - ptr += env->me_psize; - if (MDB_MSYNC(ptr, env->me_psize, rc)) { + if (toggle) { +#ifndef _WIN32 /* POSIX msync() requires ptr = start of OS page */ + if (meta_size < env->me_os_psize) + meta_size += meta_size; + else +#endif + ptr += meta_size; + } + if (MDB_MSYNC(ptr, meta_size, rc)) { rc = ErrCode(); goto fail; } @@ -3232,6 +3287,7 @@ mdb_env_create(MDB_env **env) e->me_wmutex = SEM_FAILED; #endif e->me_pid = getpid(); + GET_PAGESIZE(e->me_os_psize); VGMEMP_CREATE(e,0,0); *env = e; return MDB_SUCCESS; @@ -3397,7 +3453,9 @@ mdb_env_open2(MDB_env *env) return i; DPUTS("new mdbenv"); newenv = 1; - GET_PAGESIZE(env->me_psize); + env->me_psize = env->me_os_psize; + if (env->me_psize > MAX_PAGESIZE) + env->me_psize = MAX_PAGESIZE; } else { env->me_psize = meta.mm_psize; } @@ -3508,7 +3566,7 @@ PIMAGE_TLS_CALLBACK mdb_tls_cbp __attribute__((section (".CRT$XLB"))) = mdb_tls_ #pragma comment(linker, "/INCLUDE:_tls_used") #pragma comment(linker, "/INCLUDE:mdb_tls_cbp") #pragma const_seg(".CRT$XLB") -extern const PIMAGE_TLS_CALLBACK mdb_tls_callback; +extern const PIMAGE_TLS_CALLBACK mdb_tls_cbp; const PIMAGE_TLS_CALLBACK mdb_tls_cbp = mdb_tls_callback; #pragma const_seg() #else /* WIN32 */ @@ -3772,7 +3830,7 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo); if (size < rsize && *excl > 0) { #ifdef _WIN32 - if (SetFilePointer(env->me_lfd, rsize, NULL, FILE_BEGIN) != rsize + if (SetFilePointer(env->me_lfd, rsize, NULL, FILE_BEGIN) != (DWORD)rsize || !SetEndOfFile(env->me_lfd)) goto fail_errno; #else @@ -3928,8 +3986,9 @@ fail: * at runtime. Changing other flags requires closing the * environment and re-opening it with the new flags. */ -#define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC) -#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP|MDB_NOTLS|MDB_NOLOCK) +#define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC|MDB_NOMEMINIT) +#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP| \ + MDB_NOTLS|MDB_NOLOCK|MDB_NORDAHEAD) int mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode) @@ -4042,7 +4101,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode DPRINTF(("opened dbenv %p", (void *) env)); if (excl > 0) { rc = mdb_env_share_locks(env, &excl); + if (rc) + goto leave; } + if (!((flags & MDB_RDONLY) || + (env->me_pbuf = calloc(1, env->me_psize)))) + rc = ENOMEM; } leave: @@ -4066,6 +4130,7 @@ mdb_env_close0(MDB_env *env, int excl) for (i = env->me_maxdbs; --i > MAIN_DBI; ) free(env->me_dbxs[i].md_name.mv_data); + free(env->me_pbuf); free(env->me_dbflags); free(env->me_dbxs); free(env->me_path); @@ -4093,7 +4158,7 @@ mdb_env_close0(MDB_env *env, int excl) if (env->me_fd != INVALID_HANDLE_VALUE) (void) close(env->me_fd); if (env->me_txns) { - pid_t pid = env->me_pid; + MDB_PID_T pid = env->me_pid; /* Clearing readers is done in this function because * me_txkey with its destructor must be disabled first. */ @@ -5420,8 +5485,9 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, rc = EINVAL; } else { MDB_page *mp = mc->mc_pg[mc->mc_top]; - if (!NUMKEYS(mp)) { - mc->mc_ki[mc->mc_top] = 0; + int nkeys = NUMKEYS(mp); + if (!nkeys || mc->mc_ki[mc->mc_top] >= nkeys) { + mc->mc_ki[mc->mc_top] = nkeys; rc = MDB_NOTFOUND; break; } @@ -5602,15 +5668,14 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, unsigned int flags) { enum { MDB_NO_ROOT = MDB_LAST_ERRCODE+10 }; /* internal code */ + MDB_env *env = mc->mc_txn->mt_env; MDB_node *leaf = NULL; MDB_val xdata, *rdata, dkey; - MDB_page *fp; MDB_db dummy; int do_sub = 0, insert = 0; unsigned int mcount = 0, dcount = 0, nospill; size_t nsize; int rc, rc2; - MDB_pagebuf pbuf; char dbuf[MDB_MAXKEYSIZE+1]; unsigned int nflags; DKBUF; @@ -5724,6 +5789,9 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, /* The key already exists */ if (rc == MDB_SUCCESS) { + MDB_page *fp, *mp; + MDB_val olddata; + /* there's only a key anyway, so this is a no-op */ if (IS_LEAF2(mc->mc_pg[mc->mc_top])) { unsigned int ksize = mc->mc_db->md_pad; @@ -5736,19 +5804,23 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, return MDB_SUCCESS; } +more: leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + olddata.mv_size = NODEDSZ(leaf); + olddata.mv_data = NODEDATA(leaf); /* DB has dups? */ if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) { + mp = fp = xdata.mv_data = env->me_pbuf; + mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; + /* Was a single item before, must convert now */ -more: if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { /* Just overwrite the current item */ if (flags == MDB_CURRENT) goto current; - dkey.mv_size = NODEDSZ(leaf); - dkey.mv_data = NODEDATA(leaf); + dkey = olddata; #if UINT_MAX < SIZE_MAX if (mc->mc_dbx->md_dcmp == mdb_cmp_int && dkey.mv_size == sizeof(size_t)) #ifdef MISALIGNED_OK @@ -5771,85 +5843,76 @@ more: /* create a fake page for the dup items */ memcpy(dbuf, dkey.mv_data, dkey.mv_size); dkey.mv_data = dbuf; - fp = (MDB_page *)&pbuf; - fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP; fp->mp_lower = PAGEHDRSZ; - fp->mp_upper = PAGEHDRSZ + dkey.mv_size + data->mv_size; + xdata.mv_size = PAGEHDRSZ + dkey.mv_size + data->mv_size; if (mc->mc_db->md_flags & MDB_DUPFIXED) { fp->mp_flags |= P_LEAF2; fp->mp_pad = data->mv_size; - fp->mp_upper += 2 * data->mv_size; /* leave space for 2 more */ + xdata.mv_size += 2 * data->mv_size; /* leave space for 2 more */ } else { - fp->mp_upper += 2 * sizeof(indx_t) + 2 * NODESIZE + + xdata.mv_size += 2 * (sizeof(indx_t) + NODESIZE) + (dkey.mv_size & 1) + (data->mv_size & 1); } - mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); - do_sub = 1; - rdata = &xdata; - xdata.mv_size = fp->mp_upper; - xdata.mv_data = fp; - flags |= F_DUPDATA; - goto new_sub; - } - if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) { + fp->mp_upper = xdata.mv_size; + } else if (leaf->mn_flags & F_SUBDATA) { + /* Data is on sub-DB, just store it */ + flags |= F_DUPDATA|F_SUBDATA; + goto put_sub; + } else { /* See if we need to convert from fake page to subDB */ - MDB_page *mp; unsigned int offset; unsigned int i; uint16_t fp_flags; - fp = NODEDATA(leaf); - if (flags == MDB_CURRENT) { -reuse: + fp = olddata.mv_data; + switch (flags) { + default: + if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) { + offset = NODESIZE + sizeof(indx_t) + data->mv_size; + offset += offset & 1; + break; + } + offset = fp->mp_pad; + if (SIZELEFT(fp) < offset) { + offset *= 4; /* space for 4 more */ + break; + } + /* FALLTHRU: Big enough MDB_DUPFIXED sub-page */ + case MDB_CURRENT: fp->mp_flags |= P_DIRTY; - COPY_PGNO(fp->mp_pgno, mc->mc_pg[mc->mc_top]->mp_pgno); + COPY_PGNO(fp->mp_pgno, mp->mp_pgno); mc->mc_xcursor->mx_cursor.mc_pg[0] = fp; flags |= F_DUPDATA; goto put_sub; } - if (mc->mc_db->md_flags & MDB_DUPFIXED) { - offset = fp->mp_pad; - if (SIZELEFT(fp) >= offset) - goto reuse; - offset *= 4; /* space for 4 more */ - } else { - offset = NODESIZE + sizeof(indx_t) + data->mv_size; - } - offset += offset & 1; fp_flags = fp->mp_flags; - if (NODESIZE + sizeof(indx_t) + NODEKSZ(leaf) + NODEDSZ(leaf) + - offset >= mc->mc_txn->mt_env->me_nodemax) { + xdata.mv_size = olddata.mv_size + offset; + if (NODESIZE + sizeof(indx_t) + NODEKSZ(leaf) + xdata.mv_size + >= env->me_nodemax) { /* yes, convert it */ - dummy.md_flags = 0; if (mc->mc_db->md_flags & MDB_DUPFIXED) { dummy.md_pad = fp->mp_pad; dummy.md_flags = MDB_DUPFIXED; if (mc->mc_db->md_flags & MDB_INTEGERDUP) dummy.md_flags |= MDB_INTEGERKEY; + } else { + dummy.md_pad = 0; + dummy.md_flags = 0; } dummy.md_depth = 1; dummy.md_branch_pages = 0; dummy.md_leaf_pages = 1; dummy.md_overflow_pages = 0; dummy.md_entries = NUMKEYS(fp); - rdata = &xdata; xdata.mv_size = sizeof(MDB_db); xdata.mv_data = &dummy; if ((rc = mdb_page_alloc(mc, 1, &mp))) return rc; - offset = mc->mc_txn->mt_env->me_psize - NODEDSZ(leaf); + offset = env->me_psize - olddata.mv_size; flags |= F_DUPDATA|F_SUBDATA; dummy.md_root = mp->mp_pgno; fp_flags &= ~P_SUBP; - } else { - /* no, just grow it */ - rdata = &xdata; - xdata.mv_size = NODEDSZ(leaf) + offset; - xdata.mv_data = &pbuf; - mp = (MDB_page *)&pbuf; - mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; - flags |= F_DUPDATA; } mp->mp_flags = fp_flags | P_DIRTY; mp->mp_pad = fp->mp_pad; @@ -5858,28 +5921,27 @@ reuse: if (IS_LEAF2(fp)) { memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad); } else { - nsize = NODEDSZ(leaf) - fp->mp_upper; - memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, nsize); + memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, + olddata.mv_size - fp->mp_upper); for (i=0; imp_ptrs[i] = fp->mp_ptrs[i] + offset; } - mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); - do_sub = 1; - goto new_sub; } - /* data is on sub-DB, just store it */ - flags |= F_DUPDATA|F_SUBDATA; - goto put_sub; + + rdata = &xdata; + flags |= F_DUPDATA; + do_sub = 1; + mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); + goto new_sub; } current: /* overflow page overwrites need special handling */ if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { MDB_page *omp; pgno_t pg; - unsigned psize = mc->mc_txn->mt_env->me_psize; - int level, ovpages, dpages = OVPAGES(data->mv_size, psize); + int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize); - memcpy(&pg, NODEDATA(leaf), sizeof(pg)); + memcpy(&pg, olddata.mv_data, sizeof(pg)); if ((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0) return rc2; ovpages = omp->mp_pages; @@ -5887,7 +5949,7 @@ current: /* Is the ov page large enough? */ if (ovpages >= dpages) { if (!(omp->mp_flags & P_DIRTY) && - (level || (mc->mc_txn->mt_env->me_flags & MDB_WRITEMAP))) + (level || (env->me_flags & MDB_WRITEMAP))) { rc = mdb_page_unspill(mc->mc_txn, omp, &omp); if (rc) @@ -5902,7 +5964,7 @@ current: */ if (level > 1) { /* It is writable only in a parent txn */ - size_t sz = (size_t) psize * ovpages, off; + size_t sz = (size_t) env->me_psize * ovpages, off; MDB_page *np = mdb_page_malloc(mc->mc_txn, ovpages); MDB_ID2 id2; if (!np) @@ -5932,15 +5994,15 @@ current: } if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS) return rc2; - } else if (NODEDSZ(leaf) == data->mv_size) { + } else if (data->mv_size == olddata.mv_size) { /* same size, just replace it. Note that we could * also reuse this node if the new data is smaller, * but instead we opt to shrink the node in that case. */ if (F_ISSET(flags, MDB_RESERVE)) - data->mv_data = NODEDATA(leaf); + data->mv_data = olddata.mv_data; else if (data->mv_size) - memcpy(NODEDATA(leaf), data->mv_data, data->mv_size); + memcpy(olddata.mv_data, data->mv_data, data->mv_size); else memcpy(NODEKEY(leaf), key->mv_data, key->mv_size); goto done; @@ -5956,7 +6018,7 @@ current: new_sub: nflags = flags & NODE_ADD_FLAGS; - nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(mc->mc_txn->mt_env, key, rdata); + nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(env, key, rdata); if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) { if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA ) nflags &= ~MDB_APPEND; @@ -6050,7 +6112,6 @@ next_mult: data[1].mv_size = mcount; if (mcount < dcount) { data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size; - leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); goto more; } } @@ -6069,6 +6130,7 @@ int mdb_cursor_del(MDB_cursor *mc, unsigned int flags) { MDB_node *leaf; + MDB_page *mp; int rc; if (mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) @@ -6077,6 +6139,9 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) if (!(mc->mc_flags & C_INITIALIZED)) return EINVAL; + if (mc->mc_ki[mc->mc_top] >= NUMKEYS(mc->mc_pg[mc->mc_top])) + return MDB_NOTFOUND; + if (!(flags & MDB_NOSPILL) && (rc = mdb_page_spill(mc, NULL, NULL))) return rc; @@ -6084,9 +6149,10 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) if (rc) return rc; - leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + mp = mc->mc_pg[mc->mc_top]; + leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (!IS_LEAF2(mp) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (!(flags & MDB_NODUPDATA)) { if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) { mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); @@ -6101,13 +6167,13 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) } else { MDB_cursor *m2; /* shrink fake page */ - mdb_node_shrink(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + mdb_node_shrink(mp, mc->mc_ki[mc->mc_top]); + leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); /* fix other sub-DB cursors pointed at this fake page */ for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) { if (m2 == mc || m2->mc_snum < mc->mc_snum) continue; - if (m2->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top] && + if (m2->mc_pg[mc->mc_top] == mp && m2->mc_ki[mc->mc_top] == mc->mc_ki[mc->mc_top]) m2->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); } @@ -6691,7 +6757,7 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key) MDB_node *node; char *base; size_t len; - int delta, delta0; + int delta, ksize, oksize; indx_t ptr, i, numkeys, indx; DKBUF; @@ -6713,12 +6779,15 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key) } #endif - delta0 = delta = key->mv_size - node->mn_ksize; + ksize = key->mv_size; + ksize += (ksize & 1); + oksize = node->mn_ksize; + oksize += (oksize & 1); + delta = ksize - oksize; /* Must be 2-byte aligned. If new key is * shorter by 1, the shift will be skipped. */ - delta += (delta & 1); if (delta) { if (delta > 0 && SIZELEFT(mp) < delta) { pgno_t pgno; @@ -6744,7 +6813,7 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key) } /* But even if no shift was needed, update ksize */ - if (delta0) + if (node->mn_ksize != key->mv_size) node->mn_ksize = key->mv_size; if (key->mv_size) @@ -6785,7 +6854,7 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) flags = 0; } else { srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]); - assert(!((long)srcnode&1)); + assert(!((size_t)srcnode&1)); srcpg = NODEPGNO(srcnode); flags = srcnode->mn_flags; if (csrc->mc_ki[csrc->mc_top] == 0 && IS_BRANCH(csrc->mc_pg[csrc->mc_top])) { @@ -7285,7 +7354,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf) /* Adjust other cursors pointing to mp */ for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { - if (m2 == mc) + if (m2 == mc || m2->mc_snum < mc->mc_snum) continue; if (!(m2->mc_flags & C_INITIALIZED)) continue; @@ -7536,7 +7605,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno psize = 0; if (newindx <= split_indx || newindx >= nkeys) { i = 0; j = 1; - k = newindx >= nkeys ? nkeys : split_indx+1; + k = newindx >= nkeys ? nkeys : split_indx+2; } else { i = nkeys; j = -1; k = split_indx-1; @@ -7556,18 +7625,11 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno } psize += psize & 1; } - if (psize > pmax) { + if (psize > pmax || i == k-j) { split_indx = i + (j<0); break; } } - /* special case: when the new node was on the last - * slot we may not have tripped the break inside the loop. - * In all other cases we either hit the break condition, - * or the original split_indx was already safe. - */ - if (newindx >= nkeys && i == k) - split_indx = nkeys-1; } if (split_indx == newindx) { sepkey.mv_size = newkey->mv_size; @@ -7817,6 +7879,16 @@ mdb_env_get_path(MDB_env *env, const char **arg) return MDB_SUCCESS; } +int +mdb_env_get_fd(MDB_env *env, mdb_filehandle_t *arg) +{ + if (!env || !arg) + return EINVAL; + + *arg = env->me_fd; + return MDB_SUCCESS; +} + /** Common code for #mdb_stat() and #mdb_env_stat(). * @param[in] env the environment to operate in. * @param[in] db the #MDB_db record containing the stats to return. @@ -8242,7 +8314,7 @@ int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx) /** Insert pid into list if not already present. * return -1 if already present. */ -static int mdb_pid_insert(pid_t *ids, pid_t pid) +static int mdb_pid_insert(MDB_PID_T *ids, MDB_PID_T pid) { /* binary search of pid in list */ unsigned base = 0; @@ -8282,7 +8354,7 @@ int mdb_reader_check(MDB_env *env, int *dead) { unsigned int i, j, rdrs; MDB_reader *mr; - pid_t *pids, pid; + MDB_PID_T *pids, pid; int count = 0; if (!env) @@ -8292,7 +8364,7 @@ int mdb_reader_check(MDB_env *env, int *dead) if (!env->me_txns) return MDB_SUCCESS; rdrs = env->me_txns->mti_numreaders; - pids = malloc((rdrs+1) * sizeof(pid_t)); + pids = malloc((rdrs+1) * sizeof(MDB_PID_T)); if (!pids) return ENOMEM; pids[0] = 0;