2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2016 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Bacula Thread Read/Write locking code. It permits
21 * multiple readers but only one writer. Note, however,
22 * that the writer thread is permitted to make multiple
23 * nested write lock calls.
25 * Kern Sibbald, January MMI
27 * This code adapted from "Programming with POSIX Threads", by
32 #define LOCKMGR_COMPLIANT
37 * Initialize a read/write lock
39 * Returns: 0 on success
43 devlock *new_devlock()
46 lock = (devlock *)malloc(sizeof (devlock));
47 memset(lock, 0, sizeof(devlock));
51 int devlock::init(int init_priority)
56 rwl->r_active = rwl->w_active = 0;
57 rwl->r_wait = rwl->w_wait = 0;
58 rwl->priority = init_priority;
59 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
62 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
63 pthread_mutex_destroy(&rwl->mutex);
66 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
67 pthread_cond_destroy(&rwl->read);
68 pthread_mutex_destroy(&rwl->mutex);
71 rwl->valid = DEVLOCK_VALID;
76 * Destroy a read/write lock
78 * Returns: 0 on success
81 int devlock::destroy()
84 int stat, stat1, stat2;
86 if (rwl->valid != DEVLOCK_VALID) {
89 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
94 * If any threads are active, report EBUSY
96 if (rwl->r_active > 0 || rwl->w_active) {
97 pthread_mutex_unlock(&rwl->mutex);
102 * If any threads are waiting, report EBUSY
104 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
105 pthread_mutex_unlock(&rwl->mutex);
110 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
113 stat = pthread_mutex_destroy(&rwl->mutex);
114 stat1 = pthread_cond_destroy(&rwl->read);
115 stat2 = pthread_cond_destroy(&rwl->write);
116 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
120 * Handle cleanup when the read lock condition variable
123 static void devlock_read_release(void *arg)
125 devlock *rwl = (devlock *)arg;
129 void devlock::read_release()
132 pthread_mutex_unlock(&mutex);
136 * Handle cleanup when the write lock condition variable wait
139 static void devlock_write_release(void *arg)
141 devlock *rwl = (devlock *)arg;
142 rwl->write_release();
145 void devlock::write_release()
148 pthread_mutex_unlock(&mutex);
152 * Lock for read access, wait until locked (or error).
154 int devlock::readlock()
159 if (rwl->valid != DEVLOCK_VALID) {
162 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
166 rwl->r_wait++; /* indicate that we are waiting */
167 pthread_cleanup_push(devlock_read_release, (void *)rwl);
168 while (rwl->w_active) {
169 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
171 break; /* error, bail out */
174 pthread_cleanup_pop(0);
175 rwl->r_wait--; /* we are no longer waiting */
178 rwl->r_active++; /* we are running */
180 pthread_mutex_unlock(&rwl->mutex);
185 * Attempt to lock for read access, don't wait
187 int devlock::readtrylock()
192 if (rwl->valid != DEVLOCK_VALID) {
195 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
201 rwl->r_active++; /* we are running */
203 stat2 = pthread_mutex_unlock(&rwl->mutex);
204 return (stat == 0 ? stat2 : stat);
210 int devlock::readunlock()
215 if (rwl->valid != DEVLOCK_VALID) {
218 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
222 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
223 stat = pthread_cond_broadcast(&rwl->write);
225 stat2 = pthread_mutex_unlock(&rwl->mutex);
226 return (stat == 0 ? stat2 : stat);
231 * Lock for write access, wait until locked (or error).
232 * Multiple nested write locking is permitted.
234 int devlock::writelock(int areason, bool acan_take)
239 if (rwl->valid != DEVLOCK_VALID) {
242 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
245 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
247 pthread_mutex_unlock(&rwl->mutex);
250 lmgr_pre_lock(rwl, rwl->priority, __FILE__, __LINE__);
251 if (rwl->w_active || rwl->r_active > 0) {
252 rwl->w_wait++; /* indicate that we are waiting */
253 pthread_cleanup_push(devlock_write_release, (void *)rwl);
254 while (rwl->w_active || rwl->r_active > 0) {
255 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
257 break; /* error, bail out */
260 pthread_cleanup_pop(0);
261 rwl->w_wait--; /* we are no longer waiting */
264 rwl->w_active++; /* we are running */
265 rwl->writer_id = pthread_self(); /* save writer thread's id */
268 rwl->reason = areason;
269 rwl->can_take = acan_take;
270 pthread_mutex_unlock(&rwl->mutex);
275 * Attempt to lock for write access, don't wait
277 int devlock::writetrylock()
282 if (rwl->valid != DEVLOCK_VALID) {
285 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
288 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
290 pthread_mutex_unlock(&rwl->mutex);
293 if (rwl->w_active || rwl->r_active > 0) {
296 rwl->w_active = 1; /* we are running */
297 rwl->writer_id = pthread_self(); /* save writer thread's id */
298 lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
300 stat2 = pthread_mutex_unlock(&rwl->mutex);
301 return (stat == 0 ? stat2 : stat);
306 * Start any waiting writers in preference to waiting readers
308 int devlock::writeunlock()
313 if (rwl->valid != DEVLOCK_VALID) {
316 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
319 if (rwl->w_active <= 0) {
320 pthread_mutex_unlock(&rwl->mutex);
321 Jmsg0(NULL, M_ABORT, 0, _("writeunlock called too many times.\n"));
324 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
325 pthread_mutex_unlock(&rwl->mutex);
326 Jmsg0(NULL, M_ABORT, 0, _("writeunlock by non-owner.\n"));
328 if (rwl->w_active > 0) {
329 stat = 0; /* writers still active */
332 /* No more writers, awaken someone */
333 if (rwl->r_wait > 0) { /* if readers waiting */
334 stat = pthread_cond_broadcast(&rwl->read);
335 } else if (rwl->w_wait > 0) {
336 stat = pthread_cond_broadcast(&rwl->write);
339 stat2 = pthread_mutex_unlock(&rwl->mutex);
340 return (stat == 0 ? stat2 : stat);
343 int devlock::take_lock(take_lock_t *hold, int areason)
347 if (valid != DEVLOCK_VALID) {
350 if ((stat = pthread_mutex_lock(&mutex)) != 0) {
353 hold->reason = reason;
354 hold->prev_reason = prev_reason;
355 hold->writer_id = writer_id;
357 writer_id = pthread_self();
358 stat = pthread_mutex_unlock(&mutex);
362 int devlock::return_lock(take_lock_t *hold)
366 if (valid != DEVLOCK_VALID) {
369 if ((stat = pthread_mutex_lock(&mutex)) != 0) {
372 reason = hold->reason;
373 prev_reason = hold->prev_reason;
374 writer_id = hold->writer_id;
375 writer_id = pthread_self();
376 stat2 = pthread_mutex_unlock(&mutex);
377 if (w_active || w_wait) {
378 stat = pthread_cond_broadcast(&write);
380 return (stat == 0 ? stat2 : stat);
388 #define ITERATIONS 1000000
391 * Keep statics for each thread.
393 typedef struct thread_tag {
402 * Read/write lock and shared data.
404 typedef struct data_tag {
410 static thread_t threads[THREADS];
411 static data_t data[DATASIZE];
414 * Thread start routine that uses read/write locks.
416 void *thread_routine(void *arg)
418 thread_t *self = (thread_t *)arg;
424 for (iteration=0; iteration < ITERATIONS; iteration++) {
426 * Each "self->interval" iterations, perform an
427 * update operation (write lock instead of read
430 // if ((iteration % self->interval) == 0) {
431 status = writelock(&data[element].lock);
434 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
437 data[element].data = self->thread_num;
438 data[element].writes++;
440 status = writelock(&data[element].lock);
443 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
446 data[element].data = self->thread_num;
447 data[element].writes++;
449 status = writeunlock(&data[element].lock);
452 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
455 status = writeunlock(&data[element].lock);
458 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
465 * Look at the current data element to see whether
466 * the current thread last updated it. Count the
467 * times to report later.
469 status = readlock(&data[element].lock);
472 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
476 if (data[element].data == self->thread_num)
478 status = readunlock(&data[element].lock);
481 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
487 if (element >= DATASIZE) {
492 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
493 self->thread_num, repeats);
498 int main (int argc, char *argv[])
503 unsigned int seed = 1;
504 int thread_writes = 0;
508 * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
509 * To ensure our threads can run concurrently, we specifically
510 * set the concurrency level to THREADS.
512 thr_setconcurrency(THREADS); /* Turned on only for Solaris */
515 * Initialize the shared data.
517 for (data_count = 0; data_count < DATASIZE; data_count++) {
518 data[data_count].data = 0;
519 data[data_count].writes = 0;
520 status = rwl_init(&data[data_count].lock);
523 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
529 * Create THREADS threads to access shared data.
531 for (count = 0; count < THREADS; count++) {
532 threads[count].thread_num = count + 1;
533 threads[count].writes = 0;
534 threads[count].reads = 0;
535 threads[count].interval = rand_r(&seed) % 71;
536 if (threads[count].interval <= 0) {
537 threads[count].interval = 1;
539 status = pthread_create (&threads[count].thread_id,
540 NULL, thread_routine, (void*)&threads[count]);
541 if (status != 0 || (int)threads[count].thread_id == 0) {
543 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
549 * Wait for all threads to complete, and collect
552 for (count = 0; count < THREADS; count++) {
553 status = pthread_join (threads[count].thread_id, NULL);
556 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
559 thread_writes += threads[count].writes;
560 printf (_("%02d: interval %d, writes %d, reads %d\n"),
561 count, threads[count].interval,
562 threads[count].writes, threads[count].reads);
566 * Collect statistics for the data.
568 for (data_count = 0; data_count < DATASIZE; data_count++) {
569 data_writes += data[data_count].writes;
570 printf (_("data %02d: value %d, %d writes\n"),
571 data_count, data[data_count].data, data[data_count].writes);
572 rwl_destroy (&data[data_count].lock);
575 printf (_("Total: %d thread writes, %d data writes\n"),
576 thread_writes, data_writes);
582 #ifdef TEST_RW_TRY_LOCK
586 * Demonstrate use of non-blocking read-write locks.
588 * On older Solaris system, call thr_setconcurrency()
589 * to allow interleaved thread execution, since threads are not
597 #define ITERATIONS 1000
601 * Keep statistics for each thread.
603 typedef struct thread_tag {
613 * Read-write lock and shared data
615 typedef struct data_tag {
621 thread_t threads[THREADS];
622 data_t data[DATASIZE];
625 * Thread start routine that uses read-write locks
627 void *thread_routine (void *arg)
629 thread_t *self = (thread_t*)arg;
634 element = 0; /* Current data element */
636 for (iteration = 0; iteration < ITERATIONS; iteration++) {
637 if ((iteration % self->interval) == 0) {
638 status = rwl_writetrylock (&data[element].lock);
640 self->w_collisions++;
641 else if (status == 0) {
642 data[element].data++;
643 data[element].updates++;
645 rwl_writeunlock (&data[element].lock);
647 err_abort (status, _("Try write lock"));
649 status = rwl_readtrylock (&data[element].lock);
651 self->r_collisions++;
652 else if (status != 0) {
653 err_abort (status, _("Try read lock"));
655 if (data[element].data != data[element].updates)
656 printf ("%d: data[%d] %d != %d\n",
657 self->thread_num, element,
658 data[element].data, data[element].updates);
659 rwl_readunlock (&data[element].lock);
664 if (element >= DATASIZE)
667 lmgr_cleanup_thread();
671 int main (int argc, char *argv[])
673 int count, data_count;
674 unsigned int seed = 1;
675 int thread_updates = 0, data_updates = 0;
679 * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
680 * To ensure our threads can run concurrently, we specifically
681 * set the concurrency level to THREADS.
683 DPRINTF (("Setting concurrency level to %d\n", THREADS));
684 thr_setconcurrency(THREADS); /* Turned on only for Solaris */
687 * Initialize the shared data.
689 for (data_count = 0; data_count < DATASIZE; data_count++) {
690 data[data_count].data = 0;
691 data[data_count].updates = 0;
692 rwl_init(&data[data_count].lock);
696 * Create THREADS threads to access shared data.
698 for (count = 0; count < THREADS; count++) {
699 threads[count].thread_num = count;
700 threads[count].r_collisions = 0;
701 threads[count].w_collisions = 0;
702 threads[count].updates = 0;
703 threads[count].interval = rand_r (&seed) % ITERATIONS;
704 status = pthread_create (&threads[count].thread_id,
705 NULL, thread_routine, (void*)&threads[count]);
707 err_abort (status, _("Create thread"));
711 * Wait for all threads to complete, and collect
714 for (count = 0; count < THREADS; count++) {
715 status = pthread_join (threads[count].thread_id, NULL);
717 err_abort (status, _("Join thread"));
718 thread_updates += threads[count].updates;
719 printf (_("%02d: interval %d, updates %d, "
720 "r_collisions %d, w_collisions %d\n"),
721 count, threads[count].interval,
722 threads[count].updates,
723 threads[count].r_collisions, threads[count].w_collisions);
727 * Collect statistics for the data.
729 for (data_count = 0; data_count < DATASIZE; data_count++) {
730 data_updates += data[data_count].updates;
731 printf (_("data %02d: value %d, %d updates\n"),
732 data_count, data[data_count].data, data[data_count].updates);
733 rwl_destroy (&data[data_count].lock);