]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/devlock.c
Backport from Bacula Enterprise
[bacula/bacula] / bacula / src / lib / devlock.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5    Copyright (C) 2001-2014 Free Software Foundation Europe e.V.
6
7    The original author of Bacula is Kern Sibbald, with contributions
8    from many others, a complete list can be found in the file AUTHORS.
9
10    You may use this file and others of this release according to the
11    license defined in the LICENSE file, which includes the Affero General
12    Public License, v3.0 ("AGPLv3") and some additional permissions and
13    terms pursuant to its AGPLv3 Section 7.
14
15    This notice must be preserved when any source code is 
16    conveyed and/or propagated.
17
18    Bacula(R) is a registered trademark of Kern Sibbald.
19 */
20 /*
21  * Bacula Thread Read/Write locking code. It permits
22  *  multiple readers but only one writer.  Note, however,
23  *  that the writer thread is permitted to make multiple
24  *  nested write lock calls.
25  *
26  *  Kern Sibbald, January MMI
27  *
28  *  This code adapted from "Programming with POSIX Threads", by
29  *    David R. Butenhof
30  *
31  */
32
33 #define LOCKMGR_COMPLIANT
34 #include "bacula.h"
35 #include "devlock.h"
36
37 /*
38  * Initialize a read/write lock
39  *
40  *  Returns: 0 on success
41  *           errno on failure
42  */
43
44 devlock *new_devlock()
45 {
46    devlock *lock;
47    lock = (devlock *)malloc(sizeof (devlock));
48    memset(lock, 0, sizeof(devlock));
49    return lock;
50 }
51
52 int devlock::init(int init_priority)
53 {
54    int stat;
55    devlock *rwl = this;
56
57    rwl->r_active = rwl->w_active = 0;
58    rwl->r_wait = rwl->w_wait = 0;
59    rwl->priority = init_priority;
60    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
61       return stat;
62    }
63    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
64       pthread_mutex_destroy(&rwl->mutex);
65       return stat;
66    }
67    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
68       pthread_cond_destroy(&rwl->read);
69       pthread_mutex_destroy(&rwl->mutex);
70       return stat;
71    }
72    rwl->valid = DEVLOCK_VALID;
73    return 0;
74 }
75
76 /*
77  * Destroy a read/write lock
78  *
79  * Returns: 0 on success
80  *          errno on failure
81  */
82 int devlock::destroy()
83 {
84    devlock *rwl = this;
85    int stat, stat1, stat2;
86
87    if (rwl->valid != DEVLOCK_VALID) {
88       return EINVAL;
89    }
90    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
91       return stat;
92    }
93
94    /*
95     * If any threads are active, report EBUSY
96     */
97    if (rwl->r_active > 0 || rwl->w_active) {
98       pthread_mutex_unlock(&rwl->mutex);
99       return EBUSY;
100    }
101
102    /*
103     * If any threads are waiting, report EBUSY
104     */
105    if (rwl->r_wait > 0 || rwl->w_wait > 0) {
106       pthread_mutex_unlock(&rwl->mutex);
107       return EBUSY;
108    }
109
110    rwl->valid = 0;
111    if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
112       return stat;
113    }
114    stat  = pthread_mutex_destroy(&rwl->mutex);
115    stat1 = pthread_cond_destroy(&rwl->read);
116    stat2 = pthread_cond_destroy(&rwl->write);
117    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
118 }
119
120 /*
121  * Handle cleanup when the read lock condition variable
122  * wait is released.
123  */
124 static void devlock_read_release(void *arg)
125 {
126    devlock *rwl = (devlock *)arg;
127    rwl->read_release();
128 }
129
130 void devlock::read_release()
131 {
132    r_wait--;
133    pthread_mutex_unlock(&mutex);
134 }
135
136 /*
137  * Handle cleanup when the write lock condition variable wait
138  * is released.
139  */
140 static void devlock_write_release(void *arg)
141 {
142    devlock *rwl = (devlock *)arg;
143    rwl->write_release();
144 }
145
146 void devlock::write_release()
147 {
148    w_wait--;
149    pthread_mutex_unlock(&mutex);
150 }
151
152 /*
153  * Lock for read access, wait until locked (or error).
154  */
155 int devlock::readlock()
156 {
157    devlock *rwl = this;
158    int stat;
159
160    if (rwl->valid != DEVLOCK_VALID) {
161       return EINVAL;
162    }
163    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
164       return stat;
165    }
166    if (rwl->w_active) {
167       rwl->r_wait++;                  /* indicate that we are waiting */
168       pthread_cleanup_push(devlock_read_release, (void *)rwl);
169       while (rwl->w_active) {
170          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
171          if (stat != 0) {
172             break;                    /* error, bail out */
173          }
174       }
175       pthread_cleanup_pop(0);
176       rwl->r_wait--;                  /* we are no longer waiting */
177    }
178    if (stat == 0) {
179       rwl->r_active++;                /* we are running */
180    }
181    pthread_mutex_unlock(&rwl->mutex);
182    return stat;
183 }
184
185 /*
186  * Attempt to lock for read access, don't wait
187  */
188 int devlock::readtrylock()
189 {
190    devlock *rwl = this;
191    int stat, stat2;
192
193    if (rwl->valid != DEVLOCK_VALID) {
194       return EINVAL;
195    }
196    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
197       return stat;
198    }
199    if (rwl->w_active) {
200       stat = EBUSY;
201    } else {
202       rwl->r_active++;                /* we are running */
203    }
204    stat2 = pthread_mutex_unlock(&rwl->mutex);
205    return (stat == 0 ? stat2 : stat);
206 }
207
208 /*
209  * Unlock read lock
210  */
211 int devlock::readunlock()
212 {
213    devlock *rwl = this;
214    int stat, stat2;
215
216    if (rwl->valid != DEVLOCK_VALID) {
217       return EINVAL;
218    }
219    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
220       return stat;
221    }
222    rwl->r_active--;
223    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
224       stat = pthread_cond_broadcast(&rwl->write);
225    }
226    stat2 = pthread_mutex_unlock(&rwl->mutex);
227    return (stat == 0 ? stat2 : stat);
228 }
229
230
231 /*
232  * Lock for write access, wait until locked (or error).
233  *   Multiple nested write locking is permitted.
234  */
235 int devlock::writelock(int areason, bool acan_take)
236 {
237    devlock *rwl = this;
238    int stat;
239
240    if (rwl->valid != DEVLOCK_VALID) {
241       return EINVAL;
242    }
243    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
244       return stat;
245    }
246    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
247       rwl->w_active++;
248       pthread_mutex_unlock(&rwl->mutex);
249       return 0;
250    }
251    lmgr_pre_lock(rwl, rwl->priority, __FILE__, __LINE__);
252    if (rwl->w_active || rwl->r_active > 0) {
253       rwl->w_wait++;                  /* indicate that we are waiting */
254       pthread_cleanup_push(devlock_write_release, (void *)rwl);
255       while (rwl->w_active || rwl->r_active > 0) {
256          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
257             lmgr_do_unlock(rwl);
258             break;                    /* error, bail out */
259          }
260       }
261       pthread_cleanup_pop(0);
262       rwl->w_wait--;                  /* we are no longer waiting */
263    }
264    if (stat == 0) {
265       rwl->w_active++;                /* we are running */
266       rwl->writer_id = pthread_self(); /* save writer thread's id */
267       lmgr_post_lock();
268    }
269    rwl->reason = areason;
270    rwl->can_take = acan_take;
271    pthread_mutex_unlock(&rwl->mutex);
272    return stat;
273 }
274
275 /*
276  * Attempt to lock for write access, don't wait
277  */
278 int devlock::writetrylock()
279 {
280    devlock *rwl = this;
281    int stat, stat2;
282
283    if (rwl->valid != DEVLOCK_VALID) {
284       return EINVAL;
285    }
286    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
287       return stat;
288    }
289    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
290       rwl->w_active++;
291       pthread_mutex_unlock(&rwl->mutex);
292       return 0;
293    }
294    if (rwl->w_active || rwl->r_active > 0) {
295       stat = EBUSY;
296    } else {
297       rwl->w_active = 1;              /* we are running */
298       rwl->writer_id = pthread_self(); /* save writer thread's id */
299       lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
300    }
301    stat2 = pthread_mutex_unlock(&rwl->mutex);
302    return (stat == 0 ? stat2 : stat);
303 }
304
305 /*
306  * Unlock write lock
307  *  Start any waiting writers in preference to waiting readers
308  */
309 int devlock::writeunlock()
310 {
311    devlock *rwl = this;
312    int stat, stat2;
313
314    if (rwl->valid != DEVLOCK_VALID) {
315       return EINVAL;
316    }
317    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
318       return stat;
319    }
320    if (rwl->w_active <= 0) {
321       pthread_mutex_unlock(&rwl->mutex);
322       Jmsg0(NULL, M_ABORT, 0, _("writeunlock called too many times.\n"));
323    }
324    rwl->w_active--;
325    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
326       pthread_mutex_unlock(&rwl->mutex);
327       Jmsg0(NULL, M_ABORT, 0, _("writeunlock by non-owner.\n"));
328    }
329    if (rwl->w_active > 0) {
330       stat = 0;                       /* writers still active */
331    } else {
332       lmgr_do_unlock(rwl);
333       /* No more writers, awaken someone */
334       if (rwl->r_wait > 0) {         /* if readers waiting */
335          stat = pthread_cond_broadcast(&rwl->read);
336       } else if (rwl->w_wait > 0) {
337          stat = pthread_cond_broadcast(&rwl->write);
338       }
339    }
340    stat2 = pthread_mutex_unlock(&rwl->mutex);
341    return (stat == 0 ? stat2 : stat);
342 }
343
344 int devlock::take_lock(take_lock_t *hold, int areason)
345 {
346    int stat;
347
348    if (valid != DEVLOCK_VALID) {
349       return EINVAL;
350    }
351    if ((stat = pthread_mutex_lock(&mutex)) != 0) {
352       return stat;
353    }
354    hold->reason = reason;
355    hold->prev_reason = prev_reason;
356    hold->writer_id = writer_id;
357    reason = areason;
358    writer_id = pthread_self();
359    stat = pthread_mutex_unlock(&mutex);
360    return stat;
361 }
362
363 int devlock::return_lock(take_lock_t *hold)
364 {
365    int stat, stat2;
366
367    if (valid != DEVLOCK_VALID) {
368       return EINVAL;
369    }
370    if ((stat = pthread_mutex_lock(&mutex)) != 0) {
371       return stat;
372    }
373    reason = hold->reason;
374    prev_reason = hold->prev_reason;
375    writer_id = hold->writer_id;
376    writer_id = pthread_self();
377    stat2 = pthread_mutex_unlock(&mutex);
378    if (w_active || w_wait) {
379       stat = pthread_cond_broadcast(&write);
380    }
381    return (stat == 0 ? stat2 : stat);
382
383 }
384
385 #ifdef TEST_RWLOCK
386
387 #define THREADS     300
388 #define DATASIZE   15
389 #define ITERATIONS 1000000
390
391 /*
392  * Keep statics for each thread.
393  */
394 typedef struct thread_tag {
395    int thread_num;
396    pthread_t thread_id;
397    int writes;
398    int reads;
399    int interval;
400 } thread_t;
401
402 /*
403  * Read/write lock and shared data.
404  */
405 typedef struct data_tag {
406    brwlock_t lock;
407    int data;
408    int writes;
409 } data_t;
410
411 static thread_t threads[THREADS];
412 static data_t data[DATASIZE];
413
414 /*
415  * Thread start routine that uses read/write locks.
416  */
417 void *thread_routine(void *arg)
418 {
419    thread_t *self = (thread_t *)arg;
420    int repeats = 0;
421    int iteration;
422    int element = 0;
423    int status;
424
425    for (iteration=0; iteration < ITERATIONS; iteration++) {
426       /*
427        * Each "self->interval" iterations, perform an
428        * update operation (write lock instead of read
429        * lock).
430        */
431 //      if ((iteration % self->interval) == 0) {
432          status = writelock(&data[element].lock);
433          if (status != 0) {
434             berrno be;
435             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
436             exit(1);
437          }
438          data[element].data = self->thread_num;
439          data[element].writes++;
440          self->writes++;
441          status = writelock(&data[element].lock);
442          if (status != 0) {
443             berrno be;
444             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
445             exit(1);
446          }
447          data[element].data = self->thread_num;
448          data[element].writes++;
449          self->writes++;
450          status = writeunlock(&data[element].lock);
451          if (status != 0) {
452             berrno be;
453             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
454             exit(1);
455          }
456          status = writeunlock(&data[element].lock);
457          if (status != 0) {
458             berrno be;
459             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
460             exit(1);
461          }
462
463 #ifdef xxx
464       } else {
465          /*
466           * Look at the current data element to see whether
467           * the current thread last updated it. Count the
468           * times to report later.
469           */
470           status = readlock(&data[element].lock);
471           if (status != 0) {
472              berrno be;
473              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
474              exit(1);
475           }
476           self->reads++;
477           if (data[element].data == self->thread_num)
478              repeats++;
479           status = readunlock(&data[element].lock);
480           if (status != 0) {
481              berrno be;
482              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
483              exit(1);
484           }
485       }
486 #endif
487       element++;
488       if (element >= DATASIZE) {
489          element = 0;
490       }
491    }
492    if (repeats > 0) {
493       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
494          self->thread_num, repeats);
495    }
496    return NULL;
497 }
498
499 int main (int argc, char *argv[])
500 {
501     int count;
502     int data_count;
503     int status;
504     unsigned int seed = 1;
505     int thread_writes = 0;
506     int data_writes = 0;
507
508     /*
509      * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
510      *  To ensure our threads can run concurrently, we specifically
511      *  set the concurrency level to THREADS.
512      */
513     thr_setconcurrency(THREADS); /* Turned on only for Solaris */
514
515     /*
516      * Initialize the shared data.
517      */
518     for (data_count = 0; data_count < DATASIZE; data_count++) {
519         data[data_count].data = 0;
520         data[data_count].writes = 0;
521         status = rwl_init(&data[data_count].lock);
522         if (status != 0) {
523            berrno be;
524            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
525            exit(1);
526         }
527     }
528
529     /*
530      * Create THREADS threads to access shared data.
531      */
532     for (count = 0; count < THREADS; count++) {
533         threads[count].thread_num = count + 1;
534         threads[count].writes = 0;
535         threads[count].reads = 0;
536         threads[count].interval = rand_r(&seed) % 71;
537         if (threads[count].interval <= 0) {
538            threads[count].interval = 1;
539         }
540         status = pthread_create (&threads[count].thread_id,
541             NULL, thread_routine, (void*)&threads[count]);
542         if (status != 0 || (int)threads[count].thread_id == 0) {
543            berrno be;
544            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
545            exit(1);
546         }
547     }
548
549     /*
550      * Wait for all threads to complete, and collect
551      * statistics.
552      */
553     for (count = 0; count < THREADS; count++) {
554         status = pthread_join (threads[count].thread_id, NULL);
555         if (status != 0) {
556            berrno be;
557            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
558            exit(1);
559         }
560         thread_writes += threads[count].writes;
561         printf (_("%02d: interval %d, writes %d, reads %d\n"),
562             count, threads[count].interval,
563             threads[count].writes, threads[count].reads);
564     }
565
566     /*
567      * Collect statistics for the data.
568      */
569     for (data_count = 0; data_count < DATASIZE; data_count++) {
570         data_writes += data[data_count].writes;
571         printf (_("data %02d: value %d, %d writes\n"),
572             data_count, data[data_count].data, data[data_count].writes);
573         rwl_destroy (&data[data_count].lock);
574     }
575
576     printf (_("Total: %d thread writes, %d data writes\n"),
577         thread_writes, data_writes);
578     return 0;
579 }
580
581 #endif
582
583 #ifdef TEST_RW_TRY_LOCK
584 /*
585  * brwlock_try_main.c
586  *
587  * Demonstrate use of non-blocking read-write locks.
588  *
589  * On older Solaris system, call thr_setconcurrency()
590  * to allow interleaved thread execution, since threads are not
591  * timesliced.
592  */
593 #include <pthread.h>
594 #include "rwlock.h"
595 #include "errors.h"
596
597 #define THREADS         5
598 #define ITERATIONS      1000
599 #define DATASIZE        15
600
601 /*
602  * Keep statistics for each thread.
603  */
604 typedef struct thread_tag {
605     int         thread_num;
606     pthread_t   thread_id;
607     int         r_collisions;
608     int         w_collisions;
609     int         updates;
610     int         interval;
611 } thread_t;
612
613 /*
614  * Read-write lock and shared data
615  */
616 typedef struct data_tag {
617     brwlock_t    lock;
618     int         data;
619     int         updates;
620 } data_t;
621
622 thread_t threads[THREADS];
623 data_t data[DATASIZE];
624
625 /*
626  * Thread start routine that uses read-write locks
627  */
628 void *thread_routine (void *arg)
629 {
630     thread_t *self = (thread_t*)arg;
631     int iteration;
632     int element;
633     int status;
634     lmgr_init_thread();
635     element = 0;                        /* Current data element */
636
637     for (iteration = 0; iteration < ITERATIONS; iteration++) {
638         if ((iteration % self->interval) == 0) {
639             status = rwl_writetrylock (&data[element].lock);
640             if (status == EBUSY)
641                 self->w_collisions++;
642             else if (status == 0) {
643                 data[element].data++;
644                 data[element].updates++;
645                 self->updates++;
646                 rwl_writeunlock (&data[element].lock);
647             } else
648                 err_abort (status, _("Try write lock"));
649         } else {
650             status = rwl_readtrylock (&data[element].lock);
651             if (status == EBUSY)
652                 self->r_collisions++;
653             else if (status != 0) {
654                 err_abort (status, _("Try read lock"));
655             } else {
656                 if (data[element].data != data[element].updates)
657                     printf ("%d: data[%d] %d != %d\n",
658                         self->thread_num, element,
659                         data[element].data, data[element].updates);
660                 rwl_readunlock (&data[element].lock);
661             }
662         }
663
664         element++;
665         if (element >= DATASIZE)
666             element = 0;
667     }
668     lmgr_cleanup_thread();
669     return NULL;
670 }
671
672 int main (int argc, char *argv[])
673 {
674     int count, data_count;
675     unsigned int seed = 1;
676     int thread_updates = 0, data_updates = 0;
677     int status;
678
679     /*
680      * For Solaris 2.5,2.6,7 and 8 threads are not timesliced.
681      *  To ensure our threads can run concurrently, we specifically
682      *  set the concurrency level to THREADS.
683      */
684     DPRINTF (("Setting concurrency level to %d\n", THREADS));
685     thr_setconcurrency(THREADS); /* Turned on only for Solaris */
686
687     /*
688      * Initialize the shared data.
689      */
690     for (data_count = 0; data_count < DATASIZE; data_count++) {
691         data[data_count].data = 0;
692         data[data_count].updates = 0;
693         rwl_init(&data[data_count].lock);
694     }
695
696     /*
697      * Create THREADS threads to access shared data.
698      */
699     for (count = 0; count < THREADS; count++) {
700         threads[count].thread_num = count;
701         threads[count].r_collisions = 0;
702         threads[count].w_collisions = 0;
703         threads[count].updates = 0;
704         threads[count].interval = rand_r (&seed) % ITERATIONS;
705         status = pthread_create (&threads[count].thread_id,
706             NULL, thread_routine, (void*)&threads[count]);
707         if (status != 0)
708             err_abort (status, _("Create thread"));
709     }
710
711     /*
712      * Wait for all threads to complete, and collect
713      * statistics.
714      */
715     for (count = 0; count < THREADS; count++) {
716         status = pthread_join (threads[count].thread_id, NULL);
717         if (status != 0)
718             err_abort (status, _("Join thread"));
719         thread_updates += threads[count].updates;
720         printf (_("%02d: interval %d, updates %d, "
721                 "r_collisions %d, w_collisions %d\n"),
722             count, threads[count].interval,
723             threads[count].updates,
724             threads[count].r_collisions, threads[count].w_collisions);
725     }
726
727     /*
728      * Collect statistics for the data.
729      */
730     for (data_count = 0; data_count < DATASIZE; data_count++) {
731         data_updates += data[data_count].updates;
732         printf (_("data %02d: value %d, %d updates\n"),
733             data_count, data[data_count].data, data[data_count].updates);
734         rwl_destroy (&data[data_count].lock);
735     }
736
737     return 0;
738 }
739
740 #endif