From f367441b6966914d1985f241647e1229b4b6c8e8 Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Tue, 28 Jun 2011 13:46:48 -0700 Subject: [PATCH] Add locking support --- libraries/libmdb/mdb.c | 217 ++++++++++++++++++++++++++++++++------- libraries/libmdb/mdb.h | 5 +- libraries/libmdb/mtest.c | 3 +- 3 files changed, 186 insertions(+), 39 deletions(-) diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index c5c6f0ac29..913de09561 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -19,6 +19,7 @@ #include #include #include +#include #include @@ -47,6 +48,46 @@ typedef ulong pgno_t; typedef uint16_t indx_t; +#define DEFAULT_READERS 126 +#define DEFAULT_MAPSIZE 1048576 + +/* Lock descriptor stuff */ +#define RXBODY \ + ulong mr_txnid; \ + pid_t mr_pid; \ + pthread_t mr_tid +typedef struct MDB_rxbody { + RXBODY; +} MDB_rxbody; + +#ifndef CACHELINE +#define CACHELINE 64 /* most CPUs. Itanium uses 128 */ +#endif + +typedef struct MDB_reader { + RXBODY; + /* cache line alignment */ + char pad[CACHELINE-sizeof(MDB_rxbody)]; +} MDB_reader; + +#define TXBODY \ + uint32_t mt_magic; \ + uint32_t mt_version; \ + pthread_mutex_t mt_mutex; \ + ulong mt_txnid; \ + uint32_t mt_numreaders +typedef struct MDB_txbody { + TXBODY; +} MDB_txbody; + +typedef struct MDB_txninfo { + TXBODY; + char pad[CACHELINE-sizeof(MDB_txbody)]; + pthread_mutex_t mt_wmutex; + char pad2[CACHELINE-sizeof(pthread_mutex_t)]; + MDB_reader mt_readers[1]; +} MDB_txninfo; + /* Common header for all page types. Overflow pages * occupy a number of contiguous pages with no * headers on any page after the first. @@ -101,6 +142,7 @@ typedef struct MDB_meta { /* meta (footer) page content */ pgno_t mm_root; /* page number of root page */ MDB_stat mm_stat; pgno_t mm_prev_meta; /* previous meta page number */ + ulong mm_txnid; #define MDB_TOMBSTONE 0x01 /* file is replaced */ uint32_t mm_flags; #define mm_revisions mm_stat.ms_revisions @@ -171,8 +213,12 @@ struct MDB_txn { pgno_t mt_root; /* current / new root page */ pgno_t mt_next_pgno; /* next unallocated page */ pgno_t mt_first_pgno; + ulong mt_txnid; MDB_env *mt_env; - struct dirty_queue *mt_dirty_queue; /* modified pages */ + union { + struct dirty_queue *dirty_queue; /* modified pages */ + MDB_reader *reader; + } mt_u; #define MDB_TXN_RDONLY 0x01 /* read-only transaction */ #define MDB_TXN_ERROR 0x02 /* an error has occurred */ unsigned int mt_flags; @@ -199,16 +245,20 @@ struct MDB_db { struct MDB_env { int me_fd; + int me_lfd; #define MDB_FIXPADDING 0x01 /* internal */ uint32_t me_flags; + int me_maxreaders; char *me_path; char *me_map; + MDB_txninfo *me_txns; MDB_head me_head; MDB_db0 me_db; /* first DB, overlaps with meta */ MDB_meta me_meta; MDB_txn *me_txn; /* current write transaction */ size_t me_mapsize; off_t me_size; /* current file size */ + pthread_key_t me_txkey; /* thread-key for readers */ }; #define NODESIZE offsetof(MDB_node, mn_data) @@ -356,7 +406,7 @@ mdb_newpage(MDB_txn *txn, MDB_page *parent, int parent_idx, int num) dp->h.md_num = num; dp->h.md_parent = parent; dp->h.md_pi = parent_idx; - SIMPLEQ_INSERT_TAIL(txn->mt_dirty_queue, dp, h.md_next); + SIMPLEQ_INSERT_TAIL(txn->mt_u.dirty_queue, dp, h.md_next); dp->p.mp_pgno = txn->mt_next_pgno; txn->mt_next_pgno += num; @@ -410,11 +460,6 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) MDB_txn *txn; int rc; - if (!rdonly && env->me_txn != NULL) { - DPRINTF("write transaction already begun"); - return EBUSY; - } - if ((txn = calloc(1, sizeof(*txn))) == NULL) { DPRINTF("calloc: %s", strerror(errno)); return ENOMEM; @@ -423,23 +468,40 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) if (rdonly) { txn->mt_flags |= MDB_TXN_RDONLY; } else { - txn->mt_dirty_queue = calloc(1, sizeof(*txn->mt_dirty_queue)); - if (txn->mt_dirty_queue == NULL) { + txn->mt_u.dirty_queue = calloc(1, sizeof(*txn->mt_u.dirty_queue)); + if (txn->mt_u.dirty_queue == NULL) { free(txn); return ENOMEM; } - SIMPLEQ_INIT(txn->mt_dirty_queue); + SIMPLEQ_INIT(txn->mt_u.dirty_queue); -#if 0 - DPRINTF("taking write lock on txn %p", txn); - if (flock(bt->fd, LOCK_EX | LOCK_NB) != 0) { - DPRINTF("flock: %s", strerror(errno)); - errno = EBUSY; - free(txn->dirty_queue); - free(txn); - return NULL; + pthread_mutex_lock(&env->me_txns->mt_wmutex); + env->me_txns->mt_txnid++; + } + txn->mt_txnid = env->me_txns->mt_txnid; + if (rdonly) { + MDB_reader *r = pthread_getspecific(env->me_txkey); + if (!r) { + int i; + pthread_mutex_lock(&env->me_txns->mt_mutex); + for (i=0; ime_maxreaders; i++) { + if (env->me_txns->mt_readers[i].mr_pid == 0) { + env->me_txns->mt_readers[i].mr_pid = getpid(); + env->me_txns->mt_readers[i].mr_tid = pthread_self(); + pthread_setspecific(env->me_txkey, &env->me_txns->mt_readers[i]); + if (i >= env->me_txns->mt_numreaders) + env->me_txns->mt_numreaders = i+1; + break; + } + } + pthread_mutex_unlock(&env->me_txns->mt_mutex); + if (i == env->me_maxreaders) { + return ENOSPC; + } } -#endif + r->mr_txnid = txn->mt_txnid; + txn->mt_u.reader = r; + } else { env->me_txn = txn; } @@ -470,12 +532,14 @@ mdb_txn_abort(MDB_txn *txn) env = txn->mt_env; DPRINTF("abort transaction on mdbenv %p, root page %lu", env, txn->mt_root); - if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { + if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { + txn->mt_u.reader->mr_txnid = 0; + } else { /* Discard all dirty pages. */ - while (!SIMPLEQ_EMPTY(txn->mt_dirty_queue)) { - dp = SIMPLEQ_FIRST(txn->mt_dirty_queue); - SIMPLEQ_REMOVE_HEAD(txn->mt_dirty_queue, h.md_next); + while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) { + dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue); + SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); free(dp); } @@ -487,11 +551,11 @@ mdb_txn_abort(MDB_txn *txn) txn->bt->fd, strerror(errno)); } #endif - free(txn->mt_dirty_queue); + free(txn->mt_u.dirty_queue); + env->me_txn = NULL; + pthread_mutex_unlock(&env->me_txns->mt_wmutex); } - if (txn == env->me_txn) - env->me_txn = NULL; free(txn); } @@ -528,7 +592,7 @@ mdb_txn_commit(MDB_txn *txn) return EINVAL; } - if (SIMPLEQ_EMPTY(txn->mt_dirty_queue)) + if (SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) goto done; if (F_ISSET(env->me_flags, MDB_FIXPADDING)) { @@ -552,7 +616,7 @@ mdb_txn_commit(MDB_txn *txn) do { n = 0; done = 1; - SIMPLEQ_FOREACH(dp, txn->mt_dirty_queue, h.md_next) { + SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) { DPRINTF("committing page %lu", dp->p.mp_pgno); iov[n].iov_len = env->me_head.mh_psize * dp->h.md_num; iov[n].iov_base = &dp->p; @@ -581,9 +645,9 @@ mdb_txn_commit(MDB_txn *txn) /* Drop the dirty pages. */ - while (!SIMPLEQ_EMPTY(txn->mt_dirty_queue)) { - dp = SIMPLEQ_FIRST(txn->mt_dirty_queue); - SIMPLEQ_REMOVE_HEAD(txn->mt_dirty_queue, h.md_next); + while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) { + dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue); + SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); free(dp); if (--n == 0) break; @@ -596,8 +660,9 @@ mdb_txn_commit(MDB_txn *txn) mdb_txn_abort(txn); return n; } - txn = NULL; env->me_txn = NULL; + pthread_mutex_unlock(&env->me_txns->mt_wmutex); + txn = NULL; done: mdb_txn_abort(txn); @@ -713,7 +778,7 @@ mdbenv_write_meta(MDB_env *env, pgno_t root, unsigned int flags) bcopy(&env->me_meta, meta, sizeof(*meta)); rc = write(env->me_fd, &dp->p, env->me_head.mh_psize); - SIMPLEQ_REMOVE_HEAD(env->me_txn->mt_dirty_queue, h.md_next); + SIMPLEQ_REMOVE_HEAD(env->me_txn->mt_u.dirty_queue, h.md_next); free(dp); if (rc != (ssize_t)env->me_head.mh_psize) { int err = errno; @@ -852,12 +917,38 @@ mdbenv_create(MDB_env **env, size_t size) e->me_head.mh_magic = MDB_MAGIC; e->me_head.mh_version = MDB_VERSION; - e->me_mapsize = e->me_head.mh_mapsize = size; + e->me_head.mh_mapsize = DEFAULT_MAPSIZE; + e->me_maxreaders = DEFAULT_READERS; e->me_db.md_env = e; *env = e; return MDB_SUCCESS; } +int +mdbenv_set_mapsize(MDB_env *env, size_t size) +{ + if (env->me_map) + return EINVAL; + env->me_mapsize = env->me_head.mh_mapsize = size; + return MDB_SUCCESS; +} + +int +mdbenv_set_maxreaders(MDB_env *env, int readers) +{ + env->me_maxreaders = readers; + return MDB_SUCCESS; +} + +int +mdbenv_get_maxreaders(MDB_env *env, int *readers) +{ + if (!env || !readers) + return EINVAL; + *readers = env->me_maxreaders; + return MDB_SUCCESS; +} + int mdbenv_open2(MDB_env *env, unsigned int flags) { @@ -918,10 +1009,54 @@ mdbenv_open2(MDB_env *env, unsigned int flags) return MDB_SUCCESS; } +static void +mdbenv_reader_dest(void *ptr) +{ + MDB_reader *reader = ptr; + + reader->mr_txnid = 0; + reader->mr_pid = 0; + reader->mr_tid = 0; +} + int mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode) { - int oflags, rc; + int oflags, rc, len; + char *lpath; + off_t size, rsize; + + len = strlen(path); + lpath = malloc(len + sizeof(".lock")); + sprintf(lpath, "%s.lock", path); + if ((env->me_lfd = open(lpath, O_RDWR | O_CREAT, mode)) == -1) + return errno; + + size = lseek(env->me_lfd, 0, SEEK_END); + rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo); + if (size < rsize) { + if (ftruncate(env->me_lfd, rsize) != 0) { + rc = errno; + close(env->me_lfd); + return rc; + } + } else { + rsize = size; + size = rsize - sizeof(MDB_txninfo); + env->me_maxreaders = size/sizeof(MDB_reader) + 1; + } + env->me_txns = mmap(0, rsize, PROT_READ|PROT_WRITE, MAP_SHARED, + env->me_lfd, 0); + if (env->me_txns == MAP_FAILED) + return errno; + if (size == 0) { + pthread_mutexattr_t mattr; + + pthread_mutexattr_init(&mattr); + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&env->me_txns->mt_mutex, &mattr); + pthread_mutex_init(&env->me_txns->mt_wmutex, &mattr); + } if (F_ISSET(flags, MDB_RDONLY)) oflags = O_RDONLY; @@ -939,6 +1074,8 @@ mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode) DPRINTF("opened dbenv %p", env); } + pthread_key_create(&env->me_txkey, mdbenv_reader_dest); + return rc; } @@ -954,6 +1091,12 @@ mdbenv_close(MDB_env *env) munmap(env->me_map, env->me_mapsize); } close(env->me_fd); + if (env->me_txns) { + size_t size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo); + munmap(env->me_txns, size); + } + close(env->me_lfd); + free(env); } /* Search for key within a leaf page, using binary search. @@ -1060,9 +1203,9 @@ mdbenv_get_page(MDB_env *env, pgno_t pgno) MDB_txn *txn = env->me_txn; if (txn && pgno >= txn->mt_first_pgno && - !SIMPLEQ_EMPTY(txn->mt_dirty_queue)) { + !SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) { MDB_dpage *dp; - SIMPLEQ_FOREACH(dp, txn->mt_dirty_queue, h.md_next) { + SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) { if (dp->p.mp_pgno == pgno) { p = &dp->p; break; diff --git a/libraries/libmdb/mdb.h b/libraries/libmdb/mdb.h index b401bebaa1..518fba96a3 100644 --- a/libraries/libmdb/mdb.h +++ b/libraries/libmdb/mdb.h @@ -55,12 +55,15 @@ typedef struct MDB_stat { unsigned long ms_entries; } MDB_stat; -int mdbenv_create(MDB_env **env, size_t size); +int mdbenv_create(MDB_env **env); int mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode); int mdbenv_stat(MDB_env *env, MDB_stat **stat); void mdbenv_close(MDB_env *env); int mdbenv_get_flags(MDB_env *env, unsigned int *flags); int mdbenv_get_path(MDB_env *env, const char **path); +int mdbenv_set_mapsize(MDB_env *env, size_t size); +int mdbenv_set_maxreaders(MDB_env *env, int readers); +int mdbenv_get_maxreaders(MDB_env *env, int *readers); int mdbenv_sync(MDB_env *env); int mdbenv_compact(MDB_env *env); diff --git a/libraries/libmdb/mtest.c b/libraries/libmdb/mtest.c index b6316d4d5c..3533638c48 100644 --- a/libraries/libmdb/mtest.c +++ b/libraries/libmdb/mtest.c @@ -25,7 +25,8 @@ int main(int argc,char * argv[]) values[i] = random()%1024; } - rc = mdbenv_create(&env, 10485760); + rc = mdbenv_create(&env); + rc = mdbenv_set_mapsize(env, 10485760); rc = mdbenv_open(env, "./testdb", MDB_FIXEDMAP|MDB_NOSYNC, 0664); rc = mdb_txn_begin(env, 0, &txn); rc = mdb_open(env, txn, NULL, 0, &db); -- 2.39.5