From 059b357d1addb52bbbbe4fd257ccfd906c3e6445 Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Thu, 3 Jul 2014 14:26:14 -0700 Subject: [PATCH] More tweaks to copyfd2 Make sure the writer thread starts and stops when we expect it to. --- libraries/liblmdb/mdb.c | 82 +++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index 6997ae9583..609eb9230a 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -177,6 +177,7 @@ #define THREAD_RET DWORD #define pthread_t HANDLE #define pthread_mutex_t HANDLE +#define pthread_cond_t HANDLE #define pthread_key_t DWORD #define pthread_self() GetCurrentThreadId() #define pthread_key_create(x,y) \ @@ -186,6 +187,8 @@ #define pthread_setspecific(x,y) (TlsSetValue(x,y) ? 0 : ErrCode()) #define pthread_mutex_unlock(x) ReleaseMutex(x) #define pthread_mutex_lock(x) WaitForSingleObject(x, INFINITE) +#define pthread_cond_signal(x) SetEvent(*x) +#define pthread_cond_wait(cond,mutex) SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE) #define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL) #define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE) #define LOCK_MUTEX_R(env) pthread_mutex_lock((env)->me_rmutex) @@ -8034,7 +8037,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, #endif typedef struct mdb_copy { - pthread_mutex_t mc_mutex[2]; + pthread_mutex_t mc_mutex; + pthread_cond_t mc_cond; char *mc_wbuf[2]; char *mc_over[2]; MDB_env *mc_env; @@ -8044,6 +8048,7 @@ typedef struct mdb_copy { pgno_t mc_next_pgno; HANDLE mc_fd; int mc_status; + volatile int mc_new; int mc_toggle; } mdb_copy; @@ -8061,14 +8066,17 @@ mdb_env_copythr(void *arg) #define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0) #endif - pthread_mutex_lock(&my->mc_mutex[toggle^1]); + pthread_mutex_lock(&my->mc_mutex); + my->mc_new = 0; + pthread_cond_signal(&my->mc_cond); for(;;) { - pthread_mutex_lock(&my->mc_mutex[toggle]); - pthread_mutex_unlock(&my->mc_mutex[toggle^1]); - if (!my->mc_wlen[toggle]) { - pthread_mutex_unlock(&my->mc_mutex[toggle]); + while (!my->mc_new) + pthread_cond_wait(&my->mc_cond, &my->mc_mutex); + if (my->mc_new < 0) { + my->mc_new = 0; break; } + my->mc_new = 0; wsize = my->mc_wlen[toggle]; ptr = my->mc_wbuf[toggle]; again: @@ -8089,7 +8097,6 @@ again: } if (rc) { my->mc_status = rc; - pthread_mutex_unlock(&my->mc_mutex[toggle]); break; } /* If there's an overflow page tail, write it too */ @@ -8101,23 +8108,29 @@ again: } my->mc_wlen[toggle] = 0; toggle ^= 1; + pthread_cond_signal(&my->mc_cond); } + pthread_cond_signal(&my->mc_cond); + pthread_mutex_unlock(&my->mc_mutex); return (THREAD_RET)0; #undef DO_WRITE } static int -mdb_env_cthr_toggle(mdb_copy *my) +mdb_env_cthr_toggle(mdb_copy *my, int st) { int toggle = my->mc_toggle ^ 1; - - pthread_mutex_unlock(&my->mc_mutex[my->mc_toggle]); - pthread_mutex_lock(&my->mc_mutex[toggle]); + pthread_mutex_lock(&my->mc_mutex); if (my->mc_status) { - pthread_mutex_unlock(&my->mc_mutex[toggle]); + pthread_mutex_unlock(&my->mc_mutex); return my->mc_status; } + while (my->mc_new == 1) + pthread_cond_wait(&my->mc_cond, &my->mc_mutex); + my->mc_new = st; my->mc_toggle = toggle; + pthread_cond_signal(&my->mc_cond); + pthread_mutex_unlock(&my->mc_mutex); return 0; } @@ -8188,10 +8201,10 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) if (rc) goto done; if (my->mc_wlen[toggle] >= MDB_WBUF) { - rc = mdb_env_cthr_toggle(my); + rc = mdb_env_cthr_toggle(my, 1); if (rc) goto done; - toggle ^= 1; + toggle = my->mc_toggle; } mo = (MDB_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]); memcpy(mo, omp, my->mc_env->me_psize); @@ -8201,10 +8214,10 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) if (omp->mp_pages > 1) { my->mc_olen[toggle] = my->mc_env->me_psize * (omp->mp_pages - 1); my->mc_over[toggle] = (char *)omp + my->mc_env->me_psize; - rc = mdb_env_cthr_toggle(my); + rc = mdb_env_cthr_toggle(my, 1); if (rc) goto done; - toggle ^= 1; + toggle = my->mc_toggle; } memcpy(NODEDATA(ni), &mo->mp_pgno, sizeof(pgno_t)); } else if (ni->mn_flags & F_SUBDATA) { @@ -8250,10 +8263,10 @@ again: } } if (my->mc_wlen[toggle] >= MDB_WBUF) { - rc = mdb_env_cthr_toggle(my); + rc = mdb_env_cthr_toggle(my, 1); if (rc) goto done; - toggle ^= 1; + toggle = my->mc_toggle; } mo = (MDB_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]); mdb_page_copy(mo, mp, my->mc_env->me_psize); @@ -8286,14 +8299,14 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd) int rc; #ifdef _WIN32 - my.mc_mutex[0] = CreateMutex(NULL, FALSE, NULL); - my.mc_mutex[1] = CreateMutex(NULL, FALSE, NULL); + my.mc_mutex = CreateMutex(NULL, FALSE, NULL); + my.mc_cond = CreateEvent(NULL, FALSE, FALSE, NULL); my.mc_wbuf[0] = _aligned_malloc(MDB_WBUF*2, env->me_psize); if (my.mc_wbuf[0] == NULL) return errno; #else - pthread_mutex_init(&my.mc_mutex[0], NULL); - pthread_mutex_init(&my.mc_mutex[1], NULL); + pthread_mutex_init(&my.mc_mutex, NULL); + pthread_cond_init(&my.mc_cond, NULL); rc = posix_memalign((void **)&my.mc_wbuf[0], env->me_psize, MDB_WBUF*2); if (rc) return rc; @@ -8305,11 +8318,10 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd) my.mc_olen[1] = 0; my.mc_next_pgno = 2; my.mc_status = 0; + my.mc_new = 1; my.mc_toggle = 0; my.mc_env = env; my.mc_fd = fd; - pthread_mutex_lock(&my.mc_mutex[0]); - THREAD_CREATE(thr, mdb_env_copythr, &my); /* Do the lock/unlock of the reader mutex before starting the * write txn. Otherwise other read txns could block writers. @@ -8332,6 +8344,7 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd) } } + THREAD_CREATE(thr, mdb_env_copythr, &my); mp = (MDB_page *)my.mc_wbuf[0]; memset(mp, 0, 2*env->me_psize); mp->mp_pgno = 0; @@ -8368,21 +8381,28 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd) } my.mc_wlen[0] = env->me_psize * 2; my.mc_txn = txn; + pthread_mutex_lock(&my.mc_mutex); + while(my.mc_new) + pthread_cond_wait(&my.mc_cond, &my.mc_mutex); + pthread_mutex_unlock(&my.mc_mutex); rc = mdb_env_cwalk(&my, &txn->mt_dbs[1].md_root, 0); if (rc == MDB_SUCCESS && my.mc_wlen[my.mc_toggle]) - rc = mdb_env_cthr_toggle(&my); - my.mc_wlen[my.mc_toggle] = 0; - pthread_mutex_unlock(&my.mc_mutex[my.mc_toggle]); + rc = mdb_env_cthr_toggle(&my, 1); + mdb_env_cthr_toggle(&my, -1); + pthread_mutex_lock(&my.mc_mutex); + while(my.mc_new) + pthread_cond_wait(&my.mc_cond, &my.mc_mutex); + pthread_mutex_unlock(&my.mc_mutex); THREAD_FINISH(thr); leave: mdb_txn_abort(txn); #ifdef _WIN32 - CloseHandle(my.mc_mutex[1]); - CloseHandle(my.mc_mutex[0]); + CloseHandle(my.mc_cond); + CloseHandle(my.mc_mutex); _aligned_free(my.mc_wbuf[0]); #else - pthread_mutex_destroy(&my.mc_mutex[1]); - pthread_mutex_destroy(&my.mc_mutex[0]); + pthread_cond_destroy(&my.mc_cond); + pthread_mutex_destroy(&my.mc_mutex); free(my.mc_wbuf[0]); #endif return rc; -- 2.39.5