2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Thread Read/Write locking code. It permits
30 * multiple readers but only one writer. Note, however,
31 * that the writer thread is permitted to make multiple
32 * nested write lock calls.
34 * Kern Sibbald, January MMI
36 * This code adapted from "Programming with POSIX Threads", by
41 #define _LOCKMGR_COMPLIANT
45 * Initialize a read/write lock
47 * Returns: 0 on success
50 int rwl_init(brwlock_t *rwl, int priority)
54 rwl->r_active = rwl->w_active = 0;
55 rwl->r_wait = rwl->w_wait = 0;
56 rwl->priority = priority;
57 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
60 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
61 pthread_mutex_destroy(&rwl->mutex);
64 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
65 pthread_cond_destroy(&rwl->read);
66 pthread_mutex_destroy(&rwl->mutex);
69 rwl->valid = RWLOCK_VALID;
74 * Destroy a read/write lock
76 * Returns: 0 on success
79 int rwl_destroy(brwlock_t *rwl)
81 int stat, stat1, stat2;
83 if (rwl->valid != RWLOCK_VALID) {
86 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
91 * If any threads are active, report EBUSY
93 if (rwl->r_active > 0 || rwl->w_active) {
94 pthread_mutex_unlock(&rwl->mutex);
99 * If any threads are waiting, report EBUSY
101 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102 pthread_mutex_unlock(&rwl->mutex);
107 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
110 stat = pthread_mutex_destroy(&rwl->mutex);
111 stat1 = pthread_cond_destroy(&rwl->read);
112 stat2 = pthread_cond_destroy(&rwl->write);
113 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
117 * Handle cleanup when the read lock condition variable
120 static void rwl_read_release(void *arg)
122 brwlock_t *rwl = (brwlock_t *)arg;
125 pthread_mutex_unlock(&rwl->mutex);
129 * Handle cleanup when the write lock condition variable wait
132 static void rwl_write_release(void *arg)
134 brwlock_t *rwl = (brwlock_t *)arg;
137 pthread_mutex_unlock(&rwl->mutex);
141 * Lock for read access, wait until locked (or error).
143 int rwl_readlock(brwlock_t *rwl)
147 if (rwl->valid != RWLOCK_VALID) {
150 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
154 rwl->r_wait++; /* indicate that we are waiting */
155 pthread_cleanup_push(rwl_read_release, (void *)rwl);
156 while (rwl->w_active) {
157 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
159 break; /* error, bail out */
162 pthread_cleanup_pop(0);
163 rwl->r_wait--; /* we are no longer waiting */
166 rwl->r_active++; /* we are running */
168 pthread_mutex_unlock(&rwl->mutex);
173 * Attempt to lock for read access, don't wait
175 int rwl_readtrylock(brwlock_t *rwl)
179 if (rwl->valid != RWLOCK_VALID) {
182 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
188 rwl->r_active++; /* we are running */
190 stat2 = pthread_mutex_unlock(&rwl->mutex);
191 return (stat == 0 ? stat2 : stat);
197 int rwl_readunlock(brwlock_t *rwl)
201 if (rwl->valid != RWLOCK_VALID) {
204 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
208 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
209 stat = pthread_cond_broadcast(&rwl->write);
211 stat2 = pthread_mutex_unlock(&rwl->mutex);
212 return (stat == 0 ? stat2 : stat);
217 * Lock for write access, wait until locked (or error).
218 * Multiple nested write locking is permitted.
220 int rwl_writelock_p(brwlock_t *rwl, const char *file, int line)
224 if (rwl->valid != RWLOCK_VALID) {
227 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
230 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
232 pthread_mutex_unlock(&rwl->mutex);
235 lmgr_pre_lock(rwl, rwl->priority, file, line);
236 if (rwl->w_active || rwl->r_active > 0) {
237 rwl->w_wait++; /* indicate that we are waiting */
238 pthread_cleanup_push(rwl_write_release, (void *)rwl);
239 while (rwl->w_active || rwl->r_active > 0) {
240 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
242 break; /* error, bail out */
245 pthread_cleanup_pop(0);
246 rwl->w_wait--; /* we are no longer waiting */
249 rwl->w_active++; /* we are running */
250 rwl->writer_id = pthread_self(); /* save writer thread's id */
253 pthread_mutex_unlock(&rwl->mutex);
258 * Attempt to lock for write access, don't wait
260 int rwl_writetrylock(brwlock_t *rwl)
264 if (rwl->valid != RWLOCK_VALID) {
267 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
270 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
272 pthread_mutex_unlock(&rwl->mutex);
275 if (rwl->w_active || rwl->r_active > 0) {
278 rwl->w_active = 1; /* we are running */
279 rwl->writer_id = pthread_self(); /* save writer thread's id */
280 lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
282 stat2 = pthread_mutex_unlock(&rwl->mutex);
283 return (stat == 0 ? stat2 : stat);
288 * Start any waiting writers in preference to waiting readers
290 int rwl_writeunlock(brwlock_t *rwl)
294 if (rwl->valid != RWLOCK_VALID) {
297 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
300 if (rwl->w_active <= 0) {
301 pthread_mutex_unlock(&rwl->mutex);
302 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
305 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
306 pthread_mutex_unlock(&rwl->mutex);
307 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
309 if (rwl->w_active > 0) {
310 stat = 0; /* writers still active */
313 /* No more writers, awaken someone */
314 if (rwl->r_wait > 0) { /* if readers waiting */
315 stat = pthread_cond_broadcast(&rwl->read);
316 } else if (rwl->w_wait > 0) {
317 stat = pthread_cond_broadcast(&rwl->write);
320 stat2 = pthread_mutex_unlock(&rwl->mutex);
321 return (stat == 0 ? stat2 : stat);
328 #define ITERATIONS 1000000
331 * Keep statics for each thread.
333 typedef struct thread_tag {
342 * Read/write lock and shared data.
344 typedef struct data_tag {
350 static thread_t threads[THREADS];
351 static data_t data[DATASIZE];
354 * Thread start routine that uses read/write locks.
356 void *thread_routine(void *arg)
358 thread_t *self = (thread_t *)arg;
364 for (iteration=0; iteration < ITERATIONS; iteration++) {
366 * Each "self->interval" iterations, perform an
367 * update operation (write lock instead of read
370 // if ((iteration % self->interval) == 0) {
371 status = rwl_writelock(&data[element].lock);
374 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
377 data[element].data = self->thread_num;
378 data[element].writes++;
380 status = rwl_writelock(&data[element].lock);
383 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
386 data[element].data = self->thread_num;
387 data[element].writes++;
389 status = rwl_writeunlock(&data[element].lock);
392 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
395 status = rwl_writeunlock(&data[element].lock);
398 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
405 * Look at the current data element to see whether
406 * the current thread last updated it. Count the
407 * times to report later.
409 status = rwl_readlock(&data[element].lock);
412 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
416 if (data[element].data == self->thread_num)
418 status = rwl_readunlock(&data[element].lock);
421 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
427 if (element >= DATASIZE) {
432 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
433 self->thread_num, repeats);
438 int main (int argc, char *argv[])
443 unsigned int seed = 1;
444 int thread_writes = 0;
447 #ifdef USE_THR_SETCONCURRENCY
449 * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
450 * that our threads can run concurrently, we need to
451 * increase the concurrency level to THREADS.
453 thr_setconcurrency (THREADS);
457 * Initialize the shared data.
459 for (data_count = 0; data_count < DATASIZE; data_count++) {
460 data[data_count].data = 0;
461 data[data_count].writes = 0;
462 status = rwl_init(&data[data_count].lock);
465 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
471 * Create THREADS threads to access shared data.
473 for (count = 0; count < THREADS; count++) {
474 threads[count].thread_num = count + 1;
475 threads[count].writes = 0;
476 threads[count].reads = 0;
477 threads[count].interval = rand_r(&seed) % 71;
478 if (threads[count].interval <= 0) {
479 threads[count].interval = 1;
481 status = pthread_create (&threads[count].thread_id,
482 NULL, thread_routine, (void*)&threads[count]);
483 if (status != 0 || (int)threads[count].thread_id == 0) {
485 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
491 * Wait for all threads to complete, and collect
494 for (count = 0; count < THREADS; count++) {
495 status = pthread_join (threads[count].thread_id, NULL);
498 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
501 thread_writes += threads[count].writes;
502 printf (_("%02d: interval %d, writes %d, reads %d\n"),
503 count, threads[count].interval,
504 threads[count].writes, threads[count].reads);
508 * Collect statistics for the data.
510 for (data_count = 0; data_count < DATASIZE; data_count++) {
511 data_writes += data[data_count].writes;
512 printf (_("data %02d: value %d, %d writes\n"),
513 data_count, data[data_count].data, data[data_count].writes);
514 rwl_destroy (&data[data_count].lock);
517 printf (_("Total: %d thread writes, %d data writes\n"),
518 thread_writes, data_writes);
524 #ifdef TEST_RW_TRY_LOCK
528 * Demonstrate use of non-blocking read-write locks.
530 * Special notes: On older Solaris system, call thr_setconcurrency()
531 * to allow interleaved thread execution, since threads are not
539 #define ITERATIONS 1000
543 * Keep statistics for each thread.
545 typedef struct thread_tag {
555 * Read-write lock and shared data
557 typedef struct data_tag {
563 thread_t threads[THREADS];
564 data_t data[DATASIZE];
567 * Thread start routine that uses read-write locks
569 void *thread_routine (void *arg)
571 thread_t *self = (thread_t*)arg;
576 element = 0; /* Current data element */
578 for (iteration = 0; iteration < ITERATIONS; iteration++) {
579 if ((iteration % self->interval) == 0) {
580 status = rwl_writetrylock (&data[element].lock);
582 self->w_collisions++;
583 else if (status == 0) {
584 data[element].data++;
585 data[element].updates++;
587 rwl_writeunlock (&data[element].lock);
589 err_abort (status, _("Try write lock"));
591 status = rwl_readtrylock (&data[element].lock);
593 self->r_collisions++;
594 else if (status != 0) {
595 err_abort (status, _("Try read lock"));
597 if (data[element].data != data[element].updates)
598 printf ("%d: data[%d] %d != %d\n",
599 self->thread_num, element,
600 data[element].data, data[element].updates);
601 rwl_readunlock (&data[element].lock);
606 if (element >= DATASIZE)
609 lmgr_cleanup_thread();
613 int main (int argc, char *argv[])
615 int count, data_count;
616 unsigned int seed = 1;
617 int thread_updates = 0, data_updates = 0;
620 #ifdef USE_THR_SETCONCURRENCY
622 * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
623 * that our threads can run concurrently, we need to
624 * increase the concurrency level to THREADS.
626 DPRINTF (("Setting concurrency level to %d\n", THREADS));
627 thr_setconcurrency (THREADS);
631 * Initialize the shared data.
633 for (data_count = 0; data_count < DATASIZE; data_count++) {
634 data[data_count].data = 0;
635 data[data_count].updates = 0;
636 rwl_init(&data[data_count].lock);
640 * Create THREADS threads to access shared data.
642 for (count = 0; count < THREADS; count++) {
643 threads[count].thread_num = count;
644 threads[count].r_collisions = 0;
645 threads[count].w_collisions = 0;
646 threads[count].updates = 0;
647 threads[count].interval = rand_r (&seed) % ITERATIONS;
648 status = pthread_create (&threads[count].thread_id,
649 NULL, thread_routine, (void*)&threads[count]);
651 err_abort (status, _("Create thread"));
655 * Wait for all threads to complete, and collect
658 for (count = 0; count < THREADS; count++) {
659 status = pthread_join (threads[count].thread_id, NULL);
661 err_abort (status, _("Join thread"));
662 thread_updates += threads[count].updates;
663 printf (_("%02d: interval %d, updates %d, "
664 "r_collisions %d, w_collisions %d\n"),
665 count, threads[count].interval,
666 threads[count].updates,
667 threads[count].r_collisions, threads[count].w_collisions);
671 * Collect statistics for the data.
673 for (data_count = 0; data_count < DATASIZE; data_count++) {
674 data_updates += data[data_count].updates;
675 printf (_("data %02d: value %d, %d updates\n"),
676 data_count, data[data_count].data, data[data_count].updates);
677 rwl_destroy (&data[data_count].lock);