2 * Bacula Thread Read/Write locking code. It permits
3 * multiple readers but only one writer. Note, however,
4 * that the writer thread is permitted to make multiple
5 * nested write lock calls.
7 * Kern Sibbald, January MMI
11 * This code adapted from "Programming with POSIX Threads", by
16 Copyright (C) 2000-2004 Kern Sibbald and John Walker
18 This program is free software; you can redistribute it and/or
19 modify it under the terms of the GNU General Public License as
20 published by the Free Software Foundation; either version 2 of
21 the License, or (at your option) any later version.
23 This program is distributed in the hope that it will be useful,
24 but WITHOUT ANY WARRANTY; without even the implied warranty of
25 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26 General Public License for more details.
28 You should have received a copy of the GNU General Public
29 License along with this program; if not, write to the Free
30 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
38 * Initialize a read/write lock
40 * Returns: 0 on success
43 int rwl_init(brwlock_t *rwl)
47 rwl->r_active = rwl->w_active = 0;
48 rwl->r_wait = rwl->w_wait = 0;
49 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
52 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
53 pthread_mutex_destroy(&rwl->mutex);
56 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
57 pthread_cond_destroy(&rwl->read);
58 pthread_mutex_destroy(&rwl->mutex);
61 rwl->valid = RWLOCK_VALID;
66 * Destroy a read/write lock
68 * Returns: 0 on success
71 int rwl_destroy(brwlock_t *rwl)
73 int stat, stat1, stat2;
75 if (rwl->valid != RWLOCK_VALID) {
78 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
83 * If any threads are active, report EBUSY
85 if (rwl->r_active > 0 || rwl->w_active) {
86 pthread_mutex_unlock(&rwl->mutex);
91 * If any threads are waiting, report EBUSY
93 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
94 pthread_mutex_unlock(&rwl->mutex);
99 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
102 stat = pthread_mutex_destroy(&rwl->mutex);
103 stat1 = pthread_cond_destroy(&rwl->read);
104 stat2 = pthread_cond_destroy(&rwl->write);
105 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
109 * Handle cleanup when the read lock condition variable
112 static void rwl_read_release(void *arg)
114 brwlock_t *rwl = (brwlock_t *)arg;
117 pthread_mutex_unlock(&rwl->mutex);
121 * Handle cleanup when the write lock condition variable wait
124 static void rwl_write_release(void *arg)
126 brwlock_t *rwl = (brwlock_t *)arg;
129 pthread_mutex_unlock(&rwl->mutex);
133 * Lock for read access, wait until locked (or error).
135 int rwl_readlock(brwlock_t *rwl)
139 if (rwl->valid != RWLOCK_VALID) {
142 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
146 rwl->r_wait++; /* indicate that we are waiting */
147 pthread_cleanup_push(rwl_read_release, (void *)rwl);
148 while (rwl->w_active) {
149 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
151 break; /* error, bail out */
154 pthread_cleanup_pop(0);
155 rwl->r_wait--; /* we are no longer waiting */
158 rwl->r_active++; /* we are running */
160 pthread_mutex_unlock(&rwl->mutex);
165 * Attempt to lock for read access, don't wait
167 int rwl_readtrylock(brwlock_t *rwl)
171 if (rwl->valid != RWLOCK_VALID) {
174 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
180 rwl->r_active++; /* we are running */
182 stat2 = pthread_mutex_unlock(&rwl->mutex);
183 return (stat == 0 ? stat2 : stat);
189 int rwl_readunlock(brwlock_t *rwl)
193 if (rwl->valid != RWLOCK_VALID) {
196 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
200 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
201 stat = pthread_cond_broadcast(&rwl->write);
203 stat2 = pthread_mutex_unlock(&rwl->mutex);
204 return (stat == 0 ? stat2 : stat);
209 * Lock for write access, wait until locked (or error).
210 * Multiple nested write locking is permitted.
212 int rwl_writelock(brwlock_t *rwl)
216 if (rwl->valid != RWLOCK_VALID) {
219 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
222 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
224 pthread_mutex_unlock(&rwl->mutex);
227 if (rwl->w_active || rwl->r_active > 0) {
228 rwl->w_wait++; /* indicate that we are waiting */
229 pthread_cleanup_push(rwl_write_release, (void *)rwl);
230 while (rwl->w_active || rwl->r_active > 0) {
231 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
232 break; /* error, bail out */
235 pthread_cleanup_pop(0);
236 rwl->w_wait--; /* we are no longer waiting */
239 rwl->w_active++; /* we are running */
240 rwl->writer_id = pthread_self(); /* save writer thread's id */
242 pthread_mutex_unlock(&rwl->mutex);
247 * Attempt to lock for write access, don't wait
249 int rwl_writetrylock(brwlock_t *rwl)
253 if (rwl->valid != RWLOCK_VALID) {
256 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
259 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
261 pthread_mutex_unlock(&rwl->mutex);
264 if (rwl->w_active || rwl->r_active > 0) {
267 rwl->w_active = 1; /* we are running */
268 rwl->writer_id = pthread_self(); /* save writer thread's id */
270 stat2 = pthread_mutex_unlock(&rwl->mutex);
271 return (stat == 0 ? stat2 : stat);
276 * Start any waiting writers in preference to waiting readers
278 int rwl_writeunlock(brwlock_t *rwl)
282 if (rwl->valid != RWLOCK_VALID) {
285 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
288 if (rwl->w_active <= 0) {
289 Emsg0(M_ABORT, 0, "rwl_writeunlock called too many times.\n");
292 if (!pthread_equal(pthread_self(), rwl->writer_id)) {
293 Emsg0(M_ABORT, 0, "rwl_writeunlock by non-owner.\n");
295 if (rwl->w_active > 0) {
296 stat = 0; /* writers still active */
298 /* No more writers, awaken someone */
299 if (rwl->r_wait > 0) { /* if readers waiting */
300 stat = pthread_cond_broadcast(&rwl->read);
301 } else if (rwl->w_wait > 0) {
302 stat = pthread_cond_broadcast(&rwl->write);
305 stat2 = pthread_mutex_unlock(&rwl->mutex);
306 return (stat == 0 ? stat2 : stat);
313 #define ITERATIONS 10000
316 * Keep statics for each thread.
318 typedef struct thread_tag {
327 * Read/write lock and shared data.
329 typedef struct data_tag {
335 thread_t threads[THREADS];
336 data_t data[DATASIZE];
339 * Thread start routine that uses read/write locks.
341 void *thread_routine(void *arg)
343 thread_t *self = (thread_t *)arg;
349 for (iteration=0; iteration < ITERATIONS; iteration++) {
351 * Each "self->interval" iterations, perform an
352 * update operation (write lock instead of read
355 if ((iteration % self->interval) == 0) {
356 status = rwl_writelock(&data[element].lock);
358 Emsg1(M_ABORT, 0, "Write lock failed. ERR=%s\n", strerror(status));
360 data[element].data = self->thread_num;
361 data[element].writes++;
363 status = rwl_writeunlock(&data[element].lock);
365 Emsg1(M_ABORT, 0, "Write unlock failed. ERR=%s\n", strerror(status));
369 * Look at the current data element to see whether
370 * the current thread last updated it. Count the
371 * times to report later.
373 status = rwl_readlock(&data[element].lock);
375 Emsg1(M_ABORT, 0, "Read lock failed. ERR=%s\n", strerror(status));
378 if (data[element].data == self->thread_num)
380 status = rwl_readunlock(&data[element].lock);
382 Emsg1(M_ABORT, 0, "Read unlock failed. ERR=%s\n", strerror(status));
386 if (element >= DATASIZE) {
391 Pmsg2(000, "Thread %d found unchanged elements %d times\n",
392 self->thread_num, repeats);
397 int main (int argc, char *argv[])
402 unsigned int seed = 1;
403 int thread_writes = 0;
408 * On Solaris 2.5, threads are not timesliced. To ensure
409 * that our threads can run concurrently, we need to
410 * increase the concurrency level to THREADS.
412 thr_setconcurrency (THREADS);
416 * Initialize the shared data.
418 for (data_count = 0; data_count < DATASIZE; data_count++) {
419 data[data_count].data = 0;
420 data[data_count].writes = 0;
421 status = rwl_init (&data[data_count].lock);
423 Emsg1(M_ABORT, 0, "Init rwlock failed. ERR=%s\n", strerror(status));
428 * Create THREADS threads to access shared data.
430 for (count = 0; count < THREADS; count++) {
431 threads[count].thread_num = count + 1;
432 threads[count].writes = 0;
433 threads[count].reads = 0;
434 threads[count].interval = rand_r (&seed) % 71;
435 status = pthread_create (&threads[count].thread_id,
436 NULL, thread_routine, (void*)&threads[count]);
438 Emsg1(M_ABORT, 0, "Create thread failed. ERR=%s\n", strerror(status));
443 * Wait for all threads to complete, and collect
446 for (count = 0; count < THREADS; count++) {
447 status = pthread_join (threads[count].thread_id, NULL);
449 Emsg1(M_ABORT, 0, "Join thread failed. ERR=%s\n", strerror(status));
451 thread_writes += threads[count].writes;
452 printf ("%02d: interval %d, writes %d, reads %d\n",
453 count, threads[count].interval,
454 threads[count].writes, threads[count].reads);
458 * Collect statistics for the data.
460 for (data_count = 0; data_count < DATASIZE; data_count++) {
461 data_writes += data[data_count].writes;
462 printf ("data %02d: value %d, %d writes\n",
463 data_count, data[data_count].data, data[data_count].writes);
464 rwl_destroy (&data[data_count].lock);
467 printf ("Total: %d thread writes, %d data writes\n",
468 thread_writes, data_writes);
474 #ifdef TEST_RW_TRY_LOCK
478 * Demonstrate use of non-blocking read-write locks.
480 * Special notes: On a Solaris system, call thr_setconcurrency()
481 * to allow interleaved thread execution, since threads are not
489 #define ITERATIONS 1000
493 * Keep statistics for each thread.
495 typedef struct thread_tag {
505 * Read-write lock and shared data
507 typedef struct data_tag {
513 thread_t threads[THREADS];
514 data_t data[DATASIZE];
517 * Thread start routine that uses read-write locks
519 void *thread_routine (void *arg)
521 thread_t *self = (thread_t*)arg;
526 element = 0; /* Current data element */
528 for (iteration = 0; iteration < ITERATIONS; iteration++) {
529 if ((iteration % self->interval) == 0) {
530 status = rwl_writetrylock (&data[element].lock);
532 self->w_collisions++;
533 else if (status == 0) {
534 data[element].data++;
535 data[element].updates++;
537 rwl_writeunlock (&data[element].lock);
539 err_abort (status, "Try write lock");
541 status = rwl_readtrylock (&data[element].lock);
543 self->r_collisions++;
544 else if (status != 0) {
545 err_abort (status, "Try read lock");
547 if (data[element].data != data[element].updates)
548 printf ("%d: data[%d] %d != %d\n",
549 self->thread_num, element,
550 data[element].data, data[element].updates);
551 rwl_readunlock (&data[element].lock);
556 if (element >= DATASIZE)
562 int main (int argc, char *argv[])
564 int count, data_count;
565 unsigned int seed = 1;
566 int thread_updates = 0, data_updates = 0;
571 * On Solaris 2.5, threads are not timesliced. To ensure
572 * that our threads can run concurrently, we need to
573 * increase the concurrency level to THREADS.
575 DPRINTF (("Setting concurrency level to %d\n", THREADS));
576 thr_setconcurrency (THREADS);
580 * Initialize the shared data.
582 for (data_count = 0; data_count < DATASIZE; data_count++) {
583 data[data_count].data = 0;
584 data[data_count].updates = 0;
585 rwl_init (&data[data_count].lock);
589 * Create THREADS threads to access shared data.
591 for (count = 0; count < THREADS; count++) {
592 threads[count].thread_num = count;
593 threads[count].r_collisions = 0;
594 threads[count].w_collisions = 0;
595 threads[count].updates = 0;
596 threads[count].interval = rand_r (&seed) % ITERATIONS;
597 status = pthread_create (&threads[count].thread_id,
598 NULL, thread_routine, (void*)&threads[count]);
600 err_abort (status, "Create thread");
604 * Wait for all threads to complete, and collect
607 for (count = 0; count < THREADS; count++) {
608 status = pthread_join (threads[count].thread_id, NULL);
610 err_abort (status, "Join thread");
611 thread_updates += threads[count].updates;
612 printf ("%02d: interval %d, updates %d, "
613 "r_collisions %d, w_collisions %d\n",
614 count, threads[count].interval,
615 threads[count].updates,
616 threads[count].r_collisions, threads[count].w_collisions);
620 * Collect statistics for the data.
622 for (data_count = 0; data_count < DATASIZE; data_count++) {
623 data_updates += data[data_count].updates;
624 printf ("data %02d: value %d, %d updates\n",
625 data_count, data[data_count].data, data[data_count].updates);
626 rwl_destroy (&data[data_count].lock);