]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
ebl decrease the debug ouput
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Thread Read/Write locking code. It permits
30  *  multiple readers but only one writer.  Note, however,
31  *  that the writer thread is permitted to make multiple
32  *  nested write lock calls.
33  *
34  *  Kern Sibbald, January MMI
35  *
36  *   Version $Id$
37  *
38  *  This code adapted from "Programming with POSIX Threads", by
39  *    David R. Butenhof
40  *
41  */
42
43 #include "bacula.h"
44
45 /*
46  * Initialize a read/write lock
47  *
48  *  Returns: 0 on success
49  *           errno on failure
50  */
51 int rwl_init(brwlock_t *rwl)
52 {
53    int stat;
54
55    rwl->r_active = rwl->w_active = 0;
56    rwl->r_wait = rwl->w_wait = 0;
57    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
58       return stat;
59    }
60    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
61       pthread_mutex_destroy(&rwl->mutex);
62       return stat;
63    }
64    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
65       pthread_cond_destroy(&rwl->read);
66       pthread_mutex_destroy(&rwl->mutex);
67       return stat;
68    }
69    rwl->valid = RWLOCK_VALID;
70    return 0;
71 }
72
73 /*
74  * Destroy a read/write lock
75  *
76  * Returns: 0 on success
77  *          errno on failure
78  */
79 int rwl_destroy(brwlock_t *rwl)
80 {
81    int stat, stat1, stat2;
82
83   if (rwl->valid != RWLOCK_VALID) {
84      return EINVAL;
85   }
86   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
87      return stat;
88   }
89
90   /*
91    * If any threads are active, report EBUSY
92    */
93   if (rwl->r_active > 0 || rwl->w_active) {
94      pthread_mutex_unlock(&rwl->mutex);
95      return EBUSY;
96   }
97
98   /*
99    * If any threads are waiting, report EBUSY
100    */
101   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102      pthread_mutex_unlock(&rwl->mutex);
103      return EBUSY;
104   }
105
106   rwl->valid = 0;
107   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
108      return stat;
109   }
110   stat  = pthread_mutex_destroy(&rwl->mutex);
111   stat1 = pthread_cond_destroy(&rwl->read);
112   stat2 = pthread_cond_destroy(&rwl->write);
113   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
114 }
115
116 /*
117  * Handle cleanup when the read lock condition variable
118  * wait is released.
119  */
120 static void rwl_read_release(void *arg)
121 {
122    brwlock_t *rwl = (brwlock_t *)arg;
123
124    rwl->r_wait--;
125    pthread_mutex_unlock(&rwl->mutex);
126 }
127
128 /*
129  * Handle cleanup when the write lock condition variable wait
130  * is released.
131  */
132 static void rwl_write_release(void *arg)
133 {
134    brwlock_t *rwl = (brwlock_t *)arg;
135
136    rwl->w_wait--;
137    pthread_mutex_unlock(&rwl->mutex);
138 }
139
140 /*
141  * Lock for read access, wait until locked (or error).
142  */
143 int rwl_readlock(brwlock_t *rwl)
144 {
145    int stat;
146
147    if (rwl->valid != RWLOCK_VALID) {
148       return EINVAL;
149    }
150    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
151       return stat;
152    }
153    if (rwl->w_active) {
154       rwl->r_wait++;                  /* indicate that we are waiting */
155       pthread_cleanup_push(rwl_read_release, (void *)rwl);
156       while (rwl->w_active) {
157          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
158          if (stat != 0) {
159             break;                    /* error, bail out */
160          }
161       }
162       pthread_cleanup_pop(0);
163       rwl->r_wait--;                  /* we are no longer waiting */
164    }
165    if (stat == 0) {
166       rwl->r_active++;                /* we are running */
167    }
168    pthread_mutex_unlock(&rwl->mutex);
169    return stat;
170 }
171
172 /*
173  * Attempt to lock for read access, don't wait
174  */
175 int rwl_readtrylock(brwlock_t *rwl)
176 {
177    int stat, stat2;
178
179    if (rwl->valid != RWLOCK_VALID) {
180       return EINVAL;
181    }
182    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
183       return stat;
184    }
185    if (rwl->w_active) {
186       stat = EBUSY;
187    } else {
188       rwl->r_active++;                /* we are running */
189    }
190    stat2 = pthread_mutex_unlock(&rwl->mutex);
191    return (stat == 0 ? stat2 : stat);
192 }
193
194 /*
195  * Unlock read lock
196  */
197 int rwl_readunlock(brwlock_t *rwl)
198 {
199    int stat, stat2;
200
201    if (rwl->valid != RWLOCK_VALID) {
202       return EINVAL;
203    }
204    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
205       return stat;
206    }
207    rwl->r_active--;
208    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
209       stat = pthread_cond_broadcast(&rwl->write);
210    }
211    stat2 = pthread_mutex_unlock(&rwl->mutex);
212    return (stat == 0 ? stat2 : stat);
213 }
214
215
216 /*
217  * Lock for write access, wait until locked (or error).
218  *   Multiple nested write locking is permitted.
219  */
220 int rwl_writelock(brwlock_t *rwl)
221 {
222    int stat;
223
224    if (rwl->valid != RWLOCK_VALID) {
225       return EINVAL;
226    }
227    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
228       return stat;
229    }
230    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
231       rwl->w_active++;
232       pthread_mutex_unlock(&rwl->mutex);
233       return 0;
234    }
235    if (rwl->w_active || rwl->r_active > 0) {
236       rwl->w_wait++;                  /* indicate that we are waiting */
237       pthread_cleanup_push(rwl_write_release, (void *)rwl);
238       while (rwl->w_active || rwl->r_active > 0) {
239          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
240             break;                    /* error, bail out */
241          }
242       }
243       pthread_cleanup_pop(0);
244       rwl->w_wait--;                  /* we are no longer waiting */
245    }
246    if (stat == 0) {
247       rwl->w_active++;                /* we are running */
248       rwl->writer_id = pthread_self(); /* save writer thread's id */
249    }
250    pthread_mutex_unlock(&rwl->mutex);
251    return stat;
252 }
253
254 /*
255  * Attempt to lock for write access, don't wait
256  */
257 int rwl_writetrylock(brwlock_t *rwl)
258 {
259    int stat, stat2;
260
261    if (rwl->valid != RWLOCK_VALID) {
262       return EINVAL;
263    }
264    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
265       return stat;
266    }
267    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
268       rwl->w_active++;
269       pthread_mutex_unlock(&rwl->mutex);
270       return 0;
271    }
272    if (rwl->w_active || rwl->r_active > 0) {
273       stat = EBUSY;
274    } else {
275       rwl->w_active = 1;              /* we are running */
276       rwl->writer_id = pthread_self(); /* save writer thread's id */
277    }
278    stat2 = pthread_mutex_unlock(&rwl->mutex);
279    return (stat == 0 ? stat2 : stat);
280 }
281
282 /*
283  * Unlock write lock
284  *  Start any waiting writers in preference to waiting readers
285  */
286 int rwl_writeunlock(brwlock_t *rwl)
287 {
288    int stat, stat2;
289
290    if (rwl->valid != RWLOCK_VALID) {
291       return EINVAL;
292    }
293    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
294       return stat;
295    }
296    if (rwl->w_active <= 0) {
297       pthread_mutex_unlock(&rwl->mutex);
298       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
299    }
300    rwl->w_active--;
301    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
302       pthread_mutex_unlock(&rwl->mutex);
303       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
304    }
305    if (rwl->w_active > 0) {
306       stat = 0;                       /* writers still active */
307    } else {
308       /* No more writers, awaken someone */
309       if (rwl->r_wait > 0) {         /* if readers waiting */
310          stat = pthread_cond_broadcast(&rwl->read);
311       } else if (rwl->w_wait > 0) {
312          stat = pthread_cond_broadcast(&rwl->write);
313       }
314    }
315    stat2 = pthread_mutex_unlock(&rwl->mutex);
316    return (stat == 0 ? stat2 : stat);
317 }
318
319 #ifdef TEST_RWLOCK
320
321 #define THREADS     5
322 #define DATASIZE   15
323 #define ITERATIONS 10000
324
325 /*
326  * Keep statics for each thread.
327  */
328 typedef struct thread_tag {
329    int thread_num;
330    pthread_t thread_id;
331    int writes;
332    int reads;
333    int interval;
334 } thread_t;
335
336 /*
337  * Read/write lock and shared data.
338  */
339 typedef struct data_tag {
340    brwlock_t lock;
341    int data;
342    int writes;
343 } data_t;
344
345 thread_t threads[THREADS];
346 data_t data[DATASIZE];
347
348 /*
349  * Thread start routine that uses read/write locks.
350  */
351 void *thread_routine(void *arg)
352 {
353    thread_t *self = (thread_t *)arg;
354    int repeats = 0;
355    int iteration;
356    int element = 0;
357    int status;
358
359    for (iteration=0; iteration < ITERATIONS; iteration++) {
360       /*
361        * Each "self->interval" iterations, perform an
362        * update operation (write lock instead of read
363        * lock).
364        */
365       if ((iteration % self->interval) == 0) {
366          status = rwl_writelock(&data[element].lock);
367          if (status != 0) {
368             berrno be;
369             Jmsg1(NULL, M_ABORT, 0, _("Write lock failed. ERR=%s\n"), be.bstrerror(status));
370          }
371          data[element].data = self->thread_num;
372          data[element].writes++;
373          self->writes++;
374          status = rwl_writeunlock(&data[element].lock);
375          if (status != 0) {
376             berrno be;
377             Jmsg1(NULL, M_ABORT, 0, _("Write unlock failed. ERR=%s\n"), be.bstrerror(status));
378          }
379       } else {
380          /*
381           * Look at the current data element to see whether
382           * the current thread last updated it. Count the
383           * times to report later.
384           */
385           status = rwl_readlock(&data[element].lock);
386           if (status != 0) {
387              berrno be;
388              Jmsg1(NULL, M_ABORT, 0, _("Read lock failed. ERR=%s\n"), be.bstrerror(status));
389           }
390           self->reads++;
391           if (data[element].data == self->thread_num)
392              repeats++;
393           status = rwl_readunlock(&data[element].lock);
394           if (status != 0) {
395              berrno be;
396              Jmsg1(NULL, M_ABORT, 0, _("Read unlock failed. ERR=%s\n"), be.bstrerror(status));
397           }
398       }
399       element++;
400       if (element >= DATASIZE) {
401          element = 0;
402       }
403    }
404    if (repeats > 0) {
405       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
406          self->thread_num, repeats);
407    }
408    return NULL;
409 }
410
411 int main (int argc, char *argv[])
412 {
413     int count;
414     int data_count;
415     int status;
416     unsigned int seed = 1;
417     int thread_writes = 0;
418     int data_writes = 0;
419
420 #ifdef sun
421     /*
422      * On Solaris 2.5, threads are not timesliced. To ensure
423      * that our threads can run concurrently, we need to
424      * increase the concurrency level to THREADS.
425      */
426     thr_setconcurrency (THREADS);
427 #endif
428
429     /*
430      * Initialize the shared data.
431      */
432     for (data_count = 0; data_count < DATASIZE; data_count++) {
433         data[data_count].data = 0;
434         data[data_count].writes = 0;
435         status = rwl_init (&data[data_count].lock);
436         if (status != 0) {
437            berrno be;
438            Jmsg1(NULL, M_ABORT, 0, _("Init rwlock failed. ERR=%s\n"), be.bstrerror(status));
439         }
440     }
441
442     /*
443      * Create THREADS threads to access shared data.
444      */
445     for (count = 0; count < THREADS; count++) {
446         threads[count].thread_num = count + 1;
447         threads[count].writes = 0;
448         threads[count].reads = 0;
449         threads[count].interval = rand_r (&seed) % 71;
450         status = pthread_create (&threads[count].thread_id,
451             NULL, thread_routine, (void*)&threads[count]);
452         if (status != 0) {
453            berrno be;
454            Jmsg1(NULL, M_ABORT, 0, _("Create thread failed. ERR=%s\n"), be.bstrerror(status));
455         }
456     }
457
458     /*
459      * Wait for all threads to complete, and collect
460      * statistics.
461      */
462     for (count = 0; count < THREADS; count++) {
463         status = pthread_join (threads[count].thread_id, NULL);
464         if (status != 0) {
465            berrno be;
466            Jmsg1(NULL, M_ABORT, 0, _("Join thread failed. ERR=%s\n"), be.bstrerror(status));
467         }
468         thread_writes += threads[count].writes;
469         printf (_("%02d: interval %d, writes %d, reads %d\n"),
470             count, threads[count].interval,
471             threads[count].writes, threads[count].reads);
472     }
473
474     /*
475      * Collect statistics for the data.
476      */
477     for (data_count = 0; data_count < DATASIZE; data_count++) {
478         data_writes += data[data_count].writes;
479         printf (_("data %02d: value %d, %d writes\n"),
480             data_count, data[data_count].data, data[data_count].writes);
481         rwl_destroy (&data[data_count].lock);
482     }
483
484     printf (_("Total: %d thread writes, %d data writes\n"),
485         thread_writes, data_writes);
486     return 0;
487 }
488
489 #endif
490
491 #ifdef TEST_RW_TRY_LOCK
492 /*
493  * brwlock_try_main.c
494  *
495  * Demonstrate use of non-blocking read-write locks.
496  *
497  * Special notes: On a Solaris system, call thr_setconcurrency()
498  * to allow interleaved thread execution, since threads are not
499  * timesliced.
500  */
501 #include <pthread.h>
502 #include "rwlock.h"
503 #include "errors.h"
504
505 #define THREADS         5
506 #define ITERATIONS      1000
507 #define DATASIZE        15
508
509 /*
510  * Keep statistics for each thread.
511  */
512 typedef struct thread_tag {
513     int         thread_num;
514     pthread_t   thread_id;
515     int         r_collisions;
516     int         w_collisions;
517     int         updates;
518     int         interval;
519 } thread_t;
520
521 /*
522  * Read-write lock and shared data
523  */
524 typedef struct data_tag {
525     brwlock_t    lock;
526     int         data;
527     int         updates;
528 } data_t;
529
530 thread_t threads[THREADS];
531 data_t data[DATASIZE];
532
533 /*
534  * Thread start routine that uses read-write locks
535  */
536 void *thread_routine (void *arg)
537 {
538     thread_t *self = (thread_t*)arg;
539     int iteration;
540     int element;
541     int status;
542
543     element = 0;                        /* Current data element */
544
545     for (iteration = 0; iteration < ITERATIONS; iteration++) {
546         if ((iteration % self->interval) == 0) {
547             status = rwl_writetrylock (&data[element].lock);
548             if (status == EBUSY)
549                 self->w_collisions++;
550             else if (status == 0) {
551                 data[element].data++;
552                 data[element].updates++;
553                 self->updates++;
554                 rwl_writeunlock (&data[element].lock);
555             } else
556                 err_abort (status, _("Try write lock"));
557         } else {
558             status = rwl_readtrylock (&data[element].lock);
559             if (status == EBUSY)
560                 self->r_collisions++;
561             else if (status != 0) {
562                 err_abort (status, _("Try read lock"));
563             } else {
564                 if (data[element].data != data[element].updates)
565                     printf ("%d: data[%d] %d != %d\n",
566                         self->thread_num, element,
567                         data[element].data, data[element].updates);
568                 rwl_readunlock (&data[element].lock);
569             }
570         }
571
572         element++;
573         if (element >= DATASIZE)
574             element = 0;
575     }
576     return NULL;
577 }
578
579 int main (int argc, char *argv[])
580 {
581     int count, data_count;
582     unsigned int seed = 1;
583     int thread_updates = 0, data_updates = 0;
584     int status;
585
586 #ifdef sun
587     /*
588      * On Solaris 2.5, threads are not timesliced. To ensure
589      * that our threads can run concurrently, we need to
590      * increase the concurrency level to THREADS.
591      */
592     DPRINTF (("Setting concurrency level to %d\n", THREADS));
593     thr_setconcurrency (THREADS);
594 #endif
595
596     /*
597      * Initialize the shared data.
598      */
599     for (data_count = 0; data_count < DATASIZE; data_count++) {
600         data[data_count].data = 0;
601         data[data_count].updates = 0;
602         rwl_init (&data[data_count].lock);
603     }
604
605     /*
606      * Create THREADS threads to access shared data.
607      */
608     for (count = 0; count < THREADS; count++) {
609         threads[count].thread_num = count;
610         threads[count].r_collisions = 0;
611         threads[count].w_collisions = 0;
612         threads[count].updates = 0;
613         threads[count].interval = rand_r (&seed) % ITERATIONS;
614         status = pthread_create (&threads[count].thread_id,
615             NULL, thread_routine, (void*)&threads[count]);
616         if (status != 0)
617             err_abort (status, _("Create thread"));
618     }
619
620     /*
621      * Wait for all threads to complete, and collect
622      * statistics.
623      */
624     for (count = 0; count < THREADS; count++) {
625         status = pthread_join (threads[count].thread_id, NULL);
626         if (status != 0)
627             err_abort (status, _("Join thread"));
628         thread_updates += threads[count].updates;
629         printf (_("%02d: interval %d, updates %d, "
630                 "r_collisions %d, w_collisions %d\n"),
631             count, threads[count].interval,
632             threads[count].updates,
633             threads[count].r_collisions, threads[count].w_collisions);
634     }
635
636     /*
637      * Collect statistics for the data.
638      */
639     for (data_count = 0; data_count < DATASIZE; data_count++) {
640         data_updates += data[data_count].updates;
641         printf (_("data %02d: value %d, %d updates\n"),
642             data_count, data[data_count].data, data[data_count].updates);
643         rwl_destroy (&data[data_count].lock);
644     }
645
646     return 0;
647 }
648
649 #endif