]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/devlock.c
Change copyright as per agreement with FSFE
[bacula/bacula] / bacula / src / lib / devlock.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2016 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula Thread Read/Write locking code. It permits
21  *  multiple readers but only one writer.  Note, however,
22  *  that the writer thread is permitted to make multiple
23  *  nested write lock calls.
24  *
25  *  Kern Sibbald, January MMI
26  *
27  *  This code adapted from "Programming with POSIX Threads", by
28  *    David R. Butenhof
29  *
30  */
31
32 #define LOCKMGR_COMPLIANT
33 #include "bacula.h"
34 #include "devlock.h"
35
36 /*
37  * Initialize a read/write lock
38  *
39  *  Returns: 0 on success
40  *           errno on failure
41  */
42
43 devlock *new_devlock()
44 {
45    devlock *lock;
46    lock = (devlock *)malloc(sizeof (devlock));
47    memset(lock, 0, sizeof(devlock));
48    return lock;
49 }
50
51 int devlock::init(int init_priority)
52 {
53    int stat;
54    devlock *rwl = this;
55
56    rwl->r_active = rwl->w_active = 0;
57    rwl->r_wait = rwl->w_wait = 0;
58    rwl->priority = init_priority;
59    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
60       return stat;
61    }
62    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
63       pthread_mutex_destroy(&rwl->mutex);
64       return stat;
65    }
66    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
67       pthread_cond_destroy(&rwl->read);
68       pthread_mutex_destroy(&rwl->mutex);
69       return stat;
70    }
71    rwl->valid = DEVLOCK_VALID;
72    return 0;
73 }
74
75 /*
76  * Destroy a read/write lock
77  *
78  * Returns: 0 on success
79  *          errno on failure
80  */
81 int devlock::destroy()
82 {
83    devlock *rwl = this;
84    int stat, stat1, stat2;
85
86    if (rwl->valid != DEVLOCK_VALID) {
87       return EINVAL;
88    }
89    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
90       return stat;
91    }
92
93    /*
94     * If any threads are active, report EBUSY
95     */
96    if (rwl->r_active > 0 || rwl->w_active) {
97       pthread_mutex_unlock(&rwl->mutex);
98       return EBUSY;
99    }
100
101    /*
102     * If any threads are waiting, report EBUSY
103     */
104    if (rwl->r_wait > 0 || rwl->w_wait > 0) {
105       pthread_mutex_unlock(&rwl->mutex);
106       return EBUSY;
107    }
108
109    rwl->valid = 0;
110    if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
111       return stat;
112    }
113    stat  = pthread_mutex_destroy(&rwl->mutex);
114    stat1 = pthread_cond_destroy(&rwl->read);
115    stat2 = pthread_cond_destroy(&rwl->write);
116    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
117 }
118
119 /*
120  * Handle cleanup when the read lock condition variable
121  * wait is released.
122  */
123 static void devlock_read_release(void *arg)
124 {
125    devlock *rwl = (devlock *)arg;
126    rwl->read_release();
127 }
128
129 void devlock::read_release()
130 {
131    r_wait--;
132    pthread_mutex_unlock(&mutex);
133 }
134
135 /*
136  * Handle cleanup when the write lock condition variable wait
137  * is released.
138  */
139 static void devlock_write_release(void *arg)
140 {
141    devlock *rwl = (devlock *)arg;
142    rwl->write_release();
143 }
144
145 void devlock::write_release()
146 {
147    w_wait--;
148    pthread_mutex_unlock(&mutex);
149 }
150
151 /*
152  * Lock for read access, wait until locked (or error).
153  */
154 int devlock::readlock()
155 {
156    devlock *rwl = this;
157    int stat;
158
159    if (rwl->valid != DEVLOCK_VALID) {
160       return EINVAL;
161    }
162    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
163       return stat;
164    }
165    if (rwl->w_active) {
166       rwl->r_wait++;                  /* indicate that we are waiting */
167       pthread_cleanup_push(devlock_read_release, (void *)rwl);
168       while (rwl->w_active) {
169          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
170          if (stat != 0) {
171             break;                    /* error, bail out */
172          }
173       }
174       pthread_cleanup_pop(0);
175       rwl->r_wait--;                  /* we are no longer waiting */
176    }
177    if (stat == 0) {
178       rwl->r_active++;                /* we are running */
179    }
180    pthread_mutex_unlock(&rwl->mutex);
181    return stat;
182 }
183
184 /*
185  * Attempt to lock for read access, don't wait
186  */
187 int devlock::readtrylock()
188 {
189    devlock *rwl = this;
190    int stat, stat2;
191
192    if (rwl->valid != DEVLOCK_VALID) {
193       return EINVAL;
194    }
195    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
196       return stat;
197    }
198    if (rwl->w_active) {
199       stat = EBUSY;
200    } else {
201       rwl->r_active++;                /* we are running */
202    }
203    stat2 = pthread_mutex_unlock(&rwl->mutex);
204    return (stat == 0 ? stat2 : stat);
205 }
206
207 /*
208  * Unlock read lock
209  */
210 int devlock::readunlock()
211 {
212    devlock *rwl = this;
213    int stat, stat2;
214
215    if (rwl->valid != DEVLOCK_VALID) {
216       return EINVAL;
217    }
218    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
219       return stat;
220    }
221    rwl->r_active--;
222    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
223       stat = pthread_cond_broadcast(&rwl->write);
224    }
225    stat2 = pthread_mutex_unlock(&rwl->mutex);
226    return (stat == 0 ? stat2 : stat);
227 }
228
229
230 /*
231  * Lock for write access, wait until locked (or error).
232  *   Multiple nested write locking is permitted.
233  */
234 int devlock::writelock(int areason, bool acan_take)
235 {
236    devlock *rwl = this;
237    int stat;
238
239    if (rwl->valid != DEVLOCK_VALID) {
240       return EINVAL;
241    }
242    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
243       return stat;
244    }
245    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
246       rwl->w_active++;
247       pthread_mutex_unlock(&rwl->mutex);
248       return 0;
249    }
250    lmgr_pre_lock(rwl, rwl->priority, __FILE__, __LINE__);
251    if (rwl->w_active || rwl->r_active > 0) {
252       rwl->w_wait++;                  /* indicate that we are waiting */
253       pthread_cleanup_push(devlock_write_release, (void *)rwl);
254       while (rwl->w_active || rwl->r_active > 0) {
255          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
256             lmgr_do_unlock(rwl);
257             break;                    /* error, bail out */
258          }
259       }
260       pthread_cleanup_pop(0);
261       rwl->w_wait--;                  /* we are no longer waiting */
262    }
263    if (stat == 0) {
264       rwl->w_active++;                /* we are running */
265       rwl->writer_id = pthread_self(); /* save writer thread's id */
266       lmgr_post_lock();
267    }
268    rwl->reason = areason;
269    rwl->can_take = acan_take;
270    pthread_mutex_unlock(&rwl->mutex);
271    return stat;
272 }
273
274 /*
275  * Attempt to lock for write access, don't wait
276  */
277 int devlock::writetrylock()
278 {
279    devlock *rwl = this;
280    int stat, stat2;
281
282    if (rwl->valid != DEVLOCK_VALID) {
283       return EINVAL;
284    }
285    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
286       return stat;
287    }
288    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
289       rwl->w_active++;
290       pthread_mutex_unlock(&rwl->mutex);
291       return 0;
292    }
293    if (rwl->w_active || rwl->r_active > 0) {
294       stat = EBUSY;
295    } else {
296       rwl->w_active = 1;              /* we are running */
297       rwl->writer_id = pthread_self(); /* save writer thread's id */
298       lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
299    }
300    stat2 = pthread_mutex_unlock(&rwl->mutex);
301    return (stat == 0 ? stat2 : stat);
302 }
303
304 /*
305  * Unlock write lock
306  *  Start any waiting writers in preference to waiting readers
307  */
308 int devlock::writeunlock()
309 {
310    devlock *rwl = this;
311    int stat, stat2;
312
313    if (rwl->valid != DEVLOCK_VALID) {
314       return EINVAL;
315    }
316    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
317       return stat;
318    }
319    if (rwl->w_active <= 0) {
320       pthread_mutex_unlock(&rwl->mutex);
321       Jmsg0(NULL, M_ABORT, 0, _("writeunlock called too many times.\n"));
322    }
323    rwl->w_active--;
324    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
325       pthread_mutex_unlock(&rwl->mutex);
326       Jmsg0(NULL, M_ABORT, 0, _("writeunlock by non-owner.\n"));
327    }
328    if (rwl->w_active > 0) {
329       stat = 0;                       /* writers still active */
330    } else {
331       lmgr_do_unlock(rwl);
332       /* No more writers, awaken someone */
333       if (rwl->r_wait > 0) {         /* if readers waiting */
334          stat = pthread_cond_broadcast(&rwl->read);
335       } else if (rwl->w_wait > 0) {
336          stat = pthread_cond_broadcast(&rwl->write);
337       }
338    }
339    stat2 = pthread_mutex_unlock(&rwl->mutex);
340    return (stat == 0 ? stat2 : stat);
341 }
342
343 int devlock::take_lock(take_lock_t *hold, int areason)
344 {
345    int stat;
346
347    if (valid != DEVLOCK_VALID) {
348       return EINVAL;
349    }
350    if ((stat = pthread_mutex_lock(&mutex)) != 0) {
351       return stat;
352    }
353    hold->reason = reason;
354    hold->prev_reason = prev_reason;
355    hold->writer_id = writer_id;
356    reason = areason;
357    writer_id = pthread_self();
358    stat = pthread_mutex_unlock(&mutex);
359    return stat;
360 }
361
362 int devlock::return_lock(take_lock_t *hold)
363 {
364    int stat, stat2;
365
366    if (valid != DEVLOCK_VALID) {
367       return EINVAL;
368    }
369    if ((stat = pthread_mutex_lock(&mutex)) != 0) {
370       return stat;
371    }
372    reason = hold->reason;
373    prev_reason = hold->prev_reason;
374    writer_id = hold->writer_id;
375    writer_id = pthread_self();
376    stat2 = pthread_mutex_unlock(&mutex);
377    if (w_active || w_wait) {
378       stat = pthread_cond_broadcast(&write);
379    }
380    return (stat == 0 ? stat2 : stat);
381
382 }
383
384 #ifdef TEST_RWLOCK
385
386 #define THREADS     300
387 #define DATASIZE   15
388 #define ITERATIONS 1000000
389
390 /*
391  * Keep statics for each thread.
392  */
393 typedef struct thread_tag {
394    int thread_num;
395    pthread_t thread_id;
396    int writes;
397    int reads;
398    int interval;
399 } thread_t;
400
401 /*
402  * Read/write lock and shared data.
403  */
404 typedef struct data_tag {
405    brwlock_t lock;
406    int data;
407    int writes;
408 } data_t;
409
410 static thread_t threads[THREADS];
411 static data_t data[DATASIZE];
412
413 /*
414  * Thread start routine that uses read/write locks.
415  */
416 void *thread_routine(void *arg)
417 {
418    thread_t *self = (thread_t *)arg;
419    int repeats = 0;
420    int iteration;
421    int element = 0;
422    int status;
423
424    for (iteration=0; iteration < ITERATIONS; iteration++) {
425       /*
426        * Each "self->interval" iterations, perform an
427        * update operation (write lock instead of read
428        * lock).
429        */
430 //      if ((iteration % self->interval) == 0) {
431          status = writelock(&data[element].lock);
432          if (status != 0) {
433             berrno be;
434             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
435             exit(1);
436          }
437          data[element].data = self->thread_num;
438          data[element].writes++;
439          self->writes++;
440          status = writelock(&data[element].lock);
441          if (status != 0) {
442             berrno be;
443             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
444             exit(1);
445          }
446          data[element].data = self->thread_num;
447          data[element].writes++;
448          self->writes++;
449          status = writeunlock(&data[element].lock);
450          if (status != 0) {
451             berrno be;
452             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
453             exit(1);
454          }
455          status = writeunlock(&data[element].lock);
456          if (status != 0) {
457             berrno be;
458             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
459             exit(1);
460          }
461
462 #ifdef xxx
463       } else {
464          /*
465           * Look at the current data element to see whether
466           * the current thread last updated it. Count the
467           * times to report later.
468           */
469           status = readlock(&data[element].lock);
470           if (status != 0) {
471              berrno be;
472              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
473              exit(1);
474           }
475           self->reads++;
476           if (data[element].data == self->thread_num)
477              repeats++;
478           status = readunlock(&data[element].lock);
479           if (status != 0) {
480              berrno be;
481              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
482              exit(1);
483           }
484       }
485 #endif
486       element++;
487       if (element >= DATASIZE) {
488          element = 0;
489       }
490    }
491    if (repeats > 0) {
492       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
493          self->thread_num, repeats);
494    }
495    return NULL;
496 }
497
498 int main (int argc, char *argv[])
499 {
500     int count;
501     int data_count;
502     int status;
503     unsigned int seed = 1;
504     int thread_writes = 0;
505     int data_writes = 0;
506
507     /*
508      * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
509      *  To ensure our threads can run concurrently, we specifically
510      *  set the concurrency level to THREADS.
511      */
512     thr_setconcurrency(THREADS); /* Turned on only for Solaris */
513
514     /*
515      * Initialize the shared data.
516      */
517     for (data_count = 0; data_count < DATASIZE; data_count++) {
518         data[data_count].data = 0;
519         data[data_count].writes = 0;
520         status = rwl_init(&data[data_count].lock);
521         if (status != 0) {
522            berrno be;
523            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
524            exit(1);
525         }
526     }
527
528     /*
529      * Create THREADS threads to access shared data.
530      */
531     for (count = 0; count < THREADS; count++) {
532         threads[count].thread_num = count + 1;
533         threads[count].writes = 0;
534         threads[count].reads = 0;
535         threads[count].interval = rand_r(&seed) % 71;
536         if (threads[count].interval <= 0) {
537            threads[count].interval = 1;
538         }
539         status = pthread_create (&threads[count].thread_id,
540             NULL, thread_routine, (void*)&threads[count]);
541         if (status != 0 || (int)threads[count].thread_id == 0) {
542            berrno be;
543            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
544            exit(1);
545         }
546     }
547
548     /*
549      * Wait for all threads to complete, and collect
550      * statistics.
551      */
552     for (count = 0; count < THREADS; count++) {
553         status = pthread_join (threads[count].thread_id, NULL);
554         if (status != 0) {
555            berrno be;
556            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
557            exit(1);
558         }
559         thread_writes += threads[count].writes;
560         printf (_("%02d: interval %d, writes %d, reads %d\n"),
561             count, threads[count].interval,
562             threads[count].writes, threads[count].reads);
563     }
564
565     /*
566      * Collect statistics for the data.
567      */
568     for (data_count = 0; data_count < DATASIZE; data_count++) {
569         data_writes += data[data_count].writes;
570         printf (_("data %02d: value %d, %d writes\n"),
571             data_count, data[data_count].data, data[data_count].writes);
572         rwl_destroy (&data[data_count].lock);
573     }
574
575     printf (_("Total: %d thread writes, %d data writes\n"),
576         thread_writes, data_writes);
577     return 0;
578 }
579
580 #endif
581
582 #ifdef TEST_RW_TRY_LOCK
583 /*
584  * brwlock_try_main.c
585  *
586  * Demonstrate use of non-blocking read-write locks.
587  *
588  * On older Solaris system, call thr_setconcurrency()
589  * to allow interleaved thread execution, since threads are not
590  * timesliced.
591  */
592 #include <pthread.h>
593 #include "rwlock.h"
594 #include "errors.h"
595
596 #define THREADS         5
597 #define ITERATIONS      1000
598 #define DATASIZE        15
599
600 /*
601  * Keep statistics for each thread.
602  */
603 typedef struct thread_tag {
604     int         thread_num;
605     pthread_t   thread_id;
606     int         r_collisions;
607     int         w_collisions;
608     int         updates;
609     int         interval;
610 } thread_t;
611
612 /*
613  * Read-write lock and shared data
614  */
615 typedef struct data_tag {
616     brwlock_t    lock;
617     int         data;
618     int         updates;
619 } data_t;
620
621 thread_t threads[THREADS];
622 data_t data[DATASIZE];
623
624 /*
625  * Thread start routine that uses read-write locks
626  */
627 void *thread_routine (void *arg)
628 {
629     thread_t *self = (thread_t*)arg;
630     int iteration;
631     int element;
632     int status;
633     lmgr_init_thread();
634     element = 0;                        /* Current data element */
635
636     for (iteration = 0; iteration < ITERATIONS; iteration++) {
637         if ((iteration % self->interval) == 0) {
638             status = rwl_writetrylock (&data[element].lock);
639             if (status == EBUSY)
640                 self->w_collisions++;
641             else if (status == 0) {
642                 data[element].data++;
643                 data[element].updates++;
644                 self->updates++;
645                 rwl_writeunlock (&data[element].lock);
646             } else
647                 err_abort (status, _("Try write lock"));
648         } else {
649             status = rwl_readtrylock (&data[element].lock);
650             if (status == EBUSY)
651                 self->r_collisions++;
652             else if (status != 0) {
653                 err_abort (status, _("Try read lock"));
654             } else {
655                 if (data[element].data != data[element].updates)
656                     printf ("%d: data[%d] %d != %d\n",
657                         self->thread_num, element,
658                         data[element].data, data[element].updates);
659                 rwl_readunlock (&data[element].lock);
660             }
661         }
662
663         element++;
664         if (element >= DATASIZE)
665             element = 0;
666     }
667     lmgr_cleanup_thread();
668     return NULL;
669 }
670
671 int main (int argc, char *argv[])
672 {
673     int count, data_count;
674     unsigned int seed = 1;
675     int thread_updates = 0, data_updates = 0;
676     int status;
677
678     /*
679      * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
680      *  To ensure our threads can run concurrently, we specifically
681      *  set the concurrency level to THREADS.
682      */
683     DPRINTF (("Setting concurrency level to %d\n", THREADS));
684     thr_setconcurrency(THREADS); /* Turned on only for Solaris */
685
686     /*
687      * Initialize the shared data.
688      */
689     for (data_count = 0; data_count < DATASIZE; data_count++) {
690         data[data_count].data = 0;
691         data[data_count].updates = 0;
692         rwl_init(&data[data_count].lock);
693     }
694
695     /*
696      * Create THREADS threads to access shared data.
697      */
698     for (count = 0; count < THREADS; count++) {
699         threads[count].thread_num = count;
700         threads[count].r_collisions = 0;
701         threads[count].w_collisions = 0;
702         threads[count].updates = 0;
703         threads[count].interval = rand_r (&seed) % ITERATIONS;
704         status = pthread_create (&threads[count].thread_id,
705             NULL, thread_routine, (void*)&threads[count]);
706         if (status != 0)
707             err_abort (status, _("Create thread"));
708     }
709
710     /*
711      * Wait for all threads to complete, and collect
712      * statistics.
713      */
714     for (count = 0; count < THREADS; count++) {
715         status = pthread_join (threads[count].thread_id, NULL);
716         if (status != 0)
717             err_abort (status, _("Join thread"));
718         thread_updates += threads[count].updates;
719         printf (_("%02d: interval %d, updates %d, "
720                 "r_collisions %d, w_collisions %d\n"),
721             count, threads[count].interval,
722             threads[count].updates,
723             threads[count].r_collisions, threads[count].w_collisions);
724     }
725
726     /*
727      * Collect statistics for the data.
728      */
729     for (data_count = 0; data_count < DATASIZE; data_count++) {
730         data_updates += data[data_count].updates;
731         printf (_("data %02d: value %d, %d updates\n"),
732             data_count, data[data_count].data, data[data_count].updates);
733         rwl_destroy (&data[data_count].lock);
734     }
735
736     return 0;
737 }
738
739 #endif