]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/devlock.c
Pull src/lib changes from master
[bacula/bacula] / bacula / src / lib / devlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Thread Read/Write locking code. It permits
30  *  multiple readers but only one writer.  Note, however,
31  *  that the writer thread is permitted to make multiple
32  *  nested write lock calls.
33  *
34  *  Kern Sibbald, January MMI
35  *
36  *  This code adapted from "Programming with POSIX Threads", by
37  *    David R. Butenhof
38  *
39  */
40
41 #define _LOCKMGR_COMPLIANT
42 #include "bacula.h"
43 #include "devlock.h"
44
45 /*
46  * Initialize a read/write lock
47  *
48  *  Returns: 0 on success
49  *           errno on failure
50  */
51
52 devlock *new_devlock()                          
53 {
54    devlock *lock;
55    lock = (devlock *)malloc(sizeof (devlock));
56    memset(lock, 0, sizeof(devlock));
57    return lock;
58 }
59
60 int devlock::init(int priority)                            
61 {
62    int stat;
63    devlock *rwl = this;
64
65    rwl->r_active = rwl->w_active = 0;
66    rwl->r_wait = rwl->w_wait = 0;
67    rwl->priority = priority;
68    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
69       return stat;
70    }
71    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
72       pthread_mutex_destroy(&rwl->mutex);
73       return stat;
74    }
75    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
76       pthread_cond_destroy(&rwl->read);
77       pthread_mutex_destroy(&rwl->mutex);
78       return stat;
79    }
80    rwl->valid = DEVLOCK_VALID;
81    return 0;
82 }
83
84 /*
85  * Destroy a read/write lock
86  *
87  * Returns: 0 on success
88  *          errno on failure
89  */
90 int devlock::destroy()
91 {
92    devlock *rwl = this;
93    int stat, stat1, stat2;
94
95    if (rwl->valid != DEVLOCK_VALID) {
96       return EINVAL;
97    }
98    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
99       return stat;
100    }
101
102    /*
103     * If any threads are active, report EBUSY
104     */
105    if (rwl->r_active > 0 || rwl->w_active) {
106       pthread_mutex_unlock(&rwl->mutex);
107       return EBUSY;
108    }
109
110    /*
111     * If any threads are waiting, report EBUSY
112     */
113    if (rwl->r_wait > 0 || rwl->w_wait > 0) {
114       pthread_mutex_unlock(&rwl->mutex);
115       return EBUSY;
116    }
117
118    rwl->valid = 0;
119    if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
120       return stat;
121    }
122    stat  = pthread_mutex_destroy(&rwl->mutex);
123    stat1 = pthread_cond_destroy(&rwl->read);
124    stat2 = pthread_cond_destroy(&rwl->write);
125    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
126 }
127
128 /*
129  * Handle cleanup when the read lock condition variable
130  * wait is released.
131  */
132 static void devlock_read_release(void *arg)
133 {
134    devlock *rwl = (devlock *)arg;
135    rwl->read_release();
136 }
137
138 void devlock::read_release()
139 {
140    r_wait--;
141    pthread_mutex_unlock(&mutex);
142 }
143
144 /*
145  * Handle cleanup when the write lock condition variable wait
146  * is released.
147  */
148 static void devlock_write_release(void *arg)
149 {
150    devlock *rwl = (devlock *)arg;
151    rwl->write_release();
152 }
153
154 void devlock::write_release()
155 {
156    w_wait--;
157    pthread_mutex_unlock(&mutex);
158 }
159
160 /*
161  * Lock for read access, wait until locked (or error).
162  */
163 int devlock::readlock()
164 {
165    devlock *rwl = this;
166    int stat;
167
168    if (rwl->valid != DEVLOCK_VALID) {
169       return EINVAL;
170    }
171    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
172       return stat;
173    }
174    if (rwl->w_active) {
175       rwl->r_wait++;                  /* indicate that we are waiting */
176       pthread_cleanup_push(devlock_read_release, (void *)rwl);
177       while (rwl->w_active) {
178          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
179          if (stat != 0) {
180             break;                    /* error, bail out */
181          }
182       }
183       pthread_cleanup_pop(0);
184       rwl->r_wait--;                  /* we are no longer waiting */
185    }
186    if (stat == 0) {
187       rwl->r_active++;                /* we are running */
188    }
189    pthread_mutex_unlock(&rwl->mutex);
190    return stat;
191 }
192
193 /*
194  * Attempt to lock for read access, don't wait
195  */
196 int devlock::readtrylock()
197 {
198    devlock *rwl = this;
199    int stat, stat2;
200
201    if (rwl->valid != DEVLOCK_VALID) {
202       return EINVAL;
203    }
204    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
205       return stat;
206    }
207    if (rwl->w_active) {
208       stat = EBUSY;
209    } else {
210       rwl->r_active++;                /* we are running */
211    }
212    stat2 = pthread_mutex_unlock(&rwl->mutex);
213    return (stat == 0 ? stat2 : stat);
214 }
215
216 /*
217  * Unlock read lock
218  */
219 int devlock::readunlock()
220 {
221    devlock *rwl = this;
222    int stat, stat2;
223
224    if (rwl->valid != DEVLOCK_VALID) {
225       return EINVAL;
226    }
227    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
228       return stat;
229    }
230    rwl->r_active--;
231    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
232       stat = pthread_cond_broadcast(&rwl->write);
233    }
234    stat2 = pthread_mutex_unlock(&rwl->mutex);
235    return (stat == 0 ? stat2 : stat);
236 }
237
238
239 /*
240  * Lock for write access, wait until locked (or error).
241  *   Multiple nested write locking is permitted.
242  */
243 int devlock::writelock(int areason, bool acan_take)
244 {
245    devlock *rwl = this;
246    int stat;
247
248    if (rwl->valid != DEVLOCK_VALID) {
249       return EINVAL;
250    }
251    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
252       return stat;
253    }
254    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
255       rwl->w_active++;
256       pthread_mutex_unlock(&rwl->mutex);
257       return 0;
258    }
259    lmgr_pre_lock(rwl, rwl->priority, __FILE__, __LINE__);
260    if (rwl->w_active || rwl->r_active > 0) {
261       rwl->w_wait++;                  /* indicate that we are waiting */
262       pthread_cleanup_push(devlock_write_release, (void *)rwl);
263       while (rwl->w_active || rwl->r_active > 0) {
264          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
265             lmgr_do_unlock(rwl);
266             break;                    /* error, bail out */
267          }
268       }
269       pthread_cleanup_pop(0);
270       rwl->w_wait--;                  /* we are no longer waiting */
271    }
272    if (stat == 0) {
273       rwl->w_active++;                /* we are running */
274       rwl->writer_id = pthread_self(); /* save writer thread's id */
275       lmgr_post_lock();
276    } 
277    rwl->reason = areason;
278    rwl->can_take = acan_take;
279    pthread_mutex_unlock(&rwl->mutex);
280    return stat;
281 }
282
283 /*
284  * Attempt to lock for write access, don't wait
285  */
286 int devlock::writetrylock()
287 {
288    devlock *rwl = this;
289    int stat, stat2;
290
291    if (rwl->valid != DEVLOCK_VALID) {
292       return EINVAL;
293    }
294    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
295       return stat;
296    }
297    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
298       rwl->w_active++;
299       pthread_mutex_unlock(&rwl->mutex);
300       return 0;
301    }
302    if (rwl->w_active || rwl->r_active > 0) {
303       stat = EBUSY;
304    } else {
305       rwl->w_active = 1;              /* we are running */
306       rwl->writer_id = pthread_self(); /* save writer thread's id */
307       lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
308    }
309    stat2 = pthread_mutex_unlock(&rwl->mutex);
310    return (stat == 0 ? stat2 : stat);
311 }
312
313 /*
314  * Unlock write lock
315  *  Start any waiting writers in preference to waiting readers
316  */
317 int devlock::writeunlock()
318 {
319    devlock *rwl = this;
320    int stat, stat2;
321
322    if (rwl->valid != DEVLOCK_VALID) {
323       return EINVAL;
324    }
325    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
326       return stat;
327    }
328    if (rwl->w_active <= 0) {
329       pthread_mutex_unlock(&rwl->mutex);
330       Jmsg0(NULL, M_ABORT, 0, _("writeunlock called too many times.\n"));
331    }
332    rwl->w_active--;
333    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
334       pthread_mutex_unlock(&rwl->mutex);
335       Jmsg0(NULL, M_ABORT, 0, _("writeunlock by non-owner.\n"));
336    }
337    if (rwl->w_active > 0) {
338       stat = 0;                       /* writers still active */
339    } else {
340       lmgr_do_unlock(rwl);
341       /* No more writers, awaken someone */
342       if (rwl->r_wait > 0) {         /* if readers waiting */
343          stat = pthread_cond_broadcast(&rwl->read);
344       } else if (rwl->w_wait > 0) {
345          stat = pthread_cond_broadcast(&rwl->write);
346       }
347    }
348    stat2 = pthread_mutex_unlock(&rwl->mutex);
349    return (stat == 0 ? stat2 : stat);
350 }
351
352 int devlock::take_lock(take_lock_t *hold, int areason)
353 {
354    int stat;
355
356    if (valid != DEVLOCK_VALID) {
357       return EINVAL;
358    }
359    if ((stat = pthread_mutex_lock(&mutex)) != 0) {
360       return stat;
361    }
362    hold->reason = reason;
363    hold->prev_reason = prev_reason;
364    hold->writer_id = writer_id;
365    reason = areason;
366    writer_id = pthread_self();
367    stat = pthread_mutex_unlock(&mutex);
368    return stat;
369 }
370
371 int devlock::return_lock(take_lock_t *hold)
372 {
373    int stat, stat2;
374
375    if (valid != DEVLOCK_VALID) {
376       return EINVAL;
377    }
378    if ((stat = pthread_mutex_lock(&mutex)) != 0) {
379       return stat;
380    }
381    reason = hold->reason;
382    prev_reason = hold->prev_reason;
383    writer_id = hold->writer_id;
384    writer_id = pthread_self();
385    stat2 = pthread_mutex_unlock(&mutex);
386    if (w_active || w_wait) {
387       stat = pthread_cond_broadcast(&write);
388    }
389    return (stat == 0 ? stat2 : stat);
390
391 }
392
393 #ifdef TEST_RWLOCK
394
395 #define THREADS     300
396 #define DATASIZE   15
397 #define ITERATIONS 1000000
398
399 /*
400  * Keep statics for each thread.
401  */
402 typedef struct thread_tag {
403    int thread_num;
404    pthread_t thread_id;
405    int writes;
406    int reads;
407    int interval;
408 } thread_t;
409
410 /*
411  * Read/write lock and shared data.
412  */
413 typedef struct data_tag {
414    brwlock_t lock;
415    int data;
416    int writes;
417 } data_t;
418
419 static thread_t threads[THREADS];
420 static data_t data[DATASIZE];
421
422 /*
423  * Thread start routine that uses read/write locks.
424  */
425 void *thread_routine(void *arg)
426 {
427    thread_t *self = (thread_t *)arg;
428    int repeats = 0;
429    int iteration;
430    int element = 0;
431    int status;
432
433    for (iteration=0; iteration < ITERATIONS; iteration++) {
434       /*
435        * Each "self->interval" iterations, perform an
436        * update operation (write lock instead of read
437        * lock).
438        */
439 //      if ((iteration % self->interval) == 0) {
440          status = writelock(&data[element].lock);
441          if (status != 0) {
442             berrno be;
443             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
444             exit(1);
445          }
446          data[element].data = self->thread_num;
447          data[element].writes++;
448          self->writes++;
449          status = writelock(&data[element].lock);
450          if (status != 0) {
451             berrno be;
452             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
453             exit(1);
454          }
455          data[element].data = self->thread_num;
456          data[element].writes++;
457          self->writes++;
458          status = writeunlock(&data[element].lock);
459          if (status != 0) {
460             berrno be;
461             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
462             exit(1);
463          }
464          status = writeunlock(&data[element].lock);
465          if (status != 0) {
466             berrno be;
467             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
468             exit(1);
469          }
470
471 #ifdef xxx
472       } else {
473          /*
474           * Look at the current data element to see whether
475           * the current thread last updated it. Count the
476           * times to report later.
477           */
478           status = readlock(&data[element].lock);
479           if (status != 0) {
480              berrno be;
481              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
482              exit(1);
483           }
484           self->reads++;
485           if (data[element].data == self->thread_num)
486              repeats++;
487           status = readunlock(&data[element].lock);
488           if (status != 0) {
489              berrno be;
490              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
491              exit(1);
492           }
493       }
494 #endif
495       element++;
496       if (element >= DATASIZE) {
497          element = 0;
498       }
499    }
500    if (repeats > 0) {
501       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
502          self->thread_num, repeats);
503    }
504    return NULL;
505 }
506
507 int main (int argc, char *argv[])
508 {
509     int count;
510     int data_count;
511     int status;
512     unsigned int seed = 1;
513     int thread_writes = 0;
514     int data_writes = 0;
515
516 #ifdef USE_THR_SETCONCURRENCY
517     /*
518      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
519      * that our threads can run concurrently, we need to
520      * increase the concurrency level to THREADS.
521      */
522     thr_setconcurrency (THREADS);
523 #endif
524
525     /*
526      * Initialize the shared data.
527      */
528     for (data_count = 0; data_count < DATASIZE; data_count++) {
529         data[data_count].data = 0;
530         data[data_count].writes = 0;
531         status = rwl_init(&data[data_count].lock);
532         if (status != 0) {
533            berrno be;
534            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
535            exit(1);
536         }
537     }
538
539     /*
540      * Create THREADS threads to access shared data.
541      */
542     for (count = 0; count < THREADS; count++) {
543         threads[count].thread_num = count + 1;
544         threads[count].writes = 0;
545         threads[count].reads = 0;
546         threads[count].interval = rand_r(&seed) % 71;
547         if (threads[count].interval <= 0) {
548            threads[count].interval = 1;
549         }
550         status = pthread_create (&threads[count].thread_id,
551             NULL, thread_routine, (void*)&threads[count]);
552         if (status != 0 || (int)threads[count].thread_id == 0) {
553            berrno be;
554            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
555            exit(1);
556         }
557     }
558
559     /*
560      * Wait for all threads to complete, and collect
561      * statistics.
562      */
563     for (count = 0; count < THREADS; count++) {
564         status = pthread_join (threads[count].thread_id, NULL);
565         if (status != 0) {
566            berrno be;
567            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
568            exit(1);
569         }
570         thread_writes += threads[count].writes;
571         printf (_("%02d: interval %d, writes %d, reads %d\n"),
572             count, threads[count].interval,
573             threads[count].writes, threads[count].reads);
574     }
575
576     /*
577      * Collect statistics for the data.
578      */
579     for (data_count = 0; data_count < DATASIZE; data_count++) {
580         data_writes += data[data_count].writes;
581         printf (_("data %02d: value %d, %d writes\n"),
582             data_count, data[data_count].data, data[data_count].writes);
583         rwl_destroy (&data[data_count].lock);
584     }
585
586     printf (_("Total: %d thread writes, %d data writes\n"),
587         thread_writes, data_writes);
588     return 0;
589 }
590
591 #endif
592
593 #ifdef TEST_RW_TRY_LOCK
594 /*
595  * brwlock_try_main.c
596  *
597  * Demonstrate use of non-blocking read-write locks.
598  *
599  * Special notes: On older Solaris system, call thr_setconcurrency()
600  * to allow interleaved thread execution, since threads are not
601  * timesliced.
602  */
603 #include <pthread.h>
604 #include "rwlock.h"
605 #include "errors.h"
606
607 #define THREADS         5
608 #define ITERATIONS      1000
609 #define DATASIZE        15
610
611 /*
612  * Keep statistics for each thread.
613  */
614 typedef struct thread_tag {
615     int         thread_num;
616     pthread_t   thread_id;
617     int         r_collisions;
618     int         w_collisions;
619     int         updates;
620     int         interval;
621 } thread_t;
622
623 /*
624  * Read-write lock and shared data
625  */
626 typedef struct data_tag {
627     brwlock_t    lock;
628     int         data;
629     int         updates;
630 } data_t;
631
632 thread_t threads[THREADS];
633 data_t data[DATASIZE];
634
635 /*
636  * Thread start routine that uses read-write locks
637  */
638 void *thread_routine (void *arg)
639 {
640     thread_t *self = (thread_t*)arg;
641     int iteration;
642     int element;
643     int status;
644     lmgr_init_thread();
645     element = 0;                        /* Current data element */
646
647     for (iteration = 0; iteration < ITERATIONS; iteration++) {
648         if ((iteration % self->interval) == 0) {
649             status = rwl_writetrylock (&data[element].lock);
650             if (status == EBUSY)
651                 self->w_collisions++;
652             else if (status == 0) {
653                 data[element].data++;
654                 data[element].updates++;
655                 self->updates++;
656                 rwl_writeunlock (&data[element].lock);
657             } else
658                 err_abort (status, _("Try write lock"));
659         } else {
660             status = rwl_readtrylock (&data[element].lock);
661             if (status == EBUSY)
662                 self->r_collisions++;
663             else if (status != 0) {
664                 err_abort (status, _("Try read lock"));
665             } else {
666                 if (data[element].data != data[element].updates)
667                     printf ("%d: data[%d] %d != %d\n",
668                         self->thread_num, element,
669                         data[element].data, data[element].updates);
670                 rwl_readunlock (&data[element].lock);
671             }
672         }
673
674         element++;
675         if (element >= DATASIZE)
676             element = 0;
677     }
678     lmgr_cleanup_thread();
679     return NULL;
680 }
681
682 int main (int argc, char *argv[])
683 {
684     int count, data_count;
685     unsigned int seed = 1;
686     int thread_updates = 0, data_updates = 0;
687     int status;
688
689 #ifdef USE_THR_SETCONCURRENCY
690     /*
691      * On Solaris 2.5,2.6,7 and 8 threads are not timesliced. To ensure
692      * that our threads can run concurrently, we need to
693      * increase the concurrency level to THREADS.
694      */
695     DPRINTF (("Setting concurrency level to %d\n", THREADS));
696     thr_setconcurrency (THREADS);
697 #endif
698
699     /*
700      * Initialize the shared data.
701      */
702     for (data_count = 0; data_count < DATASIZE; data_count++) {
703         data[data_count].data = 0;
704         data[data_count].updates = 0;
705         rwl_init(&data[data_count].lock);
706     }
707
708     /*
709      * Create THREADS threads to access shared data.
710      */
711     for (count = 0; count < THREADS; count++) {
712         threads[count].thread_num = count;
713         threads[count].r_collisions = 0;
714         threads[count].w_collisions = 0;
715         threads[count].updates = 0;
716         threads[count].interval = rand_r (&seed) % ITERATIONS;
717         status = pthread_create (&threads[count].thread_id,
718             NULL, thread_routine, (void*)&threads[count]);
719         if (status != 0)
720             err_abort (status, _("Create thread"));
721     }
722
723     /*
724      * Wait for all threads to complete, and collect
725      * statistics.
726      */
727     for (count = 0; count < THREADS; count++) {
728         status = pthread_join (threads[count].thread_id, NULL);
729         if (status != 0)
730             err_abort (status, _("Join thread"));
731         thread_updates += threads[count].updates;
732         printf (_("%02d: interval %d, updates %d, "
733                 "r_collisions %d, w_collisions %d\n"),
734             count, threads[count].interval,
735             threads[count].updates,
736             threads[count].r_collisions, threads[count].w_collisions);
737     }
738
739     /*
740      * Collect statistics for the data.
741      */
742     for (data_count = 0; data_count < DATASIZE; data_count++) {
743         data_updates += data[data_count].updates;
744         printf (_("data %02d: value %d, %d updates\n"),
745             data_count, data[data_count].data, data[data_count].updates);
746         rwl_destroy (&data[data_count].lock);
747     }
748
749     return 0;
750 }
751
752 #endif