2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2014 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from many
7 others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 Bacula® is a registered trademark of Kern Sibbald.
17 * Bacula Thread Read/Write locking code. It permits
18 * multiple readers but only one writer. Note, however,
19 * that the writer thread is permitted to make multiple
20 * nested write lock calls.
22 * Kern Sibbald, January MMI
24 * This code adapted from "Programming with POSIX Threads", by
29 #define _LOCKMGR_COMPLIANT
34 * Initialize a read/write lock
36 * Returns: 0 on success
40 devlock *new_devlock()
43 lock = (devlock *)malloc(sizeof (devlock));
44 memset(lock, 0, sizeof(devlock));
48 int devlock::init(int init_priority)
53 rwl->r_active = rwl->w_active = 0;
54 rwl->r_wait = rwl->w_wait = 0;
55 rwl->priority = init_priority;
56 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
59 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
60 pthread_mutex_destroy(&rwl->mutex);
63 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
64 pthread_cond_destroy(&rwl->read);
65 pthread_mutex_destroy(&rwl->mutex);
68 rwl->valid = DEVLOCK_VALID;
73 * Destroy a read/write lock
75 * Returns: 0 on success
78 int devlock::destroy()
81 int stat, stat1, stat2;
83 if (rwl->valid != DEVLOCK_VALID) {
86 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
91 * If any threads are active, report EBUSY
93 if (rwl->r_active > 0 || rwl->w_active) {
94 pthread_mutex_unlock(&rwl->mutex);
99 * If any threads are waiting, report EBUSY
101 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102 pthread_mutex_unlock(&rwl->mutex);
107 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
110 stat = pthread_mutex_destroy(&rwl->mutex);
111 stat1 = pthread_cond_destroy(&rwl->read);
112 stat2 = pthread_cond_destroy(&rwl->write);
113 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
117 * Handle cleanup when the read lock condition variable
120 static void devlock_read_release(void *arg)
122 devlock *rwl = (devlock *)arg;
126 void devlock::read_release()
129 pthread_mutex_unlock(&mutex);
133 * Handle cleanup when the write lock condition variable wait
136 static void devlock_write_release(void *arg)
138 devlock *rwl = (devlock *)arg;
139 rwl->write_release();
142 void devlock::write_release()
145 pthread_mutex_unlock(&mutex);
149 * Lock for read access, wait until locked (or error).
151 int devlock::readlock()
156 if (rwl->valid != DEVLOCK_VALID) {
159 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
163 rwl->r_wait++; /* indicate that we are waiting */
164 pthread_cleanup_push(devlock_read_release, (void *)rwl);
165 while (rwl->w_active) {
166 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
168 break; /* error, bail out */
171 pthread_cleanup_pop(0);
172 rwl->r_wait--; /* we are no longer waiting */
175 rwl->r_active++; /* we are running */
177 pthread_mutex_unlock(&rwl->mutex);
182 * Attempt to lock for read access, don't wait
184 int devlock::readtrylock()
189 if (rwl->valid != DEVLOCK_VALID) {
192 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
198 rwl->r_active++; /* we are running */
200 stat2 = pthread_mutex_unlock(&rwl->mutex);
201 return (stat == 0 ? stat2 : stat);
207 int devlock::readunlock()
212 if (rwl->valid != DEVLOCK_VALID) {
215 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
219 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
220 stat = pthread_cond_broadcast(&rwl->write);
222 stat2 = pthread_mutex_unlock(&rwl->mutex);
223 return (stat == 0 ? stat2 : stat);
228 * Lock for write access, wait until locked (or error).
229 * Multiple nested write locking is permitted.
231 int devlock::writelock(int areason, bool acan_take)
236 if (rwl->valid != DEVLOCK_VALID) {
239 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
242 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
244 pthread_mutex_unlock(&rwl->mutex);
247 lmgr_pre_lock(rwl, rwl->priority, __FILE__, __LINE__);
248 if (rwl->w_active || rwl->r_active > 0) {
249 rwl->w_wait++; /* indicate that we are waiting */
250 pthread_cleanup_push(devlock_write_release, (void *)rwl);
251 while (rwl->w_active || rwl->r_active > 0) {
252 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
254 break; /* error, bail out */
257 pthread_cleanup_pop(0);
258 rwl->w_wait--; /* we are no longer waiting */
261 rwl->w_active++; /* we are running */
262 rwl->writer_id = pthread_self(); /* save writer thread's id */
265 rwl->reason = areason;
266 rwl->can_take = acan_take;
267 pthread_mutex_unlock(&rwl->mutex);
272 * Attempt to lock for write access, don't wait
274 int devlock::writetrylock()
279 if (rwl->valid != DEVLOCK_VALID) {
282 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
285 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
287 pthread_mutex_unlock(&rwl->mutex);
290 if (rwl->w_active || rwl->r_active > 0) {
293 rwl->w_active = 1; /* we are running */
294 rwl->writer_id = pthread_self(); /* save writer thread's id */
295 lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
297 stat2 = pthread_mutex_unlock(&rwl->mutex);
298 return (stat == 0 ? stat2 : stat);
303 * Start any waiting writers in preference to waiting readers
305 int devlock::writeunlock()
310 if (rwl->valid != DEVLOCK_VALID) {
313 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
316 if (rwl->w_active <= 0) {
317 pthread_mutex_unlock(&rwl->mutex);
318 Jmsg0(NULL, M_ABORT, 0, _("writeunlock called too many times.\n"));
321 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
322 pthread_mutex_unlock(&rwl->mutex);
323 Jmsg0(NULL, M_ABORT, 0, _("writeunlock by non-owner.\n"));
325 if (rwl->w_active > 0) {
326 stat = 0; /* writers still active */
329 /* No more writers, awaken someone */
330 if (rwl->r_wait > 0) { /* if readers waiting */
331 stat = pthread_cond_broadcast(&rwl->read);
332 } else if (rwl->w_wait > 0) {
333 stat = pthread_cond_broadcast(&rwl->write);
336 stat2 = pthread_mutex_unlock(&rwl->mutex);
337 return (stat == 0 ? stat2 : stat);
340 int devlock::take_lock(take_lock_t *hold, int areason)
344 if (valid != DEVLOCK_VALID) {
347 if ((stat = pthread_mutex_lock(&mutex)) != 0) {
350 hold->reason = reason;
351 hold->prev_reason = prev_reason;
352 hold->writer_id = writer_id;
354 writer_id = pthread_self();
355 stat = pthread_mutex_unlock(&mutex);
359 int devlock::return_lock(take_lock_t *hold)
363 if (valid != DEVLOCK_VALID) {
366 if ((stat = pthread_mutex_lock(&mutex)) != 0) {
369 reason = hold->reason;
370 prev_reason = hold->prev_reason;
371 writer_id = hold->writer_id;
372 writer_id = pthread_self();
373 stat2 = pthread_mutex_unlock(&mutex);
374 if (w_active || w_wait) {
375 stat = pthread_cond_broadcast(&write);
377 return (stat == 0 ? stat2 : stat);
385 #define ITERATIONS 1000000
388 * Keep statics for each thread.
390 typedef struct thread_tag {
399 * Read/write lock and shared data.
401 typedef struct data_tag {
407 static thread_t threads[THREADS];
408 static data_t data[DATASIZE];
411 * Thread start routine that uses read/write locks.
413 void *thread_routine(void *arg)
415 thread_t *self = (thread_t *)arg;
421 for (iteration=0; iteration < ITERATIONS; iteration++) {
423 * Each "self->interval" iterations, perform an
424 * update operation (write lock instead of read
427 // if ((iteration % self->interval) == 0) {
428 status = writelock(&data[element].lock);
431 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
434 data[element].data = self->thread_num;
435 data[element].writes++;
437 status = writelock(&data[element].lock);
440 printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
443 data[element].data = self->thread_num;
444 data[element].writes++;
446 status = writeunlock(&data[element].lock);
449 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
452 status = writeunlock(&data[element].lock);
455 printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
462 * Look at the current data element to see whether
463 * the current thread last updated it. Count the
464 * times to report later.
466 status = readlock(&data[element].lock);
469 printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
473 if (data[element].data == self->thread_num)
475 status = readunlock(&data[element].lock);
478 printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
484 if (element >= DATASIZE) {
489 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
490 self->thread_num, repeats);
495 int main (int argc, char *argv[])
500 unsigned int seed = 1;
501 int thread_writes = 0;
504 #ifdef USE_THR_SETCONCURRENCY
506 * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
507 * that our threads can run concurrently, we need to
508 * increase the concurrency level to THREADS.
510 thr_setconcurrency (THREADS);
514 * Initialize the shared data.
516 for (data_count = 0; data_count < DATASIZE; data_count++) {
517 data[data_count].data = 0;
518 data[data_count].writes = 0;
519 status = rwl_init(&data[data_count].lock);
522 printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
528 * Create THREADS threads to access shared data.
530 for (count = 0; count < THREADS; count++) {
531 threads[count].thread_num = count + 1;
532 threads[count].writes = 0;
533 threads[count].reads = 0;
534 threads[count].interval = rand_r(&seed) % 71;
535 if (threads[count].interval <= 0) {
536 threads[count].interval = 1;
538 status = pthread_create (&threads[count].thread_id,
539 NULL, thread_routine, (void*)&threads[count]);
540 if (status != 0 || (int)threads[count].thread_id == 0) {
542 printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
548 * Wait for all threads to complete, and collect
551 for (count = 0; count < THREADS; count++) {
552 status = pthread_join (threads[count].thread_id, NULL);
555 printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
558 thread_writes += threads[count].writes;
559 printf (_("%02d: interval %d, writes %d, reads %d\n"),
560 count, threads[count].interval,
561 threads[count].writes, threads[count].reads);
565 * Collect statistics for the data.
567 for (data_count = 0; data_count < DATASIZE; data_count++) {
568 data_writes += data[data_count].writes;
569 printf (_("data %02d: value %d, %d writes\n"),
570 data_count, data[data_count].data, data[data_count].writes);
571 rwl_destroy (&data[data_count].lock);
574 printf (_("Total: %d thread writes, %d data writes\n"),
575 thread_writes, data_writes);
581 #ifdef TEST_RW_TRY_LOCK
585 * Demonstrate use of non-blocking read-write locks.
587 * Special notes: On older Solaris system, call thr_setconcurrency()
588 * to allow interleaved thread execution, since threads are not
596 #define ITERATIONS 1000
600 * Keep statistics for each thread.
602 typedef struct thread_tag {
612 * Read-write lock and shared data
614 typedef struct data_tag {
620 thread_t threads[THREADS];
621 data_t data[DATASIZE];
624 * Thread start routine that uses read-write locks
626 void *thread_routine (void *arg)
628 thread_t *self = (thread_t*)arg;
633 element = 0; /* Current data element */
635 for (iteration = 0; iteration < ITERATIONS; iteration++) {
636 if ((iteration % self->interval) == 0) {
637 status = rwl_writetrylock (&data[element].lock);
639 self->w_collisions++;
640 else if (status == 0) {
641 data[element].data++;
642 data[element].updates++;
644 rwl_writeunlock (&data[element].lock);
646 err_abort (status, _("Try write lock"));
648 status = rwl_readtrylock (&data[element].lock);
650 self->r_collisions++;
651 else if (status != 0) {
652 err_abort (status, _("Try read lock"));
654 if (data[element].data != data[element].updates)
655 printf ("%d: data[%d] %d != %d\n",
656 self->thread_num, element,
657 data[element].data, data[element].updates);
658 rwl_readunlock (&data[element].lock);
663 if (element >= DATASIZE)
666 lmgr_cleanup_thread();
670 int main (int argc, char *argv[])
672 int count, data_count;
673 unsigned int seed = 1;
674 int thread_updates = 0, data_updates = 0;
677 #ifdef USE_THR_SETCONCURRENCY
679 * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
680 * that our threads can run concurrently, we need to
681 * increase the concurrency level to THREADS.
683 DPRINTF (("Setting concurrency level to %d\n", THREADS));
684 thr_setconcurrency (THREADS);
688 * Initialize the shared data.
690 for (data_count = 0; data_count < DATASIZE; data_count++) {
691 data[data_count].data = 0;
692 data[data_count].updates = 0;
693 rwl_init(&data[data_count].lock);
697 * Create THREADS threads to access shared data.
699 for (count = 0; count < THREADS; count++) {
700 threads[count].thread_num = count;
701 threads[count].r_collisions = 0;
702 threads[count].w_collisions = 0;
703 threads[count].updates = 0;
704 threads[count].interval = rand_r (&seed) % ITERATIONS;
705 status = pthread_create (&threads[count].thread_id,
706 NULL, thread_routine, (void*)&threads[count]);
708 err_abort (status, _("Create thread"));
712 * Wait for all threads to complete, and collect
715 for (count = 0; count < THREADS; count++) {
716 status = pthread_join (threads[count].thread_id, NULL);
718 err_abort (status, _("Join thread"));
719 thread_updates += threads[count].updates;
720 printf (_("%02d: interval %d, updates %d, "
721 "r_collisions %d, w_collisions %d\n"),
722 count, threads[count].interval,
723 threads[count].updates,
724 threads[count].r_collisions, threads[count].w_collisions);
728 * Collect statistics for the data.
730 for (data_count = 0; data_count < DATASIZE; data_count++) {
731 data_updates += data[data_count].updates;
732 printf (_("data %02d: value %d, %d updates\n"),
733 data_count, data[data_count].data, data[data_count].updates);
734 rwl_destroy (&data[data_count].lock);