]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/devlock.c
First cut new device lock code
[bacula/bacula] / bacula / src / lib / devlock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Thread Read/Write locking code. It permits
30  *  multiple readers but only one writer.  Note, however,
31  *  that the writer thread is permitted to make multiple
32  *  nested write lock calls.
33  *
34  *  Kern Sibbald, January MMI
35  *
36  *  This code adapted from "Programming with POSIX Threads", by
37  *    David R. Butenhof
38  *
39  */
40
41 #define _LOCKMGR_COMPLIANT
42 #include "bacula.h"
43 #include "devlock.h"
44
45 /*
46  * Initialize a read/write lock
47  *
48  *  Returns: 0 on success
49  *           errno on failure
50  */
51
52 devlock *new_devlock()                          
53 {
54    devlock *lock;
55    lock = (devlock *)malloc(sizeof (devlock));
56    memset(lock, 0, sizeof(devlock));
57    return lock;
58 }
59
60 int devlock::init(int priority)                            
61 {
62    int stat;
63    devlock *rwl = this;
64
65    rwl->r_active = rwl->w_active = 0;
66    rwl->r_wait = rwl->w_wait = 0;
67    rwl->priority = priority;
68    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
69       return stat;
70    }
71    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
72       pthread_mutex_destroy(&rwl->mutex);
73       return stat;
74    }
75    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
76       pthread_cond_destroy(&rwl->read);
77       pthread_mutex_destroy(&rwl->mutex);
78       return stat;
79    }
80    rwl->valid = DEVLOCK_VALID;
81    return 0;
82 }
83
84 /*
85  * Destroy a read/write lock
86  *
87  * Returns: 0 on success
88  *          errno on failure
89  */
90 int devlock::destroy()
91 {
92    devlock *rwl = this;
93    int stat, stat1, stat2;
94
95    if (rwl->valid != DEVLOCK_VALID) {
96       return EINVAL;
97    }
98    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
99       return stat;
100    }
101
102    /*
103     * If any threads are active, report EBUSY
104     */
105    if (rwl->r_active > 0 || rwl->w_active) {
106       pthread_mutex_unlock(&rwl->mutex);
107       return EBUSY;
108    }
109
110    /*
111     * If any threads are waiting, report EBUSY
112     */
113    if (rwl->r_wait > 0 || rwl->w_wait > 0) {
114       pthread_mutex_unlock(&rwl->mutex);
115       return EBUSY;
116    }
117
118    rwl->valid = 0;
119    if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
120       return stat;
121    }
122    stat  = pthread_mutex_destroy(&rwl->mutex);
123    stat1 = pthread_cond_destroy(&rwl->read);
124    stat2 = pthread_cond_destroy(&rwl->write);
125    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
126 }
127
128 /*
129  * Handle cleanup when the read lock condition variable
130  * wait is released.
131  */
132 static void devlock_read_release(void *arg)
133 {
134    devlock *rwl = (devlock *)arg;
135    rwl->read_release();
136 }
137
138 void devlock::read_release()
139 {
140    r_wait--;
141    pthread_mutex_unlock(&mutex);
142 }
143
144 /*
145  * Handle cleanup when the write lock condition variable wait
146  * is released.
147  */
148 static void devlock_write_release(void *arg)
149 {
150    devlock *rwl = (devlock *)arg;
151    rwl->write_release();
152 }
153
154 void devlock::write_release()
155 {
156    w_wait--;
157    pthread_mutex_unlock(&mutex);
158 }
159
160 /*
161  * Lock for read access, wait until locked (or error).
162  */
163 int devlock::readlock()
164 {
165    devlock *rwl = this;
166    int stat;
167
168    if (rwl->valid != DEVLOCK_VALID) {
169       return EINVAL;
170    }
171    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
172       return stat;
173    }
174    if (rwl->w_active) {
175       rwl->r_wait++;                  /* indicate that we are waiting */
176       pthread_cleanup_push(devlock_read_release, (void *)rwl);
177       while (rwl->w_active) {
178          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
179          if (stat != 0) {
180             break;                    /* error, bail out */
181          }
182       }
183       pthread_cleanup_pop(0);
184       rwl->r_wait--;                  /* we are no longer waiting */
185    }
186    if (stat == 0) {
187       rwl->r_active++;                /* we are running */
188    }
189    pthread_mutex_unlock(&rwl->mutex);
190    return stat;
191 }
192
193 /*
194  * Attempt to lock for read access, don't wait
195  */
196 int devlock::readtrylock()
197 {
198    devlock *rwl = this;
199    int stat, stat2;
200
201    if (rwl->valid != DEVLOCK_VALID) {
202       return EINVAL;
203    }
204    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
205       return stat;
206    }
207    if (rwl->w_active) {
208       stat = EBUSY;
209    } else {
210       rwl->r_active++;                /* we are running */
211    }
212    stat2 = pthread_mutex_unlock(&rwl->mutex);
213    return (stat == 0 ? stat2 : stat);
214 }
215
216 /*
217  * Unlock read lock
218  */
219 int devlock::readunlock()
220 {
221    devlock *rwl = this;
222    int stat, stat2;
223
224    if (rwl->valid != DEVLOCK_VALID) {
225       return EINVAL;
226    }
227    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
228       return stat;
229    }
230    rwl->r_active--;
231    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
232       stat = pthread_cond_broadcast(&rwl->write);
233    }
234    stat2 = pthread_mutex_unlock(&rwl->mutex);
235    return (stat == 0 ? stat2 : stat);
236 }
237
238
239 /*
240  * Lock for write access, wait until locked (or error).
241  *   Multiple nested write locking is permitted.
242  */
243 int devlock::writelock(int areason, bool acan_steal)
244 {
245    devlock *rwl = this;
246    int stat;
247
248    if (rwl->valid != DEVLOCK_VALID) {
249       return EINVAL;
250    }
251    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
252       return stat;
253    }
254    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
255       rwl->w_active++;
256       pthread_mutex_unlock(&rwl->mutex);
257       return 0;
258    }
259    lmgr_pre_lock(rwl, rwl->priority, __FILE__, __LINE__);
260    if (rwl->w_active || rwl->r_active > 0) {
261       rwl->w_wait++;                  /* indicate that we are waiting */
262       pthread_cleanup_push(devlock_write_release, (void *)rwl);
263       while (rwl->w_active || rwl->r_active > 0) {
264          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
265             lmgr_do_unlock(rwl);
266             break;                    /* error, bail out */
267          }
268       }
269       pthread_cleanup_pop(0);
270       rwl->w_wait--;                  /* we are no longer waiting */
271    }
272    if (stat == 0) {
273       rwl->w_active++;                /* we are running */
274       rwl->writer_id = pthread_self(); /* save writer thread's id */
275       lmgr_post_lock();
276    } 
277    rwl->reason = areason;
278    rwl->can_steal = acan_steal;
279    pthread_mutex_unlock(&rwl->mutex);
280    return stat;
281 }
282
283 /*
284  * Attempt to lock for write access, don't wait
285  */
286 int devlock::writetrylock()
287 {
288    devlock *rwl = this;
289    int stat, stat2;
290
291    if (rwl->valid != DEVLOCK_VALID) {
292       return EINVAL;
293    }
294    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
295       return stat;
296    }
297    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
298       rwl->w_active++;
299       pthread_mutex_unlock(&rwl->mutex);
300       return 0;
301    }
302    if (rwl->w_active || rwl->r_active > 0) {
303       stat = EBUSY;
304    } else {
305       rwl->w_active = 1;              /* we are running */
306       rwl->writer_id = pthread_self(); /* save writer thread's id */
307       lmgr_do_lock(rwl, rwl->priority, __FILE__, __LINE__);
308    }
309    stat2 = pthread_mutex_unlock(&rwl->mutex);
310    return (stat == 0 ? stat2 : stat);
311 }
312
313 /*
314  * Unlock write lock
315  *  Start any waiting writers in preference to waiting readers
316  */
317 int devlock::writeunlock()
318 {
319    devlock *rwl = this;
320    int stat, stat2;
321
322    if (rwl->valid != DEVLOCK_VALID) {
323       return EINVAL;
324    }
325    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
326       return stat;
327    }
328    if (rwl->w_active <= 0) {
329       pthread_mutex_unlock(&rwl->mutex);
330       Jmsg0(NULL, M_ABORT, 0, _("writeunlock called too many times.\n"));
331    }
332    rwl->w_active--;
333    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
334       pthread_mutex_unlock(&rwl->mutex);
335       Jmsg0(NULL, M_ABORT, 0, _("writeunlock by non-owner.\n"));
336    }
337    if (rwl->w_active > 0) {
338       stat = 0;                       /* writers still active */
339    } else {
340       lmgr_do_unlock(rwl);
341       /* No more writers, awaken someone */
342       if (rwl->r_wait > 0) {         /* if readers waiting */
343          stat = pthread_cond_broadcast(&rwl->read);
344       } else if (rwl->w_wait > 0) {
345          stat = pthread_cond_broadcast(&rwl->write);
346       }
347    }
348    stat2 = pthread_mutex_unlock(&rwl->mutex);
349    return (stat == 0 ? stat2 : stat);
350 }
351
352 #ifdef TEST_RWLOCK
353
354 #define THREADS     300
355 #define DATASIZE   15
356 #define ITERATIONS 1000000
357
358 /*
359  * Keep statics for each thread.
360  */
361 typedef struct thread_tag {
362    int thread_num;
363    pthread_t thread_id;
364    int writes;
365    int reads;
366    int interval;
367 } thread_t;
368
369 /*
370  * Read/write lock and shared data.
371  */
372 typedef struct data_tag {
373    brwlock_t lock;
374    int data;
375    int writes;
376 } data_t;
377
378 static thread_t threads[THREADS];
379 static data_t data[DATASIZE];
380
381 /*
382  * Thread start routine that uses read/write locks.
383  */
384 void *thread_routine(void *arg)
385 {
386    thread_t *self = (thread_t *)arg;
387    int repeats = 0;
388    int iteration;
389    int element = 0;
390    int status;
391
392    for (iteration=0; iteration < ITERATIONS; iteration++) {
393       /*
394        * Each "self->interval" iterations, perform an
395        * update operation (write lock instead of read
396        * lock).
397        */
398 //      if ((iteration % self->interval) == 0) {
399          status = writelock(&data[element].lock);
400          if (status != 0) {
401             berrno be;
402             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
403             exit(1);
404          }
405          data[element].data = self->thread_num;
406          data[element].writes++;
407          self->writes++;
408          status = writelock(&data[element].lock);
409          if (status != 0) {
410             berrno be;
411             printf("Write lock failed. ERR=%s\n", be.bstrerror(status));
412             exit(1);
413          }
414          data[element].data = self->thread_num;
415          data[element].writes++;
416          self->writes++;
417          status = writeunlock(&data[element].lock);
418          if (status != 0) {
419             berrno be;
420             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
421             exit(1);
422          }
423          status = writeunlock(&data[element].lock);
424          if (status != 0) {
425             berrno be;
426             printf("Write unlock failed. ERR=%s\n", be.bstrerror(status));
427             exit(1);
428          }
429
430 #ifdef xxx
431       } else {
432          /*
433           * Look at the current data element to see whether
434           * the current thread last updated it. Count the
435           * times to report later.
436           */
437           status = readlock(&data[element].lock);
438           if (status != 0) {
439              berrno be;
440              printf("Read lock failed. ERR=%s\n", be.bstrerror(status));
441              exit(1);
442           }
443           self->reads++;
444           if (data[element].data == self->thread_num)
445              repeats++;
446           status = readunlock(&data[element].lock);
447           if (status != 0) {
448              berrno be;
449              printf("Read unlock failed. ERR=%s\n", be.bstrerror(status));
450              exit(1);
451           }
452       }
453 #endif
454       element++;
455       if (element >= DATASIZE) {
456          element = 0;
457       }
458    }
459    if (repeats > 0) {
460       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
461          self->thread_num, repeats);
462    }
463    return NULL;
464 }
465
466 int main (int argc, char *argv[])
467 {
468     int count;
469     int data_count;
470     int status;
471     unsigned int seed = 1;
472     int thread_writes = 0;
473     int data_writes = 0;
474
475 #ifdef sun
476     /*
477      * On Solaris 2.5, threads are not timesliced. To ensure
478      * that our threads can run concurrently, we need to
479      * increase the concurrency level to THREADS.
480      */
481     thr_setconcurrency (THREADS);
482 #endif
483
484     /*
485      * Initialize the shared data.
486      */
487     for (data_count = 0; data_count < DATASIZE; data_count++) {
488         data[data_count].data = 0;
489         data[data_count].writes = 0;
490         status = rwl_init(&data[data_count].lock);
491         if (status != 0) {
492            berrno be;
493            printf("Init rwlock failed. ERR=%s\n", be.bstrerror(status));
494            exit(1);
495         }
496     }
497
498     /*
499      * Create THREADS threads to access shared data.
500      */
501     for (count = 0; count < THREADS; count++) {
502         threads[count].thread_num = count + 1;
503         threads[count].writes = 0;
504         threads[count].reads = 0;
505         threads[count].interval = rand_r(&seed) % 71;
506         if (threads[count].interval <= 0) {
507            threads[count].interval = 1;
508         }
509         status = pthread_create (&threads[count].thread_id,
510             NULL, thread_routine, (void*)&threads[count]);
511         if (status != 0 || (int)threads[count].thread_id == 0) {
512            berrno be;
513            printf("Create thread failed. ERR=%s\n", be.bstrerror(status));
514            exit(1);
515         }
516     }
517
518     /*
519      * Wait for all threads to complete, and collect
520      * statistics.
521      */
522     for (count = 0; count < THREADS; count++) {
523         status = pthread_join (threads[count].thread_id, NULL);
524         if (status != 0) {
525            berrno be;
526            printf("Join thread failed. ERR=%s\n", be.bstrerror(status));
527            exit(1);
528         }
529         thread_writes += threads[count].writes;
530         printf (_("%02d: interval %d, writes %d, reads %d\n"),
531             count, threads[count].interval,
532             threads[count].writes, threads[count].reads);
533     }
534
535     /*
536      * Collect statistics for the data.
537      */
538     for (data_count = 0; data_count < DATASIZE; data_count++) {
539         data_writes += data[data_count].writes;
540         printf (_("data %02d: value %d, %d writes\n"),
541             data_count, data[data_count].data, data[data_count].writes);
542         rwl_destroy (&data[data_count].lock);
543     }
544
545     printf (_("Total: %d thread writes, %d data writes\n"),
546         thread_writes, data_writes);
547     return 0;
548 }
549
550 #endif
551
552 #ifdef TEST_RW_TRY_LOCK
553 /*
554  * brwlock_try_main.c
555  *
556  * Demonstrate use of non-blocking read-write locks.
557  *
558  * Special notes: On a Solaris system, call thr_setconcurrency()
559  * to allow interleaved thread execution, since threads are not
560  * timesliced.
561  */
562 #include <pthread.h>
563 #include "rwlock.h"
564 #include "errors.h"
565
566 #define THREADS         5
567 #define ITERATIONS      1000
568 #define DATASIZE        15
569
570 /*
571  * Keep statistics for each thread.
572  */
573 typedef struct thread_tag {
574     int         thread_num;
575     pthread_t   thread_id;
576     int         r_collisions;
577     int         w_collisions;
578     int         updates;
579     int         interval;
580 } thread_t;
581
582 /*
583  * Read-write lock and shared data
584  */
585 typedef struct data_tag {
586     brwlock_t    lock;
587     int         data;
588     int         updates;
589 } data_t;
590
591 thread_t threads[THREADS];
592 data_t data[DATASIZE];
593
594 /*
595  * Thread start routine that uses read-write locks
596  */
597 void *thread_routine (void *arg)
598 {
599     thread_t *self = (thread_t*)arg;
600     int iteration;
601     int element;
602     int status;
603     lmgr_init_thread();
604     element = 0;                        /* Current data element */
605
606     for (iteration = 0; iteration < ITERATIONS; iteration++) {
607         if ((iteration % self->interval) == 0) {
608             status = rwl_writetrylock (&data[element].lock);
609             if (status == EBUSY)
610                 self->w_collisions++;
611             else if (status == 0) {
612                 data[element].data++;
613                 data[element].updates++;
614                 self->updates++;
615                 rwl_writeunlock (&data[element].lock);
616             } else
617                 err_abort (status, _("Try write lock"));
618         } else {
619             status = rwl_readtrylock (&data[element].lock);
620             if (status == EBUSY)
621                 self->r_collisions++;
622             else if (status != 0) {
623                 err_abort (status, _("Try read lock"));
624             } else {
625                 if (data[element].data != data[element].updates)
626                     printf ("%d: data[%d] %d != %d\n",
627                         self->thread_num, element,
628                         data[element].data, data[element].updates);
629                 rwl_readunlock (&data[element].lock);
630             }
631         }
632
633         element++;
634         if (element >= DATASIZE)
635             element = 0;
636     }
637     lmgr_cleanup_thread();
638     return NULL;
639 }
640
641 int main (int argc, char *argv[])
642 {
643     int count, data_count;
644     unsigned int seed = 1;
645     int thread_updates = 0, data_updates = 0;
646     int status;
647
648 #ifdef sun
649     /*
650      * On Solaris 2.5, threads are not timesliced. To ensure
651      * that our threads can run concurrently, we need to
652      * increase the concurrency level to THREADS.
653      */
654     DPRINTF (("Setting concurrency level to %d\n", THREADS));
655     thr_setconcurrency (THREADS);
656 #endif
657
658     /*
659      * Initialize the shared data.
660      */
661     for (data_count = 0; data_count < DATASIZE; data_count++) {
662         data[data_count].data = 0;
663         data[data_count].updates = 0;
664         rwl_init(&data[data_count].lock);
665     }
666
667     /*
668      * Create THREADS threads to access shared data.
669      */
670     for (count = 0; count < THREADS; count++) {
671         threads[count].thread_num = count;
672         threads[count].r_collisions = 0;
673         threads[count].w_collisions = 0;
674         threads[count].updates = 0;
675         threads[count].interval = rand_r (&seed) % ITERATIONS;
676         status = pthread_create (&threads[count].thread_id,
677             NULL, thread_routine, (void*)&threads[count]);
678         if (status != 0)
679             err_abort (status, _("Create thread"));
680     }
681
682     /*
683      * Wait for all threads to complete, and collect
684      * statistics.
685      */
686     for (count = 0; count < THREADS; count++) {
687         status = pthread_join (threads[count].thread_id, NULL);
688         if (status != 0)
689             err_abort (status, _("Join thread"));
690         thread_updates += threads[count].updates;
691         printf (_("%02d: interval %d, updates %d, "
692                 "r_collisions %d, w_collisions %d\n"),
693             count, threads[count].interval,
694             threads[count].updates,
695             threads[count].r_collisions, threads[count].w_collisions);
696     }
697
698     /*
699      * Collect statistics for the data.
700      */
701     for (data_count = 0; data_count < DATASIZE; data_count++) {
702         data_updates += data[data_count].updates;
703         printf (_("data %02d: value %d, %d updates\n"),
704             data_count, data[data_count].data, data[data_count].updates);
705         rwl_destroy (&data[data_count].lock);
706     }
707
708     return 0;
709 }
710
711 #endif