2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Thread Read/Write locking code. It permits
30 * multiple readers but only one writer. Note, however,
31 * that the writer thread is permitted to make multiple
32 * nested write lock calls.
34 * Kern Sibbald, January MMI
38 * This code adapted from "Programming with POSIX Threads", by
43 #define _LOCKMGR_COMPLIANT
47 * Initialize a read/write lock
49 * Returns: 0 on success
52 int rwl_init(brwlock_t *rwl)
56 rwl->r_active = rwl->w_active = 0;
57 rwl->r_wait = rwl->w_wait = 0;
58 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
61 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
62 pthread_mutex_destroy(&rwl->mutex);
65 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
66 pthread_cond_destroy(&rwl->read);
67 pthread_mutex_destroy(&rwl->mutex);
70 rwl->valid = RWLOCK_VALID;
75 * Destroy a read/write lock
77 * Returns: 0 on success
80 int rwl_destroy(brwlock_t *rwl)
82 int stat, stat1, stat2;
84 if (rwl->valid != RWLOCK_VALID) {
87 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
92 * If any threads are active, report EBUSY
94 if (rwl->r_active > 0 || rwl->w_active) {
95 pthread_mutex_unlock(&rwl->mutex);
100 * If any threads are waiting, report EBUSY
102 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
103 pthread_mutex_unlock(&rwl->mutex);
108 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
111 stat = pthread_mutex_destroy(&rwl->mutex);
112 stat1 = pthread_cond_destroy(&rwl->read);
113 stat2 = pthread_cond_destroy(&rwl->write);
114 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
118 * Handle cleanup when the read lock condition variable
121 static void rwl_read_release(void *arg)
123 brwlock_t *rwl = (brwlock_t *)arg;
126 pthread_mutex_unlock(&rwl->mutex);
130 * Handle cleanup when the write lock condition variable wait
133 static void rwl_write_release(void *arg)
135 brwlock_t *rwl = (brwlock_t *)arg;
138 pthread_mutex_unlock(&rwl->mutex);
142 * Lock for read access, wait until locked (or error).
144 int rwl_readlock(brwlock_t *rwl)
148 if (rwl->valid != RWLOCK_VALID) {
151 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
155 rwl->r_wait++; /* indicate that we are waiting */
156 pthread_cleanup_push(rwl_read_release, (void *)rwl);
157 while (rwl->w_active) {
158 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
160 break; /* error, bail out */
163 pthread_cleanup_pop(0);
164 rwl->r_wait--; /* we are no longer waiting */
167 rwl->r_active++; /* we are running */
169 pthread_mutex_unlock(&rwl->mutex);
174 * Attempt to lock for read access, don't wait
176 int rwl_readtrylock(brwlock_t *rwl)
180 if (rwl->valid != RWLOCK_VALID) {
183 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
189 rwl->r_active++; /* we are running */
191 stat2 = pthread_mutex_unlock(&rwl->mutex);
192 return (stat == 0 ? stat2 : stat);
198 int rwl_readunlock(brwlock_t *rwl)
202 if (rwl->valid != RWLOCK_VALID) {
205 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
209 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
210 stat = pthread_cond_broadcast(&rwl->write);
212 stat2 = pthread_mutex_unlock(&rwl->mutex);
213 return (stat == 0 ? stat2 : stat);
218 * Lock for write access, wait until locked (or error).
219 * Multiple nested write locking is permitted.
221 int rwl_writelock(brwlock_t *rwl)
225 if (rwl->valid != RWLOCK_VALID) {
228 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
231 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
233 pthread_mutex_unlock(&rwl->mutex);
236 lmgr_pre_lock(rwl, __FILE__, __LINE__);
237 if (rwl->w_active || rwl->r_active > 0) {
238 rwl->w_wait++; /* indicate that we are waiting */
239 pthread_cleanup_push(rwl_write_release, (void *)rwl);
240 while (rwl->w_active || rwl->r_active > 0) {
241 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
243 break; /* error, bail out */
246 pthread_cleanup_pop(0);
247 rwl->w_wait--; /* we are no longer waiting */
250 rwl->w_active++; /* we are running */
251 rwl->writer_id = pthread_self(); /* save writer thread's id */
254 pthread_mutex_unlock(&rwl->mutex);
259 * Attempt to lock for write access, don't wait
261 int rwl_writetrylock(brwlock_t *rwl)
265 if (rwl->valid != RWLOCK_VALID) {
268 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
271 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
273 pthread_mutex_unlock(&rwl->mutex);
276 if (rwl->w_active || rwl->r_active > 0) {
279 rwl->w_active = 1; /* we are running */
280 rwl->writer_id = pthread_self(); /* save writer thread's id */
281 lmgr_do_lock(rwl, __FILE__, __LINE__);
283 stat2 = pthread_mutex_unlock(&rwl->mutex);
284 return (stat == 0 ? stat2 : stat);
289 * Start any waiting writers in preference to waiting readers
291 int rwl_writeunlock(brwlock_t *rwl)
295 if (rwl->valid != RWLOCK_VALID) {
298 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
301 if (rwl->w_active <= 0) {
302 pthread_mutex_unlock(&rwl->mutex);
303 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
306 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
307 pthread_mutex_unlock(&rwl->mutex);
308 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
310 if (rwl->w_active > 0) {
311 stat = 0; /* writers still active */
314 /* No more writers, awaken someone */
315 if (rwl->r_wait > 0) { /* if readers waiting */
316 stat = pthread_cond_broadcast(&rwl->read);
317 } else if (rwl->w_wait > 0) {
318 stat = pthread_cond_broadcast(&rwl->write);
321 stat2 = pthread_mutex_unlock(&rwl->mutex);
322 return (stat == 0 ? stat2 : stat);
329 #define ITERATIONS 1000000
332 * Keep statics for each thread.
334 typedef struct thread_tag {
343 * Read/write lock and shared data.
345 typedef struct data_tag {
351 static thread_t threads[THREADS];
352 static data_t data[DATASIZE];
355 * Thread start routine that uses read/write locks.
357 void *thread_routine(void *arg)
359 thread_t *self = (thread_t *)arg;
365 for (iteration=0; iteration < ITERATIONS; iteration++) {
367 * Each "self->interval" iterations, perform an
368 * update operation (write lock instead of read
371 // if ((iteration % self->interval) == 0) {
372 status = rwl_writelock(&data[element].lock);
375 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
378 data[element].data = self->thread_num;
379 data[element].writes++;
381 status = rwl_writelock(&data[element].lock);
384 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
387 data[element].data = self->thread_num;
388 data[element].writes++;
390 status = rwl_writeunlock(&data[element].lock);
393 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
396 status = rwl_writeunlock(&data[element].lock);
399 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
406 * Look at the current data element to see whether
407 * the current thread last updated it. Count the
408 * times to report later.
410 status = rwl_readlock(&data[element].lock);
413 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
417 if (data[element].data == self->thread_num)
419 status = rwl_readunlock(&data[element].lock);
422 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
428 if (element >= DATASIZE) {
433 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
434 self->thread_num, repeats);
439 int main (int argc, char *argv[])
444 unsigned int seed = 1;
445 int thread_writes = 0;
450 * On Solaris 2.5, threads are not timesliced. To ensure
451 * that our threads can run concurrently, we need to
452 * increase the concurrency level to THREADS.
454 thr_setconcurrency (THREADS);
458 * Initialize the shared data.
460 for (data_count = 0; data_count < DATASIZE; data_count++) {
461 data[data_count].data = 0;
462 data[data_count].writes = 0;
463 status = rwl_init (&data[data_count].lock);
466 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
472 * Create THREADS threads to access shared data.
474 for (count = 0; count < THREADS; count++) {
475 threads[count].thread_num = count + 1;
476 threads[count].writes = 0;
477 threads[count].reads = 0;
478 threads[count].interval = rand_r(&seed) % 71;
479 if (threads[count].interval <= 0) {
480 threads[count].interval = 1;
482 status = pthread_create (&threads[count].thread_id,
483 NULL, thread_routine, (void*)&threads[count]);
484 if (status != 0 || (int)threads[count].thread_id == 0) {
486 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
492 * Wait for all threads to complete, and collect
495 for (count = 0; count < THREADS; count++) {
496 status = pthread_join (threads[count].thread_id, NULL);
499 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
502 thread_writes += threads[count].writes;
503 printf (_("%02d: interval %d, writes %d, reads %d\n"),
504 count, threads[count].interval,
505 threads[count].writes, threads[count].reads);
509 * Collect statistics for the data.
511 for (data_count = 0; data_count < DATASIZE; data_count++) {
512 data_writes += data[data_count].writes;
513 printf (_("data %02d: value %d, %d writes\n"),
514 data_count, data[data_count].data, data[data_count].writes);
515 rwl_destroy (&data[data_count].lock);
518 printf (_("Total: %d thread writes, %d data writes\n"),
519 thread_writes, data_writes);
525 #ifdef TEST_RW_TRY_LOCK
529 * Demonstrate use of non-blocking read-write locks.
531 * Special notes: On a Solaris system, call thr_setconcurrency()
532 * to allow interleaved thread execution, since threads are not
540 #define ITERATIONS 1000
544 * Keep statistics for each thread.
546 typedef struct thread_tag {
556 * Read-write lock and shared data
558 typedef struct data_tag {
564 thread_t threads[THREADS];
565 data_t data[DATASIZE];
568 * Thread start routine that uses read-write locks
570 void *thread_routine (void *arg)
572 thread_t *self = (thread_t*)arg;
577 element = 0; /* Current data element */
579 for (iteration = 0; iteration < ITERATIONS; iteration++) {
580 if ((iteration % self->interval) == 0) {
581 status = rwl_writetrylock (&data[element].lock);
583 self->w_collisions++;
584 else if (status == 0) {
585 data[element].data++;
586 data[element].updates++;
588 rwl_writeunlock (&data[element].lock);
590 err_abort (status, _("Try write lock"));
592 status = rwl_readtrylock (&data[element].lock);
594 self->r_collisions++;
595 else if (status != 0) {
596 err_abort (status, _("Try read lock"));
598 if (data[element].data != data[element].updates)
599 printf ("%d: data[%d] %d != %d\n",
600 self->thread_num, element,
601 data[element].data, data[element].updates);
602 rwl_readunlock (&data[element].lock);
607 if (element >= DATASIZE)
610 lmgr_cleanup_thread();
614 int main (int argc, char *argv[])
616 int count, data_count;
617 unsigned int seed = 1;
618 int thread_updates = 0, data_updates = 0;
623 * On Solaris 2.5, threads are not timesliced. To ensure
624 * that our threads can run concurrently, we need to
625 * increase the concurrency level to THREADS.
627 DPRINTF (("Setting concurrency level to %d\n", THREADS));
628 thr_setconcurrency (THREADS);
632 * Initialize the shared data.
634 for (data_count = 0; data_count < DATASIZE; data_count++) {
635 data[data_count].data = 0;
636 data[data_count].updates = 0;
637 rwl_init (&data[data_count].lock);
641 * Create THREADS threads to access shared data.
643 for (count = 0; count < THREADS; count++) {
644 threads[count].thread_num = count;
645 threads[count].r_collisions = 0;
646 threads[count].w_collisions = 0;
647 threads[count].updates = 0;
648 threads[count].interval = rand_r (&seed) % ITERATIONS;
649 status = pthread_create (&threads[count].thread_id,
650 NULL, thread_routine, (void*)&threads[count]);
652 err_abort (status, _("Create thread"));
656 * Wait for all threads to complete, and collect
659 for (count = 0; count < THREADS; count++) {
660 status = pthread_join (threads[count].thread_id, NULL);
662 err_abort (status, _("Join thread"));
663 thread_updates += threads[count].updates;
664 printf (_("%02d: interval %d, updates %d, "
665 "r_collisions %d, w_collisions %d\n"),
666 count, threads[count].interval,
667 threads[count].updates,
668 threads[count].r_collisions, threads[count].w_collisions);
672 * Collect statistics for the data.
674 for (data_count = 0; data_count < DATASIZE; data_count++) {
675 data_updates += data[data_count].updates;
676 printf (_("data %02d: value %d, %d updates\n"),
677 data_count, data[data_count].data, data[data_count].updates);
678 rwl_destroy (&data[data_count].lock);