]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
Backport from BEE
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2014 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from many
7    others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    Bacula® is a registered trademark of Kern Sibbald.
15 */
16 /*
17  * Bacula Thread Read/Write locking code. It permits
18  *  multiple readers but only one writer.  Note, however,
19  *  that the writer thread is permitted to make multiple
20  *  nested write lock calls.
21  *
22  *  Kern Sibbald, January MMI
23  *
24  *  This code adapted from "Programming with POSIX Threads", by
25  *    David R. Butenhof
26  *
27  */
28
29 #define _LOCKMGR_COMPLIANT
30 #include "bacula.h"
31
32 /*
33  * Initialize a read/write lock
34  *
35  *  Returns: 0 on success
36  *           errno on failure
37  */
38 int rwl_init(brwlock_t *rwl, int priority)
39 {
40    int stat;
41
42    rwl->r_active = rwl->w_active = 0;
43    rwl->r_wait = rwl->w_wait = 0;
44    rwl->priority = priority;
45    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
46       return stat;
47    }
48    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
49       pthread_mutex_destroy(&rwl->mutex);
50       return stat;
51    }
52    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
53       pthread_cond_destroy(&rwl->read);
54       pthread_mutex_destroy(&rwl->mutex);
55       return stat;
56    }
57    rwl->valid = RWLOCK_VALID;
58    return 0;
59 }
60
61 /*
62  * Destroy a read/write lock
63  *
64  * Returns: 0 on success
65  *          errno on failure
66  */
67 int rwl_destroy(brwlock_t *rwl)
68 {
69    int stat, stat1, stat2;
70
71   if (rwl->valid != RWLOCK_VALID) {
72      return EINVAL;
73   }
74   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
75      return stat;
76   }
77
78   /*
79    * If any threads are active, report EBUSY
80    */
81   if (rwl->r_active > 0 || rwl->w_active) {
82      pthread_mutex_unlock(&rwl->mutex);
83      return EBUSY;
84   }
85
86   /*
87    * If any threads are waiting, report EBUSY
88    */
89   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
90      pthread_mutex_unlock(&rwl->mutex);
91      return EBUSY;
92   }
93
94   rwl->valid = 0;
95   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
96      return stat;
97   }
98   stat  = pthread_mutex_destroy(&rwl->mutex);
99   stat1 = pthread_cond_destroy(&rwl->read);
100   stat2 = pthread_cond_destroy(&rwl->write);
101   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
102 }
103
104 bool rwl_is_init(brwlock_t *rwl)
105 {
106    return (rwl->valid == RWLOCK_VALID);
107 }
108
109 /*
110  * Handle cleanup when the read lock condition variable
111  * wait is released.
112  */
113 static void rwl_read_release(void *arg)
114 {
115    brwlock_t *rwl = (brwlock_t *)arg;
116
117    rwl->r_wait--;
118    pthread_mutex_unlock(&rwl->mutex);
119 }
120
121 /*
122  * Handle cleanup when the write lock condition variable wait
123  * is released.
124  */
125 static void rwl_write_release(void *arg)
126 {
127    brwlock_t *rwl = (brwlock_t *)arg;
128
129    rwl->w_wait--;
130    pthread_mutex_unlock(&rwl->mutex);
131 }
132
133 /*
134  * Lock for read access, wait until locked (or error).
135  */
136 int rwl_readlock(brwlock_t *rwl)
137 {
138    int stat;
139
140    if (rwl->valid != RWLOCK_VALID) {
141       return EINVAL;
142    }
143    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
144       return stat;
145    }
146    if (rwl->w_active) {
147       rwl->r_wait++;                  /* indicate that we are waiting */
148       pthread_cleanup_push(rwl_read_release, (void *)rwl);
149       while (rwl->w_active) {
150          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
151          if (stat != 0) {
152             break;                    /* error, bail out */
153          }
154       }
155       pthread_cleanup_pop(0);
156       rwl->r_wait--;                  /* we are no longer waiting */
157    }
158    if (stat == 0) {
159       rwl->r_active++;                /* we are running */
160    }
161    pthread_mutex_unlock(&rwl->mutex);
162    return stat;
163 }
164
165 /*
166  * Attempt to lock for read access, don't wait
167  */
168 int rwl_readtrylock(brwlock_t *rwl)
169 {
170    int stat, stat2;
171
172    if (rwl->valid != RWLOCK_VALID) {
173       return EINVAL;
174    }
175    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
176       return stat;
177    }
178    if (rwl->w_active) {
179       stat = EBUSY;
180    } else {
181       rwl->r_active++;                /* we are running */
182    }
183    stat2 = pthread_mutex_unlock(&rwl->mutex);
184    return (stat == 0 ? stat2 : stat);
185 }
186
187 /*
188  * Unlock read lock
189  */
190 int rwl_readunlock(brwlock_t *rwl)
191 {
192    int stat, stat2;
193
194    if (rwl->valid != RWLOCK_VALID) {
195       return EINVAL;
196    }
197    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
198       return stat;
199    }
200    rwl->r_active--;
201    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
202       stat = pthread_cond_broadcast(&rwl->write);
203    }
204    stat2 = pthread_mutex_unlock(&rwl->mutex);
205    return (stat == 0 ? stat2 : stat);
206 }
207
208
209 /*
210  * Lock for write access, wait until locked (or error).
211  *   Multiple nested write locking is permitted.
212  */
213 int rwl_writelock_p(brwlock_t *rwl, const char *file, int line)
214 {
215    int stat;
216
217    if (rwl->valid != RWLOCK_VALID) {
218       return EINVAL;
219    }
220    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
221       return stat;
222    }
223    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
224       rwl->w_active++;
225       pthread_mutex_unlock(&rwl->mutex);
226       return 0;
227    }
228    lmgr_pre_lock(rwl, rwl->priority, file, line);
229    if (rwl->w_active || rwl->r_active > 0) {
230       rwl->w_wait++;                  /* indicate that we are waiting */
231       pthread_cleanup_push(rwl_write_release, (void *)rwl);
232       while (rwl->w_active || rwl->r_active > 0) {
233          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
234             lmgr_do_unlock(rwl);
235             break;                    /* error, bail out */
236          }
237       }
238       pthread_cleanup_pop(0);
239       rwl->w_wait--;                  /* we are no longer waiting */
240    }
241    if (stat == 0) {
242       rwl->w_active++;                /* we are running */
243       rwl->writer_id = pthread_self(); /* save writer thread's id */
244       lmgr_post_lock();
245    }
246    pthread_mutex_unlock(&rwl->mutex);
247    return stat;
248 }
249
250 /*
251  * Attempt to lock for write access, don't wait
252  */
253 int rwl_writetrylock(brwlock_t *rwl)
254 {
255    int stat, stat2;
256
257    if (rwl->valid != RWLOCK_VALID) {
258       return EINVAL;
259    }
260    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
261       return stat;
262    }
263    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
264       rwl->w_active++;
265       pthread_mutex_unlock(&rwl->mutex);
266       return 0;
267    }
268    if (rwl->w_active || rwl->r_active > 0) {
269       stat = EBUSY;
270    } else {
271       rwl->w_active = 1;              /* we are running */
272       rwl->writer_id = pthread_self(); /* save writer thread's id */
273       lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
274    }
275    stat2 = pthread_mutex_unlock(&rwl->mutex);
276    return (stat == 0 ? stat2 : stat);
277 }
278
279 /*
280  * Unlock write lock
281  *  Start any waiting writers in preference to waiting readers
282  */
283 int rwl_writeunlock(brwlock_t *rwl)
284 {
285    int stat, stat2;
286
287    if (rwl->valid != RWLOCK_VALID) {
288       return EINVAL;
289    }
290    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
291       return stat;
292    }
293    if (rwl->w_active <= 0) {
294       pthread_mutex_unlock(&rwl->mutex);
295       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
296    }
297    rwl->w_active--;
298    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
299       pthread_mutex_unlock(&rwl->mutex);
300       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
301    }
302    if (rwl->w_active > 0) {
303       stat = 0;                       /* writers still active */
304    } else {
305       lmgr_do_unlock(rwl);
306       /* No more writers, awaken someone */
307       if (rwl->r_wait > 0) {         /* if readers waiting */
308          stat = pthread_cond_broadcast(&rwl->read);
309       } else if (rwl->w_wait > 0) {
310          stat = pthread_cond_broadcast(&rwl->write);
311       }
312    }
313    stat2 = pthread_mutex_unlock(&rwl->mutex);
314    return (stat == 0 ? stat2 : stat);
315 }
316
317 #ifdef TEST_RWLOCK
318
319 #define THREADS     300
320 #define DATASIZE   15
321 #define ITERATIONS 1000000
322
323 /*
324  * Keep statics for each thread.
325  */
326 typedef struct thread_tag {
327    int thread_num;
328    pthread_t thread_id;
329    int writes;
330    int reads;
331    int interval;
332 } thread_t;
333
334 /*
335  * Read/write lock and shared data.
336  */
337 typedef struct data_tag {
338    brwlock_t lock;
339    int data;
340    int writes;
341 } data_t;
342
343 static thread_t threads[THREADS];
344 static data_t data[DATASIZE];
345
346 /*
347  * Thread start routine that uses read/write locks.
348  */
349 void *thread_routine(void *arg)
350 {
351    thread_t *self = (thread_t *)arg;
352    int repeats = 0;
353    int iteration;
354    int element = 0;
355    int status;
356
357    for (iteration=0; iteration < ITERATIONS; iteration++) {
358       /*
359        * Each "self->interval" iterations, perform an
360        * update operation (write lock instead of read
361        * lock).
362        */
363 //      if ((iteration % self->interval) == 0) {
364          status = rwl_writelock(&data[element].lock);
365          if (status != 0) {
366             berrno be;
367             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
368             exit(1);
369          }
370          data[element].data = self->thread_num;
371          data[element].writes++;
372          self->writes++;
373          status = rwl_writelock(&data[element].lock);
374          if (status != 0) {
375             berrno be;
376             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
377             exit(1);
378          }
379          data[element].data = self->thread_num;
380          data[element].writes++;
381          self->writes++;
382          status = rwl_writeunlock(&data[element].lock);
383          if (status != 0) {
384             berrno be;
385             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
386             exit(1);
387          }
388          status = rwl_writeunlock(&data[element].lock);
389          if (status != 0) {
390             berrno be;
391             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
392             exit(1);
393          }
394
395 #ifdef xxx
396       } else {
397          /*
398           * Look at the current data element to see whether
399           * the current thread last updated it. Count the
400           * times to report later.
401           */
402           status = rwl_readlock(&data[element].lock);
403           if (status != 0) {
404              berrno be;
405              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
406              exit(1);
407           }
408           self->reads++;
409           if (data[element].data == self->thread_num)
410              repeats++;
411           status = rwl_readunlock(&data[element].lock);
412           if (status != 0) {
413              berrno be;
414              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
415              exit(1);
416           }
417       }
418 #endif
419       element++;
420       if (element >= DATASIZE) {
421          element = 0;
422       }
423    }
424    if (repeats > 0) {
425       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
426          self->thread_num, repeats);
427    }
428    return NULL;
429 }
430
431 int main (int argc, char *argv[])
432 {
433     int count;
434     int data_count;
435     int status;
436     unsigned int seed = 1;
437     int thread_writes = 0;
438     int data_writes = 0;
439
440 #ifdef USE_THR_SETCONCURRENCY
441     /*
442      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
443      * that our threads can run concurrently, we need to
444      * increase the concurrency level to THREADS.
445      */
446     thr_setconcurrency (THREADS);
447 #endif
448
449     /*
450      * Initialize the shared data.
451      */
452     for (data_count = 0; data_count < DATASIZE; data_count++) {
453         data[data_count].data = 0;
454         data[data_count].writes = 0;
455         status = rwl_init(&data[data_count].lock);
456         if (status != 0) {
457            berrno be;
458            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
459            exit(1);
460         }
461     }
462
463     /*
464      * Create THREADS threads to access shared data.
465      */
466     for (count = 0; count < THREADS; count++) {
467         threads[count].thread_num = count + 1;
468         threads[count].writes = 0;
469         threads[count].reads = 0;
470         threads[count].interval = rand_r(&seed) % 71;
471         if (threads[count].interval <= 0) {
472            threads[count].interval = 1;
473         }
474         status = pthread_create (&threads[count].thread_id,
475             NULL, thread_routine, (void*)&threads[count]);
476         if (status != 0 || (int)threads[count].thread_id == 0) {
477            berrno be;
478            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
479            exit(1);
480         }
481     }
482
483     /*
484      * Wait for all threads to complete, and collect
485      * statistics.
486      */
487     for (count = 0; count < THREADS; count++) {
488         status = pthread_join (threads[count].thread_id, NULL);
489         if (status != 0) {
490            berrno be;
491            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
492            exit(1);
493         }
494         thread_writes += threads[count].writes;
495         printf (_("%02d: interval %d, writes %d, reads %d\n"),
496             count, threads[count].interval,
497             threads[count].writes, threads[count].reads);
498     }
499
500     /*
501      * Collect statistics for the data.
502      */
503     for (data_count = 0; data_count < DATASIZE; data_count++) {
504         data_writes += data[data_count].writes;
505         printf (_("data %02d: value %d, %d writes\n"),
506             data_count, data[data_count].data, data[data_count].writes);
507         rwl_destroy (&data[data_count].lock);
508     }
509
510     printf (_("Total: %d thread writes, %d data writes\n"),
511         thread_writes, data_writes);
512     return 0;
513 }
514
515 #endif
516
517 #ifdef TEST_RW_TRY_LOCK
518 /*
519  * brwlock_try_main.c
520  *
521  * Demonstrate use of non-blocking read-write locks.
522  *
523  * Special notes: On older Solaris system, call thr_setconcurrency()
524  * to allow interleaved thread execution, since threads are not
525  * timesliced.
526  */
527 #include <pthread.h>
528 #include "rwlock.h"
529 #include "errors.h"
530
531 #define THREADS         5
532 #define ITERATIONS      1000
533 #define DATASIZE        15
534
535 /*
536  * Keep statistics for each thread.
537  */
538 typedef struct thread_tag {
539     int         thread_num;
540     pthread_t   thread_id;
541     int         r_collisions;
542     int         w_collisions;
543     int         updates;
544     int         interval;
545 } thread_t;
546
547 /*
548  * Read-write lock and shared data
549  */
550 typedef struct data_tag {
551     brwlock_t    lock;
552     int         data;
553     int         updates;
554 } data_t;
555
556 thread_t threads[THREADS];
557 data_t data[DATASIZE];
558
559 /*
560  * Thread start routine that uses read-write locks
561  */
562 void *thread_routine (void *arg)
563 {
564     thread_t *self = (thread_t*)arg;
565     int iteration;
566     int element;
567     int status;
568     lmgr_init_thread();
569     element = 0;                        /* Current data element */
570
571     for (iteration = 0; iteration < ITERATIONS; iteration++) {
572         if ((iteration % self->interval) == 0) {
573             status = rwl_writetrylock (&data[element].lock);
574             if (status == EBUSY)
575                 self->w_collisions++;
576             else if (status == 0) {
577                 data[element].data++;
578                 data[element].updates++;
579                 self->updates++;
580                 rwl_writeunlock (&data[element].lock);
581             } else
582                 err_abort (status, _("Try write lock"));
583         } else {
584             status = rwl_readtrylock (&data[element].lock);
585             if (status == EBUSY)
586                 self->r_collisions++;
587             else if (status != 0) {
588                 err_abort (status, _("Try read lock"));
589             } else {
590                 if (data[element].data != data[element].updates)
591                     printf ("%d: data[%d] %d != %d\n",
592                         self->thread_num, element,
593                         data[element].data, data[element].updates);
594                 rwl_readunlock (&data[element].lock);
595             }
596         }
597
598         element++;
599         if (element >= DATASIZE)
600             element = 0;
601     }
602     lmgr_cleanup_thread();
603     return NULL;
604 }
605
606 int main (int argc, char *argv[])
607 {
608     int count, data_count;
609     unsigned int seed = 1;
610     int thread_updates = 0, data_updates = 0;
611     int status;
612
613 #ifdef USE_THR_SETCONCURRENCY
614     /*
615      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
616      * that our threads can run concurrently, we need to
617      * increase the concurrency level to THREADS.
618      */
619     DPRINTF (("Setting concurrency level to %d\n", THREADS));
620     thr_setconcurrency (THREADS);
621 #endif
622
623     /*
624      * Initialize the shared data.
625      */
626     for (data_count = 0; data_count < DATASIZE; data_count++) {
627         data[data_count].data = 0;
628         data[data_count].updates = 0;
629         rwl_init(&data[data_count].lock);
630     }
631
632     /*
633      * Create THREADS threads to access shared data.
634      */
635     for (count = 0; count < THREADS; count++) {
636         threads[count].thread_num = count;
637         threads[count].r_collisions = 0;
638         threads[count].w_collisions = 0;
639         threads[count].updates = 0;
640         threads[count].interval = rand_r (&seed) % ITERATIONS;
641         status = pthread_create (&threads[count].thread_id,
642             NULL, thread_routine, (void*)&threads[count]);
643         if (status != 0)
644             err_abort (status, _("Create thread"));
645     }
646
647     /*
648      * Wait for all threads to complete, and collect
649      * statistics.
650      */
651     for (count = 0; count < THREADS; count++) {
652         status = pthread_join (threads[count].thread_id, NULL);
653         if (status != 0)
654             err_abort (status, _("Join thread"));
655         thread_updates += threads[count].updates;
656         printf (_("%02d: interval %d, updates %d, "
657                 "r_collisions %d, w_collisions %d\n"),
658             count, threads[count].interval,
659             threads[count].updates,
660             threads[count].r_collisions, threads[count].w_collisions);
661     }
662
663     /*
664      * Collect statistics for the data.
665      */
666     for (data_count = 0; data_count < DATASIZE; data_count++) {
667         data_updates += data[data_count].updates;
668         printf (_("data %02d: value %d, %d updates\n"),
669             data_count, data[data_count].data, data[data_count].updates);
670         rwl_destroy (&data[data_count].lock);
671     }
672
673     return 0;
674 }
675
676 #endif