2 * Bacula Thread Read/Write locking code. It permits
3 * multiple readers but only one writer. Note, however,
4 * that the writer thread is permitted to make multiple
5 * nested write lock calls.
7 * Kern Sibbald, January MMI
11 * This code adapted from "Programming with POSIX Threads", by
16 Bacula® - The Network Backup Solution
18 Copyright (C) 2001-2006 Free Software Foundation Europe e.V.
20 The main author of Bacula is Kern Sibbald, with contributions from
21 many others, a complete list can be found in the file AUTHORS.
22 This program is Free Software; you can redistribute it and/or
23 modify it under the terms of version two of the GNU General Public
24 License as published by the Free Software Foundation plus additions
25 that are listed in the file LICENSE.
27 This program is distributed in the hope that it will be useful, but
28 WITHOUT ANY WARRANTY; without even the implied warranty of
29 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
30 General Public License for more details.
32 You should have received a copy of the GNU General Public License
33 along with this program; if not, write to the Free Software
34 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
37 Bacula® is a registered trademark of John Walker.
38 The licensor of Bacula is the Free Software Foundation Europe
39 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
40 Switzerland, email:ftf@fsfeurope.org.
46 * Initialize a read/write lock
48 * Returns: 0 on success
51 int rwl_init(brwlock_t *rwl)
55 rwl->r_active = rwl->w_active = 0;
56 rwl->r_wait = rwl->w_wait = 0;
57 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
60 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
61 pthread_mutex_destroy(&rwl->mutex);
64 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
65 pthread_cond_destroy(&rwl->read);
66 pthread_mutex_destroy(&rwl->mutex);
69 rwl->valid = RWLOCK_VALID;
74 * Destroy a read/write lock
76 * Returns: 0 on success
79 int rwl_destroy(brwlock_t *rwl)
81 int stat, stat1, stat2;
83 if (rwl->valid != RWLOCK_VALID) {
86 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
91 * If any threads are active, report EBUSY
93 if (rwl->r_active > 0 || rwl->w_active) {
94 pthread_mutex_unlock(&rwl->mutex);
99 * If any threads are waiting, report EBUSY
101 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102 pthread_mutex_unlock(&rwl->mutex);
107 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
110 stat = pthread_mutex_destroy(&rwl->mutex);
111 stat1 = pthread_cond_destroy(&rwl->read);
112 stat2 = pthread_cond_destroy(&rwl->write);
113 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
117 * Handle cleanup when the read lock condition variable
120 static void rwl_read_release(void *arg)
122 brwlock_t *rwl = (brwlock_t *)arg;
125 pthread_mutex_unlock(&rwl->mutex);
129 * Handle cleanup when the write lock condition variable wait
132 static void rwl_write_release(void *arg)
134 brwlock_t *rwl = (brwlock_t *)arg;
137 pthread_mutex_unlock(&rwl->mutex);
141 * Lock for read access, wait until locked (or error).
143 int rwl_readlock(brwlock_t *rwl)
147 if (rwl->valid != RWLOCK_VALID) {
150 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
154 rwl->r_wait++; /* indicate that we are waiting */
155 pthread_cleanup_push(rwl_read_release, (void *)rwl);
156 while (rwl->w_active) {
157 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
159 break; /* error, bail out */
162 pthread_cleanup_pop(0);
163 rwl->r_wait--; /* we are no longer waiting */
166 rwl->r_active++; /* we are running */
168 pthread_mutex_unlock(&rwl->mutex);
173 * Attempt to lock for read access, don't wait
175 int rwl_readtrylock(brwlock_t *rwl)
179 if (rwl->valid != RWLOCK_VALID) {
182 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
188 rwl->r_active++; /* we are running */
190 stat2 = pthread_mutex_unlock(&rwl->mutex);
191 return (stat == 0 ? stat2 : stat);
197 int rwl_readunlock(brwlock_t *rwl)
201 if (rwl->valid != RWLOCK_VALID) {
204 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
208 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
209 stat = pthread_cond_broadcast(&rwl->write);
211 stat2 = pthread_mutex_unlock(&rwl->mutex);
212 return (stat == 0 ? stat2 : stat);
217 * Lock for write access, wait until locked (or error).
218 * Multiple nested write locking is permitted.
220 int rwl_writelock(brwlock_t *rwl)
224 if (rwl->valid != RWLOCK_VALID) {
227 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
230 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
232 pthread_mutex_unlock(&rwl->mutex);
235 if (rwl->w_active || rwl->r_active > 0) {
236 rwl->w_wait++; /* indicate that we are waiting */
237 pthread_cleanup_push(rwl_write_release, (void *)rwl);
238 while (rwl->w_active || rwl->r_active > 0) {
239 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
240 break; /* error, bail out */
243 pthread_cleanup_pop(0);
244 rwl->w_wait--; /* we are no longer waiting */
247 rwl->w_active++; /* we are running */
248 rwl->writer_id = pthread_self(); /* save writer thread's id */
250 pthread_mutex_unlock(&rwl->mutex);
255 * Attempt to lock for write access, don't wait
257 int rwl_writetrylock(brwlock_t *rwl)
261 if (rwl->valid != RWLOCK_VALID) {
264 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
267 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
269 pthread_mutex_unlock(&rwl->mutex);
272 if (rwl->w_active || rwl->r_active > 0) {
275 rwl->w_active = 1; /* we are running */
276 rwl->writer_id = pthread_self(); /* save writer thread's id */
278 stat2 = pthread_mutex_unlock(&rwl->mutex);
279 return (stat == 0 ? stat2 : stat);
284 * Start any waiting writers in preference to waiting readers
286 int rwl_writeunlock(brwlock_t *rwl)
290 if (rwl->valid != RWLOCK_VALID) {
293 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
296 if (rwl->w_active <= 0) {
297 Emsg0(M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
300 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
301 Emsg0(M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
303 if (rwl->w_active > 0) {
304 stat = 0; /* writers still active */
306 /* No more writers, awaken someone */
307 if (rwl->r_wait > 0) { /* if readers waiting */
308 stat = pthread_cond_broadcast(&rwl->read);
309 } else if (rwl->w_wait > 0) {
310 stat = pthread_cond_broadcast(&rwl->write);
313 stat2 = pthread_mutex_unlock(&rwl->mutex);
314 return (stat == 0 ? stat2 : stat);
321 #define ITERATIONS 10000
324 * Keep statics for each thread.
326 typedef struct thread_tag {
335 * Read/write lock and shared data.
337 typedef struct data_tag {
343 thread_t threads[THREADS];
344 data_t data[DATASIZE];
347 * Thread start routine that uses read/write locks.
349 void *thread_routine(void *arg)
351 thread_t *self = (thread_t *)arg;
357 for (iteration=0; iteration < ITERATIONS; iteration++) {
359 * Each "self->interval" iterations, perform an
360 * update operation (write lock instead of read
363 if ((iteration % self->interval) == 0) {
364 status = rwl_writelock(&data[element].lock);
366 Emsg1(M_ABORT, 0, _("Write lock failed. ERR=%s\n"), strerror(status));
368 data[element].data = self->thread_num;
369 data[element].writes++;
371 status = rwl_writeunlock(&data[element].lock);
373 Emsg1(M_ABORT, 0, _("Write unlock failed. ERR=%s\n"), strerror(status));
377 * Look at the current data element to see whether
378 * the current thread last updated it. Count the
379 * times to report later.
381 status = rwl_readlock(&data[element].lock);
383 Emsg1(M_ABORT, 0, _("Read lock failed. ERR=%s\n"), strerror(status));
386 if (data[element].data == self->thread_num)
388 status = rwl_readunlock(&data[element].lock);
390 Emsg1(M_ABORT, 0, _("Read unlock failed. ERR=%s\n"), strerror(status));
394 if (element >= DATASIZE) {
399 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
400 self->thread_num, repeats);
405 int main (int argc, char *argv[])
410 unsigned int seed = 1;
411 int thread_writes = 0;
416 * On Solaris 2.5, threads are not timesliced. To ensure
417 * that our threads can run concurrently, we need to
418 * increase the concurrency level to THREADS.
420 thr_setconcurrency (THREADS);
424 * Initialize the shared data.
426 for (data_count = 0; data_count < DATASIZE; data_count++) {
427 data[data_count].data = 0;
428 data[data_count].writes = 0;
429 status = rwl_init (&data[data_count].lock);
431 Emsg1(M_ABORT, 0, _("Init rwlock failed. ERR=%s\n"), strerror(status));
436 * Create THREADS threads to access shared data.
438 for (count = 0; count < THREADS; count++) {
439 threads[count].thread_num = count + 1;
440 threads[count].writes = 0;
441 threads[count].reads = 0;
442 threads[count].interval = rand_r (&seed) % 71;
443 status = pthread_create (&threads[count].thread_id,
444 NULL, thread_routine, (void*)&threads[count]);
446 Emsg1(M_ABORT, 0, _("Create thread failed. ERR=%s\n"), strerror(status));
451 * Wait for all threads to complete, and collect
454 for (count = 0; count < THREADS; count++) {
455 status = pthread_join (threads[count].thread_id, NULL);
457 Emsg1(M_ABORT, 0, _("Join thread failed. ERR=%s\n"), strerror(status));
459 thread_writes += threads[count].writes;
460 printf (_("%02d: interval %d, writes %d, reads %d\n"),
461 count, threads[count].interval,
462 threads[count].writes, threads[count].reads);
466 * Collect statistics for the data.
468 for (data_count = 0; data_count < DATASIZE; data_count++) {
469 data_writes += data[data_count].writes;
470 printf (_("data %02d: value %d, %d writes\n"),
471 data_count, data[data_count].data, data[data_count].writes);
472 rwl_destroy (&data[data_count].lock);
475 printf (_("Total: %d thread writes, %d data writes\n"),
476 thread_writes, data_writes);
482 #ifdef TEST_RW_TRY_LOCK
486 * Demonstrate use of non-blocking read-write locks.
488 * Special notes: On a Solaris system, call thr_setconcurrency()
489 * to allow interleaved thread execution, since threads are not
497 #define ITERATIONS 1000
501 * Keep statistics for each thread.
503 typedef struct thread_tag {
513 * Read-write lock and shared data
515 typedef struct data_tag {
521 thread_t threads[THREADS];
522 data_t data[DATASIZE];
525 * Thread start routine that uses read-write locks
527 void *thread_routine (void *arg)
529 thread_t *self = (thread_t*)arg;
534 element = 0; /* Current data element */
536 for (iteration = 0; iteration < ITERATIONS; iteration++) {
537 if ((iteration % self->interval) == 0) {
538 status = rwl_writetrylock (&data[element].lock);
540 self->w_collisions++;
541 else if (status == 0) {
542 data[element].data++;
543 data[element].updates++;
545 rwl_writeunlock (&data[element].lock);
547 err_abort (status, _("Try write lock"));
549 status = rwl_readtrylock (&data[element].lock);
551 self->r_collisions++;
552 else if (status != 0) {
553 err_abort (status, _("Try read lock"));
555 if (data[element].data != data[element].updates)
556 printf ("%d: data[%d] %d != %d\n",
557 self->thread_num, element,
558 data[element].data, data[element].updates);
559 rwl_readunlock (&data[element].lock);
564 if (element >= DATASIZE)
570 int main (int argc, char *argv[])
572 int count, data_count;
573 unsigned int seed = 1;
574 int thread_updates = 0, data_updates = 0;
579 * On Solaris 2.5, threads are not timesliced. To ensure
580 * that our threads can run concurrently, we need to
581 * increase the concurrency level to THREADS.
583 DPRINTF (("Setting concurrency level to %d\n", THREADS));
584 thr_setconcurrency (THREADS);
588 * Initialize the shared data.
590 for (data_count = 0; data_count < DATASIZE; data_count++) {
591 data[data_count].data = 0;
592 data[data_count].updates = 0;
593 rwl_init (&data[data_count].lock);
597 * Create THREADS threads to access shared data.
599 for (count = 0; count < THREADS; count++) {
600 threads[count].thread_num = count;
601 threads[count].r_collisions = 0;
602 threads[count].w_collisions = 0;
603 threads[count].updates = 0;
604 threads[count].interval = rand_r (&seed) % ITERATIONS;
605 status = pthread_create (&threads[count].thread_id,
606 NULL, thread_routine, (void*)&threads[count]);
608 err_abort (status, _("Create thread"));
612 * Wait for all threads to complete, and collect
615 for (count = 0; count < THREADS; count++) {
616 status = pthread_join (threads[count].thread_id, NULL);
618 err_abort (status, _("Join thread"));
619 thread_updates += threads[count].updates;
620 printf (_("%02d: interval %d, updates %d, "
621 "r_collisions %d, w_collisions %d\n"),
622 count, threads[count].interval,
623 threads[count].updates,
624 threads[count].r_collisions, threads[count].w_collisions);
628 * Collect statistics for the data.
630 for (data_count = 0; data_count < DATASIZE; data_count++) {
631 data_updates += data[data_count].updates;
632 printf (_("data %02d: value %d, %d updates\n"),
633 data_count, data[data_count].data, data[data_count].updates);
634 rwl_destroy (&data[data_count].lock);