2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2015 Kern Sibbald
5 Copyright (C) 2001-2014 Free Software Foundation Europe e.V.
7 The original author of Bacula is Kern Sibbald, with contributions
8 from many others, a complete list can be found in the file AUTHORS.
10 You may use this file and others of this release according to the
11 license defined in the LICENSE file, which includes the Affero General
12 Public License, v3.0 ("AGPLv3") and some additional permissions and
13 terms pursuant to its AGPLv3 Section 7.
15 This notice must be preserved when any source code is
16 conveyed and/or propagated.
18 Bacula(R) is a registered trademark of Kern Sibbald.
21 * Bacula Thread Read/Write locking code. It permits
22 * multiple readers but only one writer. Note, however,
23 * that the writer thread is permitted to make multiple
24 * nested write lock calls.
26 * Kern Sibbald, January MMI
28 * This code adapted from "Programming with POSIX Threads", by
33 #define LOCKMGR_COMPLIANT
38 * Initialize a read/write lock
40 * Returns: 0 on success
44 devlock *new_devlock()
47 lock = (devlock *)malloc(sizeof (devlock));
48 memset(lock, 0, sizeof(devlock));
52 int devlock::init(int init_priority)
57 rwl->r_active = rwl->w_active = 0;
58 rwl->r_wait = rwl->w_wait = 0;
59 rwl->priority = init_priority;
60 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
63 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
64 pthread_mutex_destroy(&rwl->mutex);
67 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
68 pthread_cond_destroy(&rwl->read);
69 pthread_mutex_destroy(&rwl->mutex);
72 rwl->valid = DEVLOCK_VALID;
77 * Destroy a read/write lock
79 * Returns: 0 on success
82 int devlock::destroy()
85 int stat, stat1, stat2;
87 if (rwl->valid != DEVLOCK_VALID) {
90 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
95 * If any threads are active, report EBUSY
97 if (rwl->r_active > 0 || rwl->w_active) {
98 pthread_mutex_unlock(&rwl->mutex);
103 * If any threads are waiting, report EBUSY
105 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
106 pthread_mutex_unlock(&rwl->mutex);
111 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
114 stat = pthread_mutex_destroy(&rwl->mutex);
115 stat1 = pthread_cond_destroy(&rwl->read);
116 stat2 = pthread_cond_destroy(&rwl->write);
117 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
121 * Handle cleanup when the read lock condition variable
124 static void devlock_read_release(void *arg)
126 devlock *rwl = (devlock *)arg;
130 void devlock::read_release()
133 pthread_mutex_unlock(&mutex);
137 * Handle cleanup when the write lock condition variable wait
140 static void devlock_write_release(void *arg)
142 devlock *rwl = (devlock *)arg;
143 rwl->write_release();
146 void devlock::write_release()
149 pthread_mutex_unlock(&mutex);
153 * Lock for read access, wait until locked (or error).
155 int devlock::readlock()
160 if (rwl->valid != DEVLOCK_VALID) {
163 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
167 rwl->r_wait++; /* indicate that we are waiting */
168 pthread_cleanup_push(devlock_read_release, (void *)rwl);
169 while (rwl->w_active) {
170 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
172 break; /* error, bail out */
175 pthread_cleanup_pop(0);
176 rwl->r_wait--; /* we are no longer waiting */
179 rwl->r_active++; /* we are running */
181 pthread_mutex_unlock(&rwl->mutex);
186 * Attempt to lock for read access, don't wait
188 int devlock::readtrylock()
193 if (rwl->valid != DEVLOCK_VALID) {
196 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
202 rwl->r_active++; /* we are running */
204 stat2 = pthread_mutex_unlock(&rwl->mutex);
205 return (stat == 0 ? stat2 : stat);
211 int devlock::readunlock()
216 if (rwl->valid != DEVLOCK_VALID) {
219 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
223 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
224 stat = pthread_cond_broadcast(&rwl->write);
226 stat2 = pthread_mutex_unlock(&rwl->mutex);
227 return (stat == 0 ? stat2 : stat);
232 * Lock for write access, wait until locked (or error).
233 * Multiple nested write locking is permitted.
235 int devlock::writelock(int areason, bool acan_take)
240 if (rwl->valid != DEVLOCK_VALID) {
243 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
246 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
248 pthread_mutex_unlock(&rwl->mutex);
251 lmgr_pre_lock(rwl, rwl->priority, __FILE__, __LINE__);
252 if (rwl->w_active || rwl->r_active > 0) {
253 rwl->w_wait++; /* indicate that we are waiting */
254 pthread_cleanup_push(devlock_write_release, (void *)rwl);
255 while (rwl->w_active || rwl->r_active > 0) {
256 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
258 break; /* error, bail out */
261 pthread_cleanup_pop(0);
262 rwl->w_wait--; /* we are no longer waiting */
265 rwl->w_active++; /* we are running */
266 rwl->writer_id = pthread_self(); /* save writer thread's id */
269 rwl->reason = areason;
270 rwl->can_take = acan_take;
271 pthread_mutex_unlock(&rwl->mutex);
276 * Attempt to lock for write access, don't wait
278 int devlock::writetrylock()
283 if (rwl->valid != DEVLOCK_VALID) {
286 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
289 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
291 pthread_mutex_unlock(&rwl->mutex);
294 if (rwl->w_active || rwl->r_active > 0) {
297 rwl->w_active = 1; /* we are running */
298 rwl->writer_id = pthread_self(); /* save writer thread's id */
299 lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
301 stat2 = pthread_mutex_unlock(&rwl->mutex);
302 return (stat == 0 ? stat2 : stat);
307 * Start any waiting writers in preference to waiting readers
309 int devlock::writeunlock()
314 if (rwl->valid != DEVLOCK_VALID) {
317 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
320 if (rwl->w_active <= 0) {
321 pthread_mutex_unlock(&rwl->mutex);
322 Jmsg0(NULL, M_ABORT, 0, _("writeunlock called too many times.\n"));
325 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
326 pthread_mutex_unlock(&rwl->mutex);
327 Jmsg0(NULL, M_ABORT, 0, _("writeunlock by non-owner.\n"));
329 if (rwl->w_active > 0) {
330 stat = 0; /* writers still active */
333 /* No more writers, awaken someone */
334 if (rwl->r_wait > 0) { /* if readers waiting */
335 stat = pthread_cond_broadcast(&rwl->read);
336 } else if (rwl->w_wait > 0) {
337 stat = pthread_cond_broadcast(&rwl->write);
340 stat2 = pthread_mutex_unlock(&rwl->mutex);
341 return (stat == 0 ? stat2 : stat);
344 int devlock::take_lock(take_lock_t *hold, int areason)
348 if (valid != DEVLOCK_VALID) {
351 if ((stat = pthread_mutex_lock(&mutex)) != 0) {
354 hold->reason = reason;
355 hold->prev_reason = prev_reason;
356 hold->writer_id = writer_id;
358 writer_id = pthread_self();
359 stat = pthread_mutex_unlock(&mutex);
363 int devlock::return_lock(take_lock_t *hold)
367 if (valid != DEVLOCK_VALID) {
370 if ((stat = pthread_mutex_lock(&mutex)) != 0) {
373 reason = hold->reason;
374 prev_reason = hold->prev_reason;
375 writer_id = hold->writer_id;
376 writer_id = pthread_self();
377 stat2 = pthread_mutex_unlock(&mutex);
378 if (w_active || w_wait) {
379 stat = pthread_cond_broadcast(&write);
381 return (stat == 0 ? stat2 : stat);
389 #define ITERATIONS 1000000
392 * Keep statics for each thread.
394 typedef struct thread_tag {
403 * Read/write lock and shared data.
405 typedef struct data_tag {
411 static thread_t threads[THREADS];
412 static data_t data[DATASIZE];
415 * Thread start routine that uses read/write locks.
417 void *thread_routine(void *arg)
419 thread_t *self = (thread_t *)arg;
425 for (iteration=0; iteration < ITERATIONS; iteration++) {
427 * Each "self->interval" iterations, perform an
428 * update operation (write lock instead of read
431 // if ((iteration % self->interval) == 0) {
432 status = writelock(&data[element].lock);
435 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
438 data[element].data = self->thread_num;
439 data[element].writes++;
441 status = writelock(&data[element].lock);
444 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
447 data[element].data = self->thread_num;
448 data[element].writes++;
450 status = writeunlock(&data[element].lock);
453 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
456 status = writeunlock(&data[element].lock);
459 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
466 * Look at the current data element to see whether
467 * the current thread last updated it. Count the
468 * times to report later.
470 status = readlock(&data[element].lock);
473 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
477 if (data[element].data == self->thread_num)
479 status = readunlock(&data[element].lock);
482 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
488 if (element >= DATASIZE) {
493 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
494 self->thread_num, repeats);
499 int main (int argc, char *argv[])
504 unsigned int seed = 1;
505 int thread_writes = 0;
509 * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
510 * To ensure our threads can run concurrently, we specifically
511 * set the concurrency level to THREADS.
513 thr_setconcurrency(THREADS); /* Turned on only for Solaris */
516 * Initialize the shared data.
518 for (data_count = 0; data_count < DATASIZE; data_count++) {
519 data[data_count].data = 0;
520 data[data_count].writes = 0;
521 status = rwl_init(&data[data_count].lock);
524 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
530 * Create THREADS threads to access shared data.
532 for (count = 0; count < THREADS; count++) {
533 threads[count].thread_num = count + 1;
534 threads[count].writes = 0;
535 threads[count].reads = 0;
536 threads[count].interval = rand_r(&seed) % 71;
537 if (threads[count].interval <= 0) {
538 threads[count].interval = 1;
540 status = pthread_create (&threads[count].thread_id,
541 NULL, thread_routine, (void*)&threads[count]);
542 if (status != 0 || (int)threads[count].thread_id == 0) {
544 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
550 * Wait for all threads to complete, and collect
553 for (count = 0; count < THREADS; count++) {
554 status = pthread_join (threads[count].thread_id, NULL);
557 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
560 thread_writes += threads[count].writes;
561 printf (_("%02d: interval %d, writes %d, reads %d\n"),
562 count, threads[count].interval,
563 threads[count].writes, threads[count].reads);
567 * Collect statistics for the data.
569 for (data_count = 0; data_count < DATASIZE; data_count++) {
570 data_writes += data[data_count].writes;
571 printf (_("data %02d: value %d, %d writes\n"),
572 data_count, data[data_count].data, data[data_count].writes);
573 rwl_destroy (&data[data_count].lock);
576 printf (_("Total: %d thread writes, %d data writes\n"),
577 thread_writes, data_writes);
583 #ifdef TEST_RW_TRY_LOCK
587 * Demonstrate use of non-blocking read-write locks.
589 * On older Solaris system, call thr_setconcurrency()
590 * to allow interleaved thread execution, since threads are not
598 #define ITERATIONS 1000
602 * Keep statistics for each thread.
604 typedef struct thread_tag {
614 * Read-write lock and shared data
616 typedef struct data_tag {
622 thread_t threads[THREADS];
623 data_t data[DATASIZE];
626 * Thread start routine that uses read-write locks
628 void *thread_routine (void *arg)
630 thread_t *self = (thread_t*)arg;
635 element = 0; /* Current data element */
637 for (iteration = 0; iteration < ITERATIONS; iteration++) {
638 if ((iteration % self->interval) == 0) {
639 status = rwl_writetrylock (&data[element].lock);
641 self->w_collisions++;
642 else if (status == 0) {
643 data[element].data++;
644 data[element].updates++;
646 rwl_writeunlock (&data[element].lock);
648 err_abort (status, _("Try write lock"));
650 status = rwl_readtrylock (&data[element].lock);
652 self->r_collisions++;
653 else if (status != 0) {
654 err_abort (status, _("Try read lock"));
656 if (data[element].data != data[element].updates)
657 printf ("%d: data[%d] %d != %d\n",
658 self->thread_num, element,
659 data[element].data, data[element].updates);
660 rwl_readunlock (&data[element].lock);
665 if (element >= DATASIZE)
668 lmgr_cleanup_thread();
672 int main (int argc, char *argv[])
674 int count, data_count;
675 unsigned int seed = 1;
676 int thread_updates = 0, data_updates = 0;
680 * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
681 * To ensure our threads can run concurrently, we specifically
682 * set the concurrency level to THREADS.
684 DPRINTF (("Setting concurrency level to %d\n", THREADS));
685 thr_setconcurrency(THREADS); /* Turned on only for Solaris */
688 * Initialize the shared data.
690 for (data_count = 0; data_count < DATASIZE; data_count++) {
691 data[data_count].data = 0;
692 data[data_count].updates = 0;
693 rwl_init(&data[data_count].lock);
697 * Create THREADS threads to access shared data.
699 for (count = 0; count < THREADS; count++) {
700 threads[count].thread_num = count;
701 threads[count].r_collisions = 0;
702 threads[count].w_collisions = 0;
703 threads[count].updates = 0;
704 threads[count].interval = rand_r (&seed) % ITERATIONS;
705 status = pthread_create (&threads[count].thread_id,
706 NULL, thread_routine, (void*)&threads[count]);
708 err_abort (status, _("Create thread"));
712 * Wait for all threads to complete, and collect
715 for (count = 0; count < THREADS; count++) {
716 status = pthread_join (threads[count].thread_id, NULL);
718 err_abort (status, _("Join thread"));
719 thread_updates += threads[count].updates;
720 printf (_("%02d: interval %d, updates %d, "
721 "r_collisions %d, w_collisions %d\n"),
722 count, threads[count].interval,
723 threads[count].updates,
724 threads[count].r_collisions, threads[count].w_collisions);
728 * Collect statistics for the data.
730 for (data_count = 0; data_count < DATASIZE; data_count++) {
731 data_updates += data[data_count].updates;
732 printf (_("data %02d: value %d, %d updates\n"),
733 data_count, data[data_count].data, data[data_count].updates);
734 rwl_destroy (&data[data_count].lock);