2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2010 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Thread Read/Write locking code. It permits
30 * multiple readers but only one writer. Note, however,
31 * that the writer thread is permitted to make multiple
32 * nested write lock calls.
34 * Kern Sibbald, January MMI
36 * This code adapted from "Programming with POSIX Threads", by
41 #define _LOCKMGR_COMPLIANT
46 * Initialize a read/write lock
48 * Returns: 0 on success
52 devlock *new_devlock()
55 lock = (devlock *)malloc(sizeof (devlock));
56 memset(lock, 0, sizeof(devlock));
60 int devlock::init(int priority)
65 rwl->r_active = rwl->w_active = 0;
66 rwl->r_wait = rwl->w_wait = 0;
67 rwl->priority = priority;
68 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
71 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
72 pthread_mutex_destroy(&rwl->mutex);
75 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
76 pthread_cond_destroy(&rwl->read);
77 pthread_mutex_destroy(&rwl->mutex);
80 rwl->valid = DEVLOCK_VALID;
85 * Destroy a read/write lock
87 * Returns: 0 on success
90 int devlock::destroy()
93 int stat, stat1, stat2;
95 if (rwl->valid != DEVLOCK_VALID) {
98 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
103 * If any threads are active, report EBUSY
105 if (rwl->r_active > 0 || rwl->w_active) {
106 pthread_mutex_unlock(&rwl->mutex);
111 * If any threads are waiting, report EBUSY
113 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
114 pthread_mutex_unlock(&rwl->mutex);
119 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
122 stat = pthread_mutex_destroy(&rwl->mutex);
123 stat1 = pthread_cond_destroy(&rwl->read);
124 stat2 = pthread_cond_destroy(&rwl->write);
125 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
129 * Handle cleanup when the read lock condition variable
132 static void devlock_read_release(void *arg)
134 devlock *rwl = (devlock *)arg;
138 void devlock::read_release()
141 pthread_mutex_unlock(&mutex);
145 * Handle cleanup when the write lock condition variable wait
148 static void devlock_write_release(void *arg)
150 devlock *rwl = (devlock *)arg;
151 rwl->write_release();
154 void devlock::write_release()
157 pthread_mutex_unlock(&mutex);
161 * Lock for read access, wait until locked (or error).
163 int devlock::readlock()
168 if (rwl->valid != DEVLOCK_VALID) {
171 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
175 rwl->r_wait++; /* indicate that we are waiting */
176 pthread_cleanup_push(devlock_read_release, (void *)rwl);
177 while (rwl->w_active) {
178 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
180 break; /* error, bail out */
183 pthread_cleanup_pop(0);
184 rwl->r_wait--; /* we are no longer waiting */
187 rwl->r_active++; /* we are running */
189 pthread_mutex_unlock(&rwl->mutex);
194 * Attempt to lock for read access, don't wait
196 int devlock::readtrylock()
201 if (rwl->valid != DEVLOCK_VALID) {
204 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
210 rwl->r_active++; /* we are running */
212 stat2 = pthread_mutex_unlock(&rwl->mutex);
213 return (stat == 0 ? stat2 : stat);
219 int devlock::readunlock()
224 if (rwl->valid != DEVLOCK_VALID) {
227 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
231 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
232 stat = pthread_cond_broadcast(&rwl->write);
234 stat2 = pthread_mutex_unlock(&rwl->mutex);
235 return (stat == 0 ? stat2 : stat);
240 * Lock for write access, wait until locked (or error).
241 * Multiple nested write locking is permitted.
243 int devlock::writelock(int areason, bool acan_take)
248 if (rwl->valid != DEVLOCK_VALID) {
251 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
254 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
256 pthread_mutex_unlock(&rwl->mutex);
259 lmgr_pre_lock(rwl, rwl->priority, __FILE__, __LINE__);
260 if (rwl->w_active || rwl->r_active > 0) {
261 rwl->w_wait++; /* indicate that we are waiting */
262 pthread_cleanup_push(devlock_write_release, (void *)rwl);
263 while (rwl->w_active || rwl->r_active > 0) {
264 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
266 break; /* error, bail out */
269 pthread_cleanup_pop(0);
270 rwl->w_wait--; /* we are no longer waiting */
273 rwl->w_active++; /* we are running */
274 rwl->writer_id = pthread_self(); /* save writer thread's id */
277 rwl->reason = areason;
278 rwl->can_take = acan_take;
279 pthread_mutex_unlock(&rwl->mutex);
284 * Attempt to lock for write access, don't wait
286 int devlock::writetrylock()
291 if (rwl->valid != DEVLOCK_VALID) {
294 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
297 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
299 pthread_mutex_unlock(&rwl->mutex);
302 if (rwl->w_active || rwl->r_active > 0) {
305 rwl->w_active = 1; /* we are running */
306 rwl->writer_id = pthread_self(); /* save writer thread's id */
307 lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
309 stat2 = pthread_mutex_unlock(&rwl->mutex);
310 return (stat == 0 ? stat2 : stat);
315 * Start any waiting writers in preference to waiting readers
317 int devlock::writeunlock()
322 if (rwl->valid != DEVLOCK_VALID) {
325 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
328 if (rwl->w_active <= 0) {
329 pthread_mutex_unlock(&rwl->mutex);
330 Jmsg0(NULL, M_ABORT, 0, _("writeunlock called too many times.\n"));
333 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
334 pthread_mutex_unlock(&rwl->mutex);
335 Jmsg0(NULL, M_ABORT, 0, _("writeunlock by non-owner.\n"));
337 if (rwl->w_active > 0) {
338 stat = 0; /* writers still active */
341 /* No more writers, awaken someone */
342 if (rwl->r_wait > 0) { /* if readers waiting */
343 stat = pthread_cond_broadcast(&rwl->read);
344 } else if (rwl->w_wait > 0) {
345 stat = pthread_cond_broadcast(&rwl->write);
348 stat2 = pthread_mutex_unlock(&rwl->mutex);
349 return (stat == 0 ? stat2 : stat);
352 int devlock::take_lock(take_lock_t *hold, int areason)
356 if (valid != DEVLOCK_VALID) {
359 if ((stat = pthread_mutex_lock(&mutex)) != 0) {
362 hold->reason = reason;
363 hold->prev_reason = prev_reason;
364 hold->writer_id = writer_id;
366 writer_id = pthread_self();
367 stat = pthread_mutex_unlock(&mutex);
371 int devlock::return_lock(take_lock_t *hold)
375 if (valid != DEVLOCK_VALID) {
378 if ((stat = pthread_mutex_lock(&mutex)) != 0) {
381 reason = hold->reason;
382 prev_reason = hold->prev_reason;
383 writer_id = hold->writer_id;
384 writer_id = pthread_self();
385 stat2 = pthread_mutex_unlock(&mutex);
386 if (w_active || w_wait) {
387 stat = pthread_cond_broadcast(&write);
389 return (stat == 0 ? stat2 : stat);
397 #define ITERATIONS 1000000
400 * Keep statics for each thread.
402 typedef struct thread_tag {
411 * Read/write lock and shared data.
413 typedef struct data_tag {
419 static thread_t threads[THREADS];
420 static data_t data[DATASIZE];
423 * Thread start routine that uses read/write locks.
425 void *thread_routine(void *arg)
427 thread_t *self = (thread_t *)arg;
433 for (iteration=0; iteration < ITERATIONS; iteration++) {
435 * Each "self->interval" iterations, perform an
436 * update operation (write lock instead of read
439 // if ((iteration % self->interval) == 0) {
440 status = writelock(&data[element].lock);
443 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
446 data[element].data = self->thread_num;
447 data[element].writes++;
449 status = writelock(&data[element].lock);
452 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
455 data[element].data = self->thread_num;
456 data[element].writes++;
458 status = writeunlock(&data[element].lock);
461 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
464 status = writeunlock(&data[element].lock);
467 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
474 * Look at the current data element to see whether
475 * the current thread last updated it. Count the
476 * times to report later.
478 status = readlock(&data[element].lock);
481 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
485 if (data[element].data == self->thread_num)
487 status = readunlock(&data[element].lock);
490 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
496 if (element >= DATASIZE) {
501 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
502 self->thread_num, repeats);
507 int main (int argc, char *argv[])
512 unsigned int seed = 1;
513 int thread_writes = 0;
518 * On Solaris 2.5, threads are not timesliced. To ensure
519 * that our threads can run concurrently, we need to
520 * increase the concurrency level to THREADS.
522 thr_setconcurrency (THREADS);
526 * Initialize the shared data.
528 for (data_count = 0; data_count < DATASIZE; data_count++) {
529 data[data_count].data = 0;
530 data[data_count].writes = 0;
531 status = rwl_init(&data[data_count].lock);
534 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
540 * Create THREADS threads to access shared data.
542 for (count = 0; count < THREADS; count++) {
543 threads[count].thread_num = count + 1;
544 threads[count].writes = 0;
545 threads[count].reads = 0;
546 threads[count].interval = rand_r(&seed) % 71;
547 if (threads[count].interval <= 0) {
548 threads[count].interval = 1;
550 status = pthread_create (&threads[count].thread_id,
551 NULL, thread_routine, (void*)&threads[count]);
552 if (status != 0 || (int)threads[count].thread_id == 0) {
554 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
560 * Wait for all threads to complete, and collect
563 for (count = 0; count < THREADS; count++) {
564 status = pthread_join (threads[count].thread_id, NULL);
567 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
570 thread_writes += threads[count].writes;
571 printf (_("%02d: interval %d, writes %d, reads %d\n"),
572 count, threads[count].interval,
573 threads[count].writes, threads[count].reads);
577 * Collect statistics for the data.
579 for (data_count = 0; data_count < DATASIZE; data_count++) {
580 data_writes += data[data_count].writes;
581 printf (_("data %02d: value %d, %d writes\n"),
582 data_count, data[data_count].data, data[data_count].writes);
583 rwl_destroy (&data[data_count].lock);
586 printf (_("Total: %d thread writes, %d data writes\n"),
587 thread_writes, data_writes);
593 #ifdef TEST_RW_TRY_LOCK
597 * Demonstrate use of non-blocking read-write locks.
599 * Special notes: On a Solaris system, call thr_setconcurrency()
600 * to allow interleaved thread execution, since threads are not
608 #define ITERATIONS 1000
612 * Keep statistics for each thread.
614 typedef struct thread_tag {
624 * Read-write lock and shared data
626 typedef struct data_tag {
632 thread_t threads[THREADS];
633 data_t data[DATASIZE];
636 * Thread start routine that uses read-write locks
638 void *thread_routine (void *arg)
640 thread_t *self = (thread_t*)arg;
645 element = 0; /* Current data element */
647 for (iteration = 0; iteration < ITERATIONS; iteration++) {
648 if ((iteration % self->interval) == 0) {
649 status = rwl_writetrylock (&data[element].lock);
651 self->w_collisions++;
652 else if (status == 0) {
653 data[element].data++;
654 data[element].updates++;
656 rwl_writeunlock (&data[element].lock);
658 err_abort (status, _("Try write lock"));
660 status = rwl_readtrylock (&data[element].lock);
662 self->r_collisions++;
663 else if (status != 0) {
664 err_abort (status, _("Try read lock"));
666 if (data[element].data != data[element].updates)
667 printf ("%d: data[%d] %d != %d\n",
668 self->thread_num, element,
669 data[element].data, data[element].updates);
670 rwl_readunlock (&data[element].lock);
675 if (element >= DATASIZE)
678 lmgr_cleanup_thread();
682 int main (int argc, char *argv[])
684 int count, data_count;
685 unsigned int seed = 1;
686 int thread_updates = 0, data_updates = 0;
691 * On Solaris 2.5, threads are not timesliced. To ensure
692 * that our threads can run concurrently, we need to
693 * increase the concurrency level to THREADS.
695 DPRINTF (("Setting concurrency level to %d\n", THREADS));
696 thr_setconcurrency (THREADS);
700 * Initialize the shared data.
702 for (data_count = 0; data_count < DATASIZE; data_count++) {
703 data[data_count].data = 0;
704 data[data_count].updates = 0;
705 rwl_init(&data[data_count].lock);
709 * Create THREADS threads to access shared data.
711 for (count = 0; count < THREADS; count++) {
712 threads[count].thread_num = count;
713 threads[count].r_collisions = 0;
714 threads[count].w_collisions = 0;
715 threads[count].updates = 0;
716 threads[count].interval = rand_r (&seed) % ITERATIONS;
717 status = pthread_create (&threads[count].thread_id,
718 NULL, thread_routine, (void*)&threads[count]);
720 err_abort (status, _("Create thread"));
724 * Wait for all threads to complete, and collect
727 for (count = 0; count < THREADS; count++) {
728 status = pthread_join (threads[count].thread_id, NULL);
730 err_abort (status, _("Join thread"));
731 thread_updates += threads[count].updates;
732 printf (_("%02d: interval %d, updates %d, "
733 "r_collisions %d, w_collisions %d\n"),
734 count, threads[count].interval,
735 threads[count].updates,
736 threads[count].r_collisions, threads[count].w_collisions);
740 * Collect statistics for the data.
742 for (data_count = 0; data_count < DATASIZE; data_count++) {
743 data_updates += data[data_count].updates;
744 printf (_("data %02d: value %d, %d updates\n"),
745 data_count, data[data_count].data, data[data_count].updates);
746 rwl_destroy (&data[data_count].lock);