2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Thread Read/Write locking code. It permits
30 * multiple readers but only one writer. Note, however,
31 * that the writer thread is permitted to make multiple
32 * nested write lock calls.
34 * Kern Sibbald, January MMI
36 * This code adapted from "Programming with POSIX Threads", by
41 #define _LOCKMGR_COMPLIANT
45 * Initialize a read/write lock
47 * Returns: 0 on success
50 int rwl_init(brwlock_t *rwl, int priority)
54 rwl->r_active = rwl->w_active = 0;
55 rwl->r_wait = rwl->w_wait = 0;
56 rwl->priority = priority;
57 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
60 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
61 pthread_mutex_destroy(&rwl->mutex);
64 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
65 pthread_cond_destroy(&rwl->read);
66 pthread_mutex_destroy(&rwl->mutex);
69 rwl->valid = RWLOCK_VALID;
74 * Destroy a read/write lock
76 * Returns: 0 on success
79 int rwl_destroy(brwlock_t *rwl)
81 int stat, stat1, stat2;
83 if (rwl->valid != RWLOCK_VALID) {
86 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
91 * If any threads are active, report EBUSY
93 if (rwl->r_active > 0 || rwl->w_active) {
94 pthread_mutex_unlock(&rwl->mutex);
99 * If any threads are waiting, report EBUSY
101 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102 pthread_mutex_unlock(&rwl->mutex);
107 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
110 stat = pthread_mutex_destroy(&rwl->mutex);
111 stat1 = pthread_cond_destroy(&rwl->read);
112 stat2 = pthread_cond_destroy(&rwl->write);
113 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
116 bool rwl_is_init(brwlock_t *rwl)
118 return (rwl->valid == RWLOCK_VALID);
122 * Handle cleanup when the read lock condition variable
125 static void rwl_read_release(void *arg)
127 brwlock_t *rwl = (brwlock_t *)arg;
130 pthread_mutex_unlock(&rwl->mutex);
134 * Handle cleanup when the write lock condition variable wait
137 static void rwl_write_release(void *arg)
139 brwlock_t *rwl = (brwlock_t *)arg;
142 pthread_mutex_unlock(&rwl->mutex);
146 * Lock for read access, wait until locked (or error).
148 int rwl_readlock(brwlock_t *rwl)
152 if (rwl->valid != RWLOCK_VALID) {
155 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
159 rwl->r_wait++; /* indicate that we are waiting */
160 pthread_cleanup_push(rwl_read_release, (void *)rwl);
161 while (rwl->w_active) {
162 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
164 break; /* error, bail out */
167 pthread_cleanup_pop(0);
168 rwl->r_wait--; /* we are no longer waiting */
171 rwl->r_active++; /* we are running */
173 pthread_mutex_unlock(&rwl->mutex);
178 * Attempt to lock for read access, don't wait
180 int rwl_readtrylock(brwlock_t *rwl)
184 if (rwl->valid != RWLOCK_VALID) {
187 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
193 rwl->r_active++; /* we are running */
195 stat2 = pthread_mutex_unlock(&rwl->mutex);
196 return (stat == 0 ? stat2 : stat);
202 int rwl_readunlock(brwlock_t *rwl)
206 if (rwl->valid != RWLOCK_VALID) {
209 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
213 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
214 stat = pthread_cond_broadcast(&rwl->write);
216 stat2 = pthread_mutex_unlock(&rwl->mutex);
217 return (stat == 0 ? stat2 : stat);
222 * Lock for write access, wait until locked (or error).
223 * Multiple nested write locking is permitted.
225 int rwl_writelock_p(brwlock_t *rwl, const char *file, int line)
229 if (rwl->valid != RWLOCK_VALID) {
232 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
235 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
237 pthread_mutex_unlock(&rwl->mutex);
240 lmgr_pre_lock(rwl, rwl->priority, file, line);
241 if (rwl->w_active || rwl->r_active > 0) {
242 rwl->w_wait++; /* indicate that we are waiting */
243 pthread_cleanup_push(rwl_write_release, (void *)rwl);
244 while (rwl->w_active || rwl->r_active > 0) {
245 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
247 break; /* error, bail out */
250 pthread_cleanup_pop(0);
251 rwl->w_wait--; /* we are no longer waiting */
254 rwl->w_active++; /* we are running */
255 rwl->writer_id = pthread_self(); /* save writer thread's id */
258 pthread_mutex_unlock(&rwl->mutex);
263 * Attempt to lock for write access, don't wait
265 int rwl_writetrylock(brwlock_t *rwl)
269 if (rwl->valid != RWLOCK_VALID) {
272 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
275 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
277 pthread_mutex_unlock(&rwl->mutex);
280 if (rwl->w_active || rwl->r_active > 0) {
283 rwl->w_active = 1; /* we are running */
284 rwl->writer_id = pthread_self(); /* save writer thread's id */
285 lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
287 stat2 = pthread_mutex_unlock(&rwl->mutex);
288 return (stat == 0 ? stat2 : stat);
293 * Start any waiting writers in preference to waiting readers
295 int rwl_writeunlock(brwlock_t *rwl)
299 if (rwl->valid != RWLOCK_VALID) {
302 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
305 if (rwl->w_active <= 0) {
306 pthread_mutex_unlock(&rwl->mutex);
307 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
310 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
311 pthread_mutex_unlock(&rwl->mutex);
312 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
314 if (rwl->w_active > 0) {
315 stat = 0; /* writers still active */
318 /* No more writers, awaken someone */
319 if (rwl->r_wait > 0) { /* if readers waiting */
320 stat = pthread_cond_broadcast(&rwl->read);
321 } else if (rwl->w_wait > 0) {
322 stat = pthread_cond_broadcast(&rwl->write);
325 stat2 = pthread_mutex_unlock(&rwl->mutex);
326 return (stat == 0 ? stat2 : stat);
333 #define ITERATIONS 1000000
336 * Keep statics for each thread.
338 typedef struct thread_tag {
347 * Read/write lock and shared data.
349 typedef struct data_tag {
355 static thread_t threads[THREADS];
356 static data_t data[DATASIZE];
359 * Thread start routine that uses read/write locks.
361 void *thread_routine(void *arg)
363 thread_t *self = (thread_t *)arg;
369 for (iteration=0; iteration < ITERATIONS; iteration++) {
371 * Each "self->interval" iterations, perform an
372 * update operation (write lock instead of read
375 // if ((iteration % self->interval) == 0) {
376 status = rwl_writelock(&data[element].lock);
379 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
382 data[element].data = self->thread_num;
383 data[element].writes++;
385 status = rwl_writelock(&data[element].lock);
388 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
391 data[element].data = self->thread_num;
392 data[element].writes++;
394 status = rwl_writeunlock(&data[element].lock);
397 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
400 status = rwl_writeunlock(&data[element].lock);
403 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
410 * Look at the current data element to see whether
411 * the current thread last updated it. Count the
412 * times to report later.
414 status = rwl_readlock(&data[element].lock);
417 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
421 if (data[element].data == self->thread_num)
423 status = rwl_readunlock(&data[element].lock);
426 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
432 if (element >= DATASIZE) {
437 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
438 self->thread_num, repeats);
443 int main (int argc, char *argv[])
448 unsigned int seed = 1;
449 int thread_writes = 0;
452 #ifdef USE_THR_SETCONCURRENCY
454 * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
455 * that our threads can run concurrently, we need to
456 * increase the concurrency level to THREADS.
458 thr_setconcurrency (THREADS);
462 * Initialize the shared data.
464 for (data_count = 0; data_count < DATASIZE; data_count++) {
465 data[data_count].data = 0;
466 data[data_count].writes = 0;
467 status = rwl_init(&data[data_count].lock);
470 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
476 * Create THREADS threads to access shared data.
478 for (count = 0; count < THREADS; count++) {
479 threads[count].thread_num = count + 1;
480 threads[count].writes = 0;
481 threads[count].reads = 0;
482 threads[count].interval = rand_r(&seed) % 71;
483 if (threads[count].interval <= 0) {
484 threads[count].interval = 1;
486 status = pthread_create (&threads[count].thread_id,
487 NULL, thread_routine, (void*)&threads[count]);
488 if (status != 0 || (int)threads[count].thread_id == 0) {
490 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
496 * Wait for all threads to complete, and collect
499 for (count = 0; count < THREADS; count++) {
500 status = pthread_join (threads[count].thread_id, NULL);
503 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
506 thread_writes += threads[count].writes;
507 printf (_("%02d: interval %d, writes %d, reads %d\n"),
508 count, threads[count].interval,
509 threads[count].writes, threads[count].reads);
513 * Collect statistics for the data.
515 for (data_count = 0; data_count < DATASIZE; data_count++) {
516 data_writes += data[data_count].writes;
517 printf (_("data %02d: value %d, %d writes\n"),
518 data_count, data[data_count].data, data[data_count].writes);
519 rwl_destroy (&data[data_count].lock);
522 printf (_("Total: %d thread writes, %d data writes\n"),
523 thread_writes, data_writes);
529 #ifdef TEST_RW_TRY_LOCK
533 * Demonstrate use of non-blocking read-write locks.
535 * Special notes: On older Solaris system, call thr_setconcurrency()
536 * to allow interleaved thread execution, since threads are not
544 #define ITERATIONS 1000
548 * Keep statistics for each thread.
550 typedef struct thread_tag {
560 * Read-write lock and shared data
562 typedef struct data_tag {
568 thread_t threads[THREADS];
569 data_t data[DATASIZE];
572 * Thread start routine that uses read-write locks
574 void *thread_routine (void *arg)
576 thread_t *self = (thread_t*)arg;
581 element = 0; /* Current data element */
583 for (iteration = 0; iteration < ITERATIONS; iteration++) {
584 if ((iteration % self->interval) == 0) {
585 status = rwl_writetrylock (&data[element].lock);
587 self->w_collisions++;
588 else if (status == 0) {
589 data[element].data++;
590 data[element].updates++;
592 rwl_writeunlock (&data[element].lock);
594 err_abort (status, _("Try write lock"));
596 status = rwl_readtrylock (&data[element].lock);
598 self->r_collisions++;
599 else if (status != 0) {
600 err_abort (status, _("Try read lock"));
602 if (data[element].data != data[element].updates)
603 printf ("%d: data[%d] %d != %d\n",
604 self->thread_num, element,
605 data[element].data, data[element].updates);
606 rwl_readunlock (&data[element].lock);
611 if (element >= DATASIZE)
614 lmgr_cleanup_thread();
618 int main (int argc, char *argv[])
620 int count, data_count;
621 unsigned int seed = 1;
622 int thread_updates = 0, data_updates = 0;
625 #ifdef USE_THR_SETCONCURRENCY
627 * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
628 * that our threads can run concurrently, we need to
629 * increase the concurrency level to THREADS.
631 DPRINTF (("Setting concurrency level to %d\n", THREADS));
632 thr_setconcurrency (THREADS);
636 * Initialize the shared data.
638 for (data_count = 0; data_count < DATASIZE; data_count++) {
639 data[data_count].data = 0;
640 data[data_count].updates = 0;
641 rwl_init(&data[data_count].lock);
645 * Create THREADS threads to access shared data.
647 for (count = 0; count < THREADS; count++) {
648 threads[count].thread_num = count;
649 threads[count].r_collisions = 0;
650 threads[count].w_collisions = 0;
651 threads[count].updates = 0;
652 threads[count].interval = rand_r (&seed) % ITERATIONS;
653 status = pthread_create (&threads[count].thread_id,
654 NULL, thread_routine, (void*)&threads[count]);
656 err_abort (status, _("Create thread"));
660 * Wait for all threads to complete, and collect
663 for (count = 0; count < THREADS; count++) {
664 status = pthread_join (threads[count].thread_id, NULL);
666 err_abort (status, _("Join thread"));
667 thread_updates += threads[count].updates;
668 printf (_("%02d: interval %d, updates %d, "
669 "r_collisions %d, w_collisions %d\n"),
670 count, threads[count].interval,
671 threads[count].updates,
672 threads[count].r_collisions, threads[count].w_collisions);
676 * Collect statistics for the data.
678 for (data_count = 0; data_count < DATASIZE; data_count++) {
679 data_updates += data[data_count].updates;
680 printf (_("data %02d: value %d, %d updates\n"),
681 data_count, data[data_count].data, data[data_count].updates);
682 rwl_destroy (&data[data_count].lock);