]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
dhb Medialist : created context menu function and moved lines which create
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2  * Bacula Thread Read/Write locking code. It permits
3  *  multiple readers but only one writer.  Note, however,
4  *  that the writer thread is permitted to make multiple
5  *  nested write lock calls.
6  *
7  *  Kern Sibbald, January MMI
8  *
9  *   Version $Id$
10  *
11  *  This code adapted from "Programming with POSIX Threads", by
12  *    David R. Butenhof
13  *
14  */
15 /*
16    Bacula® - The Network Backup Solution
17
18    Copyright (C) 2001-2006 Free Software Foundation Europe e.V.
19
20    The main author of Bacula is Kern Sibbald, with contributions from
21    many others, a complete list can be found in the file AUTHORS.
22    This program is Free Software; you can redistribute it and/or
23    modify it under the terms of version two of the GNU General Public
24    License as published by the Free Software Foundation plus additions
25    that are listed in the file LICENSE.
26
27    This program is distributed in the hope that it will be useful, but
28    WITHOUT ANY WARRANTY; without even the implied warranty of
29    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
30    General Public License for more details.
31
32    You should have received a copy of the GNU General Public License
33    along with this program; if not, write to the Free Software
34    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
35    02110-1301, USA.
36
37    Bacula® is a registered trademark of John Walker.
38    The licensor of Bacula is the Free Software Foundation Europe
39    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
40    Switzerland, email:ftf@fsfeurope.org.
41 */
42
43 #include "bacula.h"
44
45 /*
46  * Initialize a read/write lock
47  *
48  *  Returns: 0 on success
49  *           errno on failure
50  */
51 int rwl_init(brwlock_t *rwl)
52 {
53    int stat;
54
55    rwl->r_active = rwl->w_active = 0;
56    rwl->r_wait = rwl->w_wait = 0;
57    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
58       return stat;
59    }
60    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
61       pthread_mutex_destroy(&rwl->mutex);
62       return stat;
63    }
64    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
65       pthread_cond_destroy(&rwl->read);
66       pthread_mutex_destroy(&rwl->mutex);
67       return stat;
68    }
69    rwl->valid = RWLOCK_VALID;
70    return 0;
71 }
72
73 /*
74  * Destroy a read/write lock
75  *
76  * Returns: 0 on success
77  *          errno on failure
78  */
79 int rwl_destroy(brwlock_t *rwl)
80 {
81    int stat, stat1, stat2;
82
83   if (rwl->valid != RWLOCK_VALID) {
84      return EINVAL;
85   }
86   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
87      return stat;
88   }
89
90   /*
91    * If any threads are active, report EBUSY
92    */
93   if (rwl->r_active > 0 || rwl->w_active) {
94      pthread_mutex_unlock(&rwl->mutex);
95      return EBUSY;
96   }
97
98   /*
99    * If any threads are waiting, report EBUSY
100    */
101   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102      pthread_mutex_unlock(&rwl->mutex);
103      return EBUSY;
104   }
105
106   rwl->valid = 0;
107   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
108      return stat;
109   }
110   stat  = pthread_mutex_destroy(&rwl->mutex);
111   stat1 = pthread_cond_destroy(&rwl->read);
112   stat2 = pthread_cond_destroy(&rwl->write);
113   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
114 }
115
116 /*
117  * Handle cleanup when the read lock condition variable
118  * wait is released.
119  */
120 static void rwl_read_release(void *arg)
121 {
122    brwlock_t *rwl = (brwlock_t *)arg;
123
124    rwl->r_wait--;
125    pthread_mutex_unlock(&rwl->mutex);
126 }
127
128 /*
129  * Handle cleanup when the write lock condition variable wait
130  * is released.
131  */
132 static void rwl_write_release(void *arg)
133 {
134    brwlock_t *rwl = (brwlock_t *)arg;
135
136    rwl->w_wait--;
137    pthread_mutex_unlock(&rwl->mutex);
138 }
139
140 /*
141  * Lock for read access, wait until locked (or error).
142  */
143 int rwl_readlock(brwlock_t *rwl)
144 {
145    int stat;
146
147    if (rwl->valid != RWLOCK_VALID) {
148       return EINVAL;
149    }
150    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
151       return stat;
152    }
153    if (rwl->w_active) {
154       rwl->r_wait++;                  /* indicate that we are waiting */
155       pthread_cleanup_push(rwl_read_release, (void *)rwl);
156       while (rwl->w_active) {
157          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
158          if (stat != 0) {
159             break;                    /* error, bail out */
160          }
161       }
162       pthread_cleanup_pop(0);
163       rwl->r_wait--;                  /* we are no longer waiting */
164    }
165    if (stat == 0) {
166       rwl->r_active++;                /* we are running */
167    }
168    pthread_mutex_unlock(&rwl->mutex);
169    return stat;
170 }
171
172 /*
173  * Attempt to lock for read access, don't wait
174  */
175 int rwl_readtrylock(brwlock_t *rwl)
176 {
177    int stat, stat2;
178
179    if (rwl->valid != RWLOCK_VALID) {
180       return EINVAL;
181    }
182    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
183       return stat;
184    }
185    if (rwl->w_active) {
186       stat = EBUSY;
187    } else {
188       rwl->r_active++;                /* we are running */
189    }
190    stat2 = pthread_mutex_unlock(&rwl->mutex);
191    return (stat == 0 ? stat2 : stat);
192 }
193
194 /*
195  * Unlock read lock
196  */
197 int rwl_readunlock(brwlock_t *rwl)
198 {
199    int stat, stat2;
200
201    if (rwl->valid != RWLOCK_VALID) {
202       return EINVAL;
203    }
204    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
205       return stat;
206    }
207    rwl->r_active--;
208    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
209       stat = pthread_cond_broadcast(&rwl->write);
210    }
211    stat2 = pthread_mutex_unlock(&rwl->mutex);
212    return (stat == 0 ? stat2 : stat);
213 }
214
215
216 /*
217  * Lock for write access, wait until locked (or error).
218  *   Multiple nested write locking is permitted.
219  */
220 int rwl_writelock(brwlock_t *rwl)
221 {
222    int stat;
223
224    if (rwl->valid != RWLOCK_VALID) {
225       return EINVAL;
226    }
227    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
228       return stat;
229    }
230    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
231       rwl->w_active++;
232       pthread_mutex_unlock(&rwl->mutex);
233       return 0;
234    }
235    if (rwl->w_active || rwl->r_active > 0) {
236       rwl->w_wait++;                  /* indicate that we are waiting */
237       pthread_cleanup_push(rwl_write_release, (void *)rwl);
238       while (rwl->w_active || rwl->r_active > 0) {
239          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
240             break;                    /* error, bail out */
241          }
242       }
243       pthread_cleanup_pop(0);
244       rwl->w_wait--;                  /* we are no longer waiting */
245    }
246    if (stat == 0) {
247       rwl->w_active++;                /* we are running */
248       rwl->writer_id = pthread_self(); /* save writer thread's id */
249    }
250    pthread_mutex_unlock(&rwl->mutex);
251    return stat;
252 }
253
254 /*
255  * Attempt to lock for write access, don't wait
256  */
257 int rwl_writetrylock(brwlock_t *rwl)
258 {
259    int stat, stat2;
260
261    if (rwl->valid != RWLOCK_VALID) {
262       return EINVAL;
263    }
264    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
265       return stat;
266    }
267    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
268       rwl->w_active++;
269       pthread_mutex_unlock(&rwl->mutex);
270       return 0;
271    }
272    if (rwl->w_active || rwl->r_active > 0) {
273       stat = EBUSY;
274    } else {
275       rwl->w_active = 1;              /* we are running */
276       rwl->writer_id = pthread_self(); /* save writer thread's id */
277    }
278    stat2 = pthread_mutex_unlock(&rwl->mutex);
279    return (stat == 0 ? stat2 : stat);
280 }
281
282 /*
283  * Unlock write lock
284  *  Start any waiting writers in preference to waiting readers
285  */
286 int rwl_writeunlock(brwlock_t *rwl)
287 {
288    int stat, stat2;
289
290    if (rwl->valid != RWLOCK_VALID) {
291       return EINVAL;
292    }
293    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
294       return stat;
295    }
296    if (rwl->w_active <= 0) {
297       Emsg0(M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
298    }
299    rwl->w_active--;
300    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
301       Emsg0(M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
302    }
303    if (rwl->w_active > 0) {
304       stat = 0;                       /* writers still active */
305    } else {
306       /* No more writers, awaken someone */
307       if (rwl->r_wait > 0) {         /* if readers waiting */
308          stat = pthread_cond_broadcast(&rwl->read);
309       } else if (rwl->w_wait > 0) {
310          stat = pthread_cond_broadcast(&rwl->write);
311       }
312    }
313    stat2 = pthread_mutex_unlock(&rwl->mutex);
314    return (stat == 0 ? stat2 : stat);
315 }
316
317 #ifdef TEST_RWLOCK
318
319 #define THREADS     5
320 #define DATASIZE   15
321 #define ITERATIONS 10000
322
323 /*
324  * Keep statics for each thread.
325  */
326 typedef struct thread_tag {
327    int thread_num;
328    pthread_t thread_id;
329    int writes;
330    int reads;
331    int interval;
332 } thread_t;
333
334 /*
335  * Read/write lock and shared data.
336  */
337 typedef struct data_tag {
338    brwlock_t lock;
339    int data;
340    int writes;
341 } data_t;
342
343 thread_t threads[THREADS];
344 data_t data[DATASIZE];
345
346 /*
347  * Thread start routine that uses read/write locks.
348  */
349 void *thread_routine(void *arg)
350 {
351    thread_t *self = (thread_t *)arg;
352    int repeats = 0;
353    int iteration;
354    int element = 0;
355    int status;
356
357    for (iteration=0; iteration < ITERATIONS; iteration++) {
358       /*
359        * Each "self->interval" iterations, perform an
360        * update operation (write lock instead of read
361        * lock).
362        */
363       if ((iteration % self->interval) == 0) {
364          status = rwl_writelock(&data[element].lock);
365          if (status != 0) {
366             Emsg1(M_ABORT, 0, _("Write lock failed. ERR=%s\n"), strerror(status));
367          }
368          data[element].data = self->thread_num;
369          data[element].writes++;
370          self->writes++;
371          status = rwl_writeunlock(&data[element].lock);
372          if (status != 0) {
373             Emsg1(M_ABORT, 0, _("Write unlock failed. ERR=%s\n"), strerror(status));
374          }
375       } else {
376          /*
377           * Look at the current data element to see whether
378           * the current thread last updated it. Count the
379           * times to report later.
380           */
381           status = rwl_readlock(&data[element].lock);
382           if (status != 0) {
383              Emsg1(M_ABORT, 0, _("Read lock failed. ERR=%s\n"), strerror(status));
384           }
385           self->reads++;
386           if (data[element].data == self->thread_num)
387              repeats++;
388           status = rwl_readunlock(&data[element].lock);
389           if (status != 0) {
390              Emsg1(M_ABORT, 0, _("Read unlock failed. ERR=%s\n"), strerror(status));
391           }
392       }
393       element++;
394       if (element >= DATASIZE) {
395          element = 0;
396       }
397    }
398    if (repeats > 0) {
399       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
400          self->thread_num, repeats);
401    }
402    return NULL;
403 }
404
405 int main (int argc, char *argv[])
406 {
407     int count;
408     int data_count;
409     int status;
410     unsigned int seed = 1;
411     int thread_writes = 0;
412     int data_writes = 0;
413
414 #ifdef sun
415     /*
416      * On Solaris 2.5, threads are not timesliced. To ensure
417      * that our threads can run concurrently, we need to
418      * increase the concurrency level to THREADS.
419      */
420     thr_setconcurrency (THREADS);
421 #endif
422
423     /*
424      * Initialize the shared data.
425      */
426     for (data_count = 0; data_count < DATASIZE; data_count++) {
427         data[data_count].data = 0;
428         data[data_count].writes = 0;
429         status = rwl_init (&data[data_count].lock);
430         if (status != 0) {
431            Emsg1(M_ABORT, 0, _("Init rwlock failed. ERR=%s\n"), strerror(status));
432         }
433     }
434
435     /*
436      * Create THREADS threads to access shared data.
437      */
438     for (count = 0; count < THREADS; count++) {
439         threads[count].thread_num = count + 1;
440         threads[count].writes = 0;
441         threads[count].reads = 0;
442         threads[count].interval = rand_r (&seed) % 71;
443         status = pthread_create (&threads[count].thread_id,
444             NULL, thread_routine, (void*)&threads[count]);
445         if (status != 0) {
446            Emsg1(M_ABORT, 0, _("Create thread failed. ERR=%s\n"), strerror(status));
447         }
448     }
449
450     /*
451      * Wait for all threads to complete, and collect
452      * statistics.
453      */
454     for (count = 0; count < THREADS; count++) {
455         status = pthread_join (threads[count].thread_id, NULL);
456         if (status != 0) {
457            Emsg1(M_ABORT, 0, _("Join thread failed. ERR=%s\n"), strerror(status));
458         }
459         thread_writes += threads[count].writes;
460         printf (_("%02d: interval %d, writes %d, reads %d\n"),
461             count, threads[count].interval,
462             threads[count].writes, threads[count].reads);
463     }
464
465     /*
466      * Collect statistics for the data.
467      */
468     for (data_count = 0; data_count < DATASIZE; data_count++) {
469         data_writes += data[data_count].writes;
470         printf (_("data %02d: value %d, %d writes\n"),
471             data_count, data[data_count].data, data[data_count].writes);
472         rwl_destroy (&data[data_count].lock);
473     }
474
475     printf (_("Total: %d thread writes, %d data writes\n"),
476         thread_writes, data_writes);
477     return 0;
478 }
479
480 #endif
481
482 #ifdef TEST_RW_TRY_LOCK
483 /*
484  * brwlock_try_main.c
485  *
486  * Demonstrate use of non-blocking read-write locks.
487  *
488  * Special notes: On a Solaris system, call thr_setconcurrency()
489  * to allow interleaved thread execution, since threads are not
490  * timesliced.
491  */
492 #include <pthread.h>
493 #include "rwlock.h"
494 #include "errors.h"
495
496 #define THREADS         5
497 #define ITERATIONS      1000
498 #define DATASIZE        15
499
500 /*
501  * Keep statistics for each thread.
502  */
503 typedef struct thread_tag {
504     int         thread_num;
505     pthread_t   thread_id;
506     int         r_collisions;
507     int         w_collisions;
508     int         updates;
509     int         interval;
510 } thread_t;
511
512 /*
513  * Read-write lock and shared data
514  */
515 typedef struct data_tag {
516     brwlock_t    lock;
517     int         data;
518     int         updates;
519 } data_t;
520
521 thread_t threads[THREADS];
522 data_t data[DATASIZE];
523
524 /*
525  * Thread start routine that uses read-write locks
526  */
527 void *thread_routine (void *arg)
528 {
529     thread_t *self = (thread_t*)arg;
530     int iteration;
531     int element;
532     int status;
533
534     element = 0;                        /* Current data element */
535
536     for (iteration = 0; iteration < ITERATIONS; iteration++) {
537         if ((iteration % self->interval) == 0) {
538             status = rwl_writetrylock (&data[element].lock);
539             if (status == EBUSY)
540                 self->w_collisions++;
541             else if (status == 0) {
542                 data[element].data++;
543                 data[element].updates++;
544                 self->updates++;
545                 rwl_writeunlock (&data[element].lock);
546             } else
547                 err_abort (status, _("Try write lock"));
548         } else {
549             status = rwl_readtrylock (&data[element].lock);
550             if (status == EBUSY)
551                 self->r_collisions++;
552             else if (status != 0) {
553                 err_abort (status, _("Try read lock"));
554             } else {
555                 if (data[element].data != data[element].updates)
556                     printf ("%d: data[%d] %d != %d\n",
557                         self->thread_num, element,
558                         data[element].data, data[element].updates);
559                 rwl_readunlock (&data[element].lock);
560             }
561         }
562
563         element++;
564         if (element >= DATASIZE)
565             element = 0;
566     }
567     return NULL;
568 }
569
570 int main (int argc, char *argv[])
571 {
572     int count, data_count;
573     unsigned int seed = 1;
574     int thread_updates = 0, data_updates = 0;
575     int status;
576
577 #ifdef sun
578     /*
579      * On Solaris 2.5, threads are not timesliced. To ensure
580      * that our threads can run concurrently, we need to
581      * increase the concurrency level to THREADS.
582      */
583     DPRINTF (("Setting concurrency level to %d\n", THREADS));
584     thr_setconcurrency (THREADS);
585 #endif
586
587     /*
588      * Initialize the shared data.
589      */
590     for (data_count = 0; data_count < DATASIZE; data_count++) {
591         data[data_count].data = 0;
592         data[data_count].updates = 0;
593         rwl_init (&data[data_count].lock);
594     }
595
596     /*
597      * Create THREADS threads to access shared data.
598      */
599     for (count = 0; count < THREADS; count++) {
600         threads[count].thread_num = count;
601         threads[count].r_collisions = 0;
602         threads[count].w_collisions = 0;
603         threads[count].updates = 0;
604         threads[count].interval = rand_r (&seed) % ITERATIONS;
605         status = pthread_create (&threads[count].thread_id,
606             NULL, thread_routine, (void*)&threads[count]);
607         if (status != 0)
608             err_abort (status, _("Create thread"));
609     }
610
611     /*
612      * Wait for all threads to complete, and collect
613      * statistics.
614      */
615     for (count = 0; count < THREADS; count++) {
616         status = pthread_join (threads[count].thread_id, NULL);
617         if (status != 0)
618             err_abort (status, _("Join thread"));
619         thread_updates += threads[count].updates;
620         printf (_("%02d: interval %d, updates %d, "
621                 "r_collisions %d, w_collisions %d\n"),
622             count, threads[count].interval,
623             threads[count].updates,
624             threads[count].r_collisions, threads[count].w_collisions);
625     }
626
627     /*
628      * Collect statistics for the data.
629      */
630     for (data_count = 0; data_count < DATASIZE; data_count++) {
631         data_updates += data[data_count].updates;
632         printf (_("data %02d: value %d, %d updates\n"),
633             data_count, data[data_count].data, data[data_count].updates);
634         rwl_destroy (&data[data_count].lock);
635     }
636
637     return 0;
638 }
639
640 #endif