#define THREAD_RET DWORD
#define pthread_t HANDLE
#define pthread_mutex_t HANDLE
+#define pthread_cond_t HANDLE
#define pthread_key_t DWORD
#define pthread_self() GetCurrentThreadId()
#define pthread_key_create(x,y) \
#define pthread_setspecific(x,y) (TlsSetValue(x,y) ? 0 : ErrCode())
#define pthread_mutex_unlock(x) ReleaseMutex(x)
#define pthread_mutex_lock(x) WaitForSingleObject(x, INFINITE)
+#define pthread_cond_signal(x) SetEvent(*x)
+#define pthread_cond_wait(cond,mutex) SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE)
#define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL)
#define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE)
#define LOCK_MUTEX_R(env) pthread_mutex_lock((env)->me_rmutex)
#endif
typedef struct mdb_copy {
- pthread_mutex_t mc_mutex[2];
+ pthread_mutex_t mc_mutex;
+ pthread_cond_t mc_cond;
char *mc_wbuf[2];
char *mc_over[2];
MDB_env *mc_env;
pgno_t mc_next_pgno;
HANDLE mc_fd;
int mc_status;
+ volatile int mc_new;
int mc_toggle;
} mdb_copy;
#define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0)
#endif
- pthread_mutex_lock(&my->mc_mutex[toggle^1]);
+ pthread_mutex_lock(&my->mc_mutex);
+ my->mc_new = 0;
+ pthread_cond_signal(&my->mc_cond);
for(;;) {
- pthread_mutex_lock(&my->mc_mutex[toggle]);
- pthread_mutex_unlock(&my->mc_mutex[toggle^1]);
- if (!my->mc_wlen[toggle]) {
- pthread_mutex_unlock(&my->mc_mutex[toggle]);
+ while (!my->mc_new)
+ pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
+ if (my->mc_new < 0) {
+ my->mc_new = 0;
break;
}
+ my->mc_new = 0;
wsize = my->mc_wlen[toggle];
ptr = my->mc_wbuf[toggle];
again:
}
if (rc) {
my->mc_status = rc;
- pthread_mutex_unlock(&my->mc_mutex[toggle]);
break;
}
/* If there's an overflow page tail, write it too */
}
my->mc_wlen[toggle] = 0;
toggle ^= 1;
+ pthread_cond_signal(&my->mc_cond);
}
+ pthread_cond_signal(&my->mc_cond);
+ pthread_mutex_unlock(&my->mc_mutex);
return (THREAD_RET)0;
#undef DO_WRITE
}
static int
-mdb_env_cthr_toggle(mdb_copy *my)
+mdb_env_cthr_toggle(mdb_copy *my, int st)
{
int toggle = my->mc_toggle ^ 1;
-
- pthread_mutex_unlock(&my->mc_mutex[my->mc_toggle]);
- pthread_mutex_lock(&my->mc_mutex[toggle]);
+ pthread_mutex_lock(&my->mc_mutex);
if (my->mc_status) {
- pthread_mutex_unlock(&my->mc_mutex[toggle]);
+ pthread_mutex_unlock(&my->mc_mutex);
return my->mc_status;
}
+ while (my->mc_new == 1)
+ pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
+ my->mc_new = st;
my->mc_toggle = toggle;
+ pthread_cond_signal(&my->mc_cond);
+ pthread_mutex_unlock(&my->mc_mutex);
return 0;
}
if (rc)
goto done;
if (my->mc_wlen[toggle] >= MDB_WBUF) {
- rc = mdb_env_cthr_toggle(my);
+ rc = mdb_env_cthr_toggle(my, 1);
if (rc)
goto done;
- toggle ^= 1;
+ toggle = my->mc_toggle;
}
mo = (MDB_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
memcpy(mo, omp, my->mc_env->me_psize);
if (omp->mp_pages > 1) {
my->mc_olen[toggle] = my->mc_env->me_psize * (omp->mp_pages - 1);
my->mc_over[toggle] = (char *)omp + my->mc_env->me_psize;
- rc = mdb_env_cthr_toggle(my);
+ rc = mdb_env_cthr_toggle(my, 1);
if (rc)
goto done;
- toggle ^= 1;
+ toggle = my->mc_toggle;
}
memcpy(NODEDATA(ni), &mo->mp_pgno, sizeof(pgno_t));
} else if (ni->mn_flags & F_SUBDATA) {
}
}
if (my->mc_wlen[toggle] >= MDB_WBUF) {
- rc = mdb_env_cthr_toggle(my);
+ rc = mdb_env_cthr_toggle(my, 1);
if (rc)
goto done;
- toggle ^= 1;
+ toggle = my->mc_toggle;
}
mo = (MDB_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
mdb_page_copy(mo, mp, my->mc_env->me_psize);
int rc;
#ifdef _WIN32
- my.mc_mutex[0] = CreateMutex(NULL, FALSE, NULL);
- my.mc_mutex[1] = CreateMutex(NULL, FALSE, NULL);
+ my.mc_mutex = CreateMutex(NULL, FALSE, NULL);
+ my.mc_cond = CreateEvent(NULL, FALSE, FALSE, NULL);
my.mc_wbuf[0] = _aligned_malloc(MDB_WBUF*2, env->me_psize);
if (my.mc_wbuf[0] == NULL)
return errno;
#else
- pthread_mutex_init(&my.mc_mutex[0], NULL);
- pthread_mutex_init(&my.mc_mutex[1], NULL);
+ pthread_mutex_init(&my.mc_mutex, NULL);
+ pthread_cond_init(&my.mc_cond, NULL);
rc = posix_memalign((void **)&my.mc_wbuf[0], env->me_psize, MDB_WBUF*2);
if (rc)
return rc;
my.mc_olen[1] = 0;
my.mc_next_pgno = 2;
my.mc_status = 0;
+ my.mc_new = 1;
my.mc_toggle = 0;
my.mc_env = env;
my.mc_fd = fd;
- pthread_mutex_lock(&my.mc_mutex[0]);
- THREAD_CREATE(thr, mdb_env_copythr, &my);
/* Do the lock/unlock of the reader mutex before starting the
* write txn. Otherwise other read txns could block writers.
}
}
+ THREAD_CREATE(thr, mdb_env_copythr, &my);
mp = (MDB_page *)my.mc_wbuf[0];
memset(mp, 0, 2*env->me_psize);
mp->mp_pgno = 0;
}
my.mc_wlen[0] = env->me_psize * 2;
my.mc_txn = txn;
+ pthread_mutex_lock(&my.mc_mutex);
+ while(my.mc_new)
+ pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
+ pthread_mutex_unlock(&my.mc_mutex);
rc = mdb_env_cwalk(&my, &txn->mt_dbs[1].md_root, 0);
if (rc == MDB_SUCCESS && my.mc_wlen[my.mc_toggle])
- rc = mdb_env_cthr_toggle(&my);
- my.mc_wlen[my.mc_toggle] = 0;
- pthread_mutex_unlock(&my.mc_mutex[my.mc_toggle]);
+ rc = mdb_env_cthr_toggle(&my, 1);
+ mdb_env_cthr_toggle(&my, -1);
+ pthread_mutex_lock(&my.mc_mutex);
+ while(my.mc_new)
+ pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
+ pthread_mutex_unlock(&my.mc_mutex);
THREAD_FINISH(thr);
leave:
mdb_txn_abort(txn);
#ifdef _WIN32
- CloseHandle(my.mc_mutex[1]);
- CloseHandle(my.mc_mutex[0]);
+ CloseHandle(my.mc_cond);
+ CloseHandle(my.mc_mutex);
_aligned_free(my.mc_wbuf[0]);
#else
- pthread_mutex_destroy(&my.mc_mutex[1]);
- pthread_mutex_destroy(&my.mc_mutex[0]);
+ pthread_cond_destroy(&my.mc_cond);
+ pthread_mutex_destroy(&my.mc_mutex);
free(my.mc_wbuf[0]);
#endif
return rc;