]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/devlock.c
Backport from BEE
[bacula/bacula] / bacula / src / lib / devlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2014 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from many
7    others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    Bacula® is a registered trademark of Kern Sibbald.
15 */
16 /*
17  * Bacula Thread Read/Write locking code. It permits
18  *  multiple readers but only one writer.  Note, however,
19  *  that the writer thread is permitted to make multiple
20  *  nested write lock calls.
21  *
22  *  Kern Sibbald, January MMI
23  *
24  *  This code adapted from "Programming with POSIX Threads", by
25  *    David R. Butenhof
26  *
27  */
28
29 #define _LOCKMGR_COMPLIANT
30 #include "bacula.h"
31 #include "devlock.h"
32
33 /*
34  * Initialize a read/write lock
35  *
36  *  Returns: 0 on success
37  *           errno on failure
38  */
39
40 devlock *new_devlock()
41 {
42    devlock *lock;
43    lock = (devlock *)malloc(sizeof (devlock));
44    memset(lock, 0, sizeof(devlock));
45    return lock;
46 }
47
48 int devlock::init(int init_priority)
49 {
50    int stat;
51    devlock *rwl = this;
52
53    rwl->r_active = rwl->w_active = 0;
54    rwl->r_wait = rwl->w_wait = 0;
55    rwl->priority = init_priority;
56    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
57       return stat;
58    }
59    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
60       pthread_mutex_destroy(&rwl->mutex);
61       return stat;
62    }
63    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
64       pthread_cond_destroy(&rwl->read);
65       pthread_mutex_destroy(&rwl->mutex);
66       return stat;
67    }
68    rwl->valid = DEVLOCK_VALID;
69    return 0;
70 }
71
72 /*
73  * Destroy a read/write lock
74  *
75  * Returns: 0 on success
76  *          errno on failure
77  */
78 int devlock::destroy()
79 {
80    devlock *rwl = this;
81    int stat, stat1, stat2;
82
83    if (rwl->valid != DEVLOCK_VALID) {
84       return EINVAL;
85    }
86    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
87       return stat;
88    }
89
90    /*
91     * If any threads are active, report EBUSY
92     */
93    if (rwl->r_active > 0 || rwl->w_active) {
94       pthread_mutex_unlock(&rwl->mutex);
95       return EBUSY;
96    }
97
98    /*
99     * If any threads are waiting, report EBUSY
100     */
101    if (rwl->r_wait > 0 || rwl->w_wait > 0) {
102       pthread_mutex_unlock(&rwl->mutex);
103       return EBUSY;
104    }
105
106    rwl->valid = 0;
107    if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
108       return stat;
109    }
110    stat  = pthread_mutex_destroy(&rwl->mutex);
111    stat1 = pthread_cond_destroy(&rwl->read);
112    stat2 = pthread_cond_destroy(&rwl->write);
113    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
114 }
115
116 /*
117  * Handle cleanup when the read lock condition variable
118  * wait is released.
119  */
120 static void devlock_read_release(void *arg)
121 {
122    devlock *rwl = (devlock *)arg;
123    rwl->read_release();
124 }
125
126 void devlock::read_release()
127 {
128    r_wait--;
129    pthread_mutex_unlock(&mutex);
130 }
131
132 /*
133  * Handle cleanup when the write lock condition variable wait
134  * is released.
135  */
136 static void devlock_write_release(void *arg)
137 {
138    devlock *rwl = (devlock *)arg;
139    rwl->write_release();
140 }
141
142 void devlock::write_release()
143 {
144    w_wait--;
145    pthread_mutex_unlock(&mutex);
146 }
147
148 /*
149  * Lock for read access, wait until locked (or error).
150  */
151 int devlock::readlock()
152 {
153    devlock *rwl = this;
154    int stat;
155
156    if (rwl->valid != DEVLOCK_VALID) {
157       return EINVAL;
158    }
159    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
160       return stat;
161    }
162    if (rwl->w_active) {
163       rwl->r_wait++;                  /* indicate that we are waiting */
164       pthread_cleanup_push(devlock_read_release, (void *)rwl);
165       while (rwl->w_active) {
166          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
167          if (stat != 0) {
168             break;                    /* error, bail out */
169          }
170       }
171       pthread_cleanup_pop(0);
172       rwl->r_wait--;                  /* we are no longer waiting */
173    }
174    if (stat == 0) {
175       rwl->r_active++;                /* we are running */
176    }
177    pthread_mutex_unlock(&rwl->mutex);
178    return stat;
179 }
180
181 /*
182  * Attempt to lock for read access, don't wait
183  */
184 int devlock::readtrylock()
185 {
186    devlock *rwl = this;
187    int stat, stat2;
188
189    if (rwl->valid != DEVLOCK_VALID) {
190       return EINVAL;
191    }
192    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
193       return stat;
194    }
195    if (rwl->w_active) {
196       stat = EBUSY;
197    } else {
198       rwl->r_active++;                /* we are running */
199    }
200    stat2 = pthread_mutex_unlock(&rwl->mutex);
201    return (stat == 0 ? stat2 : stat);
202 }
203
204 /*
205  * Unlock read lock
206  */
207 int devlock::readunlock()
208 {
209    devlock *rwl = this;
210    int stat, stat2;
211
212    if (rwl->valid != DEVLOCK_VALID) {
213       return EINVAL;
214    }
215    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
216       return stat;
217    }
218    rwl->r_active--;
219    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
220       stat = pthread_cond_broadcast(&rwl->write);
221    }
222    stat2 = pthread_mutex_unlock(&rwl->mutex);
223    return (stat == 0 ? stat2 : stat);
224 }
225
226
227 /*
228  * Lock for write access, wait until locked (or error).
229  *   Multiple nested write locking is permitted.
230  */
231 int devlock::writelock(int areason, bool acan_take)
232 {
233    devlock *rwl = this;
234    int stat;
235
236    if (rwl->valid != DEVLOCK_VALID) {
237       return EINVAL;
238    }
239    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
240       return stat;
241    }
242    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
243       rwl->w_active++;
244       pthread_mutex_unlock(&rwl->mutex);
245       return 0;
246    }
247    lmgr_pre_lock(rwl, rwl->priority, __FILE__, __LINE__);
248    if (rwl->w_active || rwl->r_active > 0) {
249       rwl->w_wait++;                  /* indicate that we are waiting */
250       pthread_cleanup_push(devlock_write_release, (void *)rwl);
251       while (rwl->w_active || rwl->r_active > 0) {
252          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
253             lmgr_do_unlock(rwl);
254             break;                    /* error, bail out */
255          }
256       }
257       pthread_cleanup_pop(0);
258       rwl->w_wait--;                  /* we are no longer waiting */
259    }
260    if (stat == 0) {
261       rwl->w_active++;                /* we are running */
262       rwl->writer_id = pthread_self(); /* save writer thread's id */
263       lmgr_post_lock();
264    }
265    rwl->reason = areason;
266    rwl->can_take = acan_take;
267    pthread_mutex_unlock(&rwl->mutex);
268    return stat;
269 }
270
271 /*
272  * Attempt to lock for write access, don't wait
273  */
274 int devlock::writetrylock()
275 {
276    devlock *rwl = this;
277    int stat, stat2;
278
279    if (rwl->valid != DEVLOCK_VALID) {
280       return EINVAL;
281    }
282    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
283       return stat;
284    }
285    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
286       rwl->w_active++;
287       pthread_mutex_unlock(&rwl->mutex);
288       return 0;
289    }
290    if (rwl->w_active || rwl->r_active > 0) {
291       stat = EBUSY;
292    } else {
293       rwl->w_active = 1;              /* we are running */
294       rwl->writer_id = pthread_self(); /* save writer thread's id */
295       lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
296    }
297    stat2 = pthread_mutex_unlock(&rwl->mutex);
298    return (stat == 0 ? stat2 : stat);
299 }
300
301 /*
302  * Unlock write lock
303  *  Start any waiting writers in preference to waiting readers
304  */
305 int devlock::writeunlock()
306 {
307    devlock *rwl = this;
308    int stat, stat2;
309
310    if (rwl->valid != DEVLOCK_VALID) {
311       return EINVAL;
312    }
313    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
314       return stat;
315    }
316    if (rwl->w_active <= 0) {
317       pthread_mutex_unlock(&rwl->mutex);
318       Jmsg0(NULL, M_ABORT, 0, _("writeunlock called too many times.\n"));
319    }
320    rwl->w_active--;
321    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
322       pthread_mutex_unlock(&rwl->mutex);
323       Jmsg0(NULL, M_ABORT, 0, _("writeunlock by non-owner.\n"));
324    }
325    if (rwl->w_active > 0) {
326       stat = 0;                       /* writers still active */
327    } else {
328       lmgr_do_unlock(rwl);
329       /* No more writers, awaken someone */
330       if (rwl->r_wait > 0) {         /* if readers waiting */
331          stat = pthread_cond_broadcast(&rwl->read);
332       } else if (rwl->w_wait > 0) {
333          stat = pthread_cond_broadcast(&rwl->write);
334       }
335    }
336    stat2 = pthread_mutex_unlock(&rwl->mutex);
337    return (stat == 0 ? stat2 : stat);
338 }
339
340 int devlock::take_lock(take_lock_t *hold, int areason)
341 {
342    int stat;
343
344    if (valid != DEVLOCK_VALID) {
345       return EINVAL;
346    }
347    if ((stat = pthread_mutex_lock(&mutex)) != 0) {
348       return stat;
349    }
350    hold->reason = reason;
351    hold->prev_reason = prev_reason;
352    hold->writer_id = writer_id;
353    reason = areason;
354    writer_id = pthread_self();
355    stat = pthread_mutex_unlock(&mutex);
356    return stat;
357 }
358
359 int devlock::return_lock(take_lock_t *hold)
360 {
361    int stat, stat2;
362
363    if (valid != DEVLOCK_VALID) {
364       return EINVAL;
365    }
366    if ((stat = pthread_mutex_lock(&mutex)) != 0) {
367       return stat;
368    }
369    reason = hold->reason;
370    prev_reason = hold->prev_reason;
371    writer_id = hold->writer_id;
372    writer_id = pthread_self();
373    stat2 = pthread_mutex_unlock(&mutex);
374    if (w_active || w_wait) {
375       stat = pthread_cond_broadcast(&write);
376    }
377    return (stat == 0 ? stat2 : stat);
378
379 }
380
381 #ifdef TEST_RWLOCK
382
383 #define THREADS     300
384 #define DATASIZE   15
385 #define ITERATIONS 1000000
386
387 /*
388  * Keep statics for each thread.
389  */
390 typedef struct thread_tag {
391    int thread_num;
392    pthread_t thread_id;
393    int writes;
394    int reads;
395    int interval;
396 } thread_t;
397
398 /*
399  * Read/write lock and shared data.
400  */
401 typedef struct data_tag {
402    brwlock_t lock;
403    int data;
404    int writes;
405 } data_t;
406
407 static thread_t threads[THREADS];
408 static data_t data[DATASIZE];
409
410 /*
411  * Thread start routine that uses read/write locks.
412  */
413 void *thread_routine(void *arg)
414 {
415    thread_t *self = (thread_t *)arg;
416    int repeats = 0;
417    int iteration;
418    int element = 0;
419    int status;
420
421    for (iteration=0; iteration < ITERATIONS; iteration++) {
422       /*
423        * Each "self->interval" iterations, perform an
424        * update operation (write lock instead of read
425        * lock).
426        */
427 //      if ((iteration % self->interval) == 0) {
428          status = writelock(&data[element].lock);
429          if (status != 0) {
430             berrno be;
431             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
432             exit(1);
433          }
434          data[element].data = self->thread_num;
435          data[element].writes++;
436          self->writes++;
437          status = writelock(&data[element].lock);
438          if (status != 0) {
439             berrno be;
440             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
441             exit(1);
442          }
443          data[element].data = self->thread_num;
444          data[element].writes++;
445          self->writes++;
446          status = writeunlock(&data[element].lock);
447          if (status != 0) {
448             berrno be;
449             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
450             exit(1);
451          }
452          status = writeunlock(&data[element].lock);
453          if (status != 0) {
454             berrno be;
455             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
456             exit(1);
457          }
458
459 #ifdef xxx
460       } else {
461          /*
462           * Look at the current data element to see whether
463           * the current thread last updated it. Count the
464           * times to report later.
465           */
466           status = readlock(&data[element].lock);
467           if (status != 0) {
468              berrno be;
469              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
470              exit(1);
471           }
472           self->reads++;
473           if (data[element].data == self->thread_num)
474              repeats++;
475           status = readunlock(&data[element].lock);
476           if (status != 0) {
477              berrno be;
478              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
479              exit(1);
480           }
481       }
482 #endif
483       element++;
484       if (element >= DATASIZE) {
485          element = 0;
486       }
487    }
488    if (repeats > 0) {
489       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
490          self->thread_num, repeats);
491    }
492    return NULL;
493 }
494
495 int main (int argc, char *argv[])
496 {
497     int count;
498     int data_count;
499     int status;
500     unsigned int seed = 1;
501     int thread_writes = 0;
502     int data_writes = 0;
503
504 #ifdef USE_THR_SETCONCURRENCY
505     /*
506      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
507      * that our threads can run concurrently, we need to
508      * increase the concurrency level to THREADS.
509      */
510     thr_setconcurrency (THREADS);
511 #endif
512
513     /*
514      * Initialize the shared data.
515      */
516     for (data_count = 0; data_count < DATASIZE; data_count++) {
517         data[data_count].data = 0;
518         data[data_count].writes = 0;
519         status = rwl_init(&data[data_count].lock);
520         if (status != 0) {
521            berrno be;
522            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
523            exit(1);
524         }
525     }
526
527     /*
528      * Create THREADS threads to access shared data.
529      */
530     for (count = 0; count < THREADS; count++) {
531         threads[count].thread_num = count + 1;
532         threads[count].writes = 0;
533         threads[count].reads = 0;
534         threads[count].interval = rand_r(&seed) % 71;
535         if (threads[count].interval <= 0) {
536            threads[count].interval = 1;
537         }
538         status = pthread_create (&threads[count].thread_id,
539             NULL, thread_routine, (void*)&threads[count]);
540         if (status != 0 || (int)threads[count].thread_id == 0) {
541            berrno be;
542            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
543            exit(1);
544         }
545     }
546
547     /*
548      * Wait for all threads to complete, and collect
549      * statistics.
550      */
551     for (count = 0; count < THREADS; count++) {
552         status = pthread_join (threads[count].thread_id, NULL);
553         if (status != 0) {
554            berrno be;
555            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
556            exit(1);
557         }
558         thread_writes += threads[count].writes;
559         printf (_("%02d: interval %d, writes %d, reads %d\n"),
560             count, threads[count].interval,
561             threads[count].writes, threads[count].reads);
562     }
563
564     /*
565      * Collect statistics for the data.
566      */
567     for (data_count = 0; data_count < DATASIZE; data_count++) {
568         data_writes += data[data_count].writes;
569         printf (_("data %02d: value %d, %d writes\n"),
570             data_count, data[data_count].data, data[data_count].writes);
571         rwl_destroy (&data[data_count].lock);
572     }
573
574     printf (_("Total: %d thread writes, %d data writes\n"),
575         thread_writes, data_writes);
576     return 0;
577 }
578
579 #endif
580
581 #ifdef TEST_RW_TRY_LOCK
582 /*
583  * brwlock_try_main.c
584  *
585  * Demonstrate use of non-blocking read-write locks.
586  *
587  * Special notes: On older Solaris system, call thr_setconcurrency()
588  * to allow interleaved thread execution, since threads are not
589  * timesliced.
590  */
591 #include <pthread.h>
592 #include "rwlock.h"
593 #include "errors.h"
594
595 #define THREADS         5
596 #define ITERATIONS      1000
597 #define DATASIZE        15
598
599 /*
600  * Keep statistics for each thread.
601  */
602 typedef struct thread_tag {
603     int         thread_num;
604     pthread_t   thread_id;
605     int         r_collisions;
606     int         w_collisions;
607     int         updates;
608     int         interval;
609 } thread_t;
610
611 /*
612  * Read-write lock and shared data
613  */
614 typedef struct data_tag {
615     brwlock_t    lock;
616     int         data;
617     int         updates;
618 } data_t;
619
620 thread_t threads[THREADS];
621 data_t data[DATASIZE];
622
623 /*
624  * Thread start routine that uses read-write locks
625  */
626 void *thread_routine (void *arg)
627 {
628     thread_t *self = (thread_t*)arg;
629     int iteration;
630     int element;
631     int status;
632     lmgr_init_thread();
633     element = 0;                        /* Current data element */
634
635     for (iteration = 0; iteration < ITERATIONS; iteration++) {
636         if ((iteration % self->interval) == 0) {
637             status = rwl_writetrylock (&data[element].lock);
638             if (status == EBUSY)
639                 self->w_collisions++;
640             else if (status == 0) {
641                 data[element].data++;
642                 data[element].updates++;
643                 self->updates++;
644                 rwl_writeunlock (&data[element].lock);
645             } else
646                 err_abort (status, _("Try write lock"));
647         } else {
648             status = rwl_readtrylock (&data[element].lock);
649             if (status == EBUSY)
650                 self->r_collisions++;
651             else if (status != 0) {
652                 err_abort (status, _("Try read lock"));
653             } else {
654                 if (data[element].data != data[element].updates)
655                     printf ("%d: data[%d] %d != %d\n",
656                         self->thread_num, element,
657                         data[element].data, data[element].updates);
658                 rwl_readunlock (&data[element].lock);
659             }
660         }
661
662         element++;
663         if (element >= DATASIZE)
664             element = 0;
665     }
666     lmgr_cleanup_thread();
667     return NULL;
668 }
669
670 int main (int argc, char *argv[])
671 {
672     int count, data_count;
673     unsigned int seed = 1;
674     int thread_updates = 0, data_updates = 0;
675     int status;
676
677 #ifdef USE_THR_SETCONCURRENCY
678     /*
679      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
680      * that our threads can run concurrently, we need to
681      * increase the concurrency level to THREADS.
682      */
683     DPRINTF (("Setting concurrency level to %d\n", THREADS));
684     thr_setconcurrency (THREADS);
685 #endif
686
687     /*
688      * Initialize the shared data.
689      */
690     for (data_count = 0; data_count < DATASIZE; data_count++) {
691         data[data_count].data = 0;
692         data[data_count].updates = 0;
693         rwl_init(&data[data_count].lock);
694     }
695
696     /*
697      * Create THREADS threads to access shared data.
698      */
699     for (count = 0; count < THREADS; count++) {
700         threads[count].thread_num = count;
701         threads[count].r_collisions = 0;
702         threads[count].w_collisions = 0;
703         threads[count].updates = 0;
704         threads[count].interval = rand_r (&seed) % ITERATIONS;
705         status = pthread_create (&threads[count].thread_id,
706             NULL, thread_routine, (void*)&threads[count]);
707         if (status != 0)
708             err_abort (status, _("Create thread"));
709     }
710
711     /*
712      * Wait for all threads to complete, and collect
713      * statistics.
714      */
715     for (count = 0; count < THREADS; count++) {
716         status = pthread_join (threads[count].thread_id, NULL);
717         if (status != 0)
718             err_abort (status, _("Join thread"));
719         thread_updates += threads[count].updates;
720         printf (_("%02d: interval %d, updates %d, "
721                 "r_collisions %d, w_collisions %d\n"),
722             count, threads[count].interval,
723             threads[count].updates,
724             threads[count].r_collisions, threads[count].w_collisions);
725     }
726
727     /*
728      * Collect statistics for the data.
729      */
730     for (data_count = 0; data_count < DATASIZE; data_count++) {
731         data_updates += data[data_count].updates;
732         printf (_("data %02d: value %d, %d updates\n"),
733             data_count, data[data_count].data, data[data_count].updates);
734         rwl_destroy (&data[data_count].lock);
735     }
736
737     return 0;
738 }
739
740 #endif