#include <string.h>
#include <time.h>
#include <unistd.h>
+#include <pthread.h>
#include <openssl/sha.h>
typedef ulong pgno_t;
typedef uint16_t indx_t;
+#define DEFAULT_READERS 126
+#define DEFAULT_MAPSIZE 1048576
+
+/* Lock descriptor stuff */
+#define RXBODY \
+ ulong mr_txnid; \
+ pid_t mr_pid; \
+ pthread_t mr_tid
+typedef struct MDB_rxbody {
+ RXBODY;
+} MDB_rxbody;
+
+#ifndef CACHELINE
+#define CACHELINE 64 /* most CPUs. Itanium uses 128 */
+#endif
+
+typedef struct MDB_reader {
+ RXBODY;
+ /* cache line alignment */
+ char pad[CACHELINE-sizeof(MDB_rxbody)];
+} MDB_reader;
+
+#define TXBODY \
+ uint32_t mt_magic; \
+ uint32_t mt_version; \
+ pthread_mutex_t mt_mutex; \
+ ulong mt_txnid; \
+ uint32_t mt_numreaders
+typedef struct MDB_txbody {
+ TXBODY;
+} MDB_txbody;
+
+typedef struct MDB_txninfo {
+ TXBODY;
+ char pad[CACHELINE-sizeof(MDB_txbody)];
+ pthread_mutex_t mt_wmutex;
+ char pad2[CACHELINE-sizeof(pthread_mutex_t)];
+ MDB_reader mt_readers[1];
+} MDB_txninfo;
+
/* Common header for all page types. Overflow pages
* occupy a number of contiguous pages with no
* headers on any page after the first.
pgno_t mm_root; /* page number of root page */
MDB_stat mm_stat;
pgno_t mm_prev_meta; /* previous meta page number */
+ ulong mm_txnid;
#define MDB_TOMBSTONE 0x01 /* file is replaced */
uint32_t mm_flags;
#define mm_revisions mm_stat.ms_revisions
pgno_t mt_root; /* current / new root page */
pgno_t mt_next_pgno; /* next unallocated page */
pgno_t mt_first_pgno;
+ ulong mt_txnid;
MDB_env *mt_env;
- struct dirty_queue *mt_dirty_queue; /* modified pages */
+ union {
+ struct dirty_queue *dirty_queue; /* modified pages */
+ MDB_reader *reader;
+ } mt_u;
#define MDB_TXN_RDONLY 0x01 /* read-only transaction */
#define MDB_TXN_ERROR 0x02 /* an error has occurred */
unsigned int mt_flags;
struct MDB_env {
int me_fd;
+ int me_lfd;
#define MDB_FIXPADDING 0x01 /* internal */
uint32_t me_flags;
+ int me_maxreaders;
char *me_path;
char *me_map;
+ MDB_txninfo *me_txns;
MDB_head me_head;
MDB_db0 me_db; /* first DB, overlaps with meta */
MDB_meta me_meta;
MDB_txn *me_txn; /* current write transaction */
size_t me_mapsize;
off_t me_size; /* current file size */
+ pthread_key_t me_txkey; /* thread-key for readers */
};
#define NODESIZE offsetof(MDB_node, mn_data)
dp->h.md_num = num;
dp->h.md_parent = parent;
dp->h.md_pi = parent_idx;
- SIMPLEQ_INSERT_TAIL(txn->mt_dirty_queue, dp, h.md_next);
+ SIMPLEQ_INSERT_TAIL(txn->mt_u.dirty_queue, dp, h.md_next);
dp->p.mp_pgno = txn->mt_next_pgno;
txn->mt_next_pgno += num;
MDB_txn *txn;
int rc;
- if (!rdonly && env->me_txn != NULL) {
- DPRINTF("write transaction already begun");
- return EBUSY;
- }
-
if ((txn = calloc(1, sizeof(*txn))) == NULL) {
DPRINTF("calloc: %s", strerror(errno));
return ENOMEM;
if (rdonly) {
txn->mt_flags |= MDB_TXN_RDONLY;
} else {
- txn->mt_dirty_queue = calloc(1, sizeof(*txn->mt_dirty_queue));
- if (txn->mt_dirty_queue == NULL) {
+ txn->mt_u.dirty_queue = calloc(1, sizeof(*txn->mt_u.dirty_queue));
+ if (txn->mt_u.dirty_queue == NULL) {
free(txn);
return ENOMEM;
}
- SIMPLEQ_INIT(txn->mt_dirty_queue);
+ SIMPLEQ_INIT(txn->mt_u.dirty_queue);
-#if 0
- DPRINTF("taking write lock on txn %p", txn);
- if (flock(bt->fd, LOCK_EX | LOCK_NB) != 0) {
- DPRINTF("flock: %s", strerror(errno));
- errno = EBUSY;
- free(txn->dirty_queue);
- free(txn);
- return NULL;
+ pthread_mutex_lock(&env->me_txns->mt_wmutex);
+ env->me_txns->mt_txnid++;
+ }
+ txn->mt_txnid = env->me_txns->mt_txnid;
+ if (rdonly) {
+ MDB_reader *r = pthread_getspecific(env->me_txkey);
+ if (!r) {
+ int i;
+ pthread_mutex_lock(&env->me_txns->mt_mutex);
+ for (i=0; i<env->me_maxreaders; i++) {
+ if (env->me_txns->mt_readers[i].mr_pid == 0) {
+ env->me_txns->mt_readers[i].mr_pid = getpid();
+ env->me_txns->mt_readers[i].mr_tid = pthread_self();
+ pthread_setspecific(env->me_txkey, &env->me_txns->mt_readers[i]);
+ if (i >= env->me_txns->mt_numreaders)
+ env->me_txns->mt_numreaders = i+1;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&env->me_txns->mt_mutex);
+ if (i == env->me_maxreaders) {
+ return ENOSPC;
+ }
}
-#endif
+ r->mr_txnid = txn->mt_txnid;
+ txn->mt_u.reader = r;
+ } else {
env->me_txn = txn;
}
env = txn->mt_env;
DPRINTF("abort transaction on mdbenv %p, root page %lu", env, txn->mt_root);
- if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
+ if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
+ txn->mt_u.reader->mr_txnid = 0;
+ } else {
/* Discard all dirty pages.
*/
- while (!SIMPLEQ_EMPTY(txn->mt_dirty_queue)) {
- dp = SIMPLEQ_FIRST(txn->mt_dirty_queue);
- SIMPLEQ_REMOVE_HEAD(txn->mt_dirty_queue, h.md_next);
+ while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
+ dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
+ SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
free(dp);
}
txn->bt->fd, strerror(errno));
}
#endif
- free(txn->mt_dirty_queue);
+ free(txn->mt_u.dirty_queue);
+ env->me_txn = NULL;
+ pthread_mutex_unlock(&env->me_txns->mt_wmutex);
}
- if (txn == env->me_txn)
- env->me_txn = NULL;
free(txn);
}
return EINVAL;
}
- if (SIMPLEQ_EMPTY(txn->mt_dirty_queue))
+ if (SIMPLEQ_EMPTY(txn->mt_u.dirty_queue))
goto done;
if (F_ISSET(env->me_flags, MDB_FIXPADDING)) {
do {
n = 0;
done = 1;
- SIMPLEQ_FOREACH(dp, txn->mt_dirty_queue, h.md_next) {
+ SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
DPRINTF("committing page %lu", dp->p.mp_pgno);
iov[n].iov_len = env->me_head.mh_psize * dp->h.md_num;
iov[n].iov_base = &dp->p;
/* Drop the dirty pages.
*/
- while (!SIMPLEQ_EMPTY(txn->mt_dirty_queue)) {
- dp = SIMPLEQ_FIRST(txn->mt_dirty_queue);
- SIMPLEQ_REMOVE_HEAD(txn->mt_dirty_queue, h.md_next);
+ while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
+ dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
+ SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
free(dp);
if (--n == 0)
break;
mdb_txn_abort(txn);
return n;
}
- txn = NULL;
env->me_txn = NULL;
+ pthread_mutex_unlock(&env->me_txns->mt_wmutex);
+ txn = NULL;
done:
mdb_txn_abort(txn);
bcopy(&env->me_meta, meta, sizeof(*meta));
rc = write(env->me_fd, &dp->p, env->me_head.mh_psize);
- SIMPLEQ_REMOVE_HEAD(env->me_txn->mt_dirty_queue, h.md_next);
+ SIMPLEQ_REMOVE_HEAD(env->me_txn->mt_u.dirty_queue, h.md_next);
free(dp);
if (rc != (ssize_t)env->me_head.mh_psize) {
int err = errno;
e->me_head.mh_magic = MDB_MAGIC;
e->me_head.mh_version = MDB_VERSION;
- e->me_mapsize = e->me_head.mh_mapsize = size;
+ e->me_head.mh_mapsize = DEFAULT_MAPSIZE;
+ e->me_maxreaders = DEFAULT_READERS;
e->me_db.md_env = e;
*env = e;
return MDB_SUCCESS;
}
+int
+mdbenv_set_mapsize(MDB_env *env, size_t size)
+{
+ if (env->me_map)
+ return EINVAL;
+ env->me_mapsize = env->me_head.mh_mapsize = size;
+ return MDB_SUCCESS;
+}
+
+int
+mdbenv_set_maxreaders(MDB_env *env, int readers)
+{
+ env->me_maxreaders = readers;
+ return MDB_SUCCESS;
+}
+
+int
+mdbenv_get_maxreaders(MDB_env *env, int *readers)
+{
+ if (!env || !readers)
+ return EINVAL;
+ *readers = env->me_maxreaders;
+ return MDB_SUCCESS;
+}
+
int
mdbenv_open2(MDB_env *env, unsigned int flags)
{
return MDB_SUCCESS;
}
+static void
+mdbenv_reader_dest(void *ptr)
+{
+ MDB_reader *reader = ptr;
+
+ reader->mr_txnid = 0;
+ reader->mr_pid = 0;
+ reader->mr_tid = 0;
+}
+
int
mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
{
- int oflags, rc;
+ int oflags, rc, len;
+ char *lpath;
+ off_t size, rsize;
+
+ len = strlen(path);
+ lpath = malloc(len + sizeof(".lock"));
+ sprintf(lpath, "%s.lock", path);
+ if ((env->me_lfd = open(lpath, O_RDWR | O_CREAT, mode)) == -1)
+ return errno;
+
+ size = lseek(env->me_lfd, 0, SEEK_END);
+ rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
+ if (size < rsize) {
+ if (ftruncate(env->me_lfd, rsize) != 0) {
+ rc = errno;
+ close(env->me_lfd);
+ return rc;
+ }
+ } else {
+ rsize = size;
+ size = rsize - sizeof(MDB_txninfo);
+ env->me_maxreaders = size/sizeof(MDB_reader) + 1;
+ }
+ env->me_txns = mmap(0, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
+ env->me_lfd, 0);
+ if (env->me_txns == MAP_FAILED)
+ return errno;
+ if (size == 0) {
+ pthread_mutexattr_t mattr;
+
+ pthread_mutexattr_init(&mattr);
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(&env->me_txns->mt_mutex, &mattr);
+ pthread_mutex_init(&env->me_txns->mt_wmutex, &mattr);
+ }
if (F_ISSET(flags, MDB_RDONLY))
oflags = O_RDONLY;
DPRINTF("opened dbenv %p", env);
}
+ pthread_key_create(&env->me_txkey, mdbenv_reader_dest);
+
return rc;
}
munmap(env->me_map, env->me_mapsize);
}
close(env->me_fd);
+ if (env->me_txns) {
+ size_t size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
+ munmap(env->me_txns, size);
+ }
+ close(env->me_lfd);
+ free(env);
}
/* Search for key within a leaf page, using binary search.
MDB_txn *txn = env->me_txn;
if (txn && pgno >= txn->mt_first_pgno &&
- !SIMPLEQ_EMPTY(txn->mt_dirty_queue)) {
+ !SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
MDB_dpage *dp;
- SIMPLEQ_FOREACH(dp, txn->mt_dirty_queue, h.md_next) {
+ SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
if (dp->p.mp_pgno == pgno) {
p = &dp->p;
break;