]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
Fix bug #1893
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Thread Read/Write locking code. It permits
30  *  multiple readers but only one writer.  Note, however,
31  *  that the writer thread is permitted to make multiple
32  *  nested write lock calls.
33  *
34  *  Kern Sibbald, January MMI
35  *
36  *  This code adapted from "Programming with POSIX Threads", by
37  *    David R. Butenhof
38  *
39  */
40
41 #define _LOCKMGR_COMPLIANT
42 #include "bacula.h"
43
44 /*
45  * Initialize a read/write lock
46  *
47  *  Returns: 0 on success
48  *           errno on failure
49  */
50 int rwl_init(brwlock_t *rwl, int priority)
51 {
52    int stat;
53
54    rwl->r_active = rwl->w_active = 0;
55    rwl->r_wait = rwl->w_wait = 0;
56    rwl->priority = priority;
57    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
58       return stat;
59    }
60    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
61       pthread_mutex_destroy(&rwl->mutex);
62       return stat;
63    }
64    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
65       pthread_cond_destroy(&rwl->read);
66       pthread_mutex_destroy(&rwl->mutex);
67       return stat;
68    }
69    rwl->valid = RWLOCK_VALID;
70    return 0;
71 }
72
73 /*
74  * Destroy a read/write lock
75  *
76  * Returns: 0 on success
77  *          errno on failure
78  */
79 int rwl_destroy(brwlock_t *rwl)
80 {
81    int stat, stat1, stat2;
82
83   if (rwl->valid != RWLOCK_VALID) {
84      return EINVAL;
85   }
86   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
87      return stat;
88   }
89
90   /*
91    * If any threads are active, report EBUSY
92    */
93   if (rwl->r_active > 0 || rwl->w_active) {
94      pthread_mutex_unlock(&rwl->mutex);
95      return EBUSY;
96   }
97
98   /*
99    * If any threads are waiting, report EBUSY
100    */
101   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102      pthread_mutex_unlock(&rwl->mutex);
103      return EBUSY;
104   }
105
106   rwl->valid = 0;
107   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
108      return stat;
109   }
110   stat  = pthread_mutex_destroy(&rwl->mutex);
111   stat1 = pthread_cond_destroy(&rwl->read);
112   stat2 = pthread_cond_destroy(&rwl->write);
113   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
114 }
115
116 bool rwl_is_init(brwlock_t *rwl)
117 {
118    return (rwl->valid == RWLOCK_VALID);
119 }
120
121 /*
122  * Handle cleanup when the read lock condition variable
123  * wait is released.
124  */
125 static void rwl_read_release(void *arg)
126 {
127    brwlock_t *rwl = (brwlock_t *)arg;
128
129    rwl->r_wait--;
130    pthread_mutex_unlock(&rwl->mutex);
131 }
132
133 /*
134  * Handle cleanup when the write lock condition variable wait
135  * is released.
136  */
137 static void rwl_write_release(void *arg)
138 {
139    brwlock_t *rwl = (brwlock_t *)arg;
140
141    rwl->w_wait--;
142    pthread_mutex_unlock(&rwl->mutex);
143 }
144
145 /*
146  * Lock for read access, wait until locked (or error).
147  */
148 int rwl_readlock(brwlock_t *rwl)
149 {
150    int stat;
151
152    if (rwl->valid != RWLOCK_VALID) {
153       return EINVAL;
154    }
155    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
156       return stat;
157    }
158    if (rwl->w_active) {
159       rwl->r_wait++;                  /* indicate that we are waiting */
160       pthread_cleanup_push(rwl_read_release, (void *)rwl);
161       while (rwl->w_active) {
162          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
163          if (stat != 0) {
164             break;                    /* error, bail out */
165          }
166       }
167       pthread_cleanup_pop(0);
168       rwl->r_wait--;                  /* we are no longer waiting */
169    }
170    if (stat == 0) {
171       rwl->r_active++;                /* we are running */
172    }
173    pthread_mutex_unlock(&rwl->mutex);
174    return stat;
175 }
176
177 /*
178  * Attempt to lock for read access, don't wait
179  */
180 int rwl_readtrylock(brwlock_t *rwl)
181 {
182    int stat, stat2;
183
184    if (rwl->valid != RWLOCK_VALID) {
185       return EINVAL;
186    }
187    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
188       return stat;
189    }
190    if (rwl->w_active) {
191       stat = EBUSY;
192    } else {
193       rwl->r_active++;                /* we are running */
194    }
195    stat2 = pthread_mutex_unlock(&rwl->mutex);
196    return (stat == 0 ? stat2 : stat);
197 }
198
199 /*
200  * Unlock read lock
201  */
202 int rwl_readunlock(brwlock_t *rwl)
203 {
204    int stat, stat2;
205
206    if (rwl->valid != RWLOCK_VALID) {
207       return EINVAL;
208    }
209    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
210       return stat;
211    }
212    rwl->r_active--;
213    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
214       stat = pthread_cond_broadcast(&rwl->write);
215    }
216    stat2 = pthread_mutex_unlock(&rwl->mutex);
217    return (stat == 0 ? stat2 : stat);
218 }
219
220
221 /*
222  * Lock for write access, wait until locked (or error).
223  *   Multiple nested write locking is permitted.
224  */
225 int rwl_writelock_p(brwlock_t *rwl, const char *file, int line)
226 {
227    int stat;
228
229    if (rwl->valid != RWLOCK_VALID) {
230       return EINVAL;
231    }
232    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
233       return stat;
234    }
235    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
236       rwl->w_active++;
237       pthread_mutex_unlock(&rwl->mutex);
238       return 0;
239    }
240    lmgr_pre_lock(rwl, rwl->priority, file, line);
241    if (rwl->w_active || rwl->r_active > 0) {
242       rwl->w_wait++;                  /* indicate that we are waiting */
243       pthread_cleanup_push(rwl_write_release, (void *)rwl);
244       while (rwl->w_active || rwl->r_active > 0) {
245          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
246             lmgr_do_unlock(rwl);
247             break;                    /* error, bail out */
248          }
249       }
250       pthread_cleanup_pop(0);
251       rwl->w_wait--;                  /* we are no longer waiting */
252    }
253    if (stat == 0) {
254       rwl->w_active++;                /* we are running */
255       rwl->writer_id = pthread_self(); /* save writer thread's id */
256       lmgr_post_lock();
257    } 
258    pthread_mutex_unlock(&rwl->mutex);
259    return stat;
260 }
261
262 /*
263  * Attempt to lock for write access, don't wait
264  */
265 int rwl_writetrylock(brwlock_t *rwl)
266 {
267    int stat, stat2;
268
269    if (rwl->valid != RWLOCK_VALID) {
270       return EINVAL;
271    }
272    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
273       return stat;
274    }
275    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
276       rwl->w_active++;
277       pthread_mutex_unlock(&rwl->mutex);
278       return 0;
279    }
280    if (rwl->w_active || rwl->r_active > 0) {
281       stat = EBUSY;
282    } else {
283       rwl->w_active = 1;              /* we are running */
284       rwl->writer_id = pthread_self(); /* save writer thread's id */
285       lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
286    }
287    stat2 = pthread_mutex_unlock(&rwl->mutex);
288    return (stat == 0 ? stat2 : stat);
289 }
290
291 /*
292  * Unlock write lock
293  *  Start any waiting writers in preference to waiting readers
294  */
295 int rwl_writeunlock(brwlock_t *rwl)
296 {
297    int stat, stat2;
298
299    if (rwl->valid != RWLOCK_VALID) {
300       return EINVAL;
301    }
302    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
303       return stat;
304    }
305    if (rwl->w_active <= 0) {
306       pthread_mutex_unlock(&rwl->mutex);
307       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
308    }
309    rwl->w_active--;
310    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
311       pthread_mutex_unlock(&rwl->mutex);
312       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
313    }
314    if (rwl->w_active > 0) {
315       stat = 0;                       /* writers still active */
316    } else {
317       lmgr_do_unlock(rwl);
318       /* No more writers, awaken someone */
319       if (rwl->r_wait > 0) {         /* if readers waiting */
320          stat = pthread_cond_broadcast(&rwl->read);
321       } else if (rwl->w_wait > 0) {
322          stat = pthread_cond_broadcast(&rwl->write);
323       }
324    }
325    stat2 = pthread_mutex_unlock(&rwl->mutex);
326    return (stat == 0 ? stat2 : stat);
327 }
328
329 #ifdef TEST_RWLOCK
330
331 #define THREADS     300
332 #define DATASIZE   15
333 #define ITERATIONS 1000000
334
335 /*
336  * Keep statics for each thread.
337  */
338 typedef struct thread_tag {
339    int thread_num;
340    pthread_t thread_id;
341    int writes;
342    int reads;
343    int interval;
344 } thread_t;
345
346 /*
347  * Read/write lock and shared data.
348  */
349 typedef struct data_tag {
350    brwlock_t lock;
351    int data;
352    int writes;
353 } data_t;
354
355 static thread_t threads[THREADS];
356 static data_t data[DATASIZE];
357
358 /*
359  * Thread start routine that uses read/write locks.
360  */
361 void *thread_routine(void *arg)
362 {
363    thread_t *self = (thread_t *)arg;
364    int repeats = 0;
365    int iteration;
366    int element = 0;
367    int status;
368
369    for (iteration=0; iteration < ITERATIONS; iteration++) {
370       /*
371        * Each "self->interval" iterations, perform an
372        * update operation (write lock instead of read
373        * lock).
374        */
375 //      if ((iteration % self->interval) == 0) {
376          status = rwl_writelock(&data[element].lock);
377          if (status != 0) {
378             berrno be;
379             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
380             exit(1);
381          }
382          data[element].data = self->thread_num;
383          data[element].writes++;
384          self->writes++;
385          status = rwl_writelock(&data[element].lock);
386          if (status != 0) {
387             berrno be;
388             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
389             exit(1);
390          }
391          data[element].data = self->thread_num;
392          data[element].writes++;
393          self->writes++;
394          status = rwl_writeunlock(&data[element].lock);
395          if (status != 0) {
396             berrno be;
397             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
398             exit(1);
399          }
400          status = rwl_writeunlock(&data[element].lock);
401          if (status != 0) {
402             berrno be;
403             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
404             exit(1);
405          }
406
407 #ifdef xxx
408       } else {
409          /*
410           * Look at the current data element to see whether
411           * the current thread last updated it. Count the
412           * times to report later.
413           */
414           status = rwl_readlock(&data[element].lock);
415           if (status != 0) {
416              berrno be;
417              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
418              exit(1);
419           }
420           self->reads++;
421           if (data[element].data == self->thread_num)
422              repeats++;
423           status = rwl_readunlock(&data[element].lock);
424           if (status != 0) {
425              berrno be;
426              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
427              exit(1);
428           }
429       }
430 #endif
431       element++;
432       if (element >= DATASIZE) {
433          element = 0;
434       }
435    }
436    if (repeats > 0) {
437       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
438          self->thread_num, repeats);
439    }
440    return NULL;
441 }
442
443 int main (int argc, char *argv[])
444 {
445     int count;
446     int data_count;
447     int status;
448     unsigned int seed = 1;
449     int thread_writes = 0;
450     int data_writes = 0;
451
452 #ifdef USE_THR_SETCONCURRENCY
453     /*
454      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
455      * that our threads can run concurrently, we need to
456      * increase the concurrency level to THREADS.
457      */
458     thr_setconcurrency (THREADS);
459 #endif
460
461     /*
462      * Initialize the shared data.
463      */
464     for (data_count = 0; data_count < DATASIZE; data_count++) {
465         data[data_count].data = 0;
466         data[data_count].writes = 0;
467         status = rwl_init(&data[data_count].lock);
468         if (status != 0) {
469            berrno be;
470            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
471            exit(1);
472         }
473     }
474
475     /*
476      * Create THREADS threads to access shared data.
477      */
478     for (count = 0; count < THREADS; count++) {
479         threads[count].thread_num = count + 1;
480         threads[count].writes = 0;
481         threads[count].reads = 0;
482         threads[count].interval = rand_r(&seed) % 71;
483         if (threads[count].interval <= 0) {
484            threads[count].interval = 1;
485         }
486         status = pthread_create (&threads[count].thread_id,
487             NULL, thread_routine, (void*)&threads[count]);
488         if (status != 0 || (int)threads[count].thread_id == 0) {
489            berrno be;
490            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
491            exit(1);
492         }
493     }
494
495     /*
496      * Wait for all threads to complete, and collect
497      * statistics.
498      */
499     for (count = 0; count < THREADS; count++) {
500         status = pthread_join (threads[count].thread_id, NULL);
501         if (status != 0) {
502            berrno be;
503            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
504            exit(1);
505         }
506         thread_writes += threads[count].writes;
507         printf (_("%02d: interval %d, writes %d, reads %d\n"),
508             count, threads[count].interval,
509             threads[count].writes, threads[count].reads);
510     }
511
512     /*
513      * Collect statistics for the data.
514      */
515     for (data_count = 0; data_count < DATASIZE; data_count++) {
516         data_writes += data[data_count].writes;
517         printf (_("data %02d: value %d, %d writes\n"),
518             data_count, data[data_count].data, data[data_count].writes);
519         rwl_destroy (&data[data_count].lock);
520     }
521
522     printf (_("Total: %d thread writes, %d data writes\n"),
523         thread_writes, data_writes);
524     return 0;
525 }
526
527 #endif
528
529 #ifdef TEST_RW_TRY_LOCK
530 /*
531  * brwlock_try_main.c
532  *
533  * Demonstrate use of non-blocking read-write locks.
534  *
535  * Special notes: On older Solaris system, call thr_setconcurrency()
536  * to allow interleaved thread execution, since threads are not
537  * timesliced.
538  */
539 #include <pthread.h>
540 #include "rwlock.h"
541 #include "errors.h"
542
543 #define THREADS         5
544 #define ITERATIONS      1000
545 #define DATASIZE        15
546
547 /*
548  * Keep statistics for each thread.
549  */
550 typedef struct thread_tag {
551     int         thread_num;
552     pthread_t   thread_id;
553     int         r_collisions;
554     int         w_collisions;
555     int         updates;
556     int         interval;
557 } thread_t;
558
559 /*
560  * Read-write lock and shared data
561  */
562 typedef struct data_tag {
563     brwlock_t    lock;
564     int         data;
565     int         updates;
566 } data_t;
567
568 thread_t threads[THREADS];
569 data_t data[DATASIZE];
570
571 /*
572  * Thread start routine that uses read-write locks
573  */
574 void *thread_routine (void *arg)
575 {
576     thread_t *self = (thread_t*)arg;
577     int iteration;
578     int element;
579     int status;
580     lmgr_init_thread();
581     element = 0;                        /* Current data element */
582
583     for (iteration = 0; iteration < ITERATIONS; iteration++) {
584         if ((iteration % self->interval) == 0) {
585             status = rwl_writetrylock (&data[element].lock);
586             if (status == EBUSY)
587                 self->w_collisions++;
588             else if (status == 0) {
589                 data[element].data++;
590                 data[element].updates++;
591                 self->updates++;
592                 rwl_writeunlock (&data[element].lock);
593             } else
594                 err_abort (status, _("Try write lock"));
595         } else {
596             status = rwl_readtrylock (&data[element].lock);
597             if (status == EBUSY)
598                 self->r_collisions++;
599             else if (status != 0) {
600                 err_abort (status, _("Try read lock"));
601             } else {
602                 if (data[element].data != data[element].updates)
603                     printf ("%d: data[%d] %d != %d\n",
604                         self->thread_num, element,
605                         data[element].data, data[element].updates);
606                 rwl_readunlock (&data[element].lock);
607             }
608         }
609
610         element++;
611         if (element >= DATASIZE)
612             element = 0;
613     }
614     lmgr_cleanup_thread();
615     return NULL;
616 }
617
618 int main (int argc, char *argv[])
619 {
620     int count, data_count;
621     unsigned int seed = 1;
622     int thread_updates = 0, data_updates = 0;
623     int status;
624
625 #ifdef USE_THR_SETCONCURRENCY
626     /*
627      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
628      * that our threads can run concurrently, we need to
629      * increase the concurrency level to THREADS.
630      */
631     DPRINTF (("Setting concurrency level to %d\n", THREADS));
632     thr_setconcurrency (THREADS);
633 #endif
634
635     /*
636      * Initialize the shared data.
637      */
638     for (data_count = 0; data_count < DATASIZE; data_count++) {
639         data[data_count].data = 0;
640         data[data_count].updates = 0;
641         rwl_init(&data[data_count].lock);
642     }
643
644     /*
645      * Create THREADS threads to access shared data.
646      */
647     for (count = 0; count < THREADS; count++) {
648         threads[count].thread_num = count;
649         threads[count].r_collisions = 0;
650         threads[count].w_collisions = 0;
651         threads[count].updates = 0;
652         threads[count].interval = rand_r (&seed) % ITERATIONS;
653         status = pthread_create (&threads[count].thread_id,
654             NULL, thread_routine, (void*)&threads[count]);
655         if (status != 0)
656             err_abort (status, _("Create thread"));
657     }
658
659     /*
660      * Wait for all threads to complete, and collect
661      * statistics.
662      */
663     for (count = 0; count < THREADS; count++) {
664         status = pthread_join (threads[count].thread_id, NULL);
665         if (status != 0)
666             err_abort (status, _("Join thread"));
667         thread_updates += threads[count].updates;
668         printf (_("%02d: interval %d, updates %d, "
669                 "r_collisions %d, w_collisions %d\n"),
670             count, threads[count].interval,
671             threads[count].updates,
672             threads[count].r_collisions, threads[count].w_collisions);
673     }
674
675     /*
676      * Collect statistics for the data.
677      */
678     for (data_count = 0; data_count < DATASIZE; data_count++) {
679         data_updates += data[data_count].updates;
680         printf (_("data %02d: value %d, %d updates\n"),
681             data_count, data[data_count].data, data[data_count].updates);
682         rwl_destroy (&data[data_count].lock);
683     }
684
685     return 0;
686 }
687
688 #endif