2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2016 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Bacula Thread Read/Write locking code. It permits
21 * multiple readers but only one writer. Note, however,
22 * that the writer thread is permitted to make multiple
23 * nested write lock calls.
25 * Kern Sibbald, January MMI
27 * This code adapted from "Programming with POSIX Threads", by
32 #define LOCKMGR_COMPLIANT
36 * Initialize a read/write lock
38 * Returns: 0 on success
41 int rwl_init(brwlock_t *rwl, int priority)
45 rwl->r_active = rwl->w_active = 0;
46 rwl->r_wait = rwl->w_wait = 0;
47 rwl->priority = priority;
48 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
51 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
52 pthread_mutex_destroy(&rwl->mutex);
55 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
56 pthread_cond_destroy(&rwl->read);
57 pthread_mutex_destroy(&rwl->mutex);
60 rwl->valid = RWLOCK_VALID;
65 * Destroy a read/write lock
67 * Returns: 0 on success
70 int rwl_destroy(brwlock_t *rwl)
72 int stat, stat1, stat2;
74 if (rwl->valid != RWLOCK_VALID) {
77 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
82 * If any threads are active, report EBUSY
84 if (rwl->r_active > 0 || rwl->w_active) {
85 pthread_mutex_unlock(&rwl->mutex);
90 * If any threads are waiting, report EBUSY
92 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
93 pthread_mutex_unlock(&rwl->mutex);
98 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
101 stat = pthread_mutex_destroy(&rwl->mutex);
102 stat1 = pthread_cond_destroy(&rwl->read);
103 stat2 = pthread_cond_destroy(&rwl->write);
104 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
108 * Handle cleanup when the read lock condition variable
111 static void rwl_read_release(void *arg)
113 brwlock_t *rwl = (brwlock_t *)arg;
116 pthread_mutex_unlock(&rwl->mutex);
120 * Handle cleanup when the write lock condition variable wait
123 static void rwl_write_release(void *arg)
125 brwlock_t *rwl = (brwlock_t *)arg;
128 pthread_mutex_unlock(&rwl->mutex);
132 * Lock for read access, wait until locked (or error).
134 int rwl_readlock(brwlock_t *rwl)
138 if (rwl->valid != RWLOCK_VALID) {
141 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
145 rwl->r_wait++; /* indicate that we are waiting */
146 pthread_cleanup_push(rwl_read_release, (void *)rwl);
147 while (rwl->w_active) {
148 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
150 break; /* error, bail out */
153 pthread_cleanup_pop(0);
154 rwl->r_wait--; /* we are no longer waiting */
157 rwl->r_active++; /* we are running */
159 pthread_mutex_unlock(&rwl->mutex);
164 * Attempt to lock for read access, don't wait
166 int rwl_readtrylock(brwlock_t *rwl)
170 if (rwl->valid != RWLOCK_VALID) {
173 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
179 rwl->r_active++; /* we are running */
181 stat2 = pthread_mutex_unlock(&rwl->mutex);
182 return (stat == 0 ? stat2 : stat);
188 int rwl_readunlock(brwlock_t *rwl)
192 if (rwl->valid != RWLOCK_VALID) {
195 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
199 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
200 stat = pthread_cond_broadcast(&rwl->write);
202 stat2 = pthread_mutex_unlock(&rwl->mutex);
203 return (stat == 0 ? stat2 : stat);
208 * Lock for write access, wait until locked (or error).
209 * Multiple nested write locking is permitted.
211 int rwl_writelock_p(brwlock_t *rwl, const char *file, int line)
215 if (rwl->valid != RWLOCK_VALID) {
218 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
221 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
223 pthread_mutex_unlock(&rwl->mutex);
226 lmgr_pre_lock(rwl, rwl->priority, file, line);
227 if (rwl->w_active || rwl->r_active > 0) {
228 rwl->w_wait++; /* indicate that we are waiting */
229 pthread_cleanup_push(rwl_write_release, (void *)rwl);
230 while (rwl->w_active || rwl->r_active > 0) {
231 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
233 break; /* error, bail out */
236 pthread_cleanup_pop(0);
237 rwl->w_wait--; /* we are no longer waiting */
240 rwl->w_active++; /* we are running */
241 rwl->writer_id = pthread_self(); /* save writer thread's id */
244 pthread_mutex_unlock(&rwl->mutex);
249 * Attempt to lock for write access, don't wait
251 int rwl_writetrylock(brwlock_t *rwl)
255 if (rwl->valid != RWLOCK_VALID) {
258 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
261 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
263 pthread_mutex_unlock(&rwl->mutex);
266 if (rwl->w_active || rwl->r_active > 0) {
269 rwl->w_active = 1; /* we are running */
270 rwl->writer_id = pthread_self(); /* save writer thread's id */
271 lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
273 stat2 = pthread_mutex_unlock(&rwl->mutex);
274 return (stat == 0 ? stat2 : stat);
279 * Start any waiting writers in preference to waiting readers
281 int rwl_writeunlock(brwlock_t *rwl)
285 if (rwl->valid != RWLOCK_VALID) {
288 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
291 if (rwl->w_active <= 0) {
292 pthread_mutex_unlock(&rwl->mutex);
293 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
296 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
297 pthread_mutex_unlock(&rwl->mutex);
298 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
300 if (rwl->w_active > 0) {
301 stat = 0; /* writers still active */
304 /* No more writers, awaken someone */
305 if (rwl->r_wait > 0) { /* if readers waiting */
306 stat = pthread_cond_broadcast(&rwl->read);
307 } else if (rwl->w_wait > 0) {
308 stat = pthread_cond_broadcast(&rwl->write);
311 stat2 = pthread_mutex_unlock(&rwl->mutex);
312 return (stat == 0 ? stat2 : stat);
315 bool is_rwl_valid(brwlock_t *rwl)
317 return (rwl->valid == RWLOCK_VALID);
325 #define ITERATIONS 1000000
328 * Keep statics for each thread.
330 typedef struct thread_tag {
339 * Read/write lock and shared data.
341 typedef struct data_tag {
347 static thread_t threads[THREADS];
348 static data_t data[DATASIZE];
351 * Thread start routine that uses read/write locks.
353 void *thread_routine(void *arg)
355 thread_t *self = (thread_t *)arg;
361 for (iteration=0; iteration < ITERATIONS; iteration++) {
363 * Each "self->interval" iterations, perform an
364 * update operation (write lock instead of read
367 // if ((iteration % self->interval) == 0) {
368 status = rwl_writelock(&data[element].lock);
371 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
374 data[element].data = self->thread_num;
375 data[element].writes++;
377 status = rwl_writelock(&data[element].lock);
380 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
383 data[element].data = self->thread_num;
384 data[element].writes++;
386 status = rwl_writeunlock(&data[element].lock);
389 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
392 status = rwl_writeunlock(&data[element].lock);
395 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
402 * Look at the current data element to see whether
403 * the current thread last updated it. Count the
404 * times to report later.
406 status = rwl_readlock(&data[element].lock);
409 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
413 if (data[element].data == self->thread_num)
415 status = rwl_readunlock(&data[element].lock);
418 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
424 if (element >= DATASIZE) {
429 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
430 self->thread_num, repeats);
435 int main (int argc, char *argv[])
440 unsigned int seed = 1;
441 int thread_writes = 0;
445 * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
446 * Ensure our threads can run concurrently.
448 thr_setconcurrency(THREADS); /* Only implemented on Solaris */
451 * Initialize the shared data.
453 for (data_count = 0; data_count < DATASIZE; data_count++) {
454 data[data_count].data = 0;
455 data[data_count].writes = 0;
456 status = rwl_init(&data[data_count].lock);
459 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
465 * Create THREADS threads to access shared data.
467 for (count = 0; count < THREADS; count++) {
468 threads[count].thread_num = count + 1;
469 threads[count].writes = 0;
470 threads[count].reads = 0;
471 threads[count].interval = rand_r(&seed) % 71;
472 if (threads[count].interval <= 0) {
473 threads[count].interval = 1;
475 status = pthread_create (&threads[count].thread_id,
476 NULL, thread_routine, (void*)&threads[count]);
477 if (status != 0 || (int)threads[count].thread_id == 0) {
479 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
485 * Wait for all threads to complete, and collect
488 for (count = 0; count < THREADS; count++) {
489 status = pthread_join (threads[count].thread_id, NULL);
492 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
495 thread_writes += threads[count].writes;
496 printf (_("%02d: interval %d, writes %d, reads %d\n"),
497 count, threads[count].interval,
498 threads[count].writes, threads[count].reads);
502 * Collect statistics for the data.
504 for (data_count = 0; data_count < DATASIZE; data_count++) {
505 data_writes += data[data_count].writes;
506 printf (_("data %02d: value %d, %d writes\n"),
507 data_count, data[data_count].data, data[data_count].writes);
508 rwl_destroy (&data[data_count].lock);
511 printf (_("Total: %d thread writes, %d data writes\n"),
512 thread_writes, data_writes);
518 #ifdef TEST_RW_TRY_LOCK
522 * Demonstrate use of non-blocking read-write locks.
524 * On older Solaris systems, call thr_setconcurrency()
525 * to allow interleaved thread execution, since threads are not
533 #define ITERATIONS 1000
537 * Keep statistics for each thread.
539 typedef struct thread_tag {
549 * Read-write lock and shared data
551 typedef struct data_tag {
557 thread_t threads[THREADS];
558 data_t data[DATASIZE];
561 * Thread start routine that uses read-write locks
563 void *thread_routine (void *arg)
565 thread_t *self = (thread_t*)arg;
570 element = 0; /* Current data element */
572 for (iteration = 0; iteration < ITERATIONS; iteration++) {
573 if ((iteration % self->interval) == 0) {
574 status = rwl_writetrylock (&data[element].lock);
576 self->w_collisions++;
577 else if (status == 0) {
578 data[element].data++;
579 data[element].updates++;
581 rwl_writeunlock (&data[element].lock);
583 err_abort (status, _("Try write lock"));
585 status = rwl_readtrylock (&data[element].lock);
587 self->r_collisions++;
588 else if (status != 0) {
589 err_abort (status, _("Try read lock"));
591 if (data[element].data != data[element].updates)
592 printf ("%d: data[%d] %d != %d\n",
593 self->thread_num, element,
594 data[element].data, data[element].updates);
595 rwl_readunlock (&data[element].lock);
600 if (element >= DATASIZE)
603 lmgr_cleanup_thread();
607 int main (int argc, char *argv[])
609 int count, data_count;
610 unsigned int seed = 1;
611 int thread_updates = 0, data_updates = 0;
615 * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
616 * Ensure our threads can run concurrently.
618 DPRINTF (("Setting concurrency level to %d\n", THREADS));
619 thr_setconcurrency(THREADS); /* Only implemented on Solaris */
622 * Initialize the shared data.
624 for (data_count = 0; data_count < DATASIZE; data_count++) {
625 data[data_count].data = 0;
626 data[data_count].updates = 0;
627 rwl_init(&data[data_count].lock);
631 * Create THREADS threads to access shared data.
633 for (count = 0; count < THREADS; count++) {
634 threads[count].thread_num = count;
635 threads[count].r_collisions = 0;
636 threads[count].w_collisions = 0;
637 threads[count].updates = 0;
638 threads[count].interval = rand_r (&seed) % ITERATIONS;
639 status = pthread_create (&threads[count].thread_id,
640 NULL, thread_routine, (void*)&threads[count]);
642 err_abort (status, _("Create thread"));
646 * Wait for all threads to complete, and collect
649 for (count = 0; count < THREADS; count++) {
650 status = pthread_join (threads[count].thread_id, NULL);
652 err_abort (status, _("Join thread"));
653 thread_updates += threads[count].updates;
654 printf (_("%02d: interval %d, updates %d, "
655 "r_collisions %d, w_collisions %d\n"),
656 count, threads[count].interval,
657 threads[count].updates,
658 threads[count].r_collisions, threads[count].w_collisions);
662 * Collect statistics for the data.
664 for (data_count = 0; data_count < DATASIZE; data_count++) {
665 data_updates += data[data_count].updates;
666 printf (_("data %02d: value %d, %d updates\n"),
667 data_count, data[data_count].data, data[data_count].updates);
668 rwl_destroy (&data[data_count].lock);