2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Thread Read/Write locking code. It permits
30 * multiple readers but only one writer. Note, however,
31 * that the writer thread is permitted to make multiple
32 * nested write lock calls.
34 * Kern Sibbald, January MMI
38 * This code adapted from "Programming with POSIX Threads", by
46 * Initialize a read/write lock
48 * Returns: 0 on success
51 int rwl_init(brwlock_t *rwl)
55 rwl->r_active = rwl->w_active = 0;
56 rwl->r_wait = rwl->w_wait = 0;
57 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
60 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
61 pthread_mutex_destroy(&rwl->mutex);
64 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
65 pthread_cond_destroy(&rwl->read);
66 pthread_mutex_destroy(&rwl->mutex);
69 rwl->valid = RWLOCK_VALID;
74 * Destroy a read/write lock
76 * Returns: 0 on success
79 int rwl_destroy(brwlock_t *rwl)
81 int stat, stat1, stat2;
83 if (rwl->valid != RWLOCK_VALID) {
86 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
91 * If any threads are active, report EBUSY
93 if (rwl->r_active > 0 || rwl->w_active) {
94 pthread_mutex_unlock(&rwl->mutex);
99 * If any threads are waiting, report EBUSY
101 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102 pthread_mutex_unlock(&rwl->mutex);
107 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
110 stat = pthread_mutex_destroy(&rwl->mutex);
111 stat1 = pthread_cond_destroy(&rwl->read);
112 stat2 = pthread_cond_destroy(&rwl->write);
113 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
117 * Handle cleanup when the read lock condition variable
120 static void rwl_read_release(void *arg)
122 brwlock_t *rwl = (brwlock_t *)arg;
125 pthread_mutex_unlock(&rwl->mutex);
129 * Handle cleanup when the write lock condition variable wait
132 static void rwl_write_release(void *arg)
134 brwlock_t *rwl = (brwlock_t *)arg;
137 pthread_mutex_unlock(&rwl->mutex);
141 * Lock for read access, wait until locked (or error).
143 int rwl_readlock(brwlock_t *rwl)
147 if (rwl->valid != RWLOCK_VALID) {
150 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
154 rwl->r_wait++; /* indicate that we are waiting */
155 pthread_cleanup_push(rwl_read_release, (void *)rwl);
156 while (rwl->w_active) {
157 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
159 break; /* error, bail out */
162 pthread_cleanup_pop(0);
163 rwl->r_wait--; /* we are no longer waiting */
166 rwl->r_active++; /* we are running */
168 pthread_mutex_unlock(&rwl->mutex);
173 * Attempt to lock for read access, don't wait
175 int rwl_readtrylock(brwlock_t *rwl)
179 if (rwl->valid != RWLOCK_VALID) {
182 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
188 rwl->r_active++; /* we are running */
190 stat2 = pthread_mutex_unlock(&rwl->mutex);
191 return (stat == 0 ? stat2 : stat);
197 int rwl_readunlock(brwlock_t *rwl)
201 if (rwl->valid != RWLOCK_VALID) {
204 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
208 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
209 stat = pthread_cond_broadcast(&rwl->write);
211 stat2 = pthread_mutex_unlock(&rwl->mutex);
212 return (stat == 0 ? stat2 : stat);
217 * Lock for write access, wait until locked (or error).
218 * Multiple nested write locking is permitted.
220 int rwl_writelock(brwlock_t *rwl)
224 if (rwl->valid != RWLOCK_VALID) {
227 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
230 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
232 pthread_mutex_unlock(&rwl->mutex);
235 if (rwl->w_active || rwl->r_active > 0) {
236 rwl->w_wait++; /* indicate that we are waiting */
237 pthread_cleanup_push(rwl_write_release, (void *)rwl);
238 while (rwl->w_active || rwl->r_active > 0) {
239 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
240 break; /* error, bail out */
243 pthread_cleanup_pop(0);
244 rwl->w_wait--; /* we are no longer waiting */
247 rwl->w_active++; /* we are running */
248 rwl->writer_id = pthread_self(); /* save writer thread's id */
250 pthread_mutex_unlock(&rwl->mutex);
255 * Attempt to lock for write access, don't wait
257 int rwl_writetrylock(brwlock_t *rwl)
261 if (rwl->valid != RWLOCK_VALID) {
264 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
267 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
269 pthread_mutex_unlock(&rwl->mutex);
272 if (rwl->w_active || rwl->r_active > 0) {
275 rwl->w_active = 1; /* we are running */
276 rwl->writer_id = pthread_self(); /* save writer thread's id */
278 stat2 = pthread_mutex_unlock(&rwl->mutex);
279 return (stat == 0 ? stat2 : stat);
284 * Start any waiting writers in preference to waiting readers
286 int rwl_writeunlock(brwlock_t *rwl)
290 if (rwl->valid != RWLOCK_VALID) {
293 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
296 if (rwl->w_active <= 0) {
297 pthread_mutex_unlock(&rwl->mutex);
298 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
301 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
302 pthread_mutex_unlock(&rwl->mutex);
303 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
305 if (rwl->w_active > 0) {
306 stat = 0; /* writers still active */
308 /* No more writers, awaken someone */
309 if (rwl->r_wait > 0) { /* if readers waiting */
310 stat = pthread_cond_broadcast(&rwl->read);
311 } else if (rwl->w_wait > 0) {
312 stat = pthread_cond_broadcast(&rwl->write);
315 stat2 = pthread_mutex_unlock(&rwl->mutex);
316 return (stat == 0 ? stat2 : stat);
323 #define ITERATIONS 1000000
326 * Keep statics for each thread.
328 typedef struct thread_tag {
337 * Read/write lock and shared data.
339 typedef struct data_tag {
345 static thread_t threads[THREADS];
346 static data_t data[DATASIZE];
349 * Thread start routine that uses read/write locks.
351 void *thread_routine(void *arg)
353 thread_t *self = (thread_t *)arg;
359 for (iteration=0; iteration < ITERATIONS; iteration++) {
361 * Each "self->interval" iterations, perform an
362 * update operation (write lock instead of read
365 // if ((iteration % self->interval) == 0) {
366 status = rwl_writelock(&data[element].lock);
369 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
372 data[element].data = self->thread_num;
373 data[element].writes++;
375 status = rwl_writelock(&data[element].lock);
378 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
381 data[element].data = self->thread_num;
382 data[element].writes++;
384 status = rwl_writeunlock(&data[element].lock);
387 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
390 status = rwl_writeunlock(&data[element].lock);
393 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
400 * Look at the current data element to see whether
401 * the current thread last updated it. Count the
402 * times to report later.
404 status = rwl_readlock(&data[element].lock);
407 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
411 if (data[element].data == self->thread_num)
413 status = rwl_readunlock(&data[element].lock);
416 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
422 if (element >= DATASIZE) {
427 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
428 self->thread_num, repeats);
433 int main (int argc, char *argv[])
438 unsigned int seed = 1;
439 int thread_writes = 0;
444 * On Solaris 2.5, threads are not timesliced. To ensure
445 * that our threads can run concurrently, we need to
446 * increase the concurrency level to THREADS.
448 thr_setconcurrency (THREADS);
452 * Initialize the shared data.
454 for (data_count = 0; data_count < DATASIZE; data_count++) {
455 data[data_count].data = 0;
456 data[data_count].writes = 0;
457 status = rwl_init (&data[data_count].lock);
460 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
466 * Create THREADS threads to access shared data.
468 for (count = 0; count < THREADS; count++) {
469 threads[count].thread_num = count + 1;
470 threads[count].writes = 0;
471 threads[count].reads = 0;
472 threads[count].interval = rand_r(&seed) % 71;
473 if (threads[count].interval <= 0) {
474 threads[count].interval = 1;
476 status = pthread_create (&threads[count].thread_id,
477 NULL, thread_routine, (void*)&threads[count]);
478 if (status != 0 || (int)threads[count].thread_id == 0) {
480 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
486 * Wait for all threads to complete, and collect
489 for (count = 0; count < THREADS; count++) {
490 status = pthread_join (threads[count].thread_id, NULL);
493 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
496 thread_writes += threads[count].writes;
497 printf (_("%02d: interval %d, writes %d, reads %d\n"),
498 count, threads[count].interval,
499 threads[count].writes, threads[count].reads);
503 * Collect statistics for the data.
505 for (data_count = 0; data_count < DATASIZE; data_count++) {
506 data_writes += data[data_count].writes;
507 printf (_("data %02d: value %d, %d writes\n"),
508 data_count, data[data_count].data, data[data_count].writes);
509 rwl_destroy (&data[data_count].lock);
512 printf (_("Total: %d thread writes, %d data writes\n"),
513 thread_writes, data_writes);
519 #ifdef TEST_RW_TRY_LOCK
523 * Demonstrate use of non-blocking read-write locks.
525 * Special notes: On a Solaris system, call thr_setconcurrency()
526 * to allow interleaved thread execution, since threads are not
534 #define ITERATIONS 1000
538 * Keep statistics for each thread.
540 typedef struct thread_tag {
550 * Read-write lock and shared data
552 typedef struct data_tag {
558 thread_t threads[THREADS];
559 data_t data[DATASIZE];
562 * Thread start routine that uses read-write locks
564 void *thread_routine (void *arg)
566 thread_t *self = (thread_t*)arg;
571 element = 0; /* Current data element */
573 for (iteration = 0; iteration < ITERATIONS; iteration++) {
574 if ((iteration % self->interval) == 0) {
575 status = rwl_writetrylock (&data[element].lock);
577 self->w_collisions++;
578 else if (status == 0) {
579 data[element].data++;
580 data[element].updates++;
582 rwl_writeunlock (&data[element].lock);
584 err_abort (status, _("Try write lock"));
586 status = rwl_readtrylock (&data[element].lock);
588 self->r_collisions++;
589 else if (status != 0) {
590 err_abort (status, _("Try read lock"));
592 if (data[element].data != data[element].updates)
593 printf ("%d: data[%d] %d != %d\n",
594 self->thread_num, element,
595 data[element].data, data[element].updates);
596 rwl_readunlock (&data[element].lock);
601 if (element >= DATASIZE)
607 int main (int argc, char *argv[])
609 int count, data_count;
610 unsigned int seed = 1;
611 int thread_updates = 0, data_updates = 0;
616 * On Solaris 2.5, threads are not timesliced. To ensure
617 * that our threads can run concurrently, we need to
618 * increase the concurrency level to THREADS.
620 DPRINTF (("Setting concurrency level to %d\n", THREADS));
621 thr_setconcurrency (THREADS);
625 * Initialize the shared data.
627 for (data_count = 0; data_count < DATASIZE; data_count++) {
628 data[data_count].data = 0;
629 data[data_count].updates = 0;
630 rwl_init (&data[data_count].lock);
634 * Create THREADS threads to access shared data.
636 for (count = 0; count < THREADS; count++) {
637 threads[count].thread_num = count;
638 threads[count].r_collisions = 0;
639 threads[count].w_collisions = 0;
640 threads[count].updates = 0;
641 threads[count].interval = rand_r (&seed) % ITERATIONS;
642 status = pthread_create (&threads[count].thread_id,
643 NULL, thread_routine, (void*)&threads[count]);
645 err_abort (status, _("Create thread"));
649 * Wait for all threads to complete, and collect
652 for (count = 0; count < THREADS; count++) {
653 status = pthread_join (threads[count].thread_id, NULL);
655 err_abort (status, _("Join thread"));
656 thread_updates += threads[count].updates;
657 printf (_("%02d: interval %d, updates %d, "
658 "r_collisions %d, w_collisions %d\n"),
659 count, threads[count].interval,
660 threads[count].updates,
661 threads[count].r_collisions, threads[count].w_collisions);
665 * Collect statistics for the data.
667 for (data_count = 0; data_count < DATASIZE; data_count++) {
668 data_updates += data[data_count].updates;
669 printf (_("data %02d: value %d, %d updates\n"),
670 data_count, data[data_count].data, data[data_count].updates);
671 rwl_destroy (&data[data_count].lock);