]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
Add missing files for xattr and eliminate a few compiler complaints
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Thread Read/Write locking code. It permits
30  *  multiple readers but only one writer.  Note, however,
31  *  that the writer thread is permitted to make multiple
32  *  nested write lock calls.
33  *
34  *  Kern Sibbald, January MMI
35  *
36  *   Version $Id$
37  *
38  *  This code adapted from "Programming with POSIX Threads", by
39  *    David R. Butenhof
40  *
41  */
42
43 #include "bacula.h"
44
45 /*
46  * Initialize a read/write lock
47  *
48  *  Returns: 0 on success
49  *           errno on failure
50  */
51 int rwl_init(brwlock_t *rwl)
52 {
53    int stat;
54
55    rwl->r_active = rwl->w_active = 0;
56    rwl->r_wait = rwl->w_wait = 0;
57    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
58       return stat;
59    }
60    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
61       pthread_mutex_destroy(&rwl->mutex);
62       return stat;
63    }
64    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
65       pthread_cond_destroy(&rwl->read);
66       pthread_mutex_destroy(&rwl->mutex);
67       return stat;
68    }
69    rwl->valid = RWLOCK_VALID;
70    return 0;
71 }
72
73 /*
74  * Destroy a read/write lock
75  *
76  * Returns: 0 on success
77  *          errno on failure
78  */
79 int rwl_destroy(brwlock_t *rwl)
80 {
81    int stat, stat1, stat2;
82
83   if (rwl->valid != RWLOCK_VALID) {
84      return EINVAL;
85   }
86   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
87      return stat;
88   }
89
90   /*
91    * If any threads are active, report EBUSY
92    */
93   if (rwl->r_active > 0 || rwl->w_active) {
94      pthread_mutex_unlock(&rwl->mutex);
95      return EBUSY;
96   }
97
98   /*
99    * If any threads are waiting, report EBUSY
100    */
101   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102      pthread_mutex_unlock(&rwl->mutex);
103      return EBUSY;
104   }
105
106   rwl->valid = 0;
107   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
108      return stat;
109   }
110   stat  = pthread_mutex_destroy(&rwl->mutex);
111   stat1 = pthread_cond_destroy(&rwl->read);
112   stat2 = pthread_cond_destroy(&rwl->write);
113   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
114 }
115
116 /*
117  * Handle cleanup when the read lock condition variable
118  * wait is released.
119  */
120 static void rwl_read_release(void *arg)
121 {
122    brwlock_t *rwl = (brwlock_t *)arg;
123
124    rwl->r_wait--;
125    pthread_mutex_unlock(&rwl->mutex);
126 }
127
128 /*
129  * Handle cleanup when the write lock condition variable wait
130  * is released.
131  */
132 static void rwl_write_release(void *arg)
133 {
134    brwlock_t *rwl = (brwlock_t *)arg;
135
136    rwl->w_wait--;
137    pthread_mutex_unlock(&rwl->mutex);
138 }
139
140 /*
141  * Lock for read access, wait until locked (or error).
142  */
143 int rwl_readlock(brwlock_t *rwl)
144 {
145    int stat;
146
147    if (rwl->valid != RWLOCK_VALID) {
148       return EINVAL;
149    }
150    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
151       return stat;
152    }
153    if (rwl->w_active) {
154       rwl->r_wait++;                  /* indicate that we are waiting */
155       pthread_cleanup_push(rwl_read_release, (void *)rwl);
156       while (rwl->w_active) {
157          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
158          if (stat != 0) {
159             break;                    /* error, bail out */
160          }
161       }
162       pthread_cleanup_pop(0);
163       rwl->r_wait--;                  /* we are no longer waiting */
164    }
165    if (stat == 0) {
166       rwl->r_active++;                /* we are running */
167    }
168    pthread_mutex_unlock(&rwl->mutex);
169    return stat;
170 }
171
172 /*
173  * Attempt to lock for read access, don't wait
174  */
175 int rwl_readtrylock(brwlock_t *rwl)
176 {
177    int stat, stat2;
178
179    if (rwl->valid != RWLOCK_VALID) {
180       return EINVAL;
181    }
182    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
183       return stat;
184    }
185    if (rwl->w_active) {
186       stat = EBUSY;
187    } else {
188       rwl->r_active++;                /* we are running */
189    }
190    stat2 = pthread_mutex_unlock(&rwl->mutex);
191    return (stat == 0 ? stat2 : stat);
192 }
193
194 /*
195  * Unlock read lock
196  */
197 int rwl_readunlock(brwlock_t *rwl)
198 {
199    int stat, stat2;
200
201    if (rwl->valid != RWLOCK_VALID) {
202       return EINVAL;
203    }
204    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
205       return stat;
206    }
207    rwl->r_active--;
208    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
209       stat = pthread_cond_broadcast(&rwl->write);
210    }
211    stat2 = pthread_mutex_unlock(&rwl->mutex);
212    return (stat == 0 ? stat2 : stat);
213 }
214
215
216 /*
217  * Lock for write access, wait until locked (or error).
218  *   Multiple nested write locking is permitted.
219  */
220 int rwl_writelock(brwlock_t *rwl)
221 {
222    int stat;
223
224    if (rwl->valid != RWLOCK_VALID) {
225       return EINVAL;
226    }
227    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
228       return stat;
229    }
230    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
231       rwl->w_active++;
232       pthread_mutex_unlock(&rwl->mutex);
233       return 0;
234    }
235    if (rwl->w_active || rwl->r_active > 0) {
236       rwl->w_wait++;                  /* indicate that we are waiting */
237       pthread_cleanup_push(rwl_write_release, (void *)rwl);
238       while (rwl->w_active || rwl->r_active > 0) {
239          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
240             break;                    /* error, bail out */
241          }
242       }
243       pthread_cleanup_pop(0);
244       rwl->w_wait--;                  /* we are no longer waiting */
245    }
246    if (stat == 0) {
247       rwl->w_active++;                /* we are running */
248       rwl->writer_id = pthread_self(); /* save writer thread's id */
249    }
250    pthread_mutex_unlock(&rwl->mutex);
251    return stat;
252 }
253
254 /*
255  * Attempt to lock for write access, don't wait
256  */
257 int rwl_writetrylock(brwlock_t *rwl)
258 {
259    int stat, stat2;
260
261    if (rwl->valid != RWLOCK_VALID) {
262       return EINVAL;
263    }
264    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
265       return stat;
266    }
267    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
268       rwl->w_active++;
269       pthread_mutex_unlock(&rwl->mutex);
270       return 0;
271    }
272    if (rwl->w_active || rwl->r_active > 0) {
273       stat = EBUSY;
274    } else {
275       rwl->w_active = 1;              /* we are running */
276       rwl->writer_id = pthread_self(); /* save writer thread's id */
277    }
278    stat2 = pthread_mutex_unlock(&rwl->mutex);
279    return (stat == 0 ? stat2 : stat);
280 }
281
282 /*
283  * Unlock write lock
284  *  Start any waiting writers in preference to waiting readers
285  */
286 int rwl_writeunlock(brwlock_t *rwl)
287 {
288    int stat, stat2;
289
290    if (rwl->valid != RWLOCK_VALID) {
291       return EINVAL;
292    }
293    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
294       return stat;
295    }
296    if (rwl->w_active <= 0) {
297       pthread_mutex_unlock(&rwl->mutex);
298       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
299    }
300    rwl->w_active--;
301    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
302       pthread_mutex_unlock(&rwl->mutex);
303       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
304    }
305    if (rwl->w_active > 0) {
306       stat = 0;                       /* writers still active */
307    } else {
308       /* No more writers, awaken someone */
309       if (rwl->r_wait > 0) {         /* if readers waiting */
310          stat = pthread_cond_broadcast(&rwl->read);
311       } else if (rwl->w_wait > 0) {
312          stat = pthread_cond_broadcast(&rwl->write);
313       }
314    }
315    stat2 = pthread_mutex_unlock(&rwl->mutex);
316    return (stat == 0 ? stat2 : stat);
317 }
318
319 #ifdef TEST_RWLOCK
320
321 #define THREADS     300
322 #define DATASIZE   15
323 #define ITERATIONS 1000000
324
325 /*
326  * Keep statics for each thread.
327  */
328 typedef struct thread_tag {
329    int thread_num;
330    pthread_t thread_id;
331    int writes;
332    int reads;
333    int interval;
334 } thread_t;
335
336 /*
337  * Read/write lock and shared data.
338  */
339 typedef struct data_tag {
340    brwlock_t lock;
341    int data;
342    int writes;
343 } data_t;
344
345 static thread_t threads[THREADS];
346 static data_t data[DATASIZE];
347
348 /*
349  * Thread start routine that uses read/write locks.
350  */
351 void *thread_routine(void *arg)
352 {
353    thread_t *self = (thread_t *)arg;
354    int repeats = 0;
355    int iteration;
356    int element = 0;
357    int status;
358
359    for (iteration=0; iteration < ITERATIONS; iteration++) {
360       /*
361        * Each "self->interval" iterations, perform an
362        * update operation (write lock instead of read
363        * lock).
364        */
365 //      if ((iteration % self->interval) == 0) {
366          status = rwl_writelock(&data[element].lock);
367          if (status != 0) {
368             berrno be;
369             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
370             exit(1);
371          }
372          data[element].data = self->thread_num;
373          data[element].writes++;
374          self->writes++;
375          status = rwl_writelock(&data[element].lock);
376          if (status != 0) {
377             berrno be;
378             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
379             exit(1);
380          }
381          data[element].data = self->thread_num;
382          data[element].writes++;
383          self->writes++;
384          status = rwl_writeunlock(&data[element].lock);
385          if (status != 0) {
386             berrno be;
387             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
388             exit(1);
389          }
390          status = rwl_writeunlock(&data[element].lock);
391          if (status != 0) {
392             berrno be;
393             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
394             exit(1);
395          }
396
397 #ifdef xxx
398       } else {
399          /*
400           * Look at the current data element to see whether
401           * the current thread last updated it. Count the
402           * times to report later.
403           */
404           status = rwl_readlock(&data[element].lock);
405           if (status != 0) {
406              berrno be;
407              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
408              exit(1);
409           }
410           self->reads++;
411           if (data[element].data == self->thread_num)
412              repeats++;
413           status = rwl_readunlock(&data[element].lock);
414           if (status != 0) {
415              berrno be;
416              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
417              exit(1);
418           }
419       }
420 #endif
421       element++;
422       if (element >= DATASIZE) {
423          element = 0;
424       }
425    }
426    if (repeats > 0) {
427       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
428          self->thread_num, repeats);
429    }
430    return NULL;
431 }
432
433 int main (int argc, char *argv[])
434 {
435     int count;
436     int data_count;
437     int status;
438     unsigned int seed = 1;
439     int thread_writes = 0;
440     int data_writes = 0;
441
442 #ifdef sun
443     /*
444      * On Solaris 2.5, threads are not timesliced. To ensure
445      * that our threads can run concurrently, we need to
446      * increase the concurrency level to THREADS.
447      */
448     thr_setconcurrency (THREADS);
449 #endif
450
451     /*
452      * Initialize the shared data.
453      */
454     for (data_count = 0; data_count < DATASIZE; data_count++) {
455         data[data_count].data = 0;
456         data[data_count].writes = 0;
457         status = rwl_init (&data[data_count].lock);
458         if (status != 0) {
459            berrno be;
460            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
461            exit(1);
462         }
463     }
464
465     /*
466      * Create THREADS threads to access shared data.
467      */
468     for (count = 0; count < THREADS; count++) {
469         threads[count].thread_num = count + 1;
470         threads[count].writes = 0;
471         threads[count].reads = 0;
472         threads[count].interval = rand_r(&seed) % 71;
473         if (threads[count].interval <= 0) {
474            threads[count].interval = 1;
475         }
476         status = pthread_create (&threads[count].thread_id,
477             NULL, thread_routine, (void*)&threads[count]);
478         if (status != 0 || (int)threads[count].thread_id == 0) {
479            berrno be;
480            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
481            exit(1);
482         }
483     }
484
485     /*
486      * Wait for all threads to complete, and collect
487      * statistics.
488      */
489     for (count = 0; count < THREADS; count++) {
490         status = pthread_join (threads[count].thread_id, NULL);
491         if (status != 0) {
492            berrno be;
493            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
494            exit(1);
495         }
496         thread_writes += threads[count].writes;
497         printf (_("%02d: interval %d, writes %d, reads %d\n"),
498             count, threads[count].interval,
499             threads[count].writes, threads[count].reads);
500     }
501
502     /*
503      * Collect statistics for the data.
504      */
505     for (data_count = 0; data_count < DATASIZE; data_count++) {
506         data_writes += data[data_count].writes;
507         printf (_("data %02d: value %d, %d writes\n"),
508             data_count, data[data_count].data, data[data_count].writes);
509         rwl_destroy (&data[data_count].lock);
510     }
511
512     printf (_("Total: %d thread writes, %d data writes\n"),
513         thread_writes, data_writes);
514     return 0;
515 }
516
517 #endif
518
519 #ifdef TEST_RW_TRY_LOCK
520 /*
521  * brwlock_try_main.c
522  *
523  * Demonstrate use of non-blocking read-write locks.
524  *
525  * Special notes: On a Solaris system, call thr_setconcurrency()
526  * to allow interleaved thread execution, since threads are not
527  * timesliced.
528  */
529 #include <pthread.h>
530 #include "rwlock.h"
531 #include "errors.h"
532
533 #define THREADS         5
534 #define ITERATIONS      1000
535 #define DATASIZE        15
536
537 /*
538  * Keep statistics for each thread.
539  */
540 typedef struct thread_tag {
541     int         thread_num;
542     pthread_t   thread_id;
543     int         r_collisions;
544     int         w_collisions;
545     int         updates;
546     int         interval;
547 } thread_t;
548
549 /*
550  * Read-write lock and shared data
551  */
552 typedef struct data_tag {
553     brwlock_t    lock;
554     int         data;
555     int         updates;
556 } data_t;
557
558 thread_t threads[THREADS];
559 data_t data[DATASIZE];
560
561 /*
562  * Thread start routine that uses read-write locks
563  */
564 void *thread_routine (void *arg)
565 {
566     thread_t *self = (thread_t*)arg;
567     int iteration;
568     int element;
569     int status;
570
571     element = 0;                        /* Current data element */
572
573     for (iteration = 0; iteration < ITERATIONS; iteration++) {
574         if ((iteration % self->interval) == 0) {
575             status = rwl_writetrylock (&data[element].lock);
576             if (status == EBUSY)
577                 self->w_collisions++;
578             else if (status == 0) {
579                 data[element].data++;
580                 data[element].updates++;
581                 self->updates++;
582                 rwl_writeunlock (&data[element].lock);
583             } else
584                 err_abort (status, _("Try write lock"));
585         } else {
586             status = rwl_readtrylock (&data[element].lock);
587             if (status == EBUSY)
588                 self->r_collisions++;
589             else if (status != 0) {
590                 err_abort (status, _("Try read lock"));
591             } else {
592                 if (data[element].data != data[element].updates)
593                     printf ("%d: data[%d] %d != %d\n",
594                         self->thread_num, element,
595                         data[element].data, data[element].updates);
596                 rwl_readunlock (&data[element].lock);
597             }
598         }
599
600         element++;
601         if (element >= DATASIZE)
602             element = 0;
603     }
604     return NULL;
605 }
606
607 int main (int argc, char *argv[])
608 {
609     int count, data_count;
610     unsigned int seed = 1;
611     int thread_updates = 0, data_updates = 0;
612     int status;
613
614 #ifdef sun
615     /*
616      * On Solaris 2.5, threads are not timesliced. To ensure
617      * that our threads can run concurrently, we need to
618      * increase the concurrency level to THREADS.
619      */
620     DPRINTF (("Setting concurrency level to %d\n", THREADS));
621     thr_setconcurrency (THREADS);
622 #endif
623
624     /*
625      * Initialize the shared data.
626      */
627     for (data_count = 0; data_count < DATASIZE; data_count++) {
628         data[data_count].data = 0;
629         data[data_count].updates = 0;
630         rwl_init (&data[data_count].lock);
631     }
632
633     /*
634      * Create THREADS threads to access shared data.
635      */
636     for (count = 0; count < THREADS; count++) {
637         threads[count].thread_num = count;
638         threads[count].r_collisions = 0;
639         threads[count].w_collisions = 0;
640         threads[count].updates = 0;
641         threads[count].interval = rand_r (&seed) % ITERATIONS;
642         status = pthread_create (&threads[count].thread_id,
643             NULL, thread_routine, (void*)&threads[count]);
644         if (status != 0)
645             err_abort (status, _("Create thread"));
646     }
647
648     /*
649      * Wait for all threads to complete, and collect
650      * statistics.
651      */
652     for (count = 0; count < THREADS; count++) {
653         status = pthread_join (threads[count].thread_id, NULL);
654         if (status != 0)
655             err_abort (status, _("Join thread"));
656         thread_updates += threads[count].updates;
657         printf (_("%02d: interval %d, updates %d, "
658                 "r_collisions %d, w_collisions %d\n"),
659             count, threads[count].interval,
660             threads[count].updates,
661             threads[count].r_collisions, threads[count].w_collisions);
662     }
663
664     /*
665      * Collect statistics for the data.
666      */
667     for (data_count = 0; data_count < DATASIZE; data_count++) {
668         data_updates += data[data_count].updates;
669         printf (_("data %02d: value %d, %d updates\n"),
670             data_count, data[data_count].data, data[data_count].updates);
671         rwl_destroy (&data[data_count].lock);
672     }
673
674     return 0;
675 }
676
677 #endif