]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
Make out of freespace non-fatal for removable devices -- i.e. behaves like tape
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2016 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula Thread Read/Write locking code. It permits
21  *  multiple readers but only one writer.  Note, however,
22  *  that the writer thread is permitted to make multiple
23  *  nested write lock calls.
24  *
25  *  Kern Sibbald, January MMI
26  *
27  *  This code adapted from "Programming with POSIX Threads", by
28  *    David R. Butenhof
29  *
30  */
31
32 #define LOCKMGR_COMPLIANT
33 #include "bacula.h"
34
35 /*
36  * Initialize a read/write lock
37  *
38  *  Returns: 0 on success
39  *           errno on failure
40  */
41 int rwl_init(brwlock_t *rwl, int priority)
42 {
43    int stat;
44
45    rwl->r_active = rwl->w_active = 0;
46    rwl->r_wait = rwl->w_wait = 0;
47    rwl->priority = priority;
48    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
49       return stat;
50    }
51    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
52       pthread_mutex_destroy(&rwl->mutex);
53       return stat;
54    }
55    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
56       pthread_cond_destroy(&rwl->read);
57       pthread_mutex_destroy(&rwl->mutex);
58       return stat;
59    }
60    rwl->valid = RWLOCK_VALID;
61    return 0;
62 }
63
64 /*
65  * Destroy a read/write lock
66  *
67  * Returns: 0 on success
68  *          errno on failure
69  */
70 int rwl_destroy(brwlock_t *rwl)
71 {
72    int stat, stat1, stat2;
73
74   if (rwl->valid != RWLOCK_VALID) {
75      return EINVAL;
76   }
77   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
78      return stat;
79   }
80
81   /*
82    * If any threads are active, report EBUSY
83    */
84   if (rwl->r_active > 0 || rwl->w_active) {
85      pthread_mutex_unlock(&rwl->mutex);
86      return EBUSY;
87   }
88
89   /*
90    * If any threads are waiting, report EBUSY
91    */
92   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
93      pthread_mutex_unlock(&rwl->mutex);
94      return EBUSY;
95   }
96
97   rwl->valid = 0;
98   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
99      return stat;
100   }
101   stat  = pthread_mutex_destroy(&rwl->mutex);
102   stat1 = pthread_cond_destroy(&rwl->read);
103   stat2 = pthread_cond_destroy(&rwl->write);
104   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
105 }
106
107 /*
108  * Handle cleanup when the read lock condition variable
109  * wait is released.
110  */
111 static void rwl_read_release(void *arg)
112 {
113    brwlock_t *rwl = (brwlock_t *)arg;
114
115    rwl->r_wait--;
116    pthread_mutex_unlock(&rwl->mutex);
117 }
118
119 /*
120  * Handle cleanup when the write lock condition variable wait
121  * is released.
122  */
123 static void rwl_write_release(void *arg)
124 {
125    brwlock_t *rwl = (brwlock_t *)arg;
126
127    rwl->w_wait--;
128    pthread_mutex_unlock(&rwl->mutex);
129 }
130
131 /*
132  * Lock for read access, wait until locked (or error).
133  */
134 int rwl_readlock(brwlock_t *rwl)
135 {
136    int stat;
137
138    if (rwl->valid != RWLOCK_VALID) {
139       return EINVAL;
140    }
141    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
142       return stat;
143    }
144    if (rwl->w_active) {
145       rwl->r_wait++;                  /* indicate that we are waiting */
146       pthread_cleanup_push(rwl_read_release, (void *)rwl);
147       while (rwl->w_active) {
148          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
149          if (stat != 0) {
150             break;                    /* error, bail out */
151          }
152       }
153       pthread_cleanup_pop(0);
154       rwl->r_wait--;                  /* we are no longer waiting */
155    }
156    if (stat == 0) {
157       rwl->r_active++;                /* we are running */
158    }
159    pthread_mutex_unlock(&rwl->mutex);
160    return stat;
161 }
162
163 /*
164  * Attempt to lock for read access, don't wait
165  */
166 int rwl_readtrylock(brwlock_t *rwl)
167 {
168    int stat, stat2;
169
170    if (rwl->valid != RWLOCK_VALID) {
171       return EINVAL;
172    }
173    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
174       return stat;
175    }
176    if (rwl->w_active) {
177       stat = EBUSY;
178    } else {
179       rwl->r_active++;                /* we are running */
180    }
181    stat2 = pthread_mutex_unlock(&rwl->mutex);
182    return (stat == 0 ? stat2 : stat);
183 }
184
185 /*
186  * Unlock read lock
187  */
188 int rwl_readunlock(brwlock_t *rwl)
189 {
190    int stat, stat2;
191
192    if (rwl->valid != RWLOCK_VALID) {
193       return EINVAL;
194    }
195    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
196       return stat;
197    }
198    rwl->r_active--;
199    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
200       stat = pthread_cond_broadcast(&rwl->write);
201    }
202    stat2 = pthread_mutex_unlock(&rwl->mutex);
203    return (stat == 0 ? stat2 : stat);
204 }
205
206
207 /*
208  * Lock for write access, wait until locked (or error).
209  *   Multiple nested write locking is permitted.
210  */
211 int rwl_writelock_p(brwlock_t *rwl, const char *file, int line)
212 {
213    int stat;
214
215    if (rwl->valid != RWLOCK_VALID) {
216       return EINVAL;
217    }
218    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
219       return stat;
220    }
221    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
222       rwl->w_active++;
223       pthread_mutex_unlock(&rwl->mutex);
224       return 0;
225    }
226    lmgr_pre_lock(rwl, rwl->priority, file, line);
227    if (rwl->w_active || rwl->r_active > 0) {
228       rwl->w_wait++;                  /* indicate that we are waiting */
229       pthread_cleanup_push(rwl_write_release, (void *)rwl);
230       while (rwl->w_active || rwl->r_active > 0) {
231          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
232             lmgr_do_unlock(rwl);
233             break;                    /* error, bail out */
234          }
235       }
236       pthread_cleanup_pop(0);
237       rwl->w_wait--;                  /* we are no longer waiting */
238    }
239    if (stat == 0) {
240       rwl->w_active++;                /* we are running */
241       rwl->writer_id = pthread_self(); /* save writer thread's id */
242       lmgr_post_lock();
243    }
244    pthread_mutex_unlock(&rwl->mutex);
245    return stat;
246 }
247
248 /*
249  * Attempt to lock for write access, don't wait
250  */
251 int rwl_writetrylock(brwlock_t *rwl)
252 {
253    int stat, stat2;
254
255    if (rwl->valid != RWLOCK_VALID) {
256       return EINVAL;
257    }
258    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
259       return stat;
260    }
261    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
262       rwl->w_active++;
263       pthread_mutex_unlock(&rwl->mutex);
264       return 0;
265    }
266    if (rwl->w_active || rwl->r_active > 0) {
267       stat = EBUSY;
268    } else {
269       rwl->w_active = 1;              /* we are running */
270       rwl->writer_id = pthread_self(); /* save writer thread's id */
271       lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
272    }
273    stat2 = pthread_mutex_unlock(&rwl->mutex);
274    return (stat == 0 ? stat2 : stat);
275 }
276
277 /*
278  * Unlock write lock
279  *  Start any waiting writers in preference to waiting readers
280  */
281 int rwl_writeunlock(brwlock_t *rwl)
282 {
283    int stat, stat2;
284
285    if (rwl->valid != RWLOCK_VALID) {
286       return EINVAL;
287    }
288    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
289       return stat;
290    }
291    if (rwl->w_active <= 0) {
292       pthread_mutex_unlock(&rwl->mutex);
293       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock called too many times.\n"));
294    }
295    rwl->w_active--;
296    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
297       pthread_mutex_unlock(&rwl->mutex);
298       Jmsg0(NULL, M_ABORT, 0, _("rwl_writeunlock by non-owner.\n"));
299    }
300    if (rwl->w_active > 0) {
301       stat = 0;                       /* writers still active */
302    } else {
303       lmgr_do_unlock(rwl);
304       /* No more writers, awaken someone */
305       if (rwl->r_wait > 0) {         /* if readers waiting */
306          stat = pthread_cond_broadcast(&rwl->read);
307       } else if (rwl->w_wait > 0) {
308          stat = pthread_cond_broadcast(&rwl->write);
309       }
310    }
311    stat2 = pthread_mutex_unlock(&rwl->mutex);
312    return (stat == 0 ? stat2 : stat);
313 }
314
315 bool is_rwl_valid(brwlock_t *rwl)
316 {
317    return (rwl->valid == RWLOCK_VALID);
318 }
319
320
321 #ifdef TEST_RWLOCK
322
323 #define THREADS     300
324 #define DATASIZE   15
325 #define ITERATIONS 1000000
326
327 /*
328  * Keep statics for each thread.
329  */
330 typedef struct thread_tag {
331    int thread_num;
332    pthread_t thread_id;
333    int writes;
334    int reads;
335    int interval;
336 } thread_t;
337
338 /*
339  * Read/write lock and shared data.
340  */
341 typedef struct data_tag {
342    brwlock_t lock;
343    int data;
344    int writes;
345 } data_t;
346
347 static thread_t threads[THREADS];
348 static data_t data[DATASIZE];
349
350 /*
351  * Thread start routine that uses read/write locks.
352  */
353 void *thread_routine(void *arg)
354 {
355    thread_t *self = (thread_t *)arg;
356    int repeats = 0;
357    int iteration;
358    int element = 0;
359    int status;
360
361    for (iteration=0; iteration < ITERATIONS; iteration++) {
362       /*
363        * Each "self->interval" iterations, perform an
364        * update operation (write lock instead of read
365        * lock).
366        */
367 //      if ((iteration % self->interval) == 0) {
368          status = rwl_writelock(&data[element].lock);
369          if (status != 0) {
370             berrno be;
371             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
372             exit(1);
373          }
374          data[element].data = self->thread_num;
375          data[element].writes++;
376          self->writes++;
377          status = rwl_writelock(&data[element].lock);
378          if (status != 0) {
379             berrno be;
380             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
381             exit(1);
382          }
383          data[element].data = self->thread_num;
384          data[element].writes++;
385          self->writes++;
386          status = rwl_writeunlock(&data[element].lock);
387          if (status != 0) {
388             berrno be;
389             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
390             exit(1);
391          }
392          status = rwl_writeunlock(&data[element].lock);
393          if (status != 0) {
394             berrno be;
395             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
396             exit(1);
397          }
398
399 #ifdef xxx
400       } else {
401          /*
402           * Look at the current data element to see whether
403           * the current thread last updated it. Count the
404           * times to report later.
405           */
406           status = rwl_readlock(&data[element].lock);
407           if (status != 0) {
408              berrno be;
409              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
410              exit(1);
411           }
412           self->reads++;
413           if (data[element].data == self->thread_num)
414              repeats++;
415           status = rwl_readunlock(&data[element].lock);
416           if (status != 0) {
417              berrno be;
418              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
419              exit(1);
420           }
421       }
422 #endif
423       element++;
424       if (element >= DATASIZE) {
425          element = 0;
426       }
427    }
428    if (repeats > 0) {
429       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
430          self->thread_num, repeats);
431    }
432    return NULL;
433 }
434
435 int main (int argc, char *argv[])
436 {
437     int count;
438     int data_count;
439     int status;
440     unsigned int seed = 1;
441     int thread_writes = 0;
442     int data_writes = 0;
443
444     /*
445      * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
446      * Ensure our threads can run concurrently.
447      */
448     thr_setconcurrency(THREADS);      /* Only implemented on Solaris */
449
450     /*
451      * Initialize the shared data.
452      */
453     for (data_count = 0; data_count < DATASIZE; data_count++) {
454         data[data_count].data = 0;
455         data[data_count].writes = 0;
456         status = rwl_init(&data[data_count].lock);
457         if (status != 0) {
458            berrno be;
459            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
460            exit(1);
461         }
462     }
463
464     /*
465      * Create THREADS threads to access shared data.
466      */
467     for (count = 0; count < THREADS; count++) {
468         threads[count].thread_num = count + 1;
469         threads[count].writes = 0;
470         threads[count].reads = 0;
471         threads[count].interval = rand_r(&seed) % 71;
472         if (threads[count].interval <= 0) {
473            threads[count].interval = 1;
474         }
475         status = pthread_create (&threads[count].thread_id,
476             NULL, thread_routine, (void*)&threads[count]);
477         if (status != 0 || (int)threads[count].thread_id == 0) {
478            berrno be;
479            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
480            exit(1);
481         }
482     }
483
484     /*
485      * Wait for all threads to complete, and collect
486      * statistics.
487      */
488     for (count = 0; count < THREADS; count++) {
489         status = pthread_join (threads[count].thread_id, NULL);
490         if (status != 0) {
491            berrno be;
492            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
493            exit(1);
494         }
495         thread_writes += threads[count].writes;
496         printf (_("%02d: interval %d, writes %d, reads %d\n"),
497             count, threads[count].interval,
498             threads[count].writes, threads[count].reads);
499     }
500
501     /*
502      * Collect statistics for the data.
503      */
504     for (data_count = 0; data_count < DATASIZE; data_count++) {
505         data_writes += data[data_count].writes;
506         printf (_("data %02d: value %d, %d writes\n"),
507             data_count, data[data_count].data, data[data_count].writes);
508         rwl_destroy (&data[data_count].lock);
509     }
510
511     printf (_("Total: %d thread writes, %d data writes\n"),
512         thread_writes, data_writes);
513     return 0;
514 }
515
516 #endif
517
518 #ifdef TEST_RW_TRY_LOCK
519 /*
520  * brwlock_try_main.c
521  *
522  * Demonstrate use of non-blocking read-write locks.
523  *
524  * On older Solaris systems, call thr_setconcurrency()
525  * to allow interleaved thread execution, since threads are not
526  * timesliced.
527  */
528 #include <pthread.h>
529 #include "rwlock.h"
530 #include "errors.h"
531
532 #define THREADS         5
533 #define ITERATIONS      1000
534 #define DATASIZE        15
535
536 /*
537  * Keep statistics for each thread.
538  */
539 typedef struct thread_tag {
540     int         thread_num;
541     pthread_t   thread_id;
542     int         r_collisions;
543     int         w_collisions;
544     int         updates;
545     int         interval;
546 } thread_t;
547
548 /*
549  * Read-write lock and shared data
550  */
551 typedef struct data_tag {
552     brwlock_t    lock;
553     int         data;
554     int         updates;
555 } data_t;
556
557 thread_t threads[THREADS];
558 data_t data[DATASIZE];
559
560 /*
561  * Thread start routine that uses read-write locks
562  */
563 void *thread_routine (void *arg)
564 {
565     thread_t *self = (thread_t*)arg;
566     int iteration;
567     int element;
568     int status;
569     lmgr_init_thread();
570     element = 0;                        /* Current data element */
571
572     for (iteration = 0; iteration < ITERATIONS; iteration++) {
573         if ((iteration % self->interval) == 0) {
574             status = rwl_writetrylock (&data[element].lock);
575             if (status == EBUSY)
576                 self->w_collisions++;
577             else if (status == 0) {
578                 data[element].data++;
579                 data[element].updates++;
580                 self->updates++;
581                 rwl_writeunlock (&data[element].lock);
582             } else
583                 err_abort (status, _("Try write lock"));
584         } else {
585             status = rwl_readtrylock (&data[element].lock);
586             if (status == EBUSY)
587                 self->r_collisions++;
588             else if (status != 0) {
589                 err_abort (status, _("Try read lock"));
590             } else {
591                 if (data[element].data != data[element].updates)
592                     printf ("%d: data[%d] %d != %d\n",
593                         self->thread_num, element,
594                         data[element].data, data[element].updates);
595                 rwl_readunlock (&data[element].lock);
596             }
597         }
598
599         element++;
600         if (element >= DATASIZE)
601             element = 0;
602     }
603     lmgr_cleanup_thread();
604     return NULL;
605 }
606
607 int main (int argc, char *argv[])
608 {
609     int count, data_count;
610     unsigned int seed = 1;
611     int thread_updates = 0, data_updates = 0;
612     int status;
613
614     /*
615      * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
616      * Ensure our threads can run concurrently.
617      */
618     DPRINTF (("Setting concurrency level to %d\n", THREADS));
619     thr_setconcurrency(THREADS);      /* Only implemented on Solaris */
620
621     /*
622      * Initialize the shared data.
623      */
624     for (data_count = 0; data_count < DATASIZE; data_count++) {
625         data[data_count].data = 0;
626         data[data_count].updates = 0;
627         rwl_init(&data[data_count].lock);
628     }
629
630     /*
631      * Create THREADS threads to access shared data.
632      */
633     for (count = 0; count < THREADS; count++) {
634         threads[count].thread_num = count;
635         threads[count].r_collisions = 0;
636         threads[count].w_collisions = 0;
637         threads[count].updates = 0;
638         threads[count].interval = rand_r (&seed) % ITERATIONS;
639         status = pthread_create (&threads[count].thread_id,
640             NULL, thread_routine, (void*)&threads[count]);
641         if (status != 0)
642             err_abort (status, _("Create thread"));
643     }
644
645     /*
646      * Wait for all threads to complete, and collect
647      * statistics.
648      */
649     for (count = 0; count < THREADS; count++) {
650         status = pthread_join (threads[count].thread_id, NULL);
651         if (status != 0)
652             err_abort (status, _("Join thread"));
653         thread_updates += threads[count].updates;
654         printf (_("%02d: interval %d, updates %d, "
655                 "r_collisions %d, w_collisions %d\n"),
656             count, threads[count].interval,
657             threads[count].updates,
658             threads[count].r_collisions, threads[count].w_collisions);
659     }
660
661     /*
662      * Collect statistics for the data.
663      */
664     for (data_count = 0; data_count < DATASIZE; data_count++) {
665         data_updates += data[data_count].updates;
666         printf (_("data %02d: value %d, %d updates\n"),
667             data_count, data[data_count].data, data[data_count].updates);
668         rwl_destroy (&data[data_count].lock);
669     }
670
671     return 0;
672 }
673
674 #endif