2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2010 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Thread Read/Write locking code. It permits
30 * multiple readers but only one writer. Note, however,
31 * that the writer thread is permitted to make multiple
32 * nested write lock calls.
34 * Kern Sibbald, January MMI
36 * This code adapted from "Programming with POSIX Threads", by
41 #define _LOCKMGR_COMPLIANT
46 * Initialize a read/write lock
48 * Returns: 0 on success
52 devlock *new_devlock()
55 lock = (devlock *)malloc(sizeof (devlock));
56 memset(lock, 0, sizeof(devlock));
60 int devlock::init(int priority)
65 rwl->r_active = rwl->w_active = 0;
66 rwl->r_wait = rwl->w_wait = 0;
67 rwl->priority = priority;
68 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
71 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
72 pthread_mutex_destroy(&rwl->mutex);
75 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
76 pthread_cond_destroy(&rwl->read);
77 pthread_mutex_destroy(&rwl->mutex);
80 rwl->valid = DEVLOCK_VALID;
85 * Destroy a read/write lock
87 * Returns: 0 on success
90 int devlock::destroy()
93 int stat, stat1, stat2;
95 if (rwl->valid != DEVLOCK_VALID) {
98 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
103 * If any threads are active, report EBUSY
105 if (rwl->r_active > 0 || rwl->w_active) {
106 pthread_mutex_unlock(&rwl->mutex);
111 * If any threads are waiting, report EBUSY
113 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
114 pthread_mutex_unlock(&rwl->mutex);
119 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
122 stat = pthread_mutex_destroy(&rwl->mutex);
123 stat1 = pthread_cond_destroy(&rwl->read);
124 stat2 = pthread_cond_destroy(&rwl->write);
125 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
129 * Handle cleanup when the read lock condition variable
132 static void devlock_read_release(void *arg)
134 devlock *rwl = (devlock *)arg;
138 void devlock::read_release()
141 pthread_mutex_unlock(&mutex);
145 * Handle cleanup when the write lock condition variable wait
148 static void devlock_write_release(void *arg)
150 devlock *rwl = (devlock *)arg;
151 rwl->write_release();
154 void devlock::write_release()
157 pthread_mutex_unlock(&mutex);
161 * Lock for read access, wait until locked (or error).
163 int devlock::readlock()
168 if (rwl->valid != DEVLOCK_VALID) {
171 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
175 rwl->r_wait++; /* indicate that we are waiting */
176 pthread_cleanup_push(devlock_read_release, (void *)rwl);
177 while (rwl->w_active) {
178 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
180 break; /* error, bail out */
183 pthread_cleanup_pop(0);
184 rwl->r_wait--; /* we are no longer waiting */
187 rwl->r_active++; /* we are running */
189 pthread_mutex_unlock(&rwl->mutex);
194 * Attempt to lock for read access, don't wait
196 int devlock::readtrylock()
201 if (rwl->valid != DEVLOCK_VALID) {
204 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
210 rwl->r_active++; /* we are running */
212 stat2 = pthread_mutex_unlock(&rwl->mutex);
213 return (stat == 0 ? stat2 : stat);
219 int devlock::readunlock()
224 if (rwl->valid != DEVLOCK_VALID) {
227 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
231 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
232 stat = pthread_cond_broadcast(&rwl->write);
234 stat2 = pthread_mutex_unlock(&rwl->mutex);
235 return (stat == 0 ? stat2 : stat);
240 * Lock for write access, wait until locked (or error).
241 * Multiple nested write locking is permitted.
243 int devlock::writelock(int areason, bool acan_steal)
248 if (rwl->valid != DEVLOCK_VALID) {
251 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
254 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
256 pthread_mutex_unlock(&rwl->mutex);
259 lmgr_pre_lock(rwl, rwl->priority, __FILE__, __LINE__);
260 if (rwl->w_active || rwl->r_active > 0) {
261 rwl->w_wait++; /* indicate that we are waiting */
262 pthread_cleanup_push(devlock_write_release, (void *)rwl);
263 while (rwl->w_active || rwl->r_active > 0) {
264 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
266 break; /* error, bail out */
269 pthread_cleanup_pop(0);
270 rwl->w_wait--; /* we are no longer waiting */
273 rwl->w_active++; /* we are running */
274 rwl->writer_id = pthread_self(); /* save writer thread's id */
277 rwl->reason = areason;
278 rwl->can_steal = acan_steal;
279 pthread_mutex_unlock(&rwl->mutex);
284 * Attempt to lock for write access, don't wait
286 int devlock::writetrylock()
291 if (rwl->valid != DEVLOCK_VALID) {
294 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
297 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
299 pthread_mutex_unlock(&rwl->mutex);
302 if (rwl->w_active || rwl->r_active > 0) {
305 rwl->w_active = 1; /* we are running */
306 rwl->writer_id = pthread_self(); /* save writer thread's id */
307 lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
309 stat2 = pthread_mutex_unlock(&rwl->mutex);
310 return (stat == 0 ? stat2 : stat);
315 * Start any waiting writers in preference to waiting readers
317 int devlock::writeunlock()
322 if (rwl->valid != DEVLOCK_VALID) {
325 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
328 if (rwl->w_active <= 0) {
329 pthread_mutex_unlock(&rwl->mutex);
330 Jmsg0(NULL, M_ABORT, 0, _("writeunlock called too many times.\n"));
333 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
334 pthread_mutex_unlock(&rwl->mutex);
335 Jmsg0(NULL, M_ABORT, 0, _("writeunlock by non-owner.\n"));
337 if (rwl->w_active > 0) {
338 stat = 0; /* writers still active */
341 /* No more writers, awaken someone */
342 if (rwl->r_wait > 0) { /* if readers waiting */
343 stat = pthread_cond_broadcast(&rwl->read);
344 } else if (rwl->w_wait > 0) {
345 stat = pthread_cond_broadcast(&rwl->write);
348 stat2 = pthread_mutex_unlock(&rwl->mutex);
349 return (stat == 0 ? stat2 : stat);
356 #define ITERATIONS 1000000
359 * Keep statics for each thread.
361 typedef struct thread_tag {
370 * Read/write lock and shared data.
372 typedef struct data_tag {
378 static thread_t threads[THREADS];
379 static data_t data[DATASIZE];
382 * Thread start routine that uses read/write locks.
384 void *thread_routine(void *arg)
386 thread_t *self = (thread_t *)arg;
392 for (iteration=0; iteration < ITERATIONS; iteration++) {
394 * Each "self->interval" iterations, perform an
395 * update operation (write lock instead of read
398 // if ((iteration % self->interval) == 0) {
399 status = writelock(&data[element].lock);
402 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
405 data[element].data = self->thread_num;
406 data[element].writes++;
408 status = writelock(&data[element].lock);
411 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
414 data[element].data = self->thread_num;
415 data[element].writes++;
417 status = writeunlock(&data[element].lock);
420 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
423 status = writeunlock(&data[element].lock);
426 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
433 * Look at the current data element to see whether
434 * the current thread last updated it. Count the
435 * times to report later.
437 status = readlock(&data[element].lock);
440 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
444 if (data[element].data == self->thread_num)
446 status = readunlock(&data[element].lock);
449 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
455 if (element >= DATASIZE) {
460 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
461 self->thread_num, repeats);
466 int main (int argc, char *argv[])
471 unsigned int seed = 1;
472 int thread_writes = 0;
477 * On Solaris 2.5, threads are not timesliced. To ensure
478 * that our threads can run concurrently, we need to
479 * increase the concurrency level to THREADS.
481 thr_setconcurrency (THREADS);
485 * Initialize the shared data.
487 for (data_count = 0; data_count < DATASIZE; data_count++) {
488 data[data_count].data = 0;
489 data[data_count].writes = 0;
490 status = rwl_init(&data[data_count].lock);
493 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
499 * Create THREADS threads to access shared data.
501 for (count = 0; count < THREADS; count++) {
502 threads[count].thread_num = count + 1;
503 threads[count].writes = 0;
504 threads[count].reads = 0;
505 threads[count].interval = rand_r(&seed) % 71;
506 if (threads[count].interval <= 0) {
507 threads[count].interval = 1;
509 status = pthread_create (&threads[count].thread_id,
510 NULL, thread_routine, (void*)&threads[count]);
511 if (status != 0 || (int)threads[count].thread_id == 0) {
513 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
519 * Wait for all threads to complete, and collect
522 for (count = 0; count < THREADS; count++) {
523 status = pthread_join (threads[count].thread_id, NULL);
526 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
529 thread_writes += threads[count].writes;
530 printf (_("%02d: interval %d, writes %d, reads %d\n"),
531 count, threads[count].interval,
532 threads[count].writes, threads[count].reads);
536 * Collect statistics for the data.
538 for (data_count = 0; data_count < DATASIZE; data_count++) {
539 data_writes += data[data_count].writes;
540 printf (_("data %02d: value %d, %d writes\n"),
541 data_count, data[data_count].data, data[data_count].writes);
542 rwl_destroy (&data[data_count].lock);
545 printf (_("Total: %d thread writes, %d data writes\n"),
546 thread_writes, data_writes);
552 #ifdef TEST_RW_TRY_LOCK
556 * Demonstrate use of non-blocking read-write locks.
558 * Special notes: On a Solaris system, call thr_setconcurrency()
559 * to allow interleaved thread execution, since threads are not
567 #define ITERATIONS 1000
571 * Keep statistics for each thread.
573 typedef struct thread_tag {
583 * Read-write lock and shared data
585 typedef struct data_tag {
591 thread_t threads[THREADS];
592 data_t data[DATASIZE];
595 * Thread start routine that uses read-write locks
597 void *thread_routine (void *arg)
599 thread_t *self = (thread_t*)arg;
604 element = 0; /* Current data element */
606 for (iteration = 0; iteration < ITERATIONS; iteration++) {
607 if ((iteration % self->interval) == 0) {
608 status = rwl_writetrylock (&data[element].lock);
610 self->w_collisions++;
611 else if (status == 0) {
612 data[element].data++;
613 data[element].updates++;
615 rwl_writeunlock (&data[element].lock);
617 err_abort (status, _("Try write lock"));
619 status = rwl_readtrylock (&data[element].lock);
621 self->r_collisions++;
622 else if (status != 0) {
623 err_abort (status, _("Try read lock"));
625 if (data[element].data != data[element].updates)
626 printf ("%d: data[%d] %d != %d\n",
627 self->thread_num, element,
628 data[element].data, data[element].updates);
629 rwl_readunlock (&data[element].lock);
634 if (element >= DATASIZE)
637 lmgr_cleanup_thread();
641 int main (int argc, char *argv[])
643 int count, data_count;
644 unsigned int seed = 1;
645 int thread_updates = 0, data_updates = 0;
650 * On Solaris 2.5, threads are not timesliced. To ensure
651 * that our threads can run concurrently, we need to
652 * increase the concurrency level to THREADS.
654 DPRINTF (("Setting concurrency level to %d\n", THREADS));
655 thr_setconcurrency (THREADS);
659 * Initialize the shared data.
661 for (data_count = 0; data_count < DATASIZE; data_count++) {
662 data[data_count].data = 0;
663 data[data_count].updates = 0;
664 rwl_init(&data[data_count].lock);
668 * Create THREADS threads to access shared data.
670 for (count = 0; count < THREADS; count++) {
671 threads[count].thread_num = count;
672 threads[count].r_collisions = 0;
673 threads[count].w_collisions = 0;
674 threads[count].updates = 0;
675 threads[count].interval = rand_r (&seed) % ITERATIONS;
676 status = pthread_create (&threads[count].thread_id,
677 NULL, thread_routine, (void*)&threads[count]);
679 err_abort (status, _("Create thread"));
683 * Wait for all threads to complete, and collect
686 for (count = 0; count < THREADS; count++) {
687 status = pthread_join (threads[count].thread_id, NULL);
689 err_abort (status, _("Join thread"));
690 thread_updates += threads[count].updates;
691 printf (_("%02d: interval %d, updates %d, "
692 "r_collisions %d, w_collisions %d\n"),
693 count, threads[count].interval,
694 threads[count].updates,
695 threads[count].r_collisions, threads[count].w_collisions);
699 * Collect statistics for the data.
701 for (data_count = 0; data_count < DATASIZE; data_count++) {
702 data_updates += data[data_count].updates;
703 printf (_("data %02d: value %d, %d updates\n"),
704 data_count, data[data_count].data, data[data_count].updates);
705 rwl_destroy (&data[data_count].lock);