From 1d8623504790540e481b4c24838f9c48ef58c117 Mon Sep 17 00:00:00 2001 From: Hallvard Furuseth Date: Sat, 25 Jun 2016 07:55:34 +0200 Subject: [PATCH] ITS#8209 MDB_CP_COMPACT: Threading/error handling Handle errors. Fix cond_wait condition so mc_new is the sole control var. Drop specious cond_waits. Do not look at 'mo' while copythr writes it. Don't know if posix_memalign() pointer is defined after failure. Some _aligned_free() doc seems to say arg NULL = user error. --- libraries/liblmdb/mdb.c | 155 ++++++++++++++++++++++------------------ 1 file changed, 84 insertions(+), 71 deletions(-) diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index 7d34276566..255e216724 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -291,8 +291,10 @@ typedef HANDLE mdb_mutex_t, mdb_mutexref_t; #define pthread_mutex_lock(x) WaitForSingleObject(*x, INFINITE) #define pthread_cond_signal(x) SetEvent(*x) #define pthread_cond_wait(cond,mutex) do{SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE);}while(0) -#define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL) -#define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE) +#define THREAD_CREATE(thr,start,arg) \ + (((thr) = CreateThread(NULL, 0, start, arg, 0, NULL)) ? 0 : ErrCode()) +#define THREAD_FINISH(thr) \ + (WaitForSingleObject(thr, INFINITE) ? ErrCode() : 0) #define LOCK_MUTEX0(mutex) WaitForSingleObject(mutex, INFINITE) #define UNLOCK_MUTEX(mutex) ReleaseMutex(mutex) #define mdb_mutex_consistent(mutex) 0 @@ -8818,11 +8820,12 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, #ifndef MDB_WBUF #define MDB_WBUF (1024*1024) #endif +#define MDB_EOF 0x10 /**< #mdb_env_copyfd1() is done reading */ - /** State needed for a compacting copy. */ + /** State needed for a double-buffering compacting copy. */ typedef struct mdb_copy { pthread_mutex_t mc_mutex; - pthread_cond_t mc_cond; + pthread_cond_t mc_cond; /**< Condition variable for #mc_new */ char *mc_wbuf[2]; char *mc_over[2]; MDB_env *mc_env; @@ -8831,10 +8834,12 @@ typedef struct mdb_copy { int mc_olen[2]; pgno_t mc_next_pgno; HANDLE mc_fd; - int mc_status; - volatile int mc_new; - int mc_toggle; - + int mc_toggle; /**< Buffer number in provider */ + int mc_new; /**< (0-2 buffers to write) | (#MDB_EOF at end) */ + /** Error code. Never cleared if set. Both threads can set nonzero + * to fail the copy. Not mutex-protected, LMDB expects atomic int. + */ + volatile int mc_error; } mdb_copy; /** Dedicated writer thread for compacting copy. */ @@ -8853,20 +8858,16 @@ mdb_env_copythr(void *arg) #endif pthread_mutex_lock(&my->mc_mutex); - my->mc_new = 0; - pthread_cond_signal(&my->mc_cond); for(;;) { while (!my->mc_new) pthread_cond_wait(&my->mc_cond, &my->mc_mutex); - if (my->mc_new < 0) { - my->mc_new = 0; + if (my->mc_new == 0 + MDB_EOF) /* 0 buffers, just EOF */ break; - } - my->mc_new = 0; wsize = my->mc_wlen[toggle]; ptr = my->mc_wbuf[toggle]; again: - while (wsize > 0) { + rc = MDB_SUCCESS; + while (wsize > 0 && !my->mc_error) { DO_WRITE(rc, my->mc_fd, ptr, wsize, len); if (!rc) { rc = ErrCode(); @@ -8882,8 +8883,7 @@ again: } } if (rc) { - my->mc_status = rc; - break; + my->mc_error = rc; } /* If there's an overflow page tail, write it too */ if (my->mc_olen[toggle]) { @@ -8894,34 +8894,41 @@ again: } my->mc_wlen[toggle] = 0; toggle ^= 1; + /* Return the empty buffer to provider */ + my->mc_new--; pthread_cond_signal(&my->mc_cond); } - pthread_cond_signal(&my->mc_cond); pthread_mutex_unlock(&my->mc_mutex); return (THREAD_RET)0; #undef DO_WRITE } - /** Tell the writer thread there's a buffer ready to write */ + /** Give buffer and/or #MDB_EOF to writer thread, await unused buffer. + * + * @param[in] my control structure. + * @param[in] adjust (1 to hand off 1 buffer) | (MDB_EOF when ending). + */ static int ESECT -mdb_env_cthr_toggle(mdb_copy *my, int st) +mdb_env_cthr_toggle(mdb_copy *my, int adjust) { - int toggle = my->mc_toggle ^ 1; pthread_mutex_lock(&my->mc_mutex); - if (my->mc_status) { - pthread_mutex_unlock(&my->mc_mutex); - return my->mc_status; - } - while (my->mc_new == 1) - pthread_cond_wait(&my->mc_cond, &my->mc_mutex); - my->mc_new = st; - my->mc_toggle = toggle; + my->mc_new += adjust; pthread_cond_signal(&my->mc_cond); + while (my->mc_new & 2) /* both buffers in use */ + pthread_cond_wait(&my->mc_cond, &my->mc_mutex); pthread_mutex_unlock(&my->mc_mutex); - return 0; + + my->mc_toggle ^= (adjust & 1); + /* Both threads reset mc_wlen, to be safe from threading errors */ + my->mc_wlen[my->mc_toggle] = 0; + return my->mc_error; } - /** Depth-first tree traversal for compacting copy. */ + /** Depth-first tree traversal for compacting copy. + * @param[in] my control structure. + * @param[in,out] pg database root. + * @param[in] flags includes #F_DUPDATA if it is a sorted-duplicate sub-DB. + */ static int ESECT mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) { @@ -8983,6 +8990,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) } memcpy(&pg, NODEDATA(ni), sizeof(pg)); + memcpy(NODEDATA(ni), &my->mc_next_pgno, sizeof(pgno_t)); rc = mdb_page_get(&mc, pg, &omp, NULL); if (rc) goto done; @@ -9005,7 +9013,6 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) goto done; toggle = my->mc_toggle; } - memcpy(NODEDATA(ni), &mo->mp_pgno, sizeof(pgno_t)); } else if (ni->mn_flags & F_SUBDATA) { MDB_db db; @@ -9083,47 +9090,55 @@ mdb_env_copyfd1(MDB_env *env, HANDLE fd) { MDB_meta *mm; MDB_page *mp; - mdb_copy my; + mdb_copy my = {0}; MDB_txn *txn = NULL; pthread_t thr; - int rc; + int rc = MDB_SUCCESS; #ifdef _WIN32 - my.mc_mutex = CreateMutex(NULL, FALSE, NULL); - my.mc_cond = CreateEvent(NULL, FALSE, FALSE, NULL); + if (!(my.mc_mutex = CreateMutex(NULL, FALSE, NULL)) || + !(my.mc_cond = CreateEvent(NULL, FALSE, FALSE, NULL))) { + rc = ErrCode(); + goto done; + } my.mc_wbuf[0] = _aligned_malloc(MDB_WBUF*2, env->me_os_psize); - if (my.mc_wbuf[0] == NULL) - return errno; + if (my.mc_wbuf[0] == NULL) { + /* _aligned_malloc() sets errno, but we use Windows error codes */ + rc = ERROR_NOT_ENOUGH_MEMORY; + goto done; + } #else - pthread_mutex_init(&my.mc_mutex, NULL); - pthread_cond_init(&my.mc_cond, NULL); + if ((rc = pthread_mutex_init(&my.mc_mutex, NULL)) != 0) + return rc; + if ((rc = pthread_cond_init(&my.mc_cond, NULL)) != 0) + goto done2; #ifdef HAVE_MEMALIGN my.mc_wbuf[0] = memalign(env->me_os_psize, MDB_WBUF*2); - if (my.mc_wbuf[0] == NULL) - return errno; + if (my.mc_wbuf[0] == NULL) { + rc = errno; + goto done; + } #else - rc = posix_memalign((void **)&my.mc_wbuf[0], env->me_os_psize, MDB_WBUF*2); - if (rc) - return rc; + { + void *p; + if ((rc = posix_memalign(&p, env->me_os_psize, MDB_WBUF*2)) != 0) + goto done; + my.mc_wbuf[0] = p; + } #endif #endif memset(my.mc_wbuf[0], 0, MDB_WBUF*2); my.mc_wbuf[1] = my.mc_wbuf[0] + MDB_WBUF; - my.mc_wlen[0] = 0; - my.mc_wlen[1] = 0; - my.mc_olen[0] = 0; - my.mc_olen[1] = 0; my.mc_next_pgno = NUM_METAS; - my.mc_status = 0; - my.mc_new = 1; - my.mc_toggle = 0; my.mc_env = env; my.mc_fd = fd; - THREAD_CREATE(thr, mdb_env_copythr, &my); + rc = THREAD_CREATE(thr, mdb_env_copythr, &my); + if (rc) + goto done; rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn); if (rc) - return rc; + goto finish; mp = (MDB_page *)my.mc_wbuf[0]; memset(mp, 0, NUM_METAS * env->me_psize); @@ -9149,6 +9164,8 @@ mdb_env_copyfd1(MDB_env *env, HANDLE fd) mdb_cursor_init(&mc, txn, FREE_DBI, NULL); while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0) freecount += *(MDB_ID *)data.mv_data; + if (rc != MDB_NOTFOUND) + goto finish; freecount += txn->mt_dbs[FREE_DBI].md_branch_pages + txn->mt_dbs[FREE_DBI].md_leaf_pages + txn->mt_dbs[FREE_DBI].md_overflow_pages; @@ -9165,31 +9182,27 @@ mdb_env_copyfd1(MDB_env *env, HANDLE fd) } my.mc_wlen[0] = env->me_psize * NUM_METAS; my.mc_txn = txn; - pthread_mutex_lock(&my.mc_mutex); - while(my.mc_new) - pthread_cond_wait(&my.mc_cond, &my.mc_mutex); - pthread_mutex_unlock(&my.mc_mutex); rc = mdb_env_cwalk(&my, &txn->mt_dbs[MAIN_DBI].md_root, 0); - if (rc == MDB_SUCCESS && my.mc_wlen[my.mc_toggle]) - rc = mdb_env_cthr_toggle(&my, 1); - mdb_env_cthr_toggle(&my, -1); - pthread_mutex_lock(&my.mc_mutex); - while(my.mc_new) - pthread_cond_wait(&my.mc_cond, &my.mc_mutex); - pthread_mutex_unlock(&my.mc_mutex); - THREAD_FINISH(thr); +finish: + if (rc) + my.mc_error = rc; + mdb_env_cthr_toggle(&my, 1 | MDB_EOF); + rc = THREAD_FINISH(thr); mdb_txn_abort(txn); + +done: #ifdef _WIN32 - CloseHandle(my.mc_cond); - CloseHandle(my.mc_mutex); - _aligned_free(my.mc_wbuf[0]); + if (my.mc_wbuf[0]) _aligned_free(my.mc_wbuf[0]); + if (my.mc_cond) CloseHandle(my.mc_cond); + if (my.mc_mutex) CloseHandle(my.mc_mutex); #else + free(my.mc_wbuf[0]); pthread_cond_destroy(&my.mc_cond); +done2: pthread_mutex_destroy(&my.mc_mutex); - free(my.mc_wbuf[0]); #endif - return rc; + return rc ? rc : my.mc_error; } /** Copy environment as-is. */ -- 2.39.5