#ifndef _GNU_SOURCE
#define _GNU_SOURCE 1
#endif
-#include <sys/types.h>
-#include <sys/stat.h>
#ifdef _WIN32
+#include <malloc.h>
#include <windows.h>
/** getpid() returns int; MinGW defines pid_t but MinGW64 typedefs it
* as int64 which is wrong. MSVC doesn't define it at all, so just
* don't use it.
*/
#define MDB_PID_T int
+#define MDB_THR_T DWORD
+#include <sys/types.h>
+#include <sys/stat.h>
#ifdef __GNUC__
# include <sys/param.h>
#else
# endif
#endif
#else
+#include <sys/types.h>
+#include <sys/stat.h>
#define MDB_PID_T pid_t
+#define MDB_THR_T pthread_t
#include <sys/param.h>
#include <sys/uio.h>
#include <sys/mman.h>
#ifdef _WIN32
#define MDB_USE_HASH 1
#define MDB_PIDLOCK 0
-#define pthread_t DWORD
+#define THREAD_RET DWORD
+#define pthread_t HANDLE
#define pthread_mutex_t HANDLE
+#define pthread_cond_t HANDLE
#define pthread_key_t DWORD
#define pthread_self() GetCurrentThreadId()
#define pthread_key_create(x,y) \
#define pthread_setspecific(x,y) (TlsSetValue(x,y) ? 0 : ErrCode())
#define pthread_mutex_unlock(x) ReleaseMutex(x)
#define pthread_mutex_lock(x) WaitForSingleObject(x, INFINITE)
+#define pthread_cond_signal(x) SetEvent(*x)
+#define pthread_cond_wait(cond,mutex) SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE)
+#define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL)
+#define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE)
#define LOCK_MUTEX_R(env) pthread_mutex_lock((env)->me_rmutex)
#define UNLOCK_MUTEX_R(env) pthread_mutex_unlock((env)->me_rmutex)
#define LOCK_MUTEX_W(env) pthread_mutex_lock((env)->me_wmutex)
#endif
#define Z "I"
#else
-
+#define THREAD_RET void *
+#define THREAD_CREATE(thr,start,arg) pthread_create(&thr,NULL,start,arg)
+#define THREAD_FINISH(thr) pthread_join(thr,NULL)
#define Z "z" /**< printf format modifier for size_t */
/** For MDB_LOCK_FORMAT: True if readers take a pid lock in the lockfile */
/** The process ID of the process owning this reader txn. */
MDB_PID_T mrb_pid;
/** The thread ID of the thread owning this txn. */
- pthread_t mrb_tid;
+ MDB_THR_T mrb_tid;
} MDB_rxbody;
/** The actual reader record, with cacheline padding. */
return MDB_BAD_RSLOT;
} else {
MDB_PID_T pid = env->me_pid;
- pthread_t tid = pthread_self();
+ MDB_THR_T tid = pthread_self();
if (!env->me_live_reader) {
rc = mdb_reader_pid(env, Pidset, pid);
int rc;
HANDLE mh;
LONG sizelo, sizehi;
- sizelo = env->me_mapsize & 0xffffffff;
- sizehi = env->me_mapsize >> 16 >> 16; /* only needed on Win64 */
+ size_t msize;
+
+ if (flags & MDB_RDONLY) {
+ msize = 0;
+ sizelo = 0;
+ sizehi = 0;
+ } else {
+ msize = env->me_mapsize;
+ sizelo = msize & 0xffffffff;
+ sizehi = msize >> 16 >> 16; /* only needed on Win64 */
+ }
/* Windows won't create mappings for zero length files.
* Just allocate the maxsize right now.
return ErrCode();
env->me_map = MapViewOfFileEx(mh, flags & MDB_WRITEMAP ?
FILE_MAP_WRITE : FILE_MAP_READ,
- 0, 0, env->me_mapsize, addr);
+ 0, 0, msize, addr);
rc = env->me_map ? 0 : ErrCode();
CloseHandle(mh);
if (rc)
return mdb_cursor_put(&mc, key, data, flags);
}
-#define WBUF (64*1024)
+#ifndef MDB_WBUF
+#define MDB_WBUF (1024*1024)
+#endif
+ /** State needed for a compacting copy. */
typedef struct mdb_copy {
- pthread_mutex_t mc_mutex[2];
+ pthread_mutex_t mc_mutex;
+ pthread_cond_t mc_cond;
char *mc_wbuf[2];
char *mc_over[2];
- void *mc_obuf[2];
- void *mc_free;
MDB_env *mc_env;
MDB_txn *mc_txn;
int mc_wlen[2];
pgno_t mc_next_pgno;
HANDLE mc_fd;
int mc_status;
+ volatile int mc_new;
int mc_toggle;
+
} mdb_copy;
-static void *
+ /** Dedicated writer thread for compacting copy. */
+static THREAD_RET
mdb_env_copythr(void *arg)
{
mdb_copy *my = arg;
char *ptr;
- int wsize;
- int toggle = 0, len, rc;
+ int toggle = 0, wsize, rc;
#ifdef _WIN32
+ DWORD len;
#define DO_WRITE(rc, fd, ptr, w2, len) rc = WriteFile(fd, ptr, w2, &len, NULL)
#else
+ int len;
#define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0)
#endif
+ pthread_mutex_lock(&my->mc_mutex);
+ my->mc_new = 0;
+ pthread_cond_signal(&my->mc_cond);
for(;;) {
- pthread_mutex_lock(&my->mc_mutex[toggle]);
- if (!my->mc_wlen[toggle]) {
- pthread_mutex_unlock(&my->mc_mutex[toggle]);
+ while (!my->mc_new)
+ pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
+ if (my->mc_new < 0) {
+ my->mc_new = 0;
break;
}
+ my->mc_new = 0;
wsize = my->mc_wlen[toggle];
ptr = my->mc_wbuf[toggle];
again:
}
if (rc) {
my->mc_status = rc;
- pthread_mutex_unlock(&my->mc_mutex[toggle]);
break;
}
/* If there's an overflow page tail, write it too */
my->mc_olen[toggle] = 0;
goto again;
}
- pthread_mutex_unlock(&my->mc_mutex[toggle]);
+ my->mc_wlen[toggle] = 0;
toggle ^= 1;
+ pthread_cond_signal(&my->mc_cond);
}
- return NULL;
+ pthread_cond_signal(&my->mc_cond);
+ pthread_mutex_unlock(&my->mc_mutex);
+ return (THREAD_RET)0;
#undef DO_WRITE
}
+ /** Tell the writer thread there's a buffer ready to write */
static int
-mdb_env_cthr_toggle(mdb_copy *my)
+mdb_env_cthr_toggle(mdb_copy *my, int st)
{
int toggle = my->mc_toggle ^ 1;
-
- pthread_mutex_unlock(&my->mc_mutex[my->mc_toggle]);
- pthread_mutex_lock(&my->mc_mutex[toggle]);
+ pthread_mutex_lock(&my->mc_mutex);
if (my->mc_status) {
- pthread_mutex_unlock(&my->mc_mutex[toggle]);
+ pthread_mutex_unlock(&my->mc_mutex);
return my->mc_status;
}
- my->mc_wlen[toggle] = 0;
- my->mc_olen[toggle] = 0;
+ while (my->mc_new == 1)
+ pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
+ my->mc_new = st;
my->mc_toggle = toggle;
+ pthread_cond_signal(&my->mc_cond);
+ pthread_mutex_unlock(&my->mc_mutex);
return 0;
}
+ /** Depth-first tree traversal for compacting copy. */
static int
-mdb_env_cwalk(mdb_copy *my, pgno_t pg)
+mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
{
MDB_cursor mc;
MDB_txn *txn = my->mc_txn;
MDB_node *ni;
- MDB_page *mo, *mp;
+ MDB_page *mo, *mp, *leaf;
char *buf, *ptr;
int rc, toggle;
unsigned int i;
+ /* Empty DB, nothing to do */
+ if (*pg == P_INVALID)
+ return MDB_SUCCESS;
+
mc.mc_snum = 1;
mc.mc_top = 0;
mc.mc_txn = txn;
- rc = mdb_page_get(my->mc_txn, pg, &mc.mc_pg[0], NULL);
+ rc = mdb_page_get(my->mc_txn, *pg, &mc.mc_pg[0], NULL);
if (rc)
return rc;
rc = mdb_page_search_root(&mc, NULL, MDB_PS_FIRST);
return rc;
/* Make cursor pages writable */
- buf = ptr = malloc(my->mc_env->me_psize * mc.mc_top);
+ buf = ptr = malloc(my->mc_env->me_psize * mc.mc_snum);
if (buf == NULL)
return ENOMEM;
ptr += my->mc_env->me_psize;
}
+ /* This is writable space for a leaf page. Usually not needed. */
+ leaf = (MDB_page *)ptr;
+
toggle = my->mc_toggle;
while (mc.mc_snum > 0) {
unsigned n;
n = NUMKEYS(mp);
if (IS_LEAF(mp)) {
- if (!IS_LEAF2(mp)) {
+ if (!IS_LEAF2(mp) && !(flags & F_DUPDATA)) {
for (i=0; i<n; i++) {
ni = NODEPTR(mp, i);
if (ni->mn_flags & F_BIGDATA) {
MDB_page *omp;
pgno_t pg;
+
+ /* Need writable leaf */
+ if (mp != leaf) {
+ mc.mc_pg[mc.mc_top] = leaf;
+ mdb_page_copy(leaf, mp, my->mc_env->me_psize);
+ mp = leaf;
+ ni = NODEPTR(mp, i);
+ }
+
memcpy(&pg, NODEDATA(ni), sizeof(pg));
rc = mdb_page_get(txn, pg, &omp, NULL);
if (rc)
goto done;
- if (my->mc_wlen[toggle] >= WBUF) {
- rc = mdb_env_cthr_toggle(my);
+ if (my->mc_wlen[toggle] >= MDB_WBUF) {
+ rc = mdb_env_cthr_toggle(my, 1);
if (rc)
goto done;
- toggle ^= 1;
+ toggle = my->mc_toggle;
}
mo = (MDB_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
memcpy(mo, omp, my->mc_env->me_psize);
mo->mp_pgno = my->mc_next_pgno;
my->mc_next_pgno += omp->mp_pages;
my->mc_wlen[toggle] += my->mc_env->me_psize;
- my->mc_olen[toggle] = my->mc_env->me_psize * (omp->mp_pages - 1);
- my->mc_obuf[toggle] = (char *)omp + my->mc_env->me_psize;
- rc = mdb_env_cthr_toggle(my);
- if (rc)
- goto done;
- toggle ^= 1;
+ if (omp->mp_pages > 1) {
+ my->mc_olen[toggle] = my->mc_env->me_psize * (omp->mp_pages - 1);
+ my->mc_over[toggle] = (char *)omp + my->mc_env->me_psize;
+ rc = mdb_env_cthr_toggle(my, 1);
+ if (rc)
+ goto done;
+ toggle = my->mc_toggle;
+ }
+ memcpy(NODEDATA(ni), &mo->mp_pgno, sizeof(pgno_t));
} else if (ni->mn_flags & F_SUBDATA) {
MDB_db db;
+
+ /* Need writable leaf */
+ if (mp != leaf) {
+ mc.mc_pg[mc.mc_top] = leaf;
+ mdb_page_copy(leaf, mp, my->mc_env->me_psize);
+ mp = leaf;
+ ni = NODEPTR(mp, i);
+ }
+
memcpy(&db, NODEDATA(ni), sizeof(db));
my->mc_toggle = toggle;
- rc = mdb_env_cwalk(my, db.md_root);
+ rc = mdb_env_cwalk(my, &db.md_root, ni->mn_flags & F_DUPDATA);
if (rc)
goto done;
toggle = my->mc_toggle;
+ memcpy(NODEDATA(ni), &db, sizeof(db));
}
}
}
mc.mc_snum++;
mc.mc_ki[mc.mc_top] = 0;
if (IS_BRANCH(mp)) {
+ /* Whenever we advance to a sibling branch page,
+ * we must proceed all the way down to its first leaf.
+ */
mdb_page_copy(mc.mc_pg[mc.mc_top], mp, my->mc_env->me_psize);
goto again;
} else
continue;
}
}
- if (mc.mc_top) {
- ni = NODEPTR(mc.mc_pg[mc.mc_top-1], mc.mc_ki[mc.mc_top-1]);
- SETPGNO(ni, my->mc_next_pgno);
- }
- if (my->mc_wlen[toggle] >= WBUF) {
- rc = mdb_env_cthr_toggle(my);
+ if (my->mc_wlen[toggle] >= MDB_WBUF) {
+ rc = mdb_env_cthr_toggle(my, 1);
if (rc)
goto done;
- toggle ^= 1;
+ toggle = my->mc_toggle;
}
mo = (MDB_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
mdb_page_copy(mo, mp, my->mc_env->me_psize);
mo->mp_pgno = my->mc_next_pgno++;
my->mc_wlen[toggle] += my->mc_env->me_psize;
- mdb_cursor_pop(&mc);
+ if (mc.mc_top) {
+ /* Update parent if there is one */
+ ni = NODEPTR(mc.mc_pg[mc.mc_top-1], mc.mc_ki[mc.mc_top-1]);
+ SETPGNO(ni, mo->mp_pgno);
+ mdb_cursor_pop(&mc);
+ } else {
+ /* Otherwise we're done */
+ *pg = mo->mp_pgno;
+ break;
+ }
}
done:
free(buf);
return rc;
}
-int
-mdb_env_copyfd2(MDB_env *env, HANDLE fd)
+ /** Copy environment with compaction. */
+static int
+mdb_env_copyfd1(MDB_env *env, HANDLE fd)
{
MDB_meta *mm;
MDB_page *mp;
pthread_t thr;
int rc;
- rc = posix_memalign(&my.mc_free, env->me_psize, WBUF*2);
+#ifdef _WIN32
+ my.mc_mutex = CreateMutex(NULL, FALSE, NULL);
+ my.mc_cond = CreateEvent(NULL, FALSE, FALSE, NULL);
+ my.mc_wbuf[0] = _aligned_malloc(MDB_WBUF*2, env->me_psize);
+ if (my.mc_wbuf[0] == NULL)
+ return errno;
+#else
+ pthread_mutex_init(&my.mc_mutex, NULL);
+ pthread_cond_init(&my.mc_cond, NULL);
+ rc = posix_memalign((void **)&my.mc_wbuf[0], env->me_psize, MDB_WBUF*2);
if (rc)
return rc;
- my.mc_wbuf[0] = my.mc_free;
- my.mc_wbuf[1] = my.mc_free + WBUF;
- pthread_mutex_init(&my.mc_mutex[0], NULL);
- pthread_mutex_init(&my.mc_mutex[1], NULL);
+#endif
+ my.mc_wbuf[1] = my.mc_wbuf[0] + MDB_WBUF;
my.mc_wlen[0] = 0;
my.mc_wlen[1] = 0;
my.mc_olen[0] = 0;
my.mc_olen[1] = 0;
my.mc_next_pgno = 2;
my.mc_status = 0;
+ my.mc_new = 1;
my.mc_toggle = 0;
my.mc_env = env;
my.mc_fd = fd;
- pthread_mutex_lock(&my.mc_mutex[0]);
/* Do the lock/unlock of the reader mutex before starting the
* write txn. Otherwise other read txns could block writers.
}
}
+ THREAD_CREATE(thr, mdb_env_copythr, &my);
mp = (MDB_page *)my.mc_wbuf[0];
memset(mp, 0, 2*env->me_psize);
mp->mp_pgno = 0;
}
my.mc_wlen[0] = env->me_psize * 2;
my.mc_txn = txn;
- pthread_create(&thr, NULL, mdb_env_copythr, &my);
- rc = mdb_env_cwalk(&my, txn->mt_dbs[1].md_root);
+ pthread_mutex_lock(&my.mc_mutex);
+ while(my.mc_new)
+ pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
+ pthread_mutex_unlock(&my.mc_mutex);
+ rc = mdb_env_cwalk(&my, &txn->mt_dbs[1].md_root, 0);
if (rc == MDB_SUCCESS && my.mc_wlen[my.mc_toggle])
- rc = mdb_env_cthr_toggle(&my);
- my.mc_wlen[my.mc_toggle] = 0;
- pthread_mutex_unlock(&my.mc_mutex[my.mc_toggle]);
- pthread_join(thr, NULL);
+ rc = mdb_env_cthr_toggle(&my, 1);
+ mdb_env_cthr_toggle(&my, -1);
+ pthread_mutex_lock(&my.mc_mutex);
+ while(my.mc_new)
+ pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
+ pthread_mutex_unlock(&my.mc_mutex);
+ THREAD_FINISH(thr);
leave:
mdb_txn_abort(txn);
- free(my.mc_free);
+#ifdef _WIN32
+ CloseHandle(my.mc_cond);
+ CloseHandle(my.mc_mutex);
+ _aligned_free(my.mc_wbuf[0]);
+#else
+ pthread_cond_destroy(&my.mc_cond);
+ pthread_mutex_destroy(&my.mc_mutex);
+ free(my.mc_wbuf[0]);
+#endif
return rc;
}
-int
-mdb_env_copyfd(MDB_env *env, HANDLE fd)
+ /** Copy environment as-is. */
+static int
+mdb_env_copyfd0(MDB_env *env, HANDLE fd)
{
MDB_txn *txn = NULL;
int rc;
return rc;
}
-static int
-mdb_env_copy0(MDB_env *env, const char *path, int flag)
+int
+mdb_env_copyfd2(MDB_env *env, HANDLE fd, unsigned int flags)
+{
+ if (flags & MDB_CP_COMPACT)
+ return mdb_env_copyfd1(env, fd);
+ else
+ return mdb_env_copyfd0(env, fd);
+}
+
+int
+mdb_env_copyfd(MDB_env *env, HANDLE fd)
+{
+ return mdb_env_copyfd2(env, fd, 0);
+}
+
+int
+mdb_env_copy2(MDB_env *env, const char *path, unsigned int flags)
{
int rc, len;
char *lpath;
}
#endif
- if (flag)
- rc = mdb_env_copyfd2(env, newfd);
- else
- rc = mdb_env_copyfd(env, newfd);
+ rc = mdb_env_copyfd2(env, newfd, flags);
leave:
if (!(env->me_flags & MDB_NOSUBDIR))
int
mdb_env_copy(MDB_env *env, const char *path)
{
- return mdb_env_copy0(env, path, 0);
-}
-
-int
-mdb_env_copy2(MDB_env *env, const char *path)
-{
- return mdb_env_copy0(env, path, 1);
+ return mdb_env_copy2(env, path, 0);
}
int
return EINVAL;
toggle = mdb_env_pick_meta(env);
- arg->me_mapaddr = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : 0;
+ arg->me_mapaddr = env->me_metas[toggle]->mm_address;
arg->me_mapsize = env->me_mapsize;
arg->me_maxreaders = env->me_maxreaders;