]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
ebl use tokyocabinet by default instead of htable
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Thread Read/Write locking code. It permits
30  *  multiple readers but only one writer.  Note, however,
31  *  that the writer thread is permitted to make multiple
32  *  nested write lock calls.
33  *
34  *  Kern Sibbald, January MMI
35  *
36  *   Version $Id$
37  *
38  *  This code adapted from "Programming with POSIX Threads", by
39  *    David R. Butenhof
40  *
41  */
42
43 #include "bacula.h"
44
45 /*
46  * Initialize a read/write lock
47  *
48  *  Returns: 0 on success
49  *           errno on failure
50  */
51 int rwl_init(brwlock_t *rwl)
52 {
53    int stat;
54
55    rwl->r_active = rwl->w_active = 0;
56    rwl->r_wait = rwl->w_wait = 0;
57    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
58       return stat;
59    }
60    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
61       pthread_mutex_destroy(&rwl->mutex);
62       return stat;
63    }
64    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
65       pthread_cond_destroy(&rwl->read);
66       pthread_mutex_destroy(&rwl->mutex);
67       return stat;
68    }
69    rwl->valid = RWLOCK_VALID;
70    return 0;
71 }
72
73 /*
74  * Destroy a read/write lock
75  *
76  * Returns: 0 on success
77  *          errno on failure
78  */
79 int rwl_destroy(brwlock_t *rwl)
80 {
81    int stat, stat1, stat2;
82
83   if (rwl->valid != RWLOCK_VALID) {
84      return EINVAL;
85   }
86   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
87      return stat;
88   }
89
90   /*
91    * If any threads are active, report EBUSY
92    */
93   if (rwl->r_active > 0 || rwl->w_active) {
94      pthread_mutex_unlock(&rwl->mutex);
95      return EBUSY;
96   }
97
98   /*
99    * If any threads are waiting, report EBUSY
100    */
101   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102      pthread_mutex_unlock(&rwl->mutex);
103      return EBUSY;
104   }
105
106   rwl->valid = 0;
107   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
108      return stat;
109   }
110   stat  = pthread_mutex_destroy(&rwl->mutex);
111   stat1 = pthread_cond_destroy(&rwl->read);
112   stat2 = pthread_cond_destroy(&rwl->write);
113   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
114 }
115
116 /*
117  * Handle cleanup when the read lock condition variable
118  * wait is released.
119  */
120 static void rwl_read_release(void *arg)
121 {
122    brwlock_t *rwl = (brwlock_t *)arg;
123
124    rwl->r_wait--;
125    pthread_mutex_unlock(&rwl->mutex);
126 }
127
128 /*
129  * Handle cleanup when the write lock condition variable wait
130  * is released.
131  */
132 static void rwl_write_release(void *arg)
133 {
134    brwlock_t *rwl = (brwlock_t *)arg;
135
136    rwl->w_wait--;
137    pthread_mutex_unlock(&rwl->mutex);
138 }
139
140 /*
141  * Lock for read access, wait until locked (or error).
142  */
143 int rwl_readlock(brwlock_t *rwl)
144 {
145    int stat;
146
147    if (rwl->valid != RWLOCK_VALID) {
148       return EINVAL;
149    }
150    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
151       return stat;
152    }
153    if (rwl->w_active) {
154       rwl->r_wait++;                  /* indicate that we are waiting */
155       pthread_cleanup_push(rwl_read_release, (void *)rwl);
156       while (rwl->w_active) {
157          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
158          if (stat != 0) {
159             break;                    /* error, bail out */
160          }
161       }
162       pthread_cleanup_pop(0);
163       rwl->r_wait--;                  /* we are no longer waiting */
164    }
165    if (stat == 0) {
166       rwl->r_active++;                /* we are running */
167    }
168    pthread_mutex_unlock(&rwl->mutex);
169    return stat;
170 }
171
172 /*
173  * Attempt to lock for read access, don't wait
174  */
175 int rwl_readtrylock(brwlock_t *rwl)
176 {
177    int stat, stat2;
178
179    if (rwl->valid != RWLOCK_VALID) {
180       return EINVAL;
181    }
182    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
183       return stat;
184    }
185    if (rwl->w_active) {
186       stat = EBUSY;
187    } else {
188       rwl->r_active++;                /* we are running */
189    }
190    stat2 = pthread_mutex_unlock(&rwl->mutex);
191    return (stat == 0 ? stat2 : stat);
192 }
193
194 /*
195  * Unlock read lock
196  */
197 int rwl_readunlock(brwlock_t *rwl)
198 {
199    int stat, stat2;
200
201    if (rwl->valid != RWLOCK_VALID) {
202       return EINVAL;
203    }
204    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
205       return stat;
206    }
207    rwl->r_active--;
208    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
209       stat = pthread_cond_broadcast(&rwl->write);
210    }
211    stat2 = pthread_mutex_unlock(&rwl->mutex);
212    return (stat == 0 ? stat2 : stat);
213 }
214
215
216 /*
217  * Lock for write access, wait until locked (or error).
218  *   Multiple nested write locking is permitted.
219  */
220 int rwl_writelock(brwlock_t *rwl)
221 {
222    int stat;
223
224    if (rwl->valid != RWLOCK_VALID) {
225       return EINVAL;
226    }
227    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
228       return stat;
229    }
230    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
231       rwl->w_active++;
232       pthread_mutex_unlock(&rwl->mutex);
233       return 0;
234    }
235    if (rwl->w_active || rwl->r_active > 0) {
236       rwl->w_wait++;                  /* indicate that we are waiting */
237       pthread_cleanup_push(rwl_write_release, (void *)rwl);
238       while (rwl->w_active || rwl->r_active > 0) {
239          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
240             break;                    /* error, bail out */
241          }
242       }
243       pthread_cleanup_pop(0);
244       rwl->w_wait--;                  /* we are no longer waiting */
245    }
246    if (stat == 0) {
247       rwl->w_active++;                /* we are running */
248       rwl->writer_id = pthread_self(); /* save writer thread's id */
249    }
250    pthread_mutex_unlock(&rwl->mutex);
251    return stat;
252 }
253
254 /*
255  * Attempt to lock for write access, don't wait
256  */
257 int rwl_writetrylock(brwlock_t *rwl)
258 {
259    int stat, stat2;
260
261    if (rwl->valid != RWLOCK_VALID) {
262       return EINVAL;
263    }
264    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
265       return stat;
266    }
267    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
268       rwl->w_active++;
269       pthread_mutex_unlock(&rwl->mutex);
270       return 0;
271    }
272    if (rwl->w_active || rwl->r_active > 0) {
273       stat = EBUSY;
274    } else {
275       rwl->w_active = 1;              /* we are running */
276       rwl->writer_id = pthread_self(); /* save writer thread's id */
277    }
278    stat2 = pthread_mutex_unlock(&rwl->mutex);
279    return (stat == 0 ? stat2 : stat);
280 }
281
282 /*
283  * Unlock write lock
284  *  Start any waiting writers in preference to waiting readers
285  */
286 int rwl_writeunlock(brwlock_t *rwl)
287 {
288    int stat, stat2;
289
290    if (rwl->valid != RWLOCK_VALID) {
291       return EINVAL;
292    }
293    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
294       return stat;
295    }
296    if (rwl->w_active <= 0) {
297       Emsg0(M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
298    }
299    rwl->w_active--;
300    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
301       Emsg0(M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
302    }
303    if (rwl->w_active > 0) {
304       stat = 0;                       /* writers still active */
305    } else {
306       /* No more writers, awaken someone */
307       if (rwl->r_wait > 0) {         /* if readers waiting */
308          stat = pthread_cond_broadcast(&rwl->read);
309       } else if (rwl->w_wait > 0) {
310          stat = pthread_cond_broadcast(&rwl->write);
311       }
312    }
313    stat2 = pthread_mutex_unlock(&rwl->mutex);
314    return (stat == 0 ? stat2 : stat);
315 }
316
317 #ifdef TEST_RWLOCK
318
319 #define THREADS     5
320 #define DATASIZE   15
321 #define ITERATIONS 10000
322
323 /*
324  * Keep statics for each thread.
325  */
326 typedef struct thread_tag {
327    int thread_num;
328    pthread_t thread_id;
329    int writes;
330    int reads;
331    int interval;
332 } thread_t;
333
334 /*
335  * Read/write lock and shared data.
336  */
337 typedef struct data_tag {
338    brwlock_t lock;
339    int data;
340    int writes;
341 } data_t;
342
343 thread_t threads[THREADS];
344 data_t data[DATASIZE];
345
346 /*
347  * Thread start routine that uses read/write locks.
348  */
349 void *thread_routine(void *arg)
350 {
351    thread_t *self = (thread_t *)arg;
352    int repeats = 0;
353    int iteration;
354    int element = 0;
355    int status;
356
357    for (iteration=0; iteration < ITERATIONS; iteration++) {
358       /*
359        * Each "self->interval" iterations, perform an
360        * update operation (write lock instead of read
361        * lock).
362        */
363       if ((iteration % self->interval) == 0) {
364          status = rwl_writelock(&data[element].lock);
365          if (status != 0) {
366             berrno be;
367             Emsg1(M_ABORT, 0, _("Write lock failed. ERR=%s\n"), be.bstrerror(status));
368          }
369          data[element].data = self->thread_num;
370          data[element].writes++;
371          self->writes++;
372          status = rwl_writeunlock(&data[element].lock);
373          if (status != 0) {
374             berrno be;
375             Emsg1(M_ABORT, 0, _("Write unlock failed. ERR=%s\n"), be.bstrerror(status));
376          }
377       } else {
378          /*
379           * Look at the current data element to see whether
380           * the current thread last updated it. Count the
381           * times to report later.
382           */
383           status = rwl_readlock(&data[element].lock);
384           if (status != 0) {
385              berrno be;
386              Emsg1(M_ABORT, 0, _("Read lock failed. ERR=%s\n"), be.bstrerror(status));
387           }
388           self->reads++;
389           if (data[element].data == self->thread_num)
390              repeats++;
391           status = rwl_readunlock(&data[element].lock);
392           if (status != 0) {
393              berrno be;
394              Emsg1(M_ABORT, 0, _("Read unlock failed. ERR=%s\n"), be.bstrerror(status));
395           }
396       }
397       element++;
398       if (element >= DATASIZE) {
399          element = 0;
400       }
401    }
402    if (repeats > 0) {
403       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
404          self->thread_num, repeats);
405    }
406    return NULL;
407 }
408
409 int main (int argc, char *argv[])
410 {
411     int count;
412     int data_count;
413     int status;
414     unsigned int seed = 1;
415     int thread_writes = 0;
416     int data_writes = 0;
417
418 #ifdef sun
419     /*
420      * On Solaris 2.5, threads are not timesliced. To ensure
421      * that our threads can run concurrently, we need to
422      * increase the concurrency level to THREADS.
423      */
424     thr_setconcurrency (THREADS);
425 #endif
426
427     /*
428      * Initialize the shared data.
429      */
430     for (data_count = 0; data_count < DATASIZE; data_count++) {
431         data[data_count].data = 0;
432         data[data_count].writes = 0;
433         status = rwl_init (&data[data_count].lock);
434         if (status != 0) {
435            berrno be;
436            Emsg1(M_ABORT, 0, _("Init rwlock failed. ERR=%s\n"), be.bstrerror(status));
437         }
438     }
439
440     /*
441      * Create THREADS threads to access shared data.
442      */
443     for (count = 0; count < THREADS; count++) {
444         threads[count].thread_num = count + 1;
445         threads[count].writes = 0;
446         threads[count].reads = 0;
447         threads[count].interval = rand_r (&seed) % 71;
448         status = pthread_create (&threads[count].thread_id,
449             NULL, thread_routine, (void*)&threads[count]);
450         if (status != 0) {
451            berrno be;
452            Emsg1(M_ABORT, 0, _("Create thread failed. ERR=%s\n"), be.bstrerror(status));
453         }
454     }
455
456     /*
457      * Wait for all threads to complete, and collect
458      * statistics.
459      */
460     for (count = 0; count < THREADS; count++) {
461         status = pthread_join (threads[count].thread_id, NULL);
462         if (status != 0) {
463            berrno be;
464            Emsg1(M_ABORT, 0, _("Join thread failed. ERR=%s\n"), be.bstrerror(status));
465         }
466         thread_writes += threads[count].writes;
467         printf (_("%02d: interval %d, writes %d, reads %d\n"),
468             count, threads[count].interval,
469             threads[count].writes, threads[count].reads);
470     }
471
472     /*
473      * Collect statistics for the data.
474      */
475     for (data_count = 0; data_count < DATASIZE; data_count++) {
476         data_writes += data[data_count].writes;
477         printf (_("data %02d: value %d, %d writes\n"),
478             data_count, data[data_count].data, data[data_count].writes);
479         rwl_destroy (&data[data_count].lock);
480     }
481
482     printf (_("Total: %d thread writes, %d data writes\n"),
483         thread_writes, data_writes);
484     return 0;
485 }
486
487 #endif
488
489 #ifdef TEST_RW_TRY_LOCK
490 /*
491  * brwlock_try_main.c
492  *
493  * Demonstrate use of non-blocking read-write locks.
494  *
495  * Special notes: On a Solaris system, call thr_setconcurrency()
496  * to allow interleaved thread execution, since threads are not
497  * timesliced.
498  */
499 #include <pthread.h>
500 #include "rwlock.h"
501 #include "errors.h"
502
503 #define THREADS         5
504 #define ITERATIONS      1000
505 #define DATASIZE        15
506
507 /*
508  * Keep statistics for each thread.
509  */
510 typedef struct thread_tag {
511     int         thread_num;
512     pthread_t   thread_id;
513     int         r_collisions;
514     int         w_collisions;
515     int         updates;
516     int         interval;
517 } thread_t;
518
519 /*
520  * Read-write lock and shared data
521  */
522 typedef struct data_tag {
523     brwlock_t    lock;
524     int         data;
525     int         updates;
526 } data_t;
527
528 thread_t threads[THREADS];
529 data_t data[DATASIZE];
530
531 /*
532  * Thread start routine that uses read-write locks
533  */
534 void *thread_routine (void *arg)
535 {
536     thread_t *self = (thread_t*)arg;
537     int iteration;
538     int element;
539     int status;
540
541     element = 0;                        /* Current data element */
542
543     for (iteration = 0; iteration < ITERATIONS; iteration++) {
544         if ((iteration % self->interval) == 0) {
545             status = rwl_writetrylock (&data[element].lock);
546             if (status == EBUSY)
547                 self->w_collisions++;
548             else if (status == 0) {
549                 data[element].data++;
550                 data[element].updates++;
551                 self->updates++;
552                 rwl_writeunlock (&data[element].lock);
553             } else
554                 err_abort (status, _("Try write lock"));
555         } else {
556             status = rwl_readtrylock (&data[element].lock);
557             if (status == EBUSY)
558                 self->r_collisions++;
559             else if (status != 0) {
560                 err_abort (status, _("Try read lock"));
561             } else {
562                 if (data[element].data != data[element].updates)
563                     printf ("%d: data[%d] %d != %d\n",
564                         self->thread_num, element,
565                         data[element].data, data[element].updates);
566                 rwl_readunlock (&data[element].lock);
567             }
568         }
569
570         element++;
571         if (element >= DATASIZE)
572             element = 0;
573     }
574     return NULL;
575 }
576
577 int main (int argc, char *argv[])
578 {
579     int count, data_count;
580     unsigned int seed = 1;
581     int thread_updates = 0, data_updates = 0;
582     int status;
583
584 #ifdef sun
585     /*
586      * On Solaris 2.5, threads are not timesliced. To ensure
587      * that our threads can run concurrently, we need to
588      * increase the concurrency level to THREADS.
589      */
590     DPRINTF (("Setting concurrency level to %d\n", THREADS));
591     thr_setconcurrency (THREADS);
592 #endif
593
594     /*
595      * Initialize the shared data.
596      */
597     for (data_count = 0; data_count < DATASIZE; data_count++) {
598         data[data_count].data = 0;
599         data[data_count].updates = 0;
600         rwl_init (&data[data_count].lock);
601     }
602
603     /*
604      * Create THREADS threads to access shared data.
605      */
606     for (count = 0; count < THREADS; count++) {
607         threads[count].thread_num = count;
608         threads[count].r_collisions = 0;
609         threads[count].w_collisions = 0;
610         threads[count].updates = 0;
611         threads[count].interval = rand_r (&seed) % ITERATIONS;
612         status = pthread_create (&threads[count].thread_id,
613             NULL, thread_routine, (void*)&threads[count]);
614         if (status != 0)
615             err_abort (status, _("Create thread"));
616     }
617
618     /*
619      * Wait for all threads to complete, and collect
620      * statistics.
621      */
622     for (count = 0; count < THREADS; count++) {
623         status = pthread_join (threads[count].thread_id, NULL);
624         if (status != 0)
625             err_abort (status, _("Join thread"));
626         thread_updates += threads[count].updates;
627         printf (_("%02d: interval %d, updates %d, "
628                 "r_collisions %d, w_collisions %d\n"),
629             count, threads[count].interval,
630             threads[count].updates,
631             threads[count].r_collisions, threads[count].w_collisions);
632     }
633
634     /*
635      * Collect statistics for the data.
636      */
637     for (data_count = 0; data_count < DATASIZE; data_count++) {
638         data_updates += data[data_count].updates;
639         printf (_("data %02d: value %d, %d updates\n"),
640             data_count, data[data_count].data, data[data_count].updates);
641         rwl_destroy (&data[data_count].lock);
642     }
643
644     return 0;
645 }
646
647 #endif