2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2010 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Thread Read/Write locking code. It permits
30 * multiple readers but only one writer. Note, however,
31 * that the writer thread is permitted to make multiple
32 * nested write lock calls.
34 * Kern Sibbald, January MMI
38 * This code adapted from "Programming with POSIX Threads", by
43 #define _LOCKMGR_COMPLIANT
47 * Initialize a read/write lock
49 * Returns: 0 on success
52 int rwl_init(brwlock_t *rwl, int priority)
56 rwl->r_active = rwl->w_active = 0;
57 rwl->r_wait = rwl->w_wait = 0;
58 rwl->priority = priority;
59 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
62 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
63 pthread_mutex_destroy(&rwl->mutex);
66 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
67 pthread_cond_destroy(&rwl->read);
68 pthread_mutex_destroy(&rwl->mutex);
71 rwl->valid = RWLOCK_VALID;
76 * Destroy a read/write lock
78 * Returns: 0 on success
81 int rwl_destroy(brwlock_t *rwl)
83 int stat, stat1, stat2;
85 if (rwl->valid != RWLOCK_VALID) {
88 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
93 * If any threads are active, report EBUSY
95 if (rwl->r_active > 0 || rwl->w_active) {
96 pthread_mutex_unlock(&rwl->mutex);
101 * If any threads are waiting, report EBUSY
103 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
104 pthread_mutex_unlock(&rwl->mutex);
109 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
112 stat = pthread_mutex_destroy(&rwl->mutex);
113 stat1 = pthread_cond_destroy(&rwl->read);
114 stat2 = pthread_cond_destroy(&rwl->write);
115 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
119 * Handle cleanup when the read lock condition variable
122 static void rwl_read_release(void *arg)
124 brwlock_t *rwl = (brwlock_t *)arg;
127 pthread_mutex_unlock(&rwl->mutex);
131 * Handle cleanup when the write lock condition variable wait
134 static void rwl_write_release(void *arg)
136 brwlock_t *rwl = (brwlock_t *)arg;
139 pthread_mutex_unlock(&rwl->mutex);
143 * Lock for read access, wait until locked (or error).
145 int rwl_readlock(brwlock_t *rwl)
149 if (rwl->valid != RWLOCK_VALID) {
152 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
156 rwl->r_wait++; /* indicate that we are waiting */
157 pthread_cleanup_push(rwl_read_release, (void *)rwl);
158 while (rwl->w_active) {
159 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
161 break; /* error, bail out */
164 pthread_cleanup_pop(0);
165 rwl->r_wait--; /* we are no longer waiting */
168 rwl->r_active++; /* we are running */
170 pthread_mutex_unlock(&rwl->mutex);
175 * Attempt to lock for read access, don't wait
177 int rwl_readtrylock(brwlock_t *rwl)
181 if (rwl->valid != RWLOCK_VALID) {
184 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
190 rwl->r_active++; /* we are running */
192 stat2 = pthread_mutex_unlock(&rwl->mutex);
193 return (stat == 0 ? stat2 : stat);
199 int rwl_readunlock(brwlock_t *rwl)
203 if (rwl->valid != RWLOCK_VALID) {
206 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
210 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
211 stat = pthread_cond_broadcast(&rwl->write);
213 stat2 = pthread_mutex_unlock(&rwl->mutex);
214 return (stat == 0 ? stat2 : stat);
219 * Lock for write access, wait until locked (or error).
220 * Multiple nested write locking is permitted.
222 int rwl_writelock_p(brwlock_t *rwl, const char *file, int line)
226 if (rwl->valid != RWLOCK_VALID) {
229 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
232 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
234 pthread_mutex_unlock(&rwl->mutex);
237 lmgr_pre_lock(rwl, rwl->priority, file, line);
238 if (rwl->w_active || rwl->r_active > 0) {
239 rwl->w_wait++; /* indicate that we are waiting */
240 pthread_cleanup_push(rwl_write_release, (void *)rwl);
241 while (rwl->w_active || rwl->r_active > 0) {
242 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
244 break; /* error, bail out */
247 pthread_cleanup_pop(0);
248 rwl->w_wait--; /* we are no longer waiting */
251 rwl->w_active++; /* we are running */
252 rwl->writer_id = pthread_self(); /* save writer thread's id */
255 pthread_mutex_unlock(&rwl->mutex);
260 * Attempt to lock for write access, don't wait
262 int rwl_writetrylock(brwlock_t *rwl)
266 if (rwl->valid != RWLOCK_VALID) {
269 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
272 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
274 pthread_mutex_unlock(&rwl->mutex);
277 if (rwl->w_active || rwl->r_active > 0) {
280 rwl->w_active = 1; /* we are running */
281 rwl->writer_id = pthread_self(); /* save writer thread's id */
282 lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
284 stat2 = pthread_mutex_unlock(&rwl->mutex);
285 return (stat == 0 ? stat2 : stat);
290 * Start any waiting writers in preference to waiting readers
292 int rwl_writeunlock(brwlock_t *rwl)
296 if (rwl->valid != RWLOCK_VALID) {
299 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
302 if (rwl->w_active <= 0) {
303 pthread_mutex_unlock(&rwl->mutex);
304 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
307 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
308 pthread_mutex_unlock(&rwl->mutex);
309 Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
311 if (rwl->w_active > 0) {
312 stat = 0; /* writers still active */
315 /* No more writers, awaken someone */
316 if (rwl->r_wait > 0) { /* if readers waiting */
317 stat = pthread_cond_broadcast(&rwl->read);
318 } else if (rwl->w_wait > 0) {
319 stat = pthread_cond_broadcast(&rwl->write);
322 stat2 = pthread_mutex_unlock(&rwl->mutex);
323 return (stat == 0 ? stat2 : stat);
330 #define ITERATIONS 1000000
333 * Keep statics for each thread.
335 typedef struct thread_tag {
344 * Read/write lock and shared data.
346 typedef struct data_tag {
352 static thread_t threads[THREADS];
353 static data_t data[DATASIZE];
356 * Thread start routine that uses read/write locks.
358 void *thread_routine(void *arg)
360 thread_t *self = (thread_t *)arg;
366 for (iteration=0; iteration < ITERATIONS; iteration++) {
368 * Each "self->interval" iterations, perform an
369 * update operation (write lock instead of read
372 // if ((iteration % self->interval) == 0) {
373 status = rwl_writelock(&data[element].lock);
376 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
379 data[element].data = self->thread_num;
380 data[element].writes++;
382 status = rwl_writelock(&data[element].lock);
385 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
388 data[element].data = self->thread_num;
389 data[element].writes++;
391 status = rwl_writeunlock(&data[element].lock);
394 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
397 status = rwl_writeunlock(&data[element].lock);
400 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
407 * Look at the current data element to see whether
408 * the current thread last updated it. Count the
409 * times to report later.
411 status = rwl_readlock(&data[element].lock);
414 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
418 if (data[element].data == self->thread_num)
420 status = rwl_readunlock(&data[element].lock);
423 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
429 if (element >= DATASIZE) {
434 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
435 self->thread_num, repeats);
440 int main (int argc, char *argv[])
445 unsigned int seed = 1;
446 int thread_writes = 0;
449 #ifdef USE_THR_SETCONCURRENCY
451 * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
452 * that our threads can run concurrently, we need to
453 * increase the concurrency level to THREADS.
455 thr_setconcurrency (THREADS);
459 * Initialize the shared data.
461 for (data_count = 0; data_count < DATASIZE; data_count++) {
462 data[data_count].data = 0;
463 data[data_count].writes = 0;
464 status = rwl_init(&data[data_count].lock);
467 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
473 * Create THREADS threads to access shared data.
475 for (count = 0; count < THREADS; count++) {
476 threads[count].thread_num = count + 1;
477 threads[count].writes = 0;
478 threads[count].reads = 0;
479 threads[count].interval = rand_r(&seed) % 71;
480 if (threads[count].interval <= 0) {
481 threads[count].interval = 1;
483 status = pthread_create (&threads[count].thread_id,
484 NULL, thread_routine, (void*)&threads[count]);
485 if (status != 0 || (int)threads[count].thread_id == 0) {
487 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
493 * Wait for all threads to complete, and collect
496 for (count = 0; count < THREADS; count++) {
497 status = pthread_join (threads[count].thread_id, NULL);
500 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
503 thread_writes += threads[count].writes;
504 printf (_("%02d: interval %d, writes %d, reads %d\n"),
505 count, threads[count].interval,
506 threads[count].writes, threads[count].reads);
510 * Collect statistics for the data.
512 for (data_count = 0; data_count < DATASIZE; data_count++) {
513 data_writes += data[data_count].writes;
514 printf (_("data %02d: value %d, %d writes\n"),
515 data_count, data[data_count].data, data[data_count].writes);
516 rwl_destroy (&data[data_count].lock);
519 printf (_("Total: %d thread writes, %d data writes\n"),
520 thread_writes, data_writes);
526 #ifdef TEST_RW_TRY_LOCK
530 * Demonstrate use of non-blocking read-write locks.
532 * Special notes: On older Solaris system, call thr_setconcurrency()
533 * to allow interleaved thread execution, since threads are not
541 #define ITERATIONS 1000
545 * Keep statistics for each thread.
547 typedef struct thread_tag {
557 * Read-write lock and shared data
559 typedef struct data_tag {
565 thread_t threads[THREADS];
566 data_t data[DATASIZE];
569 * Thread start routine that uses read-write locks
571 void *thread_routine (void *arg)
573 thread_t *self = (thread_t*)arg;
578 element = 0; /* Current data element */
580 for (iteration = 0; iteration < ITERATIONS; iteration++) {
581 if ((iteration % self->interval) == 0) {
582 status = rwl_writetrylock (&data[element].lock);
584 self->w_collisions++;
585 else if (status == 0) {
586 data[element].data++;
587 data[element].updates++;
589 rwl_writeunlock (&data[element].lock);
591 err_abort (status, _("Try write lock"));
593 status = rwl_readtrylock (&data[element].lock);
595 self->r_collisions++;
596 else if (status != 0) {
597 err_abort (status, _("Try read lock"));
599 if (data[element].data != data[element].updates)
600 printf ("%d: data[%d] %d != %d\n",
601 self->thread_num, element,
602 data[element].data, data[element].updates);
603 rwl_readunlock (&data[element].lock);
608 if (element >= DATASIZE)
611 lmgr_cleanup_thread();
615 int main (int argc, char *argv[])
617 int count, data_count;
618 unsigned int seed = 1;
619 int thread_updates = 0, data_updates = 0;
622 #ifdef USE_THR_SETCONCURRENCY
624 * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
625 * that our threads can run concurrently, we need to
626 * increase the concurrency level to THREADS.
628 DPRINTF (("Setting concurrency level to %d\n", THREADS));
629 thr_setconcurrency (THREADS);
633 * Initialize the shared data.
635 for (data_count = 0; data_count < DATASIZE; data_count++) {
636 data[data_count].data = 0;
637 data[data_count].updates = 0;
638 rwl_init(&data[data_count].lock);
642 * Create THREADS threads to access shared data.
644 for (count = 0; count < THREADS; count++) {
645 threads[count].thread_num = count;
646 threads[count].r_collisions = 0;
647 threads[count].w_collisions = 0;
648 threads[count].updates = 0;
649 threads[count].interval = rand_r (&seed) % ITERATIONS;
650 status = pthread_create (&threads[count].thread_id,
651 NULL, thread_routine, (void*)&threads[count]);
653 err_abort (status, _("Create thread"));
657 * Wait for all threads to complete, and collect
660 for (count = 0; count < THREADS; count++) {
661 status = pthread_join (threads[count].thread_id, NULL);
663 err_abort (status, _("Join thread"));
664 thread_updates += threads[count].updates;
665 printf (_("%02d: interval %d, updates %d, "
666 "r_collisions %d, w_collisions %d\n"),
667 count, threads[count].interval,
668 threads[count].updates,
669 threads[count].r_collisions, threads[count].w_collisions);
673 * Collect statistics for the data.
675 for (data_count = 0; data_count < DATASIZE; data_count++) {
676 data_updates += data[data_count].updates;
677 printf (_("data %02d: value %d, %d updates\n"),
678 data_count, data[data_count].data, data[data_count].updates);
679 rwl_destroy (&data[data_count].lock);