]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
This commit was manufactured by cvs2svn to create tag
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2  * Bacula Thread Read/Write locking code. It permits
3  *  multiple readers but only one writer.  Note, however,
4  *  that the writer thread is permitted to make multiple
5  *  nested write lock calls.
6  *
7  *  Kern Sibbald, January MMI
8  *
9  *   Version $Id$
10  *
11  *  This code adapted from "Programming with POSIX Threads", by
12  *    David R. Butenhof
13  *
14  */
15 /*
16    Copyright (C) 2001-2006 Kern Sibbald
17
18    This program is free software; you can redistribute it and/or
19    modify it under the terms of the GNU General Public License
20    version 2 as amended with additional clauses defined in the
21    file LICENSE in the main source directory.
22
23    This program is distributed in the hope that it will be useful,
24    but WITHOUT ANY WARRANTY; without even the implied warranty of
25    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
26    the file LICENSE for additional details.
27
28  */
29
30 #include "bacula.h"
31
32 /*
33  * Initialize a read/write lock
34  *
35  *  Returns: 0 on success
36  *           errno on failure
37  */
38 int rwl_init(brwlock_t *rwl)
39 {
40    int stat;
41
42    rwl->r_active = rwl->w_active = 0;
43    rwl->r_wait = rwl->w_wait = 0;
44    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
45       return stat;
46    }
47    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
48       pthread_mutex_destroy(&rwl->mutex);
49       return stat;
50    }
51    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
52       pthread_cond_destroy(&rwl->read);
53       pthread_mutex_destroy(&rwl->mutex);
54       return stat;
55    }
56    rwl->valid = RWLOCK_VALID;
57    return 0;
58 }
59
60 /*
61  * Destroy a read/write lock
62  *
63  * Returns: 0 on success
64  *          errno on failure
65  */
66 int rwl_destroy(brwlock_t *rwl)
67 {
68    int stat, stat1, stat2;
69
70   if (rwl->valid != RWLOCK_VALID) {
71      return EINVAL;
72   }
73   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
74      return stat;
75   }
76
77   /*
78    * If any threads are active, report EBUSY
79    */
80   if (rwl->r_active > 0 || rwl->w_active) {
81      pthread_mutex_unlock(&rwl->mutex);
82      return EBUSY;
83   }
84
85   /*
86    * If any threads are waiting, report EBUSY
87    */
88   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
89      pthread_mutex_unlock(&rwl->mutex);
90      return EBUSY;
91   }
92
93   rwl->valid = 0;
94   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
95      return stat;
96   }
97   stat  = pthread_mutex_destroy(&rwl->mutex);
98   stat1 = pthread_cond_destroy(&rwl->read);
99   stat2 = pthread_cond_destroy(&rwl->write);
100   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
101 }
102
103 /*
104  * Handle cleanup when the read lock condition variable
105  * wait is released.
106  */
107 static void rwl_read_release(void *arg)
108 {
109    brwlock_t *rwl = (brwlock_t *)arg;
110
111    rwl->r_wait--;
112    pthread_mutex_unlock(&rwl->mutex);
113 }
114
115 /*
116  * Handle cleanup when the write lock condition variable wait
117  * is released.
118  */
119 static void rwl_write_release(void *arg)
120 {
121    brwlock_t *rwl = (brwlock_t *)arg;
122
123    rwl->w_wait--;
124    pthread_mutex_unlock(&rwl->mutex);
125 }
126
127 /*
128  * Lock for read access, wait until locked (or error).
129  */
130 int rwl_readlock(brwlock_t *rwl)
131 {
132    int stat;
133
134    if (rwl->valid != RWLOCK_VALID) {
135       return EINVAL;
136    }
137    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
138       return stat;
139    }
140    if (rwl->w_active) {
141       rwl->r_wait++;                  /* indicate that we are waiting */
142       pthread_cleanup_push(rwl_read_release, (void *)rwl);
143       while (rwl->w_active) {
144          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
145          if (stat != 0) {
146             break;                    /* error, bail out */
147          }
148       }
149       pthread_cleanup_pop(0);
150       rwl->r_wait--;                  /* we are no longer waiting */
151    }
152    if (stat == 0) {
153       rwl->r_active++;                /* we are running */
154    }
155    pthread_mutex_unlock(&rwl->mutex);
156    return stat;
157 }
158
159 /*
160  * Attempt to lock for read access, don't wait
161  */
162 int rwl_readtrylock(brwlock_t *rwl)
163 {
164    int stat, stat2;
165
166    if (rwl->valid != RWLOCK_VALID) {
167       return EINVAL;
168    }
169    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
170       return stat;
171    }
172    if (rwl->w_active) {
173       stat = EBUSY;
174    } else {
175       rwl->r_active++;                /* we are running */
176    }
177    stat2 = pthread_mutex_unlock(&rwl->mutex);
178    return (stat == 0 ? stat2 : stat);
179 }
180
181 /*
182  * Unlock read lock
183  */
184 int rwl_readunlock(brwlock_t *rwl)
185 {
186    int stat, stat2;
187
188    if (rwl->valid != RWLOCK_VALID) {
189       return EINVAL;
190    }
191    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
192       return stat;
193    }
194    rwl->r_active--;
195    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
196       stat = pthread_cond_broadcast(&rwl->write);
197    }
198    stat2 = pthread_mutex_unlock(&rwl->mutex);
199    return (stat == 0 ? stat2 : stat);
200 }
201
202
203 /*
204  * Lock for write access, wait until locked (or error).
205  *   Multiple nested write locking is permitted.
206  */
207 int rwl_writelock(brwlock_t *rwl)
208 {
209    int stat;
210
211    if (rwl->valid != RWLOCK_VALID) {
212       return EINVAL;
213    }
214    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
215       return stat;
216    }
217    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
218       rwl->w_active++;
219       pthread_mutex_unlock(&rwl->mutex);
220       return 0;
221    }
222    if (rwl->w_active || rwl->r_active > 0) {
223       rwl->w_wait++;                  /* indicate that we are waiting */
224       pthread_cleanup_push(rwl_write_release, (void *)rwl);
225       while (rwl->w_active || rwl->r_active > 0) {
226          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
227             break;                    /* error, bail out */
228          }
229       }
230       pthread_cleanup_pop(0);
231       rwl->w_wait--;                  /* we are no longer waiting */
232    }
233    if (stat == 0) {
234       rwl->w_active++;                /* we are running */
235       rwl->writer_id = pthread_self(); /* save writer thread's id */
236    }
237    pthread_mutex_unlock(&rwl->mutex);
238    return stat;
239 }
240
241 /*
242  * Attempt to lock for write access, don't wait
243  */
244 int rwl_writetrylock(brwlock_t *rwl)
245 {
246    int stat, stat2;
247
248    if (rwl->valid != RWLOCK_VALID) {
249       return EINVAL;
250    }
251    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
252       return stat;
253    }
254    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
255       rwl->w_active++;
256       pthread_mutex_unlock(&rwl->mutex);
257       return 0;
258    }
259    if (rwl->w_active || rwl->r_active > 0) {
260       stat = EBUSY;
261    } else {
262       rwl->w_active = 1;              /* we are running */
263       rwl->writer_id = pthread_self(); /* save writer thread's id */
264    }
265    stat2 = pthread_mutex_unlock(&rwl->mutex);
266    return (stat == 0 ? stat2 : stat);
267 }
268
269 /*
270  * Unlock write lock
271  *  Start any waiting writers in preference to waiting readers
272  */
273 int rwl_writeunlock(brwlock_t *rwl)
274 {
275    int stat, stat2;
276
277    if (rwl->valid != RWLOCK_VALID) {
278       return EINVAL;
279    }
280    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
281       return stat;
282    }
283    if (rwl->w_active <= 0) {
284       Emsg0(M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
285    }
286    rwl->w_active--;
287    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
288       Emsg0(M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
289    }
290    if (rwl->w_active > 0) {
291       stat = 0;                       /* writers still active */
292    } else {
293       /* No more writers, awaken someone */
294       if (rwl->r_wait > 0) {         /* if readers waiting */
295          stat = pthread_cond_broadcast(&rwl->read);
296       } else if (rwl->w_wait > 0) {
297          stat = pthread_cond_broadcast(&rwl->write);
298       }
299    }
300    stat2 = pthread_mutex_unlock(&rwl->mutex);
301    return (stat == 0 ? stat2 : stat);
302 }
303
304 #ifdef TEST_RWLOCK
305
306 #define THREADS     5
307 #define DATASIZE   15
308 #define ITERATIONS 10000
309
310 /*
311  * Keep statics for each thread.
312  */
313 typedef struct thread_tag {
314    int thread_num;
315    pthread_t thread_id;
316    int writes;
317    int reads;
318    int interval;
319 } thread_t;
320
321 /*
322  * Read/write lock and shared data.
323  */
324 typedef struct data_tag {
325    brwlock_t lock;
326    int data;
327    int writes;
328 } data_t;
329
330 thread_t threads[THREADS];
331 data_t data[DATASIZE];
332
333 /*
334  * Thread start routine that uses read/write locks.
335  */
336 void *thread_routine(void *arg)
337 {
338    thread_t *self = (thread_t *)arg;
339    int repeats = 0;
340    int iteration;
341    int element = 0;
342    int status;
343
344    for (iteration=0; iteration < ITERATIONS; iteration++) {
345       /*
346        * Each "self->interval" iterations, perform an
347        * update operation (write lock instead of read
348        * lock).
349        */
350       if ((iteration % self->interval) == 0) {
351          status = rwl_writelock(&data[element].lock);
352          if (status != 0) {
353             Emsg1(M_ABORT, 0, _("Write lock failed. ERR=%s\n"), strerror(status));
354          }
355          data[element].data = self->thread_num;
356          data[element].writes++;
357          self->writes++;
358          status = rwl_writeunlock(&data[element].lock);
359          if (status != 0) {
360             Emsg1(M_ABORT, 0, _("Write unlock failed. ERR=%s\n"), strerror(status));
361          }
362       } else {
363          /*
364           * Look at the current data element to see whether
365           * the current thread last updated it. Count the
366           * times to report later.
367           */
368           status = rwl_readlock(&data[element].lock);
369           if (status != 0) {
370              Emsg1(M_ABORT, 0, _("Read lock failed. ERR=%s\n"), strerror(status));
371           }
372           self->reads++;
373           if (data[element].data == self->thread_num)
374              repeats++;
375           status = rwl_readunlock(&data[element].lock);
376           if (status != 0) {
377              Emsg1(M_ABORT, 0, _("Read unlock failed. ERR=%s\n"), strerror(status));
378           }
379       }
380       element++;
381       if (element >= DATASIZE) {
382          element = 0;
383       }
384    }
385    if (repeats > 0) {
386       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
387          self->thread_num, repeats);
388    }
389    return NULL;
390 }
391
392 int main (int argc, char *argv[])
393 {
394     int count;
395     int data_count;
396     int status;
397     unsigned int seed = 1;
398     int thread_writes = 0;
399     int data_writes = 0;
400
401 #ifdef sun
402     /*
403      * On Solaris 2.5, threads are not timesliced. To ensure
404      * that our threads can run concurrently, we need to
405      * increase the concurrency level to THREADS.
406      */
407     thr_setconcurrency (THREADS);
408 #endif
409
410     /*
411      * Initialize the shared data.
412      */
413     for (data_count = 0; data_count < DATASIZE; data_count++) {
414         data[data_count].data = 0;
415         data[data_count].writes = 0;
416         status = rwl_init (&data[data_count].lock);
417         if (status != 0) {
418            Emsg1(M_ABORT, 0, _("Init rwlock failed. ERR=%s\n"), strerror(status));
419         }
420     }
421
422     /*
423      * Create THREADS threads to access shared data.
424      */
425     for (count = 0; count < THREADS; count++) {
426         threads[count].thread_num = count + 1;
427         threads[count].writes = 0;
428         threads[count].reads = 0;
429         threads[count].interval = rand_r (&seed) % 71;
430         status = pthread_create (&threads[count].thread_id,
431             NULL, thread_routine, (void*)&threads[count]);
432         if (status != 0) {
433            Emsg1(M_ABORT, 0, _("Create thread failed. ERR=%s\n"), strerror(status));
434         }
435     }
436
437     /*
438      * Wait for all threads to complete, and collect
439      * statistics.
440      */
441     for (count = 0; count < THREADS; count++) {
442         status = pthread_join (threads[count].thread_id, NULL);
443         if (status != 0) {
444            Emsg1(M_ABORT, 0, _("Join thread failed. ERR=%s\n"), strerror(status));
445         }
446         thread_writes += threads[count].writes;
447         printf (_("%02d: interval %d, writes %d, reads %d\n"),
448             count, threads[count].interval,
449             threads[count].writes, threads[count].reads);
450     }
451
452     /*
453      * Collect statistics for the data.
454      */
455     for (data_count = 0; data_count < DATASIZE; data_count++) {
456         data_writes += data[data_count].writes;
457         printf (_("data %02d: value %d, %d writes\n"),
458             data_count, data[data_count].data, data[data_count].writes);
459         rwl_destroy (&data[data_count].lock);
460     }
461
462     printf (_("Total: %d thread writes, %d data writes\n"),
463         thread_writes, data_writes);
464     return 0;
465 }
466
467 #endif
468
469 #ifdef TEST_RW_TRY_LOCK
470 /*
471  * brwlock_try_main.c
472  *
473  * Demonstrate use of non-blocking read-write locks.
474  *
475  * Special notes: On a Solaris system, call thr_setconcurrency()
476  * to allow interleaved thread execution, since threads are not
477  * timesliced.
478  */
479 #include <pthread.h>
480 #include "rwlock.h"
481 #include "errors.h"
482
483 #define THREADS         5
484 #define ITERATIONS      1000
485 #define DATASIZE        15
486
487 /*
488  * Keep statistics for each thread.
489  */
490 typedef struct thread_tag {
491     int         thread_num;
492     pthread_t   thread_id;
493     int         r_collisions;
494     int         w_collisions;
495     int         updates;
496     int         interval;
497 } thread_t;
498
499 /*
500  * Read-write lock and shared data
501  */
502 typedef struct data_tag {
503     brwlock_t    lock;
504     int         data;
505     int         updates;
506 } data_t;
507
508 thread_t threads[THREADS];
509 data_t data[DATASIZE];
510
511 /*
512  * Thread start routine that uses read-write locks
513  */
514 void *thread_routine (void *arg)
515 {
516     thread_t *self = (thread_t*)arg;
517     int iteration;
518     int element;
519     int status;
520
521     element = 0;                        /* Current data element */
522
523     for (iteration = 0; iteration < ITERATIONS; iteration++) {
524         if ((iteration % self->interval) == 0) {
525             status = rwl_writetrylock (&data[element].lock);
526             if (status == EBUSY)
527                 self->w_collisions++;
528             else if (status == 0) {
529                 data[element].data++;
530                 data[element].updates++;
531                 self->updates++;
532                 rwl_writeunlock (&data[element].lock);
533             } else
534                 err_abort (status, _("Try write lock"));
535         } else {
536             status = rwl_readtrylock (&data[element].lock);
537             if (status == EBUSY)
538                 self->r_collisions++;
539             else if (status != 0) {
540                 err_abort (status, _("Try read lock"));
541             } else {
542                 if (data[element].data != data[element].updates)
543                     printf ("%d: data[%d] %d != %d\n",
544                         self->thread_num, element,
545                         data[element].data, data[element].updates);
546                 rwl_readunlock (&data[element].lock);
547             }
548         }
549
550         element++;
551         if (element >= DATASIZE)
552             element = 0;
553     }
554     return NULL;
555 }
556
557 int main (int argc, char *argv[])
558 {
559     int count, data_count;
560     unsigned int seed = 1;
561     int thread_updates = 0, data_updates = 0;
562     int status;
563
564 #ifdef sun
565     /*
566      * On Solaris 2.5, threads are not timesliced. To ensure
567      * that our threads can run concurrently, we need to
568      * increase the concurrency level to THREADS.
569      */
570     DPRINTF (("Setting concurrency level to %d\n", THREADS));
571     thr_setconcurrency (THREADS);
572 #endif
573
574     /*
575      * Initialize the shared data.
576      */
577     for (data_count = 0; data_count < DATASIZE; data_count++) {
578         data[data_count].data = 0;
579         data[data_count].updates = 0;
580         rwl_init (&data[data_count].lock);
581     }
582
583     /*
584      * Create THREADS threads to access shared data.
585      */
586     for (count = 0; count < THREADS; count++) {
587         threads[count].thread_num = count;
588         threads[count].r_collisions = 0;
589         threads[count].w_collisions = 0;
590         threads[count].updates = 0;
591         threads[count].interval = rand_r (&seed) % ITERATIONS;
592         status = pthread_create (&threads[count].thread_id,
593             NULL, thread_routine, (void*)&threads[count]);
594         if (status != 0)
595             err_abort (status, _("Create thread"));
596     }
597
598     /*
599      * Wait for all threads to complete, and collect
600      * statistics.
601      */
602     for (count = 0; count < THREADS; count++) {
603         status = pthread_join (threads[count].thread_id, NULL);
604         if (status != 0)
605             err_abort (status, _("Join thread"));
606         thread_updates += threads[count].updates;
607         printf (_("%02d: interval %d, updates %d, "
608                 "r_collisions %d, w_collisions %d\n"),
609             count, threads[count].interval,
610             threads[count].updates,
611             threads[count].r_collisions, threads[count].w_collisions);
612     }
613
614     /*
615      * Collect statistics for the data.
616      */
617     for (data_count = 0; data_count < DATASIZE; data_count++) {
618         data_updates += data[data_count].updates;
619         printf (_("data %02d: value %d, %d updates\n"),
620             data_count, data[data_count].data, data[data_count].updates);
621         rwl_destroy (&data[data_count].lock);
622     }
623
624     return 0;
625 }
626
627 #endif