]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
Tweak version date
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Thread Read/Write locking code. It permits
30  *  multiple readers but only one writer.  Note, however,
31  *  that the writer thread is permitted to make multiple
32  *  nested write lock calls.
33  *
34  *  Kern Sibbald, January MMI
35  *
36  *  This code adapted from "Programming with POSIX Threads", by
37  *    David R. Butenhof
38  *
39  */
40
41 #define _LOCKMGR_COMPLIANT
42 #include "bacula.h"
43
44 /*
45  * Initialize a read/write lock
46  *
47  *  Returns: 0 on success
48  *           errno on failure
49  */
50 int rwl_init(brwlock_t *rwl, int priority)
51 {
52    int stat;
53
54    rwl->r_active = rwl->w_active = 0;
55    rwl->r_wait = rwl->w_wait = 0;
56    rwl->priority = priority;
57    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
58       return stat;
59    }
60    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
61       pthread_mutex_destroy(&rwl->mutex);
62       return stat;
63    }
64    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
65       pthread_cond_destroy(&rwl->read);
66       pthread_mutex_destroy(&rwl->mutex);
67       return stat;
68    }
69    rwl->valid = RWLOCK_VALID;
70    return 0;
71 }
72
73 /*
74  * Destroy a read/write lock
75  *
76  * Returns: 0 on success
77  *          errno on failure
78  */
79 int rwl_destroy(brwlock_t *rwl)
80 {
81    int stat, stat1, stat2;
82
83   if (rwl->valid != RWLOCK_VALID) {
84      return EINVAL;
85   }
86   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
87      return stat;
88   }
89
90   /*
91    * If any threads are active, report EBUSY
92    */
93   if (rwl->r_active > 0 || rwl->w_active) {
94      pthread_mutex_unlock(&rwl->mutex);
95      return EBUSY;
96   }
97
98   /*
99    * If any threads are waiting, report EBUSY
100    */
101   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102      pthread_mutex_unlock(&rwl->mutex);
103      return EBUSY;
104   }
105
106   rwl->valid = 0;
107   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
108      return stat;
109   }
110   stat  = pthread_mutex_destroy(&rwl->mutex);
111   stat1 = pthread_cond_destroy(&rwl->read);
112   stat2 = pthread_cond_destroy(&rwl->write);
113   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
114 }
115
116 /*
117  * Handle cleanup when the read lock condition variable
118  * wait is released.
119  */
120 static void rwl_read_release(void *arg)
121 {
122    brwlock_t *rwl = (brwlock_t *)arg;
123
124    rwl->r_wait--;
125    pthread_mutex_unlock(&rwl->mutex);
126 }
127
128 /*
129  * Handle cleanup when the write lock condition variable wait
130  * is released.
131  */
132 static void rwl_write_release(void *arg)
133 {
134    brwlock_t *rwl = (brwlock_t *)arg;
135
136    rwl->w_wait--;
137    pthread_mutex_unlock(&rwl->mutex);
138 }
139
140 /*
141  * Lock for read access, wait until locked (or error).
142  */
143 int rwl_readlock(brwlock_t *rwl)
144 {
145    int stat;
146
147    if (rwl->valid != RWLOCK_VALID) {
148       return EINVAL;
149    }
150    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
151       return stat;
152    }
153    if (rwl->w_active) {
154       rwl->r_wait++;                  /* indicate that we are waiting */
155       pthread_cleanup_push(rwl_read_release, (void *)rwl);
156       while (rwl->w_active) {
157          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
158          if (stat != 0) {
159             break;                    /* error, bail out */
160          }
161       }
162       pthread_cleanup_pop(0);
163       rwl->r_wait--;                  /* we are no longer waiting */
164    }
165    if (stat == 0) {
166       rwl->r_active++;                /* we are running */
167    }
168    pthread_mutex_unlock(&rwl->mutex);
169    return stat;
170 }
171
172 /*
173  * Attempt to lock for read access, don't wait
174  */
175 int rwl_readtrylock(brwlock_t *rwl)
176 {
177    int stat, stat2;
178
179    if (rwl->valid != RWLOCK_VALID) {
180       return EINVAL;
181    }
182    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
183       return stat;
184    }
185    if (rwl->w_active) {
186       stat = EBUSY;
187    } else {
188       rwl->r_active++;                /* we are running */
189    }
190    stat2 = pthread_mutex_unlock(&rwl->mutex);
191    return (stat == 0 ? stat2 : stat);
192 }
193
194 /*
195  * Unlock read lock
196  */
197 int rwl_readunlock(brwlock_t *rwl)
198 {
199    int stat, stat2;
200
201    if (rwl->valid != RWLOCK_VALID) {
202       return EINVAL;
203    }
204    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
205       return stat;
206    }
207    rwl->r_active--;
208    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
209       stat = pthread_cond_broadcast(&rwl->write);
210    }
211    stat2 = pthread_mutex_unlock(&rwl->mutex);
212    return (stat == 0 ? stat2 : stat);
213 }
214
215
216 /*
217  * Lock for write access, wait until locked (or error).
218  *   Multiple nested write locking is permitted.
219  */
220 int rwl_writelock_p(brwlock_t *rwl, const char *file, int line)
221 {
222    int stat;
223
224    if (rwl->valid != RWLOCK_VALID) {
225       return EINVAL;
226    }
227    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
228       return stat;
229    }
230    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
231       rwl->w_active++;
232       pthread_mutex_unlock(&rwl->mutex);
233       return 0;
234    }
235    lmgr_pre_lock(rwl, rwl->priority, file, line);
236    if (rwl->w_active || rwl->r_active > 0) {
237       rwl->w_wait++;                  /* indicate that we are waiting */
238       pthread_cleanup_push(rwl_write_release, (void *)rwl);
239       while (rwl->w_active || rwl->r_active > 0) {
240          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
241             lmgr_do_unlock(rwl);
242             break;                    /* error, bail out */
243          }
244       }
245       pthread_cleanup_pop(0);
246       rwl->w_wait--;                  /* we are no longer waiting */
247    }
248    if (stat == 0) {
249       rwl->w_active++;                /* we are running */
250       rwl->writer_id = pthread_self(); /* save writer thread's id */
251       lmgr_post_lock();
252    } 
253    pthread_mutex_unlock(&rwl->mutex);
254    return stat;
255 }
256
257 /*
258  * Attempt to lock for write access, don't wait
259  */
260 int rwl_writetrylock(brwlock_t *rwl)
261 {
262    int stat, stat2;
263
264    if (rwl->valid != RWLOCK_VALID) {
265       return EINVAL;
266    }
267    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
268       return stat;
269    }
270    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
271       rwl->w_active++;
272       pthread_mutex_unlock(&rwl->mutex);
273       return 0;
274    }
275    if (rwl->w_active || rwl->r_active > 0) {
276       stat = EBUSY;
277    } else {
278       rwl->w_active = 1;              /* we are running */
279       rwl->writer_id = pthread_self(); /* save writer thread's id */
280       lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
281    }
282    stat2 = pthread_mutex_unlock(&rwl->mutex);
283    return (stat == 0 ? stat2 : stat);
284 }
285
286 /*
287  * Unlock write lock
288  *  Start any waiting writers in preference to waiting readers
289  */
290 int rwl_writeunlock(brwlock_t *rwl)
291 {
292    int stat, stat2;
293
294    if (rwl->valid != RWLOCK_VALID) {
295       return EINVAL;
296    }
297    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
298       return stat;
299    }
300    if (rwl->w_active <= 0) {
301       pthread_mutex_unlock(&rwl->mutex);
302       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
303    }
304    rwl->w_active--;
305    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
306       pthread_mutex_unlock(&rwl->mutex);
307       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
308    }
309    if (rwl->w_active > 0) {
310       stat = 0;                       /* writers still active */
311    } else {
312       lmgr_do_unlock(rwl);
313       /* No more writers, awaken someone */
314       if (rwl->r_wait > 0) {         /* if readers waiting */
315          stat = pthread_cond_broadcast(&rwl->read);
316       } else if (rwl->w_wait > 0) {
317          stat = pthread_cond_broadcast(&rwl->write);
318       }
319    }
320    stat2 = pthread_mutex_unlock(&rwl->mutex);
321    return (stat == 0 ? stat2 : stat);
322 }
323
324 #ifdef TEST_RWLOCK
325
326 #define THREADS     300
327 #define DATASIZE   15
328 #define ITERATIONS 1000000
329
330 /*
331  * Keep statics for each thread.
332  */
333 typedef struct thread_tag {
334    int thread_num;
335    pthread_t thread_id;
336    int writes;
337    int reads;
338    int interval;
339 } thread_t;
340
341 /*
342  * Read/write lock and shared data.
343  */
344 typedef struct data_tag {
345    brwlock_t lock;
346    int data;
347    int writes;
348 } data_t;
349
350 static thread_t threads[THREADS];
351 static data_t data[DATASIZE];
352
353 /*
354  * Thread start routine that uses read/write locks.
355  */
356 void *thread_routine(void *arg)
357 {
358    thread_t *self = (thread_t *)arg;
359    int repeats = 0;
360    int iteration;
361    int element = 0;
362    int status;
363
364    for (iteration=0; iteration < ITERATIONS; iteration++) {
365       /*
366        * Each "self->interval" iterations, perform an
367        * update operation (write lock instead of read
368        * lock).
369        */
370 //      if ((iteration % self->interval) == 0) {
371          status = rwl_writelock(&data[element].lock);
372          if (status != 0) {
373             berrno be;
374             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
375             exit(1);
376          }
377          data[element].data = self->thread_num;
378          data[element].writes++;
379          self->writes++;
380          status = rwl_writelock(&data[element].lock);
381          if (status != 0) {
382             berrno be;
383             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
384             exit(1);
385          }
386          data[element].data = self->thread_num;
387          data[element].writes++;
388          self->writes++;
389          status = rwl_writeunlock(&data[element].lock);
390          if (status != 0) {
391             berrno be;
392             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
393             exit(1);
394          }
395          status = rwl_writeunlock(&data[element].lock);
396          if (status != 0) {
397             berrno be;
398             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
399             exit(1);
400          }
401
402 #ifdef xxx
403       } else {
404          /*
405           * Look at the current data element to see whether
406           * the current thread last updated it. Count the
407           * times to report later.
408           */
409           status = rwl_readlock(&data[element].lock);
410           if (status != 0) {
411              berrno be;
412              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
413              exit(1);
414           }
415           self->reads++;
416           if (data[element].data == self->thread_num)
417              repeats++;
418           status = rwl_readunlock(&data[element].lock);
419           if (status != 0) {
420              berrno be;
421              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
422              exit(1);
423           }
424       }
425 #endif
426       element++;
427       if (element >= DATASIZE) {
428          element = 0;
429       }
430    }
431    if (repeats > 0) {
432       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
433          self->thread_num, repeats);
434    }
435    return NULL;
436 }
437
438 int main (int argc, char *argv[])
439 {
440     int count;
441     int data_count;
442     int status;
443     unsigned int seed = 1;
444     int thread_writes = 0;
445     int data_writes = 0;
446
447 #ifdef USE_THR_SETCONCURRENCY
448     /*
449      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
450      * that our threads can run concurrently, we need to
451      * increase the concurrency level to THREADS.
452      */
453     thr_setconcurrency (THREADS);
454 #endif
455
456     /*
457      * Initialize the shared data.
458      */
459     for (data_count = 0; data_count < DATASIZE; data_count++) {
460         data[data_count].data = 0;
461         data[data_count].writes = 0;
462         status = rwl_init(&data[data_count].lock);
463         if (status != 0) {
464            berrno be;
465            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
466            exit(1);
467         }
468     }
469
470     /*
471      * Create THREADS threads to access shared data.
472      */
473     for (count = 0; count < THREADS; count++) {
474         threads[count].thread_num = count + 1;
475         threads[count].writes = 0;
476         threads[count].reads = 0;
477         threads[count].interval = rand_r(&seed) % 71;
478         if (threads[count].interval <= 0) {
479            threads[count].interval = 1;
480         }
481         status = pthread_create (&threads[count].thread_id,
482             NULL, thread_routine, (void*)&threads[count]);
483         if (status != 0 || (int)threads[count].thread_id == 0) {
484            berrno be;
485            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
486            exit(1);
487         }
488     }
489
490     /*
491      * Wait for all threads to complete, and collect
492      * statistics.
493      */
494     for (count = 0; count < THREADS; count++) {
495         status = pthread_join (threads[count].thread_id, NULL);
496         if (status != 0) {
497            berrno be;
498            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
499            exit(1);
500         }
501         thread_writes += threads[count].writes;
502         printf (_("%02d: interval %d, writes %d, reads %d\n"),
503             count, threads[count].interval,
504             threads[count].writes, threads[count].reads);
505     }
506
507     /*
508      * Collect statistics for the data.
509      */
510     for (data_count = 0; data_count < DATASIZE; data_count++) {
511         data_writes += data[data_count].writes;
512         printf (_("data %02d: value %d, %d writes\n"),
513             data_count, data[data_count].data, data[data_count].writes);
514         rwl_destroy (&data[data_count].lock);
515     }
516
517     printf (_("Total: %d thread writes, %d data writes\n"),
518         thread_writes, data_writes);
519     return 0;
520 }
521
522 #endif
523
524 #ifdef TEST_RW_TRY_LOCK
525 /*
526  * brwlock_try_main.c
527  *
528  * Demonstrate use of non-blocking read-write locks.
529  *
530  * Special notes: On older Solaris system, call thr_setconcurrency()
531  * to allow interleaved thread execution, since threads are not
532  * timesliced.
533  */
534 #include <pthread.h>
535 #include "rwlock.h"
536 #include "errors.h"
537
538 #define THREADS         5
539 #define ITERATIONS      1000
540 #define DATASIZE        15
541
542 /*
543  * Keep statistics for each thread.
544  */
545 typedef struct thread_tag {
546     int         thread_num;
547     pthread_t   thread_id;
548     int         r_collisions;
549     int         w_collisions;
550     int         updates;
551     int         interval;
552 } thread_t;
553
554 /*
555  * Read-write lock and shared data
556  */
557 typedef struct data_tag {
558     brwlock_t    lock;
559     int         data;
560     int         updates;
561 } data_t;
562
563 thread_t threads[THREADS];
564 data_t data[DATASIZE];
565
566 /*
567  * Thread start routine that uses read-write locks
568  */
569 void *thread_routine (void *arg)
570 {
571     thread_t *self = (thread_t*)arg;
572     int iteration;
573     int element;
574     int status;
575     lmgr_init_thread();
576     element = 0;                        /* Current data element */
577
578     for (iteration = 0; iteration < ITERATIONS; iteration++) {
579         if ((iteration % self->interval) == 0) {
580             status = rwl_writetrylock (&data[element].lock);
581             if (status == EBUSY)
582                 self->w_collisions++;
583             else if (status == 0) {
584                 data[element].data++;
585                 data[element].updates++;
586                 self->updates++;
587                 rwl_writeunlock (&data[element].lock);
588             } else
589                 err_abort (status, _("Try write lock"));
590         } else {
591             status = rwl_readtrylock (&data[element].lock);
592             if (status == EBUSY)
593                 self->r_collisions++;
594             else if (status != 0) {
595                 err_abort (status, _("Try read lock"));
596             } else {
597                 if (data[element].data != data[element].updates)
598                     printf ("%d: data[%d] %d != %d\n",
599                         self->thread_num, element,
600                         data[element].data, data[element].updates);
601                 rwl_readunlock (&data[element].lock);
602             }
603         }
604
605         element++;
606         if (element >= DATASIZE)
607             element = 0;
608     }
609     lmgr_cleanup_thread();
610     return NULL;
611 }
612
613 int main (int argc, char *argv[])
614 {
615     int count, data_count;
616     unsigned int seed = 1;
617     int thread_updates = 0, data_updates = 0;
618     int status;
619
620 #ifdef USE_THR_SETCONCURRENCY
621     /*
622      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
623      * that our threads can run concurrently, we need to
624      * increase the concurrency level to THREADS.
625      */
626     DPRINTF (("Setting concurrency level to %d\n", THREADS));
627     thr_setconcurrency (THREADS);
628 #endif
629
630     /*
631      * Initialize the shared data.
632      */
633     for (data_count = 0; data_count < DATASIZE; data_count++) {
634         data[data_count].data = 0;
635         data[data_count].updates = 0;
636         rwl_init(&data[data_count].lock);
637     }
638
639     /*
640      * Create THREADS threads to access shared data.
641      */
642     for (count = 0; count < THREADS; count++) {
643         threads[count].thread_num = count;
644         threads[count].r_collisions = 0;
645         threads[count].w_collisions = 0;
646         threads[count].updates = 0;
647         threads[count].interval = rand_r (&seed) % ITERATIONS;
648         status = pthread_create (&threads[count].thread_id,
649             NULL, thread_routine, (void*)&threads[count]);
650         if (status != 0)
651             err_abort (status, _("Create thread"));
652     }
653
654     /*
655      * Wait for all threads to complete, and collect
656      * statistics.
657      */
658     for (count = 0; count < THREADS; count++) {
659         status = pthread_join (threads[count].thread_id, NULL);
660         if (status != 0)
661             err_abort (status, _("Join thread"));
662         thread_updates += threads[count].updates;
663         printf (_("%02d: interval %d, updates %d, "
664                 "r_collisions %d, w_collisions %d\n"),
665             count, threads[count].interval,
666             threads[count].updates,
667             threads[count].r_collisions, threads[count].w_collisions);
668     }
669
670     /*
671      * Collect statistics for the data.
672      */
673     for (data_count = 0; data_count < DATASIZE; data_count++) {
674         data_updates += data[data_count].updates;
675         printf (_("data %02d: value %d, %d updates\n"),
676             data_count, data[data_count].data, data[data_count].updates);
677         rwl_destroy (&data[data_count].lock);
678     }
679
680     return 0;
681 }
682
683 #endif