]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
1a263bca1c18e99f2074b66ba6469f7f7f8a9713
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Thread Read/Write locking code. It permits
30  *  multiple readers but only one writer.  Note, however,
31  *  that the writer thread is permitted to make multiple
32  *  nested write lock calls.
33  *
34  *  Kern Sibbald, January MMI
35  *
36  *   Version $Id$
37  *
38  *  This code adapted from "Programming with POSIX Threads", by
39  *    David R. Butenhof
40  *
41  */
42
43 #define _LOCKMGR_COMPLIANT
44 #include "bacula.h"
45
46 /*
47  * Initialize a read/write lock
48  *
49  *  Returns: 0 on success
50  *           errno on failure
51  */
52 int rwl_init(brwlock_t *rwl, int priority)
53 {
54    int stat;
55
56    rwl->r_active = rwl->w_active = 0;
57    rwl->r_wait = rwl->w_wait = 0;
58    rwl->priority = priority;
59    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
60       return stat;
61    }
62    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
63       pthread_mutex_destroy(&rwl->mutex);
64       return stat;
65    }
66    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
67       pthread_cond_destroy(&rwl->read);
68       pthread_mutex_destroy(&rwl->mutex);
69       return stat;
70    }
71    rwl->valid = RWLOCK_VALID;
72    return 0;
73 }
74
75 /*
76  * Destroy a read/write lock
77  *
78  * Returns: 0 on success
79  *          errno on failure
80  */
81 int rwl_destroy(brwlock_t *rwl)
82 {
83    int stat, stat1, stat2;
84
85   if (rwl->valid != RWLOCK_VALID) {
86      return EINVAL;
87   }
88   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
89      return stat;
90   }
91
92   /*
93    * If any threads are active, report EBUSY
94    */
95   if (rwl->r_active > 0 || rwl->w_active) {
96      pthread_mutex_unlock(&rwl->mutex);
97      return EBUSY;
98   }
99
100   /*
101    * If any threads are waiting, report EBUSY
102    */
103   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
104      pthread_mutex_unlock(&rwl->mutex);
105      return EBUSY;
106   }
107
108   rwl->valid = 0;
109   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
110      return stat;
111   }
112   stat  = pthread_mutex_destroy(&rwl->mutex);
113   stat1 = pthread_cond_destroy(&rwl->read);
114   stat2 = pthread_cond_destroy(&rwl->write);
115   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
116 }
117
118 /*
119  * Handle cleanup when the read lock condition variable
120  * wait is released.
121  */
122 static void rwl_read_release(void *arg)
123 {
124    brwlock_t *rwl = (brwlock_t *)arg;
125
126    rwl->r_wait--;
127    pthread_mutex_unlock(&rwl->mutex);
128 }
129
130 /*
131  * Handle cleanup when the write lock condition variable wait
132  * is released.
133  */
134 static void rwl_write_release(void *arg)
135 {
136    brwlock_t *rwl = (brwlock_t *)arg;
137
138    rwl->w_wait--;
139    pthread_mutex_unlock(&rwl->mutex);
140 }
141
142 /*
143  * Lock for read access, wait until locked (or error).
144  */
145 int rwl_readlock(brwlock_t *rwl)
146 {
147    int stat;
148
149    if (rwl->valid != RWLOCK_VALID) {
150       return EINVAL;
151    }
152    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
153       return stat;
154    }
155    if (rwl->w_active) {
156       rwl->r_wait++;                  /* indicate that we are waiting */
157       pthread_cleanup_push(rwl_read_release, (void *)rwl);
158       while (rwl->w_active) {
159          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
160          if (stat != 0) {
161             break;                    /* error, bail out */
162          }
163       }
164       pthread_cleanup_pop(0);
165       rwl->r_wait--;                  /* we are no longer waiting */
166    }
167    if (stat == 0) {
168       rwl->r_active++;                /* we are running */
169    }
170    pthread_mutex_unlock(&rwl->mutex);
171    return stat;
172 }
173
174 /*
175  * Attempt to lock for read access, don't wait
176  */
177 int rwl_readtrylock(brwlock_t *rwl)
178 {
179    int stat, stat2;
180
181    if (rwl->valid != RWLOCK_VALID) {
182       return EINVAL;
183    }
184    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
185       return stat;
186    }
187    if (rwl->w_active) {
188       stat = EBUSY;
189    } else {
190       rwl->r_active++;                /* we are running */
191    }
192    stat2 = pthread_mutex_unlock(&rwl->mutex);
193    return (stat == 0 ? stat2 : stat);
194 }
195
196 /*
197  * Unlock read lock
198  */
199 int rwl_readunlock(brwlock_t *rwl)
200 {
201    int stat, stat2;
202
203    if (rwl->valid != RWLOCK_VALID) {
204       return EINVAL;
205    }
206    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
207       return stat;
208    }
209    rwl->r_active--;
210    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
211       stat = pthread_cond_broadcast(&rwl->write);
212    }
213    stat2 = pthread_mutex_unlock(&rwl->mutex);
214    return (stat == 0 ? stat2 : stat);
215 }
216
217
218 /*
219  * Lock for write access, wait until locked (or error).
220  *   Multiple nested write locking is permitted.
221  */
222 int rwl_writelock_p(brwlock_t *rwl, const char *file, int line)
223 {
224    int stat;
225
226    if (rwl->valid != RWLOCK_VALID) {
227       return EINVAL;
228    }
229    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
230       return stat;
231    }
232    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
233       rwl->w_active++;
234       pthread_mutex_unlock(&rwl->mutex);
235       return 0;
236    }
237    lmgr_pre_lock(rwl, rwl->priority, file, line);
238    if (rwl->w_active || rwl->r_active > 0) {
239       rwl->w_wait++;                  /* indicate that we are waiting */
240       pthread_cleanup_push(rwl_write_release, (void *)rwl);
241       while (rwl->w_active || rwl->r_active > 0) {
242          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
243             lmgr_do_unlock(rwl);
244             break;                    /* error, bail out */
245          }
246       }
247       pthread_cleanup_pop(0);
248       rwl->w_wait--;                  /* we are no longer waiting */
249    }
250    if (stat == 0) {
251       rwl->w_active++;                /* we are running */
252       rwl->writer_id = pthread_self(); /* save writer thread's id */
253       lmgr_post_lock();
254    } 
255    pthread_mutex_unlock(&rwl->mutex);
256    return stat;
257 }
258
259 /*
260  * Attempt to lock for write access, don't wait
261  */
262 int rwl_writetrylock(brwlock_t *rwl)
263 {
264    int stat, stat2;
265
266    if (rwl->valid != RWLOCK_VALID) {
267       return EINVAL;
268    }
269    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
270       return stat;
271    }
272    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
273       rwl->w_active++;
274       pthread_mutex_unlock(&rwl->mutex);
275       return 0;
276    }
277    if (rwl->w_active || rwl->r_active > 0) {
278       stat = EBUSY;
279    } else {
280       rwl->w_active = 1;              /* we are running */
281       rwl->writer_id = pthread_self(); /* save writer thread's id */
282       lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
283    }
284    stat2 = pthread_mutex_unlock(&rwl->mutex);
285    return (stat == 0 ? stat2 : stat);
286 }
287
288 /*
289  * Unlock write lock
290  *  Start any waiting writers in preference to waiting readers
291  */
292 int rwl_writeunlock(brwlock_t *rwl)
293 {
294    int stat, stat2;
295
296    if (rwl->valid != RWLOCK_VALID) {
297       return EINVAL;
298    }
299    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
300       return stat;
301    }
302    if (rwl->w_active <= 0) {
303       pthread_mutex_unlock(&rwl->mutex);
304       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
305    }
306    rwl->w_active--;
307    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
308       pthread_mutex_unlock(&rwl->mutex);
309       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
310    }
311    if (rwl->w_active > 0) {
312       stat = 0;                       /* writers still active */
313    } else {
314       lmgr_do_unlock(rwl);
315       /* No more writers, awaken someone */
316       if (rwl->r_wait > 0) {         /* if readers waiting */
317          stat = pthread_cond_broadcast(&rwl->read);
318       } else if (rwl->w_wait > 0) {
319          stat = pthread_cond_broadcast(&rwl->write);
320       }
321    }
322    stat2 = pthread_mutex_unlock(&rwl->mutex);
323    return (stat == 0 ? stat2 : stat);
324 }
325
326 #ifdef TEST_RWLOCK
327
328 #define THREADS     300
329 #define DATASIZE   15
330 #define ITERATIONS 1000000
331
332 /*
333  * Keep statics for each thread.
334  */
335 typedef struct thread_tag {
336    int thread_num;
337    pthread_t thread_id;
338    int writes;
339    int reads;
340    int interval;
341 } thread_t;
342
343 /*
344  * Read/write lock and shared data.
345  */
346 typedef struct data_tag {
347    brwlock_t lock;
348    int data;
349    int writes;
350 } data_t;
351
352 static thread_t threads[THREADS];
353 static data_t data[DATASIZE];
354
355 /*
356  * Thread start routine that uses read/write locks.
357  */
358 void *thread_routine(void *arg)
359 {
360    thread_t *self = (thread_t *)arg;
361    int repeats = 0;
362    int iteration;
363    int element = 0;
364    int status;
365
366    for (iteration=0; iteration < ITERATIONS; iteration++) {
367       /*
368        * Each "self->interval" iterations, perform an
369        * update operation (write lock instead of read
370        * lock).
371        */
372 //      if ((iteration % self->interval) == 0) {
373          status = rwl_writelock(&data[element].lock);
374          if (status != 0) {
375             berrno be;
376             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
377             exit(1);
378          }
379          data[element].data = self->thread_num;
380          data[element].writes++;
381          self->writes++;
382          status = rwl_writelock(&data[element].lock);
383          if (status != 0) {
384             berrno be;
385             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
386             exit(1);
387          }
388          data[element].data = self->thread_num;
389          data[element].writes++;
390          self->writes++;
391          status = rwl_writeunlock(&data[element].lock);
392          if (status != 0) {
393             berrno be;
394             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
395             exit(1);
396          }
397          status = rwl_writeunlock(&data[element].lock);
398          if (status != 0) {
399             berrno be;
400             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
401             exit(1);
402          }
403
404 #ifdef xxx
405       } else {
406          /*
407           * Look at the current data element to see whether
408           * the current thread last updated it. Count the
409           * times to report later.
410           */
411           status = rwl_readlock(&data[element].lock);
412           if (status != 0) {
413              berrno be;
414              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
415              exit(1);
416           }
417           self->reads++;
418           if (data[element].data == self->thread_num)
419              repeats++;
420           status = rwl_readunlock(&data[element].lock);
421           if (status != 0) {
422              berrno be;
423              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
424              exit(1);
425           }
426       }
427 #endif
428       element++;
429       if (element >= DATASIZE) {
430          element = 0;
431       }
432    }
433    if (repeats > 0) {
434       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
435          self->thread_num, repeats);
436    }
437    return NULL;
438 }
439
440 int main (int argc, char *argv[])
441 {
442     int count;
443     int data_count;
444     int status;
445     unsigned int seed = 1;
446     int thread_writes = 0;
447     int data_writes = 0;
448
449 #ifdef USE_THR_SETCONCURRENCY
450     /*
451      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
452      * that our threads can run concurrently, we need to
453      * increase the concurrency level to THREADS.
454      */
455     thr_setconcurrency (THREADS);
456 #endif
457
458     /*
459      * Initialize the shared data.
460      */
461     for (data_count = 0; data_count < DATASIZE; data_count++) {
462         data[data_count].data = 0;
463         data[data_count].writes = 0;
464         status = rwl_init(&data[data_count].lock);
465         if (status != 0) {
466            berrno be;
467            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
468            exit(1);
469         }
470     }
471
472     /*
473      * Create THREADS threads to access shared data.
474      */
475     for (count = 0; count < THREADS; count++) {
476         threads[count].thread_num = count + 1;
477         threads[count].writes = 0;
478         threads[count].reads = 0;
479         threads[count].interval = rand_r(&seed) % 71;
480         if (threads[count].interval <= 0) {
481            threads[count].interval = 1;
482         }
483         status = pthread_create (&threads[count].thread_id,
484             NULL, thread_routine, (void*)&threads[count]);
485         if (status != 0 || (int)threads[count].thread_id == 0) {
486            berrno be;
487            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
488            exit(1);
489         }
490     }
491
492     /*
493      * Wait for all threads to complete, and collect
494      * statistics.
495      */
496     for (count = 0; count < THREADS; count++) {
497         status = pthread_join (threads[count].thread_id, NULL);
498         if (status != 0) {
499            berrno be;
500            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
501            exit(1);
502         }
503         thread_writes += threads[count].writes;
504         printf (_("%02d: interval %d, writes %d, reads %d\n"),
505             count, threads[count].interval,
506             threads[count].writes, threads[count].reads);
507     }
508
509     /*
510      * Collect statistics for the data.
511      */
512     for (data_count = 0; data_count < DATASIZE; data_count++) {
513         data_writes += data[data_count].writes;
514         printf (_("data %02d: value %d, %d writes\n"),
515             data_count, data[data_count].data, data[data_count].writes);
516         rwl_destroy (&data[data_count].lock);
517     }
518
519     printf (_("Total: %d thread writes, %d data writes\n"),
520         thread_writes, data_writes);
521     return 0;
522 }
523
524 #endif
525
526 #ifdef TEST_RW_TRY_LOCK
527 /*
528  * brwlock_try_main.c
529  *
530  * Demonstrate use of non-blocking read-write locks.
531  *
532  * Special notes: On older Solaris system, call thr_setconcurrency()
533  * to allow interleaved thread execution, since threads are not
534  * timesliced.
535  */
536 #include <pthread.h>
537 #include "rwlock.h"
538 #include "errors.h"
539
540 #define THREADS         5
541 #define ITERATIONS      1000
542 #define DATASIZE        15
543
544 /*
545  * Keep statistics for each thread.
546  */
547 typedef struct thread_tag {
548     int         thread_num;
549     pthread_t   thread_id;
550     int         r_collisions;
551     int         w_collisions;
552     int         updates;
553     int         interval;
554 } thread_t;
555
556 /*
557  * Read-write lock and shared data
558  */
559 typedef struct data_tag {
560     brwlock_t    lock;
561     int         data;
562     int         updates;
563 } data_t;
564
565 thread_t threads[THREADS];
566 data_t data[DATASIZE];
567
568 /*
569  * Thread start routine that uses read-write locks
570  */
571 void *thread_routine (void *arg)
572 {
573     thread_t *self = (thread_t*)arg;
574     int iteration;
575     int element;
576     int status;
577     lmgr_init_thread();
578     element = 0;                        /* Current data element */
579
580     for (iteration = 0; iteration < ITERATIONS; iteration++) {
581         if ((iteration % self->interval) == 0) {
582             status = rwl_writetrylock (&data[element].lock);
583             if (status == EBUSY)
584                 self->w_collisions++;
585             else if (status == 0) {
586                 data[element].data++;
587                 data[element].updates++;
588                 self->updates++;
589                 rwl_writeunlock (&data[element].lock);
590             } else
591                 err_abort (status, _("Try write lock"));
592         } else {
593             status = rwl_readtrylock (&data[element].lock);
594             if (status == EBUSY)
595                 self->r_collisions++;
596             else if (status != 0) {
597                 err_abort (status, _("Try read lock"));
598             } else {
599                 if (data[element].data != data[element].updates)
600                     printf ("%d: data[%d] %d != %d\n",
601                         self->thread_num, element,
602                         data[element].data, data[element].updates);
603                 rwl_readunlock (&data[element].lock);
604             }
605         }
606
607         element++;
608         if (element >= DATASIZE)
609             element = 0;
610     }
611     lmgr_cleanup_thread();
612     return NULL;
613 }
614
615 int main (int argc, char *argv[])
616 {
617     int count, data_count;
618     unsigned int seed = 1;
619     int thread_updates = 0, data_updates = 0;
620     int status;
621
622 #ifdef USE_THR_SETCONCURRENCY
623     /*
624      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
625      * that our threads can run concurrently, we need to
626      * increase the concurrency level to THREADS.
627      */
628     DPRINTF (("Setting concurrency level to %d\n", THREADS));
629     thr_setconcurrency (THREADS);
630 #endif
631
632     /*
633      * Initialize the shared data.
634      */
635     for (data_count = 0; data_count < DATASIZE; data_count++) {
636         data[data_count].data = 0;
637         data[data_count].updates = 0;
638         rwl_init(&data[data_count].lock);
639     }
640
641     /*
642      * Create THREADS threads to access shared data.
643      */
644     for (count = 0; count < THREADS; count++) {
645         threads[count].thread_num = count;
646         threads[count].r_collisions = 0;
647         threads[count].w_collisions = 0;
648         threads[count].updates = 0;
649         threads[count].interval = rand_r (&seed) % ITERATIONS;
650         status = pthread_create (&threads[count].thread_id,
651             NULL, thread_routine, (void*)&threads[count]);
652         if (status != 0)
653             err_abort (status, _("Create thread"));
654     }
655
656     /*
657      * Wait for all threads to complete, and collect
658      * statistics.
659      */
660     for (count = 0; count < THREADS; count++) {
661         status = pthread_join (threads[count].thread_id, NULL);
662         if (status != 0)
663             err_abort (status, _("Join thread"));
664         thread_updates += threads[count].updates;
665         printf (_("%02d: interval %d, updates %d, "
666                 "r_collisions %d, w_collisions %d\n"),
667             count, threads[count].interval,
668             threads[count].updates,
669             threads[count].r_collisions, threads[count].w_collisions);
670     }
671
672     /*
673      * Collect statistics for the data.
674      */
675     for (data_count = 0; data_count < DATASIZE; data_count++) {
676         data_updates += data[data_count].updates;
677         printf (_("data %02d: value %d, %d updates\n"),
678             data_count, data[data_count].data, data[data_count].updates);
679         rwl_destroy (&data[data_count].lock);
680     }
681
682     return 0;
683 }
684
685 #endif