2 * Bacula Thread Read/Write locking code. It permits
3 * multiple readers but only one writer. Note, however,
4 * that the writer thread is permitted to make multiple
5 * nested write lock calls.
7 * Kern Sibbald, January MMI
11 * This code adapted from "Programming with POSIX Threads", by
16 Copyright (C) 2001-2006 Kern Sibbald
18 This program is free software; you can redistribute it and/or
19 modify it under the terms of the GNU General Public License
20 version 2 as amended with additional clauses defined in the
21 file LICENSE in the main source directory.
23 This program is distributed in the hope that it will be useful,
24 but WITHOUT ANY WARRANTY; without even the implied warranty of
25 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 the file LICENSE for additional details.
33 * Initialize a read/write lock
35 * Returns: 0 on success
38 int rwl_init(brwlock_t *rwl)
42 rwl->r_active = rwl->w_active = 0;
43 rwl->r_wait = rwl->w_wait = 0;
44 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
47 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
48 pthread_mutex_destroy(&rwl->mutex);
51 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
52 pthread_cond_destroy(&rwl->read);
53 pthread_mutex_destroy(&rwl->mutex);
56 rwl->valid = RWLOCK_VALID;
61 * Destroy a read/write lock
63 * Returns: 0 on success
66 int rwl_destroy(brwlock_t *rwl)
68 int stat, stat1, stat2;
70 if (rwl->valid != RWLOCK_VALID) {
73 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
78 * If any threads are active, report EBUSY
80 if (rwl->r_active > 0 || rwl->w_active) {
81 pthread_mutex_unlock(&rwl->mutex);
86 * If any threads are waiting, report EBUSY
88 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
89 pthread_mutex_unlock(&rwl->mutex);
94 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
97 stat = pthread_mutex_destroy(&rwl->mutex);
98 stat1 = pthread_cond_destroy(&rwl->read);
99 stat2 = pthread_cond_destroy(&rwl->write);
100 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
104 * Handle cleanup when the read lock condition variable
107 static void rwl_read_release(void *arg)
109 brwlock_t *rwl = (brwlock_t *)arg;
112 pthread_mutex_unlock(&rwl->mutex);
116 * Handle cleanup when the write lock condition variable wait
119 static void rwl_write_release(void *arg)
121 brwlock_t *rwl = (brwlock_t *)arg;
124 pthread_mutex_unlock(&rwl->mutex);
128 * Lock for read access, wait until locked (or error).
130 int rwl_readlock(brwlock_t *rwl)
134 if (rwl->valid != RWLOCK_VALID) {
137 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
141 rwl->r_wait++; /* indicate that we are waiting */
142 pthread_cleanup_push(rwl_read_release, (void *)rwl);
143 while (rwl->w_active) {
144 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
146 break; /* error, bail out */
149 pthread_cleanup_pop(0);
150 rwl->r_wait--; /* we are no longer waiting */
153 rwl->r_active++; /* we are running */
155 pthread_mutex_unlock(&rwl->mutex);
160 * Attempt to lock for read access, don't wait
162 int rwl_readtrylock(brwlock_t *rwl)
166 if (rwl->valid != RWLOCK_VALID) {
169 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
175 rwl->r_active++; /* we are running */
177 stat2 = pthread_mutex_unlock(&rwl->mutex);
178 return (stat == 0 ? stat2 : stat);
184 int rwl_readunlock(brwlock_t *rwl)
188 if (rwl->valid != RWLOCK_VALID) {
191 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
195 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
196 stat = pthread_cond_broadcast(&rwl->write);
198 stat2 = pthread_mutex_unlock(&rwl->mutex);
199 return (stat == 0 ? stat2 : stat);
204 * Lock for write access, wait until locked (or error).
205 * Multiple nested write locking is permitted.
207 int rwl_writelock(brwlock_t *rwl)
211 if (rwl->valid != RWLOCK_VALID) {
214 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
217 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
219 pthread_mutex_unlock(&rwl->mutex);
222 if (rwl->w_active || rwl->r_active > 0) {
223 rwl->w_wait++; /* indicate that we are waiting */
224 pthread_cleanup_push(rwl_write_release, (void *)rwl);
225 while (rwl->w_active || rwl->r_active > 0) {
226 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
227 break; /* error, bail out */
230 pthread_cleanup_pop(0);
231 rwl->w_wait--; /* we are no longer waiting */
234 rwl->w_active++; /* we are running */
235 rwl->writer_id = pthread_self(); /* save writer thread's id */
237 pthread_mutex_unlock(&rwl->mutex);
242 * Attempt to lock for write access, don't wait
244 int rwl_writetrylock(brwlock_t *rwl)
248 if (rwl->valid != RWLOCK_VALID) {
251 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
254 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
256 pthread_mutex_unlock(&rwl->mutex);
259 if (rwl->w_active || rwl->r_active > 0) {
262 rwl->w_active = 1; /* we are running */
263 rwl->writer_id = pthread_self(); /* save writer thread's id */
265 stat2 = pthread_mutex_unlock(&rwl->mutex);
266 return (stat == 0 ? stat2 : stat);
271 * Start any waiting writers in preference to waiting readers
273 int rwl_writeunlock(brwlock_t *rwl)
277 if (rwl->valid != RWLOCK_VALID) {
280 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
283 if (rwl->w_active <= 0) {
284 Emsg0(M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
287 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
288 Emsg0(M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
290 if (rwl->w_active > 0) {
291 stat = 0; /* writers still active */
293 /* No more writers, awaken someone */
294 if (rwl->r_wait > 0) { /* if readers waiting */
295 stat = pthread_cond_broadcast(&rwl->read);
296 } else if (rwl->w_wait > 0) {
297 stat = pthread_cond_broadcast(&rwl->write);
300 stat2 = pthread_mutex_unlock(&rwl->mutex);
301 return (stat == 0 ? stat2 : stat);
308 #define ITERATIONS 10000
311 * Keep statics for each thread.
313 typedef struct thread_tag {
322 * Read/write lock and shared data.
324 typedef struct data_tag {
330 thread_t threads[THREADS];
331 data_t data[DATASIZE];
334 * Thread start routine that uses read/write locks.
336 void *thread_routine(void *arg)
338 thread_t *self = (thread_t *)arg;
344 for (iteration=0; iteration < ITERATIONS; iteration++) {
346 * Each "self->interval" iterations, perform an
347 * update operation (write lock instead of read
350 if ((iteration % self->interval) == 0) {
351 status = rwl_writelock(&data[element].lock);
353 Emsg1(M_ABORT, 0, _("Write lock failed. ERR=%s\n"), strerror(status));
355 data[element].data = self->thread_num;
356 data[element].writes++;
358 status = rwl_writeunlock(&data[element].lock);
360 Emsg1(M_ABORT, 0, _("Write unlock failed. ERR=%s\n"), strerror(status));
364 * Look at the current data element to see whether
365 * the current thread last updated it. Count the
366 * times to report later.
368 status = rwl_readlock(&data[element].lock);
370 Emsg1(M_ABORT, 0, _("Read lock failed. ERR=%s\n"), strerror(status));
373 if (data[element].data == self->thread_num)
375 status = rwl_readunlock(&data[element].lock);
377 Emsg1(M_ABORT, 0, _("Read unlock failed. ERR=%s\n"), strerror(status));
381 if (element >= DATASIZE) {
386 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
387 self->thread_num, repeats);
392 int main (int argc, char *argv[])
397 unsigned int seed = 1;
398 int thread_writes = 0;
403 * On Solaris 2.5, threads are not timesliced. To ensure
404 * that our threads can run concurrently, we need to
405 * increase the concurrency level to THREADS.
407 thr_setconcurrency (THREADS);
411 * Initialize the shared data.
413 for (data_count = 0; data_count < DATASIZE; data_count++) {
414 data[data_count].data = 0;
415 data[data_count].writes = 0;
416 status = rwl_init (&data[data_count].lock);
418 Emsg1(M_ABORT, 0, _("Init rwlock failed. ERR=%s\n"), strerror(status));
423 * Create THREADS threads to access shared data.
425 for (count = 0; count < THREADS; count++) {
426 threads[count].thread_num = count + 1;
427 threads[count].writes = 0;
428 threads[count].reads = 0;
429 threads[count].interval = rand_r (&seed) % 71;
430 status = pthread_create (&threads[count].thread_id,
431 NULL, thread_routine, (void*)&threads[count]);
433 Emsg1(M_ABORT, 0, _("Create thread failed. ERR=%s\n"), strerror(status));
438 * Wait for all threads to complete, and collect
441 for (count = 0; count < THREADS; count++) {
442 status = pthread_join (threads[count].thread_id, NULL);
444 Emsg1(M_ABORT, 0, _("Join thread failed. ERR=%s\n"), strerror(status));
446 thread_writes += threads[count].writes;
447 printf (_("%02d: interval %d, writes %d, reads %d\n"),
448 count, threads[count].interval,
449 threads[count].writes, threads[count].reads);
453 * Collect statistics for the data.
455 for (data_count = 0; data_count < DATASIZE; data_count++) {
456 data_writes += data[data_count].writes;
457 printf (_("data %02d: value %d, %d writes\n"),
458 data_count, data[data_count].data, data[data_count].writes);
459 rwl_destroy (&data[data_count].lock);
462 printf (_("Total: %d thread writes, %d data writes\n"),
463 thread_writes, data_writes);
469 #ifdef TEST_RW_TRY_LOCK
473 * Demonstrate use of non-blocking read-write locks.
475 * Special notes: On a Solaris system, call thr_setconcurrency()
476 * to allow interleaved thread execution, since threads are not
484 #define ITERATIONS 1000
488 * Keep statistics for each thread.
490 typedef struct thread_tag {
500 * Read-write lock and shared data
502 typedef struct data_tag {
508 thread_t threads[THREADS];
509 data_t data[DATASIZE];
512 * Thread start routine that uses read-write locks
514 void *thread_routine (void *arg)
516 thread_t *self = (thread_t*)arg;
521 element = 0; /* Current data element */
523 for (iteration = 0; iteration < ITERATIONS; iteration++) {
524 if ((iteration % self->interval) == 0) {
525 status = rwl_writetrylock (&data[element].lock);
527 self->w_collisions++;
528 else if (status == 0) {
529 data[element].data++;
530 data[element].updates++;
532 rwl_writeunlock (&data[element].lock);
534 err_abort (status, _("Try write lock"));
536 status = rwl_readtrylock (&data[element].lock);
538 self->r_collisions++;
539 else if (status != 0) {
540 err_abort (status, _("Try read lock"));
542 if (data[element].data != data[element].updates)
543 printf ("%d: data[%d] %d != %d\n",
544 self->thread_num, element,
545 data[element].data, data[element].updates);
546 rwl_readunlock (&data[element].lock);
551 if (element >= DATASIZE)
557 int main (int argc, char *argv[])
559 int count, data_count;
560 unsigned int seed = 1;
561 int thread_updates = 0, data_updates = 0;
566 * On Solaris 2.5, threads are not timesliced. To ensure
567 * that our threads can run concurrently, we need to
568 * increase the concurrency level to THREADS.
570 DPRINTF (("Setting concurrency level to %d\n", THREADS));
571 thr_setconcurrency (THREADS);
575 * Initialize the shared data.
577 for (data_count = 0; data_count < DATASIZE; data_count++) {
578 data[data_count].data = 0;
579 data[data_count].updates = 0;
580 rwl_init (&data[data_count].lock);
584 * Create THREADS threads to access shared data.
586 for (count = 0; count < THREADS; count++) {
587 threads[count].thread_num = count;
588 threads[count].r_collisions = 0;
589 threads[count].w_collisions = 0;
590 threads[count].updates = 0;
591 threads[count].interval = rand_r (&seed) % ITERATIONS;
592 status = pthread_create (&threads[count].thread_id,
593 NULL, thread_routine, (void*)&threads[count]);
595 err_abort (status, _("Create thread"));
599 * Wait for all threads to complete, and collect
602 for (count = 0; count < THREADS; count++) {
603 status = pthread_join (threads[count].thread_id, NULL);
605 err_abort (status, _("Join thread"));
606 thread_updates += threads[count].updates;
607 printf (_("%02d: interval %d, updates %d, "
608 "r_collisions %d, w_collisions %d\n"),
609 count, threads[count].interval,
610 threads[count].updates,
611 threads[count].r_collisions, threads[count].w_collisions);
615 * Collect statistics for the data.
617 for (data_count = 0; data_count < DATASIZE; data_count++) {
618 data_updates += data[data_count].updates;
619 printf (_("data %02d: value %d, %d updates\n"),
620 data_count, data[data_count].data, data[data_count].updates);
621 rwl_destroy (&data[data_count].lock);