2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2014 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from many
7 others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 Bacula® is a registered trademark of Kern Sibbald.
17 * Bacula Thread Read/Write locking code. It permits
18 * multiple readers but only one writer. Note, however,
19 * that the writer thread is permitted to make multiple
20 * nested write lock calls.
22 * Kern Sibbald, January MMI
24 * This code adapted from "Programming with POSIX Threads", by
29 #define _LOCKMGR_COMPLIANT
33 * Initialize a read/write lock
35 * Returns: 0 on success
38 int rwl_init(brwlock_t *rwl, int priority)
42 rwl->r_active = rwl->w_active = 0;
43 rwl->r_wait = rwl->w_wait = 0;
44 rwl->priority = priority;
45 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
48 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
49 pthread_mutex_destroy(&rwl->mutex);
52 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
53 pthread_cond_destroy(&rwl->read);
54 pthread_mutex_destroy(&rwl->mutex);
57 rwl->valid = RWLOCK_VALID;
62 * Destroy a read/write lock
64 * Returns: 0 on success
67 int rwl_destroy(brwlock_t *rwl)
69 int stat, stat1, stat2;
71 if (rwl->valid != RWLOCK_VALID) {
74 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
79 * If any threads are active, report EBUSY
81 if (rwl->r_active > 0 || rwl->w_active) {
82 pthread_mutex_unlock(&rwl->mutex);
87 * If any threads are waiting, report EBUSY
89 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
90 pthread_mutex_unlock(&rwl->mutex);
95 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
98 stat = pthread_mutex_destroy(&rwl->mutex);
99 stat1 = pthread_cond_destroy(&rwl->read);
100 stat2 = pthread_cond_destroy(&rwl->write);
101 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
104 bool rwl_is_init(brwlock_t *rwl)
106 return (rwl->valid == RWLOCK_VALID);
110 * Handle cleanup when the read lock condition variable
113 static void rwl_read_release(void *arg)
115 brwlock_t *rwl = (brwlock_t *)arg;
118 pthread_mutex_unlock(&rwl->mutex);
122 * Handle cleanup when the write lock condition variable wait
125 static void rwl_write_release(void *arg)
127 brwlock_t *rwl = (brwlock_t *)arg;
130 pthread_mutex_unlock(&rwl->mutex);
134 * Lock for read access, wait until locked (or error).
136 int rwl_readlock(brwlock_t *rwl)
140 if (rwl->valid != RWLOCK_VALID) {
143 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
147 rwl->r_wait++; /* indicate that we are waiting */
148 pthread_cleanup_push(rwl_read_release, (void *)rwl);
149 while (rwl->w_active) {
150 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
152 break; /* error, bail out */
155 pthread_cleanup_pop(0);
156 rwl->r_wait--; /* we are no longer waiting */
159 rwl->r_active++; /* we are running */
161 pthread_mutex_unlock(&rwl->mutex);
166 * Attempt to lock for read access, don't wait
168 int rwl_readtrylock(brwlock_t *rwl)
172 if (rwl->valid != RWLOCK_VALID) {
175 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
181 rwl->r_active++; /* we are running */
183 stat2 = pthread_mutex_unlock(&rwl->mutex);
184 return (stat == 0 ? stat2 : stat);
190 int rwl_readunlock(brwlock_t *rwl)
194 if (rwl->valid != RWLOCK_VALID) {
197 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
201 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
202 stat = pthread_cond_broadcast(&rwl->write);
204 stat2 = pthread_mutex_unlock(&rwl->mutex);
205 return (stat == 0 ? stat2 : stat);
210 * Lock for write access, wait until locked (or error).
211 * Multiple nested write locking is permitted.
213 int rwl_writelock_p(brwlock_t *rwl, const char *file, int line)
217 if (rwl->valid != RWLOCK_VALID) {
220 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
223 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
225 pthread_mutex_unlock(&rwl->mutex);
228 lmgr_pre_lock(rwl, rwl->priority, file, line);
229 if (rwl->w_active || rwl->r_active > 0) {
230 rwl->w_wait++; /* indicate that we are waiting */
231 pthread_cleanup_push(rwl_write_release, (void *)rwl);
232 while (rwl->w_active || rwl->r_active > 0) {
233 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
235 break; /* error, bail out */
238 pthread_cleanup_pop(0);
239 rwl->w_wait--; /* we are no longer waiting */
242 rwl->w_active++; /* we are running */
243 rwl->writer_id = pthread_self(); /* save writer thread's id */
246 pthread_mutex_unlock(&rwl->mutex);
251 * Attempt to lock for write access, don't wait
253 int rwl_writetrylock(brwlock_t *rwl)
257 if (rwl->valid != RWLOCK_VALID) {
260 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
263 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
265 pthread_mutex_unlock(&rwl->mutex);
268 if (rwl->w_active || rwl->r_active > 0) {
271 rwl->w_active = 1; /* we are running */
272 rwl->writer_id = pthread_self(); /* save writer thread's id */
273 lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
275 stat2 = pthread_mutex_unlock(&rwl->mutex);
276 return (stat == 0 ? stat2 : stat);
281 * Start any waiting writers in preference to waiting readers
283 int rwl_writeunlock(brwlock_t *rwl)
287 if (rwl->valid != RWLOCK_VALID) {
290 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
293 if (rwl->w_active <= 0) {
294 pthread_mutex_unlock(&rwl->mutex);
295 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
298 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
299 pthread_mutex_unlock(&rwl->mutex);
300 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
302 if (rwl->w_active > 0) {
303 stat = 0; /* writers still active */
306 /* No more writers, awaken someone */
307 if (rwl->r_wait > 0) { /* if readers waiting */
308 stat = pthread_cond_broadcast(&rwl->read);
309 } else if (rwl->w_wait > 0) {
310 stat = pthread_cond_broadcast(&rwl->write);
313 stat2 = pthread_mutex_unlock(&rwl->mutex);
314 return (stat == 0 ? stat2 : stat);
321 #define ITERATIONS 1000000
324 * Keep statics for each thread.
326 typedef struct thread_tag {
335 * Read/write lock and shared data.
337 typedef struct data_tag {
343 static thread_t threads[THREADS];
344 static data_t data[DATASIZE];
347 * Thread start routine that uses read/write locks.
349 void *thread_routine(void *arg)
351 thread_t *self = (thread_t *)arg;
357 for (iteration=0; iteration < ITERATIONS; iteration++) {
359 * Each "self->interval" iterations, perform an
360 * update operation (write lock instead of read
363 // if ((iteration % self->interval) == 0) {
364 status = rwl_writelock(&data[element].lock);
367 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
370 data[element].data = self->thread_num;
371 data[element].writes++;
373 status = rwl_writelock(&data[element].lock);
376 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
379 data[element].data = self->thread_num;
380 data[element].writes++;
382 status = rwl_writeunlock(&data[element].lock);
385 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
388 status = rwl_writeunlock(&data[element].lock);
391 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
398 * Look at the current data element to see whether
399 * the current thread last updated it. Count the
400 * times to report later.
402 status = rwl_readlock(&data[element].lock);
405 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
409 if (data[element].data == self->thread_num)
411 status = rwl_readunlock(&data[element].lock);
414 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
420 if (element >= DATASIZE) {
425 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
426 self->thread_num, repeats);
431 int main (int argc, char *argv[])
436 unsigned int seed = 1;
437 int thread_writes = 0;
440 #ifdef USE_THR_SETCONCURRENCY
442 * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
443 * that our threads can run concurrently, we need to
444 * increase the concurrency level to THREADS.
446 thr_setconcurrency (THREADS);
450 * Initialize the shared data.
452 for (data_count = 0; data_count < DATASIZE; data_count++) {
453 data[data_count].data = 0;
454 data[data_count].writes = 0;
455 status = rwl_init(&data[data_count].lock);
458 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
464 * Create THREADS threads to access shared data.
466 for (count = 0; count < THREADS; count++) {
467 threads[count].thread_num = count + 1;
468 threads[count].writes = 0;
469 threads[count].reads = 0;
470 threads[count].interval = rand_r(&seed) % 71;
471 if (threads[count].interval <= 0) {
472 threads[count].interval = 1;
474 status = pthread_create (&threads[count].thread_id,
475 NULL, thread_routine, (void*)&threads[count]);
476 if (status != 0 || (int)threads[count].thread_id == 0) {
478 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
484 * Wait for all threads to complete, and collect
487 for (count = 0; count < THREADS; count++) {
488 status = pthread_join (threads[count].thread_id, NULL);
491 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
494 thread_writes += threads[count].writes;
495 printf (_("%02d: interval %d, writes %d, reads %d\n"),
496 count, threads[count].interval,
497 threads[count].writes, threads[count].reads);
501 * Collect statistics for the data.
503 for (data_count = 0; data_count < DATASIZE; data_count++) {
504 data_writes += data[data_count].writes;
505 printf (_("data %02d: value %d, %d writes\n"),
506 data_count, data[data_count].data, data[data_count].writes);
507 rwl_destroy (&data[data_count].lock);
510 printf (_("Total: %d thread writes, %d data writes\n"),
511 thread_writes, data_writes);
517 #ifdef TEST_RW_TRY_LOCK
521 * Demonstrate use of non-blocking read-write locks.
523 * Special notes: On older Solaris system, call thr_setconcurrency()
524 * to allow interleaved thread execution, since threads are not
532 #define ITERATIONS 1000
536 * Keep statistics for each thread.
538 typedef struct thread_tag {
548 * Read-write lock and shared data
550 typedef struct data_tag {
556 thread_t threads[THREADS];
557 data_t data[DATASIZE];
560 * Thread start routine that uses read-write locks
562 void *thread_routine (void *arg)
564 thread_t *self = (thread_t*)arg;
569 element = 0; /* Current data element */
571 for (iteration = 0; iteration < ITERATIONS; iteration++) {
572 if ((iteration % self->interval) == 0) {
573 status = rwl_writetrylock (&data[element].lock);
575 self->w_collisions++;
576 else if (status == 0) {
577 data[element].data++;
578 data[element].updates++;
580 rwl_writeunlock (&data[element].lock);
582 err_abort (status, _("Try write lock"));
584 status = rwl_readtrylock (&data[element].lock);
586 self->r_collisions++;
587 else if (status != 0) {
588 err_abort (status, _("Try read lock"));
590 if (data[element].data != data[element].updates)
591 printf ("%d: data[%d] %d != %d\n",
592 self->thread_num, element,
593 data[element].data, data[element].updates);
594 rwl_readunlock (&data[element].lock);
599 if (element >= DATASIZE)
602 lmgr_cleanup_thread();
606 int main (int argc, char *argv[])
608 int count, data_count;
609 unsigned int seed = 1;
610 int thread_updates = 0, data_updates = 0;
613 #ifdef USE_THR_SETCONCURRENCY
615 * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
616 * that our threads can run concurrently, we need to
617 * increase the concurrency level to THREADS.
619 DPRINTF (("Setting concurrency level to %d\n", THREADS));
620 thr_setconcurrency (THREADS);
624 * Initialize the shared data.
626 for (data_count = 0; data_count < DATASIZE; data_count++) {
627 data[data_count].data = 0;
628 data[data_count].updates = 0;
629 rwl_init(&data[data_count].lock);
633 * Create THREADS threads to access shared data.
635 for (count = 0; count < THREADS; count++) {
636 threads[count].thread_num = count;
637 threads[count].r_collisions = 0;
638 threads[count].w_collisions = 0;
639 threads[count].updates = 0;
640 threads[count].interval = rand_r (&seed) % ITERATIONS;
641 status = pthread_create (&threads[count].thread_id,
642 NULL, thread_routine, (void*)&threads[count]);
644 err_abort (status, _("Create thread"));
648 * Wait for all threads to complete, and collect
651 for (count = 0; count < THREADS; count++) {
652 status = pthread_join (threads[count].thread_id, NULL);
654 err_abort (status, _("Join thread"));
655 thread_updates += threads[count].updates;
656 printf (_("%02d: interval %d, updates %d, "
657 "r_collisions %d, w_collisions %d\n"),
658 count, threads[count].interval,
659 threads[count].updates,
660 threads[count].r_collisions, threads[count].w_collisions);
664 * Collect statistics for the data.
666 for (data_count = 0; data_count < DATASIZE; data_count++) {
667 data_updates += data[data_count].updates;
668 printf (_("data %02d: value %d, %d updates\n"),
669 data_count, data[data_count].data, data[data_count].updates);
670 rwl_destroy (&data[data_count].lock);