]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
update lock manager to get better traces
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Thread Read/Write locking code. It permits
30  *  multiple readers but only one writer.  Note, however,
31  *  that the writer thread is permitted to make multiple
32  *  nested write lock calls.
33  *
34  *  Kern Sibbald, January MMI
35  *
36  *   Version $Id$
37  *
38  *  This code adapted from "Programming with POSIX Threads", by
39  *    David R. Butenhof
40  *
41  */
42
43 #define _LOCKMGR_COMPLIANT
44 #include "bacula.h"
45
46 /*
47  * Initialize a read/write lock
48  *
49  *  Returns: 0 on success
50  *           errno on failure
51  */
52 int rwl_init(brwlock_t *rwl)
53 {
54    int stat;
55
56    rwl->r_active = rwl->w_active = 0;
57    rwl->r_wait = rwl->w_wait = 0;
58    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
59       return stat;
60    }
61    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
62       pthread_mutex_destroy(&rwl->mutex);
63       return stat;
64    }
65    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
66       pthread_cond_destroy(&rwl->read);
67       pthread_mutex_destroy(&rwl->mutex);
68       return stat;
69    }
70    rwl->valid = RWLOCK_VALID;
71    return 0;
72 }
73
74 /*
75  * Destroy a read/write lock
76  *
77  * Returns: 0 on success
78  *          errno on failure
79  */
80 int rwl_destroy(brwlock_t *rwl)
81 {
82    int stat, stat1, stat2;
83
84   if (rwl->valid != RWLOCK_VALID) {
85      return EINVAL;
86   }
87   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
88      return stat;
89   }
90
91   /*
92    * If any threads are active, report EBUSY
93    */
94   if (rwl->r_active > 0 || rwl->w_active) {
95      pthread_mutex_unlock(&rwl->mutex);
96      return EBUSY;
97   }
98
99   /*
100    * If any threads are waiting, report EBUSY
101    */
102   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
103      pthread_mutex_unlock(&rwl->mutex);
104      return EBUSY;
105   }
106
107   rwl->valid = 0;
108   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
109      return stat;
110   }
111   stat  = pthread_mutex_destroy(&rwl->mutex);
112   stat1 = pthread_cond_destroy(&rwl->read);
113   stat2 = pthread_cond_destroy(&rwl->write);
114   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
115 }
116
117 /*
118  * Handle cleanup when the read lock condition variable
119  * wait is released.
120  */
121 static void rwl_read_release(void *arg)
122 {
123    brwlock_t *rwl = (brwlock_t *)arg;
124
125    rwl->r_wait--;
126    pthread_mutex_unlock(&rwl->mutex);
127 }
128
129 /*
130  * Handle cleanup when the write lock condition variable wait
131  * is released.
132  */
133 static void rwl_write_release(void *arg)
134 {
135    brwlock_t *rwl = (brwlock_t *)arg;
136
137    rwl->w_wait--;
138    pthread_mutex_unlock(&rwl->mutex);
139 }
140
141 /*
142  * Lock for read access, wait until locked (or error).
143  */
144 int rwl_readlock(brwlock_t *rwl)
145 {
146    int stat;
147
148    if (rwl->valid != RWLOCK_VALID) {
149       return EINVAL;
150    }
151    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
152       return stat;
153    }
154    if (rwl->w_active) {
155       rwl->r_wait++;                  /* indicate that we are waiting */
156       pthread_cleanup_push(rwl_read_release, (void *)rwl);
157       while (rwl->w_active) {
158          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
159          if (stat != 0) {
160             break;                    /* error, bail out */
161          }
162       }
163       pthread_cleanup_pop(0);
164       rwl->r_wait--;                  /* we are no longer waiting */
165    }
166    if (stat == 0) {
167       rwl->r_active++;                /* we are running */
168    }
169    pthread_mutex_unlock(&rwl->mutex);
170    return stat;
171 }
172
173 /*
174  * Attempt to lock for read access, don't wait
175  */
176 int rwl_readtrylock(brwlock_t *rwl)
177 {
178    int stat, stat2;
179
180    if (rwl->valid != RWLOCK_VALID) {
181       return EINVAL;
182    }
183    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
184       return stat;
185    }
186    if (rwl->w_active) {
187       stat = EBUSY;
188    } else {
189       rwl->r_active++;                /* we are running */
190    }
191    stat2 = pthread_mutex_unlock(&rwl->mutex);
192    return (stat == 0 ? stat2 : stat);
193 }
194
195 /*
196  * Unlock read lock
197  */
198 int rwl_readunlock(brwlock_t *rwl)
199 {
200    int stat, stat2;
201
202    if (rwl->valid != RWLOCK_VALID) {
203       return EINVAL;
204    }
205    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
206       return stat;
207    }
208    rwl->r_active--;
209    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
210       stat = pthread_cond_broadcast(&rwl->write);
211    }
212    stat2 = pthread_mutex_unlock(&rwl->mutex);
213    return (stat == 0 ? stat2 : stat);
214 }
215
216
217 /*
218  * Lock for write access, wait until locked (or error).
219  *   Multiple nested write locking is permitted.
220  */
221 int rwl_writelock(brwlock_t *rwl)
222 {
223    int stat;
224
225    if (rwl->valid != RWLOCK_VALID) {
226       return EINVAL;
227    }
228    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
229       return stat;
230    }
231    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
232       rwl->w_active++;
233       pthread_mutex_unlock(&rwl->mutex);
234       return 0;
235    }
236    lmgr_pre_lock(rwl, __FILE__, __LINE__);
237    if (rwl->w_active || rwl->r_active > 0) {
238       rwl->w_wait++;                  /* indicate that we are waiting */
239       pthread_cleanup_push(rwl_write_release, (void *)rwl);
240       while (rwl->w_active || rwl->r_active > 0) {
241          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
242             lmgr_do_unlock(rwl);
243             break;                    /* error, bail out */
244          }
245       }
246       pthread_cleanup_pop(0);
247       rwl->w_wait--;                  /* we are no longer waiting */
248    }
249    if (stat == 0) {
250       rwl->w_active++;                /* we are running */
251       rwl->writer_id = pthread_self(); /* save writer thread's id */
252       lmgr_post_lock();
253    } 
254    pthread_mutex_unlock(&rwl->mutex);
255    return stat;
256 }
257
258 /*
259  * Attempt to lock for write access, don't wait
260  */
261 int rwl_writetrylock(brwlock_t *rwl)
262 {
263    int stat, stat2;
264
265    if (rwl->valid != RWLOCK_VALID) {
266       return EINVAL;
267    }
268    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
269       return stat;
270    }
271    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
272       rwl->w_active++;
273       pthread_mutex_unlock(&rwl->mutex);
274       return 0;
275    }
276    if (rwl->w_active || rwl->r_active > 0) {
277       stat = EBUSY;
278    } else {
279       rwl->w_active = 1;              /* we are running */
280       rwl->writer_id = pthread_self(); /* save writer thread's id */
281       lmgr_do_lock(rwl, __FILE__, __LINE__);
282    }
283    stat2 = pthread_mutex_unlock(&rwl->mutex);
284    return (stat == 0 ? stat2 : stat);
285 }
286
287 /*
288  * Unlock write lock
289  *  Start any waiting writers in preference to waiting readers
290  */
291 int rwl_writeunlock(brwlock_t *rwl)
292 {
293    int stat, stat2;
294
295    if (rwl->valid != RWLOCK_VALID) {
296       return EINVAL;
297    }
298    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
299       return stat;
300    }
301    if (rwl->w_active <= 0) {
302       pthread_mutex_unlock(&rwl->mutex);
303       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
304    }
305    rwl->w_active--;
306    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
307       pthread_mutex_unlock(&rwl->mutex);
308       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
309    }
310    if (rwl->w_active > 0) {
311       stat = 0;                       /* writers still active */
312    } else {
313       lmgr_do_unlock(rwl);
314       /* No more writers, awaken someone */
315       if (rwl->r_wait > 0) {         /* if readers waiting */
316          stat = pthread_cond_broadcast(&rwl->read);
317       } else if (rwl->w_wait > 0) {
318          stat = pthread_cond_broadcast(&rwl->write);
319       }
320    }
321    stat2 = pthread_mutex_unlock(&rwl->mutex);
322    return (stat == 0 ? stat2 : stat);
323 }
324
325 #ifdef TEST_RWLOCK
326
327 #define THREADS     300
328 #define DATASIZE   15
329 #define ITERATIONS 1000000
330
331 /*
332  * Keep statics for each thread.
333  */
334 typedef struct thread_tag {
335    int thread_num;
336    pthread_t thread_id;
337    int writes;
338    int reads;
339    int interval;
340 } thread_t;
341
342 /*
343  * Read/write lock and shared data.
344  */
345 typedef struct data_tag {
346    brwlock_t lock;
347    int data;
348    int writes;
349 } data_t;
350
351 static thread_t threads[THREADS];
352 static data_t data[DATASIZE];
353
354 /*
355  * Thread start routine that uses read/write locks.
356  */
357 void *thread_routine(void *arg)
358 {
359    thread_t *self = (thread_t *)arg;
360    int repeats = 0;
361    int iteration;
362    int element = 0;
363    int status;
364
365    for (iteration=0; iteration < ITERATIONS; iteration++) {
366       /*
367        * Each "self->interval" iterations, perform an
368        * update operation (write lock instead of read
369        * lock).
370        */
371 //      if ((iteration % self->interval) == 0) {
372          status = rwl_writelock(&data[element].lock);
373          if (status != 0) {
374             berrno be;
375             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
376             exit(1);
377          }
378          data[element].data = self->thread_num;
379          data[element].writes++;
380          self->writes++;
381          status = rwl_writelock(&data[element].lock);
382          if (status != 0) {
383             berrno be;
384             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
385             exit(1);
386          }
387          data[element].data = self->thread_num;
388          data[element].writes++;
389          self->writes++;
390          status = rwl_writeunlock(&data[element].lock);
391          if (status != 0) {
392             berrno be;
393             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
394             exit(1);
395          }
396          status = rwl_writeunlock(&data[element].lock);
397          if (status != 0) {
398             berrno be;
399             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
400             exit(1);
401          }
402
403 #ifdef xxx
404       } else {
405          /*
406           * Look at the current data element to see whether
407           * the current thread last updated it. Count the
408           * times to report later.
409           */
410           status = rwl_readlock(&data[element].lock);
411           if (status != 0) {
412              berrno be;
413              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
414              exit(1);
415           }
416           self->reads++;
417           if (data[element].data == self->thread_num)
418              repeats++;
419           status = rwl_readunlock(&data[element].lock);
420           if (status != 0) {
421              berrno be;
422              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
423              exit(1);
424           }
425       }
426 #endif
427       element++;
428       if (element >= DATASIZE) {
429          element = 0;
430       }
431    }
432    if (repeats > 0) {
433       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
434          self->thread_num, repeats);
435    }
436    return NULL;
437 }
438
439 int main (int argc, char *argv[])
440 {
441     int count;
442     int data_count;
443     int status;
444     unsigned int seed = 1;
445     int thread_writes = 0;
446     int data_writes = 0;
447
448 #ifdef sun
449     /*
450      * On Solaris 2.5, threads are not timesliced. To ensure
451      * that our threads can run concurrently, we need to
452      * increase the concurrency level to THREADS.
453      */
454     thr_setconcurrency (THREADS);
455 #endif
456
457     /*
458      * Initialize the shared data.
459      */
460     for (data_count = 0; data_count < DATASIZE; data_count++) {
461         data[data_count].data = 0;
462         data[data_count].writes = 0;
463         status = rwl_init (&data[data_count].lock);
464         if (status != 0) {
465            berrno be;
466            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
467            exit(1);
468         }
469     }
470
471     /*
472      * Create THREADS threads to access shared data.
473      */
474     for (count = 0; count < THREADS; count++) {
475         threads[count].thread_num = count + 1;
476         threads[count].writes = 0;
477         threads[count].reads = 0;
478         threads[count].interval = rand_r(&seed) % 71;
479         if (threads[count].interval <= 0) {
480            threads[count].interval = 1;
481         }
482         status = pthread_create (&threads[count].thread_id,
483             NULL, thread_routine, (void*)&threads[count]);
484         if (status != 0 || (int)threads[count].thread_id == 0) {
485            berrno be;
486            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
487            exit(1);
488         }
489     }
490
491     /*
492      * Wait for all threads to complete, and collect
493      * statistics.
494      */
495     for (count = 0; count < THREADS; count++) {
496         status = pthread_join (threads[count].thread_id, NULL);
497         if (status != 0) {
498            berrno be;
499            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
500            exit(1);
501         }
502         thread_writes += threads[count].writes;
503         printf (_("%02d: interval %d, writes %d, reads %d\n"),
504             count, threads[count].interval,
505             threads[count].writes, threads[count].reads);
506     }
507
508     /*
509      * Collect statistics for the data.
510      */
511     for (data_count = 0; data_count < DATASIZE; data_count++) {
512         data_writes += data[data_count].writes;
513         printf (_("data %02d: value %d, %d writes\n"),
514             data_count, data[data_count].data, data[data_count].writes);
515         rwl_destroy (&data[data_count].lock);
516     }
517
518     printf (_("Total: %d thread writes, %d data writes\n"),
519         thread_writes, data_writes);
520     return 0;
521 }
522
523 #endif
524
525 #ifdef TEST_RW_TRY_LOCK
526 /*
527  * brwlock_try_main.c
528  *
529  * Demonstrate use of non-blocking read-write locks.
530  *
531  * Special notes: On a Solaris system, call thr_setconcurrency()
532  * to allow interleaved thread execution, since threads are not
533  * timesliced.
534  */
535 #include <pthread.h>
536 #include "rwlock.h"
537 #include "errors.h"
538
539 #define THREADS         5
540 #define ITERATIONS      1000
541 #define DATASIZE        15
542
543 /*
544  * Keep statistics for each thread.
545  */
546 typedef struct thread_tag {
547     int         thread_num;
548     pthread_t   thread_id;
549     int         r_collisions;
550     int         w_collisions;
551     int         updates;
552     int         interval;
553 } thread_t;
554
555 /*
556  * Read-write lock and shared data
557  */
558 typedef struct data_tag {
559     brwlock_t    lock;
560     int         data;
561     int         updates;
562 } data_t;
563
564 thread_t threads[THREADS];
565 data_t data[DATASIZE];
566
567 /*
568  * Thread start routine that uses read-write locks
569  */
570 void *thread_routine (void *arg)
571 {
572     thread_t *self = (thread_t*)arg;
573     int iteration;
574     int element;
575     int status;
576     lmgr_init_thread();
577     element = 0;                        /* Current data element */
578
579     for (iteration = 0; iteration < ITERATIONS; iteration++) {
580         if ((iteration % self->interval) == 0) {
581             status = rwl_writetrylock (&data[element].lock);
582             if (status == EBUSY)
583                 self->w_collisions++;
584             else if (status == 0) {
585                 data[element].data++;
586                 data[element].updates++;
587                 self->updates++;
588                 rwl_writeunlock (&data[element].lock);
589             } else
590                 err_abort (status, _("Try write lock"));
591         } else {
592             status = rwl_readtrylock (&data[element].lock);
593             if (status == EBUSY)
594                 self->r_collisions++;
595             else if (status != 0) {
596                 err_abort (status, _("Try read lock"));
597             } else {
598                 if (data[element].data != data[element].updates)
599                     printf ("%d: data[%d] %d != %d\n",
600                         self->thread_num, element,
601                         data[element].data, data[element].updates);
602                 rwl_readunlock (&data[element].lock);
603             }
604         }
605
606         element++;
607         if (element >= DATASIZE)
608             element = 0;
609     }
610     lmgr_cleanup_thread();
611     return NULL;
612 }
613
614 int main (int argc, char *argv[])
615 {
616     int count, data_count;
617     unsigned int seed = 1;
618     int thread_updates = 0, data_updates = 0;
619     int status;
620
621 #ifdef sun
622     /*
623      * On Solaris 2.5, threads are not timesliced. To ensure
624      * that our threads can run concurrently, we need to
625      * increase the concurrency level to THREADS.
626      */
627     DPRINTF (("Setting concurrency level to %d\n", THREADS));
628     thr_setconcurrency (THREADS);
629 #endif
630
631     /*
632      * Initialize the shared data.
633      */
634     for (data_count = 0; data_count < DATASIZE; data_count++) {
635         data[data_count].data = 0;
636         data[data_count].updates = 0;
637         rwl_init (&data[data_count].lock);
638     }
639
640     /*
641      * Create THREADS threads to access shared data.
642      */
643     for (count = 0; count < THREADS; count++) {
644         threads[count].thread_num = count;
645         threads[count].r_collisions = 0;
646         threads[count].w_collisions = 0;
647         threads[count].updates = 0;
648         threads[count].interval = rand_r (&seed) % ITERATIONS;
649         status = pthread_create (&threads[count].thread_id,
650             NULL, thread_routine, (void*)&threads[count]);
651         if (status != 0)
652             err_abort (status, _("Create thread"));
653     }
654
655     /*
656      * Wait for all threads to complete, and collect
657      * statistics.
658      */
659     for (count = 0; count < THREADS; count++) {
660         status = pthread_join (threads[count].thread_id, NULL);
661         if (status != 0)
662             err_abort (status, _("Join thread"));
663         thread_updates += threads[count].updates;
664         printf (_("%02d: interval %d, updates %d, "
665                 "r_collisions %d, w_collisions %d\n"),
666             count, threads[count].interval,
667             threads[count].updates,
668             threads[count].r_collisions, threads[count].w_collisions);
669     }
670
671     /*
672      * Collect statistics for the data.
673      */
674     for (data_count = 0; data_count < DATASIZE; data_count++) {
675         data_updates += data[data_count].updates;
676         printf (_("data %02d: value %d, %d updates\n"),
677             data_count, data[data_count].data, data[data_count].updates);
678         rwl_destroy (&data[data_count].lock);
679     }
680
681     return 0;
682 }
683
684 #endif