2 * Bacula Thread Read/Write locking code. It permits
3 * multiple readers but only one writer. Note, however,
4 * that the writer thread is permitted to make multiple
5 * nested write lock calls.
7 * Kern Sibbald, January MMI
11 * This code adapted from "Programming with POSIX Threads", by
16 Copyright (C) 2000-2003 Kern Sibbald and John Walker
18 This program is free software; you can redistribute it and/or
19 modify it under the terms of the GNU General Public License as
20 published by the Free Software Foundation; either version 2 of
21 the License, or (at your option) any later version.
23 This program is distributed in the hope that it will be useful,
24 but WITHOUT ANY WARRANTY; without even the implied warranty of
25 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26 General Public License for more details.
28 You should have received a copy of the GNU General Public
29 License along with this program; if not, write to the Free
30 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
38 * Initialize a read/write lock
40 * Returns: 0 on success
43 int rwl_init(brwlock_t *rwl)
47 rwl->r_active = rwl->w_active = 0;
48 rwl->r_wait = rwl->w_wait = 0;
49 if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
52 if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
53 pthread_mutex_destroy(&rwl->mutex);
56 if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
57 pthread_cond_destroy(&rwl->read);
58 pthread_mutex_destroy(&rwl->mutex);
61 rwl->valid = RWLOCK_VALID;
66 * Destroy a read/write lock
68 * Returns: 0 on success
71 int rwl_destroy(brwlock_t *rwl)
73 int stat, stat1, stat2;
75 if (rwl->valid != RWLOCK_VALID) {
78 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
83 * If any threads are active, report EBUSY
85 if (rwl->r_active > 0 || rwl->w_active) {
86 pthread_mutex_unlock(&rwl->mutex);
91 * If any threads are waiting, report EBUSY
93 if (rwl->r_wait > 0 || rwl->w_wait > 0) {
94 pthread_mutex_unlock(&rwl->mutex);
99 if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
102 stat = pthread_mutex_destroy(&rwl->mutex);
103 stat1 = pthread_cond_destroy(&rwl->read);
104 stat2 = pthread_cond_destroy(&rwl->write);
105 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
109 * Handle cleanup when the read lock condition variable
112 static void rwl_read_release(void *arg)
114 brwlock_t *rwl = (brwlock_t *)arg;
117 pthread_mutex_unlock(&rwl->mutex);
121 * Handle cleanup when the write lock condition variable wait
124 static void rwl_write_release(void *arg)
126 brwlock_t *rwl = (brwlock_t *)arg;
129 pthread_mutex_unlock(&rwl->mutex);
133 * Lock for read access, wait until locked (or error).
135 int rwl_readlock(brwlock_t *rwl)
139 if (rwl->valid != RWLOCK_VALID) {
142 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
146 rwl->r_wait++; /* indicate that we are waiting */
147 pthread_cleanup_push(rwl_read_release, (void *)rwl);
148 while (rwl->w_active) {
149 stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
151 break; /* error, bail out */
154 pthread_cleanup_pop(0);
155 rwl->r_wait--; /* we are no longer waiting */
158 rwl->r_active++; /* we are running */
160 pthread_mutex_unlock(&rwl->mutex);
165 * Attempt to lock for read access, don't wait
167 int rwl_readtrylock(brwlock_t *rwl)
171 if (rwl->valid != RWLOCK_VALID) {
174 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
180 rwl->r_active++; /* we are running */
182 stat2 = pthread_mutex_unlock(&rwl->mutex);
183 return (stat == 0 ? stat2 : stat);
189 int rwl_readunlock(brwlock_t *rwl)
193 if (rwl->valid != RWLOCK_VALID) {
196 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
200 if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
201 stat = pthread_cond_signal(&rwl->write);
203 stat2 = pthread_mutex_unlock(&rwl->mutex);
204 return (stat == 0 ? stat2 : stat);
209 * Lock for write access, wait until locked (or error).
210 * Multiple nested write locking is permitted.
212 int rwl_writelock(brwlock_t *rwl)
216 if (rwl->valid != RWLOCK_VALID) {
219 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
222 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
224 pthread_mutex_unlock(&rwl->mutex);
227 if (rwl->w_active || rwl->r_active > 0) {
228 rwl->w_wait++; /* indicate that we are waiting */
229 pthread_cleanup_push(rwl_write_release, (void *)rwl);
230 while (rwl->w_active || rwl->r_active > 0) {
231 if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
232 break; /* error, bail out */
235 pthread_cleanup_pop(0);
236 rwl->w_wait--; /* we are no longer waiting */
239 rwl->w_active = 1; /* we are running */
240 rwl->writer_id = pthread_self(); /* save writer thread's id */
242 pthread_mutex_unlock(&rwl->mutex);
247 * Attempt to lock for write access, don't wait
249 int rwl_writetrylock(brwlock_t *rwl)
253 if (rwl->valid != RWLOCK_VALID) {
256 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
259 if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
261 pthread_mutex_unlock(&rwl->mutex);
264 if (rwl->w_active || rwl->r_active > 0) {
267 rwl->w_active = 1; /* we are running */
268 rwl->writer_id = pthread_self(); /* save writer thread's id */
270 stat2 = pthread_mutex_unlock(&rwl->mutex);
271 return (stat == 0 ? stat2 : stat);
276 * Start any waiting writers in preference to waiting readers
278 int rwl_writeunlock(brwlock_t *rwl)
282 if (rwl->valid != RWLOCK_VALID) {
285 if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
289 if (rwl->w_active < 0 || !pthread_equal(pthread_self(), rwl->writer_id)) {
290 Emsg0(M_ABORT, 0, "rwl_writeunlock by non-owner.\n");
292 if (rwl->w_active > 0) {
293 stat = 0; /* writers still active */
295 /* No more writers, awaken someone */
296 if (rwl->r_wait > 0) { /* if readers waiting */
297 stat = pthread_cond_broadcast(&rwl->read);
298 } else if (rwl->w_wait > 0) {
299 stat = pthread_cond_signal(&rwl->write);
302 stat2 = pthread_mutex_unlock(&rwl->mutex);
303 return (stat == 0 ? stat2 : stat);
310 #define ITERATIONS 10000
313 * Keep statics for each thread.
315 typedef struct thread_tag {
324 * Read/write lock and shared data.
326 typedef struct data_tag {
332 thread_t threads[THREADS];
333 data_t data[DATASIZE];
336 * Thread start routine that uses read/write locks.
338 void *thread_routine(void *arg)
340 thread_t *self = (thread_t *)arg;
346 for (iteration=0; iteration < ITERATIONS; iteration++) {
348 * Each "self->interval" iterations, perform an
349 * update operation (write lock instead of read
352 if ((iteration % self->interval) == 0) {
353 status = rwl_writelock(&data[element].lock);
355 Emsg1(M_ABORT, 0, "Write lock failed. ERR=%s\n", strerror(status));
357 data[element].data = self->thread_num;
358 data[element].writes++;
360 status = rwl_writeunlock(&data[element].lock);
362 Emsg1(M_ABORT, 0, "Write unlock failed. ERR=%s\n", strerror(status));
366 * Look at the current data element to see whether
367 * the current thread last updated it. Count the
368 * times to report later.
370 status = rwl_readlock(&data[element].lock);
372 Emsg1(M_ABORT, 0, "Read lock failed. ERR=%s\n", strerror(status));
375 if (data[element].data == self->thread_num)
377 status = rwl_readunlock(&data[element].lock);
379 Emsg1(M_ABORT, 0, "Read unlock failed. ERR=%s\n", strerror(status));
383 if (element >= DATASIZE) {
388 Dmsg2(000, "Thread %d found unchanged elements %d times\n",
389 self->thread_num, repeats);
394 int main (int argc, char *argv[])
399 unsigned int seed = 1;
400 int thread_writes = 0;
405 * On Solaris 2.5, threads are not timesliced. To ensure
406 * that our threads can run concurrently, we need to
407 * increase the concurrency level to THREADS.
409 thr_setconcurrency (THREADS);
413 * Initialize the shared data.
415 for (data_count = 0; data_count < DATASIZE; data_count++) {
416 data[data_count].data = 0;
417 data[data_count].writes = 0;
418 status = rwl_init (&data[data_count].lock);
420 Emsg1(M_ABORT, 0, "Init rwlock failed. ERR=%s\n", strerror(status));
425 * Create THREADS threads to access shared data.
427 for (count = 0; count < THREADS; count++) {
428 threads[count].thread_num = count + 1;
429 threads[count].writes = 0;
430 threads[count].reads = 0;
431 threads[count].interval = rand_r (&seed) % 71;
432 status = pthread_create (&threads[count].thread_id,
433 NULL, thread_routine, (void*)&threads[count]);
435 Emsg1(M_ABORT, 0, "Create thread failed. ERR=%s\n", strerror(status));
440 * Wait for all threads to complete, and collect
443 for (count = 0; count < THREADS; count++) {
444 status = pthread_join (threads[count].thread_id, NULL);
446 Emsg1(M_ABORT, 0, "Join thread failed. ERR=%s\n", strerror(status));
448 thread_writes += threads[count].writes;
449 printf ("%02d: interval %d, writes %d, reads %d\n",
450 count, threads[count].interval,
451 threads[count].writes, threads[count].reads);
455 * Collect statistics for the data.
457 for (data_count = 0; data_count < DATASIZE; data_count++) {
458 data_writes += data[data_count].writes;
459 printf ("data %02d: value %d, %d writes\n",
460 data_count, data[data_count].data, data[data_count].writes);
461 rwl_destroy (&data[data_count].lock);
464 printf ("Total: %d thread writes, %d data writes\n",
465 thread_writes, data_writes);
471 #ifdef TEST_RW_TRY_LOCK
475 * Demonstrate use of non-blocking read-write locks.
477 * Special notes: On a Solaris system, call thr_setconcurrency()
478 * to allow interleaved thread execution, since threads are not
486 #define ITERATIONS 1000
490 * Keep statistics for each thread.
492 typedef struct thread_tag {
502 * Read-write lock and shared data
504 typedef struct data_tag {
510 thread_t threads[THREADS];
511 data_t data[DATASIZE];
514 * Thread start routine that uses read-write locks
516 void *thread_routine (void *arg)
518 thread_t *self = (thread_t*)arg;
523 element = 0; /* Current data element */
525 for (iteration = 0; iteration < ITERATIONS; iteration++) {
526 if ((iteration % self->interval) == 0) {
527 status = rwl_writetrylock (&data[element].lock);
529 self->w_collisions++;
530 else if (status == 0) {
531 data[element].data++;
532 data[element].updates++;
534 rwl_writeunlock (&data[element].lock);
536 err_abort (status, "Try write lock");
538 status = rwl_readtrylock (&data[element].lock);
540 self->r_collisions++;
541 else if (status != 0) {
542 err_abort (status, "Try read lock");
544 if (data[element].data != data[element].updates)
545 printf ("%d: data[%d] %d != %d\n",
546 self->thread_num, element,
547 data[element].data, data[element].updates);
548 rwl_readunlock (&data[element].lock);
553 if (element >= DATASIZE)
559 int main (int argc, char *argv[])
561 int count, data_count;
562 unsigned int seed = 1;
563 int thread_updates = 0, data_updates = 0;
568 * On Solaris 2.5, threads are not timesliced. To ensure
569 * that our threads can run concurrently, we need to
570 * increase the concurrency level to THREADS.
572 DPRINTF (("Setting concurrency level to %d\n", THREADS));
573 thr_setconcurrency (THREADS);
577 * Initialize the shared data.
579 for (data_count = 0; data_count < DATASIZE; data_count++) {
580 data[data_count].data = 0;
581 data[data_count].updates = 0;
582 rwl_init (&data[data_count].lock);
586 * Create THREADS threads to access shared data.
588 for (count = 0; count < THREADS; count++) {
589 threads[count].thread_num = count;
590 threads[count].r_collisions = 0;
591 threads[count].w_collisions = 0;
592 threads[count].updates = 0;
593 threads[count].interval = rand_r (&seed) % ITERATIONS;
594 status = pthread_create (&threads[count].thread_id,
595 NULL, thread_routine, (void*)&threads[count]);
597 err_abort (status, "Create thread");
601 * Wait for all threads to complete, and collect
604 for (count = 0; count < THREADS; count++) {
605 status = pthread_join (threads[count].thread_id, NULL);
607 err_abort (status, "Join thread");
608 thread_updates += threads[count].updates;
609 printf ("%02d: interval %d, updates %d, "
610 "r_collisions %d, w_collisions %d\n",
611 count, threads[count].interval,
612 threads[count].updates,
613 threads[count].r_collisions, threads[count].w_collisions);
617 * Collect statistics for the data.
619 for (data_count = 0; data_count < DATASIZE; data_count++) {
620 data_updates += data[data_count].updates;
621 printf ("data %02d: value %d, %d updates\n",
622 data_count, data[data_count].data, data[data_count].updates);
623 rwl_destroy (&data[data_count].lock);