]> git.sur5r.net Git - openldap/commitdiff
Add locking support
authorHoward Chu <hyc@symas.com>
Tue, 28 Jun 2011 20:46:48 +0000 (13:46 -0700)
committerHoward Chu <hyc@symas.com>
Tue, 28 Jun 2011 20:46:48 +0000 (13:46 -0700)
libraries/libmdb/mdb.c
libraries/libmdb/mdb.h
libraries/libmdb/mtest.c

index c5c6f0ac29f7af294007b241188c81ab6d6f9a78..913de09561069bb3349dd32eb404632fde75028e 100644 (file)
@@ -19,6 +19,7 @@
 #include <string.h>
 #include <time.h>
 #include <unistd.h>
+#include <pthread.h>
 
 #include <openssl/sha.h>
 
 typedef ulong           pgno_t;
 typedef uint16_t        indx_t;
 
+#define DEFAULT_READERS        126
+#define DEFAULT_MAPSIZE        1048576
+
+/* Lock descriptor stuff */
+#define RXBODY \
+       ulong           mr_txnid; \
+       pid_t           mr_pid; \
+       pthread_t       mr_tid
+typedef struct MDB_rxbody {
+       RXBODY;
+} MDB_rxbody;
+
+#ifndef CACHELINE
+#define CACHELINE      64      /* most CPUs. Itanium uses 128 */
+#endif
+
+typedef struct MDB_reader {
+       RXBODY;
+       /* cache line alignment */
+       char pad[CACHELINE-sizeof(MDB_rxbody)];
+} MDB_reader;
+
+#define        TXBODY \
+       uint32_t        mt_magic;       \
+       uint32_t        mt_version;     \
+       pthread_mutex_t mt_mutex;       \
+       ulong           mt_txnid;       \
+       uint32_t        mt_numreaders
+typedef struct MDB_txbody {
+       TXBODY;
+} MDB_txbody;
+
+typedef struct MDB_txninfo {
+       TXBODY;
+       char pad[CACHELINE-sizeof(MDB_txbody)];
+       pthread_mutex_t mt_wmutex;
+       char pad2[CACHELINE-sizeof(pthread_mutex_t)];
+       MDB_reader      mt_readers[1];
+} MDB_txninfo;
+
 /* Common header for all page types. Overflow pages
  * occupy a number of contiguous pages with no
  * headers on any page after the first.
@@ -101,6 +142,7 @@ typedef struct MDB_meta {                   /* meta (footer) page content */
        pgno_t          mm_root;                        /* page number of root page */
        MDB_stat        mm_stat;
        pgno_t          mm_prev_meta;           /* previous meta page number */
+       ulong           mm_txnid;
 #define MDB_TOMBSTONE   0x01           /* file is replaced */
        uint32_t        mm_flags;
 #define mm_revisions mm_stat.ms_revisions
@@ -171,8 +213,12 @@ struct MDB_txn {
        pgno_t          mt_root;                /* current / new root page */
        pgno_t          mt_next_pgno;   /* next unallocated page */
        pgno_t          mt_first_pgno;
+       ulong           mt_txnid;
        MDB_env         *mt_env;        
-       struct dirty_queue      *mt_dirty_queue;        /* modified pages */
+       union {
+               struct dirty_queue      *dirty_queue;   /* modified pages */
+               MDB_reader      *reader;
+       } mt_u;
 #define MDB_TXN_RDONLY          0x01           /* read-only transaction */
 #define MDB_TXN_ERROR           0x02           /* an error has occurred */
        unsigned int             mt_flags;
@@ -199,16 +245,20 @@ struct MDB_db {
 
 struct MDB_env {
        int                     me_fd;
+       int                     me_lfd;
 #define MDB_FIXPADDING          0x01           /* internal */
        uint32_t        me_flags;
+       int                     me_maxreaders;
        char            *me_path;
        char *me_map;
+       MDB_txninfo     *me_txns;
        MDB_head        me_head;
        MDB_db0         me_db;          /* first DB, overlaps with meta */
        MDB_meta        me_meta;
        MDB_txn *me_txn;                /* current write transaction */
        size_t          me_mapsize;
        off_t           me_size;                /* current file size */
+       pthread_key_t   me_txkey;       /* thread-key for readers */
 };
 
 #define NODESIZE        offsetof(MDB_node, mn_data)
@@ -356,7 +406,7 @@ mdb_newpage(MDB_txn *txn, MDB_page *parent, int parent_idx, int num)
        dp->h.md_num = num;
        dp->h.md_parent = parent;
        dp->h.md_pi = parent_idx;
-       SIMPLEQ_INSERT_TAIL(txn->mt_dirty_queue, dp, h.md_next);
+       SIMPLEQ_INSERT_TAIL(txn->mt_u.dirty_queue, dp, h.md_next);
        dp->p.mp_pgno = txn->mt_next_pgno;
        txn->mt_next_pgno += num;
 
@@ -410,11 +460,6 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
        MDB_txn *txn;
        int rc;
 
-       if (!rdonly && env->me_txn != NULL) {
-               DPRINTF("write transaction already begun");
-               return EBUSY;
-       }
-
        if ((txn = calloc(1, sizeof(*txn))) == NULL) {
                DPRINTF("calloc: %s", strerror(errno));
                return ENOMEM;
@@ -423,23 +468,40 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
        if (rdonly) {
                txn->mt_flags |= MDB_TXN_RDONLY;
        } else {
-               txn->mt_dirty_queue = calloc(1, sizeof(*txn->mt_dirty_queue));
-               if (txn->mt_dirty_queue == NULL) {
+               txn->mt_u.dirty_queue = calloc(1, sizeof(*txn->mt_u.dirty_queue));
+               if (txn->mt_u.dirty_queue == NULL) {
                        free(txn);
                        return ENOMEM;
                }
-               SIMPLEQ_INIT(txn->mt_dirty_queue);
+               SIMPLEQ_INIT(txn->mt_u.dirty_queue);
 
-#if 0
-               DPRINTF("taking write lock on txn %p", txn);
-               if (flock(bt->fd, LOCK_EX | LOCK_NB) != 0) {
-                       DPRINTF("flock: %s", strerror(errno));
-                       errno = EBUSY;
-                       free(txn->dirty_queue);
-                       free(txn);
-                       return NULL;
+               pthread_mutex_lock(&env->me_txns->mt_wmutex);
+               env->me_txns->mt_txnid++;
+       }
+       txn->mt_txnid = env->me_txns->mt_txnid;
+       if (rdonly) {
+               MDB_reader *r = pthread_getspecific(env->me_txkey);
+               if (!r) {
+                       int i;
+                       pthread_mutex_lock(&env->me_txns->mt_mutex);
+                       for (i=0; i<env->me_maxreaders; i++) {
+                               if (env->me_txns->mt_readers[i].mr_pid == 0) {
+                                       env->me_txns->mt_readers[i].mr_pid = getpid();
+                                       env->me_txns->mt_readers[i].mr_tid = pthread_self();
+                                       pthread_setspecific(env->me_txkey, &env->me_txns->mt_readers[i]);
+                                       if (i >= env->me_txns->mt_numreaders)
+                                               env->me_txns->mt_numreaders = i+1;
+                                       break;
+                               }
+                       }
+                       pthread_mutex_unlock(&env->me_txns->mt_mutex);
+                       if (i == env->me_maxreaders) {
+                               return ENOSPC;
+                       }
                }
-#endif
+               r->mr_txnid = txn->mt_txnid;
+               txn->mt_u.reader = r;
+       } else {
                env->me_txn = txn;
        }
 
@@ -470,12 +532,14 @@ mdb_txn_abort(MDB_txn *txn)
        env = txn->mt_env;
        DPRINTF("abort transaction on mdbenv %p, root page %lu", env, txn->mt_root);
 
-       if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
+       if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
+               txn->mt_u.reader->mr_txnid = 0;
+       } else {
                /* Discard all dirty pages.
                 */
-               while (!SIMPLEQ_EMPTY(txn->mt_dirty_queue)) {
-                       dp = SIMPLEQ_FIRST(txn->mt_dirty_queue);
-                       SIMPLEQ_REMOVE_HEAD(txn->mt_dirty_queue, h.md_next);
+               while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
+                       dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
+                       SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
                        free(dp);
                }
 
@@ -487,11 +551,11 @@ mdb_txn_abort(MDB_txn *txn)
                            txn->bt->fd, strerror(errno));
                }
 #endif
-               free(txn->mt_dirty_queue);
+               free(txn->mt_u.dirty_queue);
+               env->me_txn = NULL;
+               pthread_mutex_unlock(&env->me_txns->mt_wmutex);
        }
 
-       if (txn == env->me_txn)
-               env->me_txn = NULL;
        free(txn);
 }
 
@@ -528,7 +592,7 @@ mdb_txn_commit(MDB_txn *txn)
                return EINVAL;
        }
 
-       if (SIMPLEQ_EMPTY(txn->mt_dirty_queue))
+       if (SIMPLEQ_EMPTY(txn->mt_u.dirty_queue))
                goto done;
 
        if (F_ISSET(env->me_flags, MDB_FIXPADDING)) {
@@ -552,7 +616,7 @@ mdb_txn_commit(MDB_txn *txn)
        do {
                n = 0;
                done = 1;
-               SIMPLEQ_FOREACH(dp, txn->mt_dirty_queue, h.md_next) {
+               SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
                        DPRINTF("committing page %lu", dp->p.mp_pgno);
                        iov[n].iov_len = env->me_head.mh_psize * dp->h.md_num;
                        iov[n].iov_base = &dp->p;
@@ -581,9 +645,9 @@ mdb_txn_commit(MDB_txn *txn)
 
                /* Drop the dirty pages.
                 */
-               while (!SIMPLEQ_EMPTY(txn->mt_dirty_queue)) {
-                       dp = SIMPLEQ_FIRST(txn->mt_dirty_queue);
-                       SIMPLEQ_REMOVE_HEAD(txn->mt_dirty_queue, h.md_next);
+               while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
+                       dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
+                       SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
                        free(dp);
                        if (--n == 0)
                                break;
@@ -596,8 +660,9 @@ mdb_txn_commit(MDB_txn *txn)
                mdb_txn_abort(txn);
                return n;
        }
-       txn = NULL;
        env->me_txn = NULL;
+       pthread_mutex_unlock(&env->me_txns->mt_wmutex);
+       txn = NULL;
 
 done:
        mdb_txn_abort(txn);
@@ -713,7 +778,7 @@ mdbenv_write_meta(MDB_env *env, pgno_t root, unsigned int flags)
        bcopy(&env->me_meta, meta, sizeof(*meta));
 
        rc = write(env->me_fd, &dp->p, env->me_head.mh_psize);
-       SIMPLEQ_REMOVE_HEAD(env->me_txn->mt_dirty_queue, h.md_next);
+       SIMPLEQ_REMOVE_HEAD(env->me_txn->mt_u.dirty_queue, h.md_next);
        free(dp);
        if (rc != (ssize_t)env->me_head.mh_psize) {
                int err = errno;
@@ -852,12 +917,38 @@ mdbenv_create(MDB_env **env, size_t size)
 
        e->me_head.mh_magic = MDB_MAGIC;
        e->me_head.mh_version = MDB_VERSION;
-       e->me_mapsize = e->me_head.mh_mapsize = size;
+       e->me_head.mh_mapsize = DEFAULT_MAPSIZE;
+       e->me_maxreaders = DEFAULT_READERS;
        e->me_db.md_env = e;
        *env = e;
        return MDB_SUCCESS;
 }
 
+int
+mdbenv_set_mapsize(MDB_env *env, size_t size)
+{
+       if (env->me_map)
+               return EINVAL;
+       env->me_mapsize = env->me_head.mh_mapsize = size;
+       return MDB_SUCCESS;
+}
+
+int
+mdbenv_set_maxreaders(MDB_env *env, int readers)
+{
+       env->me_maxreaders = readers;
+       return MDB_SUCCESS;
+}
+
+int
+mdbenv_get_maxreaders(MDB_env *env, int *readers)
+{
+       if (!env || !readers)
+               return EINVAL;
+       *readers = env->me_maxreaders;
+       return MDB_SUCCESS;
+}
+
 int
 mdbenv_open2(MDB_env *env, unsigned int flags)
 {
@@ -918,10 +1009,54 @@ mdbenv_open2(MDB_env *env, unsigned int flags)
        return MDB_SUCCESS;
 }
 
+static void
+mdbenv_reader_dest(void *ptr)
+{
+       MDB_reader *reader = ptr;
+
+       reader->mr_txnid = 0;
+       reader->mr_pid = 0;
+       reader->mr_tid = 0;
+}
+
 int
 mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
 {
-       int              oflags, rc;
+       int             oflags, rc, len;
+       char    *lpath;
+       off_t   size, rsize;
+
+       len = strlen(path);
+       lpath = malloc(len + sizeof(".lock"));
+       sprintf(lpath, "%s.lock", path);
+       if ((env->me_lfd = open(lpath, O_RDWR | O_CREAT, mode)) == -1)
+               return errno;
+
+       size = lseek(env->me_lfd, 0, SEEK_END);
+       rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
+       if (size < rsize) {
+               if (ftruncate(env->me_lfd, rsize) != 0) {
+                       rc = errno;
+                       close(env->me_lfd);
+                       return rc;
+               }
+       } else {
+               rsize = size;
+               size = rsize - sizeof(MDB_txninfo);
+               env->me_maxreaders = size/sizeof(MDB_reader) + 1;
+       }
+       env->me_txns = mmap(0, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
+               env->me_lfd, 0);
+       if (env->me_txns == MAP_FAILED)
+               return errno;
+       if (size == 0) {
+               pthread_mutexattr_t mattr;
+
+               pthread_mutexattr_init(&mattr);
+               pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+               pthread_mutex_init(&env->me_txns->mt_mutex, &mattr);
+               pthread_mutex_init(&env->me_txns->mt_wmutex, &mattr);
+       }
 
        if (F_ISSET(flags, MDB_RDONLY))
                oflags = O_RDONLY;
@@ -939,6 +1074,8 @@ mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
                DPRINTF("opened dbenv %p", env);
        }
 
+       pthread_key_create(&env->me_txkey, mdbenv_reader_dest);
+
        return rc;
 }
 
@@ -954,6 +1091,12 @@ mdbenv_close(MDB_env *env)
                munmap(env->me_map, env->me_mapsize);
        }
        close(env->me_fd);
+       if (env->me_txns) {
+               size_t size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
+               munmap(env->me_txns, size);
+       }
+       close(env->me_lfd);
+       free(env);
 }
 
 /* Search for key within a leaf page, using binary search.
@@ -1060,9 +1203,9 @@ mdbenv_get_page(MDB_env *env, pgno_t pgno)
        MDB_txn *txn = env->me_txn;
 
        if (txn && pgno >= txn->mt_first_pgno &&
-               !SIMPLEQ_EMPTY(txn->mt_dirty_queue)) {
+               !SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
                MDB_dpage *dp;
-               SIMPLEQ_FOREACH(dp, txn->mt_dirty_queue, h.md_next) {
+               SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
                        if (dp->p.mp_pgno == pgno) {
                                p = &dp->p;
                                break;
index b401bebaa1cd599e23b0f82bd7905a6e9f6aa791..518fba96a30f2d87f3906e2a19c95b63967ee451 100644 (file)
@@ -55,12 +55,15 @@ typedef struct MDB_stat {
        unsigned long   ms_entries;
 } MDB_stat;
 
-int  mdbenv_create(MDB_env **env, size_t size);
+int  mdbenv_create(MDB_env **env);
 int  mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode);
 int  mdbenv_stat(MDB_env *env, MDB_stat **stat);
 void mdbenv_close(MDB_env *env);
 int  mdbenv_get_flags(MDB_env *env, unsigned int *flags);
 int  mdbenv_get_path(MDB_env *env, const char **path);
+int  mdbenv_set_mapsize(MDB_env *env, size_t size);
+int  mdbenv_set_maxreaders(MDB_env *env, int readers);
+int  mdbenv_get_maxreaders(MDB_env *env, int *readers);
 int  mdbenv_sync(MDB_env *env);
 int  mdbenv_compact(MDB_env *env);
 
index b6316d4d5c82e107427b548376915c7649bb3f34..3533638c48c62a890ac9e5e9b228a9adff10fe58 100644 (file)
@@ -25,7 +25,8 @@ int main(int argc,char * argv[])
                        values[i] = random()%1024;
            }
     
-               rc = mdbenv_create(&env, 10485760);
+               rc = mdbenv_create(&env);
+               rc = mdbenv_set_mapsize(env, 10485760);
                rc = mdbenv_open(env, "./testdb", MDB_FIXEDMAP|MDB_NOSYNC, 0664);
                rc = mdb_txn_begin(env, 0, &txn);
                rc = mdb_open(env, txn, NULL, 0, &db);