]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
Add TwoEOF directive + enhance fill command
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2  * Bacula Thread Read/Write locking code. It permits
3  *  multiple readers but only one writer.  Note, however,
4  *  that the writer thread is permitted to make multiple
5  *  nested write lock calls.
6  *
7  *  Kern Sibbald, January MMI
8  *
9  *   Version $Id$
10  *
11  *  This code adapted from "Programming with POSIX Threads", by
12  *    David R. Butenhof
13  *
14  */
15 /*
16    Copyright (C) 2000-2003 Kern Sibbald and John Walker
17
18    This program is free software; you can redistribute it and/or
19    modify it under the terms of the GNU General Public License as
20    published by the Free Software Foundation; either version 2 of
21    the License, or (at your option) any later version.
22
23    This program is distributed in the hope that it will be useful,
24    but WITHOUT ANY WARRANTY; without even the implied warranty of
25    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26    General Public License for more details.
27
28    You should have received a copy of the GNU General Public
29    License along with this program; if not, write to the Free
30    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
31    MA 02111-1307, USA.
32
33  */
34
35 #include "bacula.h"
36
37 /*   
38  * Initialize a read/write lock
39  *
40  *  Returns: 0 on success
41  *           errno on failure
42  */
43 int rwl_init(brwlock_t *rwl)
44 {
45    int stat;
46                         
47    rwl->r_active = rwl->w_active = 0;
48    rwl->r_wait = rwl->w_wait = 0;
49    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
50       return stat;
51    }
52    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
53       pthread_mutex_destroy(&rwl->mutex);
54       return stat;
55    }
56    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
57       pthread_cond_destroy(&rwl->read);
58       pthread_mutex_destroy(&rwl->mutex);
59       return stat;
60    }
61    rwl->valid = RWLOCK_VALID;
62    return 0;
63 }
64
65 /*
66  * Destroy a read/write lock
67  *
68  * Returns: 0 on success
69  *          errno on failure
70  */
71 int rwl_destroy(brwlock_t *rwl)
72 {
73    int stat, stat1, stat2;
74
75   if (rwl->valid != RWLOCK_VALID) {
76      return EINVAL;
77   }
78   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
79      return stat;
80   }
81
82   /* 
83    * If any threads are active, report EBUSY
84    */
85   if (rwl->r_active > 0 || rwl->w_active) {
86      pthread_mutex_unlock(&rwl->mutex);
87      return EBUSY;
88   }
89
90   /*
91    * If any threads are waiting, report EBUSY
92    */
93   if (rwl->r_wait > 0 || rwl->w_wait > 0) { 
94      pthread_mutex_unlock(&rwl->mutex);
95      return EBUSY;
96   }
97
98   rwl->valid = 0;
99   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
100      return stat;
101   }
102   stat  = pthread_mutex_destroy(&rwl->mutex);
103   stat1 = pthread_cond_destroy(&rwl->read);
104   stat2 = pthread_cond_destroy(&rwl->write);
105   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
106 }
107
108 /*
109  * Handle cleanup when the read lock condition variable
110  * wait is released.
111  */
112 static void rwl_read_release(void *arg)
113 {
114    brwlock_t *rwl = (brwlock_t *)arg;
115
116    rwl->r_wait--;
117    pthread_mutex_unlock(&rwl->mutex);
118 }
119
120 /*
121  * Handle cleanup when the write lock condition variable wait
122  * is released.
123  */
124 static void rwl_write_release(void *arg)
125 {
126    brwlock_t *rwl = (brwlock_t *)arg;
127
128    rwl->w_wait--;
129    pthread_mutex_unlock(&rwl->mutex);
130 }
131
132 /*
133  * Lock for read access, wait until locked (or error).
134  */
135 int rwl_readlock(brwlock_t *rwl)
136 {
137    int stat;
138     
139    if (rwl->valid != RWLOCK_VALID) {
140       return EINVAL;
141    }
142    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
143       return stat;
144    }
145    if (rwl->w_active) {
146       rwl->r_wait++;                  /* indicate that we are waiting */
147       pthread_cleanup_push(rwl_read_release, (void *)rwl);
148       while (rwl->w_active) {
149          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
150          if (stat != 0) {
151             break;                    /* error, bail out */
152          }
153       }
154       pthread_cleanup_pop(0);
155       rwl->r_wait--;                  /* we are no longer waiting */
156    }
157    if (stat == 0) {
158       rwl->r_active++;                /* we are running */
159    }
160    pthread_mutex_unlock(&rwl->mutex);
161    return stat;
162 }
163
164 /* 
165  * Attempt to lock for read access, don't wait
166  */
167 int rwl_readtrylock(brwlock_t *rwl)
168 {
169    int stat, stat2;
170     
171    if (rwl->valid != RWLOCK_VALID) {
172       return EINVAL;
173    }
174    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
175       return stat;
176    }
177    if (rwl->w_active) {
178       stat = EBUSY;
179    } else {
180       rwl->r_active++;                /* we are running */
181    }
182    stat2 = pthread_mutex_unlock(&rwl->mutex);
183    return (stat == 0 ? stat2 : stat);
184 }
185    
186 /* 
187  * Unlock read lock
188  */
189 int rwl_readunlock(brwlock_t *rwl)
190 {
191    int stat, stat2;
192     
193    if (rwl->valid != RWLOCK_VALID) {
194       return EINVAL;
195    }
196    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
197       return stat;
198    }
199    rwl->r_active--;
200    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
201       stat = pthread_cond_signal(&rwl->write);
202    }
203    stat2 = pthread_mutex_unlock(&rwl->mutex);
204    return (stat == 0 ? stat2 : stat);
205 }
206
207
208 /*
209  * Lock for write access, wait until locked (or error).
210  *   Multiple nested write locking is permitted.
211  */
212 int rwl_writelock(brwlock_t *rwl)
213 {
214    int stat;
215     
216    if (rwl->valid != RWLOCK_VALID) {
217       return EINVAL;
218    }
219    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
220       return stat;
221    }
222    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
223       rwl->w_active++;
224       pthread_mutex_unlock(&rwl->mutex);
225       return 0;
226    }
227    if (rwl->w_active || rwl->r_active > 0) {
228       rwl->w_wait++;                  /* indicate that we are waiting */
229       pthread_cleanup_push(rwl_write_release, (void *)rwl);
230       while (rwl->w_active || rwl->r_active > 0) {
231          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
232             break;                    /* error, bail out */
233          }
234       }
235       pthread_cleanup_pop(0);
236       rwl->w_wait--;                  /* we are no longer waiting */
237    }
238    if (stat == 0) {
239       rwl->w_active = 1;              /* we are running */
240       rwl->writer_id = pthread_self(); /* save writer thread's id */
241    }
242    pthread_mutex_unlock(&rwl->mutex);
243    return stat;
244 }
245
246 /* 
247  * Attempt to lock for write access, don't wait
248  */
249 int rwl_writetrylock(brwlock_t *rwl)
250 {
251    int stat, stat2;
252     
253    if (rwl->valid != RWLOCK_VALID) {
254       return EINVAL;
255    }
256    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
257       return stat;
258    }
259    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
260       rwl->w_active++;
261       pthread_mutex_unlock(&rwl->mutex);
262       return 0;
263    }
264    if (rwl->w_active || rwl->r_active > 0) {
265       stat = EBUSY;
266    } else {
267       rwl->w_active = 1;              /* we are running */
268       rwl->writer_id = pthread_self(); /* save writer thread's id */
269    }
270    stat2 = pthread_mutex_unlock(&rwl->mutex);
271    return (stat == 0 ? stat2 : stat);
272 }
273    
274 /* 
275  * Unlock write lock
276  *  Start any waiting writers in preference to waiting readers
277  */
278 int rwl_writeunlock(brwlock_t *rwl)
279 {
280    int stat, stat2;
281     
282    if (rwl->valid != RWLOCK_VALID) {
283       return EINVAL;
284    }
285    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
286       return stat;
287    }
288    rwl->w_active--;
289    if (rwl->w_active < 0 || !pthread_equal(pthread_self(), rwl->writer_id)) {
290       Emsg0(M_ABORT, 0, "rwl_writeunlock by non-owner.\n");
291    }
292    if (rwl->w_active > 0) {
293       stat = 0;                       /* writers still active */
294    } else {
295       /* No more writers, awaken someone */
296       if (rwl->r_wait > 0) {         /* if readers waiting */
297          stat = pthread_cond_broadcast(&rwl->read);
298       } else if (rwl->w_wait > 0) {
299          stat = pthread_cond_signal(&rwl->write);
300       }
301    }
302    stat2 = pthread_mutex_unlock(&rwl->mutex);
303    return (stat == 0 ? stat2 : stat);
304 }
305
306 #ifdef TEST_RWLOCK
307
308 #define THREADS     5
309 #define DATASIZE   15
310 #define ITERATIONS 10000
311
312 /*
313  * Keep statics for each thread.
314  */
315 typedef struct thread_tag {
316    int thread_num;
317    pthread_t thread_id;
318    int writes;
319    int reads;
320    int interval;
321 } thread_t;
322
323 /* 
324  * Read/write lock and shared data.
325  */
326 typedef struct data_tag {
327    brwlock_t lock;
328    int data;
329    int writes;
330 } data_t;
331
332 thread_t threads[THREADS];
333 data_t data[DATASIZE];
334
335 /* 
336  * Thread start routine that uses read/write locks.
337  */
338 void *thread_routine(void *arg)
339 {
340    thread_t *self = (thread_t *)arg;
341    int repeats = 0;
342    int iteration;
343    int element = 0;
344    int status;
345
346    for (iteration=0; iteration < ITERATIONS; iteration++) {
347       /*
348        * Each "self->interval" iterations, perform an
349        * update operation (write lock instead of read
350        * lock).
351        */
352       if ((iteration % self->interval) == 0) {
353          status = rwl_writelock(&data[element].lock);
354          if (status != 0) {
355             Emsg1(M_ABORT, 0, "Write lock failed. ERR=%s\n", strerror(status));
356          }
357          data[element].data = self->thread_num;
358          data[element].writes++;
359          self->writes++;
360          status = rwl_writeunlock(&data[element].lock);
361          if (status != 0) {
362             Emsg1(M_ABORT, 0, "Write unlock failed. ERR=%s\n", strerror(status));
363          }
364       } else {
365          /*
366           * Look at the current data element to see whether
367           * the current thread last updated it. Count the
368           * times to report later.
369           */
370           status = rwl_readlock(&data[element].lock);
371           if (status != 0) {
372              Emsg1(M_ABORT, 0, "Read lock failed. ERR=%s\n", strerror(status));
373           }
374           self->reads++;
375           if (data[element].data == self->thread_num)
376              repeats++;
377           status = rwl_readunlock(&data[element].lock);
378           if (status != 0) {
379              Emsg1(M_ABORT, 0, "Read unlock failed. ERR=%s\n", strerror(status));
380           }
381       }
382       element++;
383       if (element >= DATASIZE) {
384          element = 0;
385       }
386    }
387    if (repeats > 0) {
388       Pmsg2(000, "Thread %d found unchanged elements %d times\n",
389          self->thread_num, repeats);
390    }
391    return NULL;
392 }
393
394 int main (int argc, char *argv[])
395 {
396     int count;
397     int data_count;
398     int status;
399     unsigned int seed = 1;
400     int thread_writes = 0;
401     int data_writes = 0;
402
403 #ifdef sun
404     /*
405      * On Solaris 2.5, threads are not timesliced. To ensure
406      * that our threads can run concurrently, we need to
407      * increase the concurrency level to THREADS.
408      */
409     thr_setconcurrency (THREADS);
410 #endif
411
412     /*
413      * Initialize the shared data.
414      */
415     for (data_count = 0; data_count < DATASIZE; data_count++) {
416         data[data_count].data = 0;
417         data[data_count].writes = 0;
418         status = rwl_init (&data[data_count].lock);
419         if (status != 0) {
420            Emsg1(M_ABORT, 0, "Init rwlock failed. ERR=%s\n", strerror(status));
421         }
422     }
423
424     /*
425      * Create THREADS threads to access shared data.
426      */
427     for (count = 0; count < THREADS; count++) {
428         threads[count].thread_num = count + 1;
429         threads[count].writes = 0;
430         threads[count].reads = 0;
431         threads[count].interval = rand_r (&seed) % 71;
432         status = pthread_create (&threads[count].thread_id,
433             NULL, thread_routine, (void*)&threads[count]);
434         if (status != 0) {
435            Emsg1(M_ABORT, 0, "Create thread failed. ERR=%s\n", strerror(status));
436         }
437     }
438
439     /*
440      * Wait for all threads to complete, and collect
441      * statistics.
442      */
443     for (count = 0; count < THREADS; count++) {
444         status = pthread_join (threads[count].thread_id, NULL);
445         if (status != 0) {
446            Emsg1(M_ABORT, 0, "Join thread failed. ERR=%s\n", strerror(status));
447         }
448         thread_writes += threads[count].writes;
449         printf ("%02d: interval %d, writes %d, reads %d\n",
450             count, threads[count].interval,
451             threads[count].writes, threads[count].reads);
452     }
453
454     /*
455      * Collect statistics for the data.
456      */
457     for (data_count = 0; data_count < DATASIZE; data_count++) {
458         data_writes += data[data_count].writes;
459         printf ("data %02d: value %d, %d writes\n",
460             data_count, data[data_count].data, data[data_count].writes);
461         rwl_destroy (&data[data_count].lock);
462     }
463
464     printf ("Total: %d thread writes, %d data writes\n",
465         thread_writes, data_writes);
466     return 0;
467 }
468
469 #endif
470
471 #ifdef TEST_RW_TRY_LOCK
472 /*
473  * brwlock_try_main.c
474  *
475  * Demonstrate use of non-blocking read-write locks.
476  *
477  * Special notes: On a Solaris system, call thr_setconcurrency()
478  * to allow interleaved thread execution, since threads are not
479  * timesliced.
480  */
481 #include <pthread.h>
482 #include "rwlock.h"
483 #include "errors.h"
484
485 #define THREADS         5
486 #define ITERATIONS      1000
487 #define DATASIZE        15
488
489 /*
490  * Keep statistics for each thread.
491  */
492 typedef struct thread_tag {
493     int         thread_num;
494     pthread_t   thread_id;
495     int         r_collisions;
496     int         w_collisions;
497     int         updates;
498     int         interval;
499 } thread_t;
500
501 /*
502  * Read-write lock and shared data
503  */
504 typedef struct data_tag {
505     brwlock_t    lock;
506     int         data;
507     int         updates;
508 } data_t;
509
510 thread_t threads[THREADS];
511 data_t data[DATASIZE];
512
513 /*
514  * Thread start routine that uses read-write locks
515  */
516 void *thread_routine (void *arg)
517 {
518     thread_t *self = (thread_t*)arg;
519     int iteration;
520     int element;
521     int status;
522
523     element = 0;                        /* Current data element */
524
525     for (iteration = 0; iteration < ITERATIONS; iteration++) {
526         if ((iteration % self->interval) == 0) {
527             status = rwl_writetrylock (&data[element].lock);
528             if (status == EBUSY)
529                 self->w_collisions++;
530             else if (status == 0) {
531                 data[element].data++;
532                 data[element].updates++;
533                 self->updates++;
534                 rwl_writeunlock (&data[element].lock);
535             } else
536                 err_abort (status, "Try write lock");
537         } else {
538             status = rwl_readtrylock (&data[element].lock);
539             if (status == EBUSY)
540                 self->r_collisions++;
541             else if (status != 0) {
542                 err_abort (status, "Try read lock");
543             } else {
544                 if (data[element].data != data[element].updates)
545                     printf ("%d: data[%d] %d != %d\n",
546                         self->thread_num, element,
547                         data[element].data, data[element].updates);
548                 rwl_readunlock (&data[element].lock);
549             }
550         }
551
552         element++;
553         if (element >= DATASIZE)
554             element = 0;
555     }
556     return NULL;
557 }
558
559 int main (int argc, char *argv[])
560 {
561     int count, data_count;
562     unsigned int seed = 1;
563     int thread_updates = 0, data_updates = 0;
564     int status;
565
566 #ifdef sun
567     /*
568      * On Solaris 2.5, threads are not timesliced. To ensure
569      * that our threads can run concurrently, we need to
570      * increase the concurrency level to THREADS.
571      */
572     DPRINTF (("Setting concurrency level to %d\n", THREADS));
573     thr_setconcurrency (THREADS);
574 #endif
575
576     /*
577      * Initialize the shared data.
578      */
579     for (data_count = 0; data_count < DATASIZE; data_count++) {
580         data[data_count].data = 0;
581         data[data_count].updates = 0;
582         rwl_init (&data[data_count].lock);
583     }
584
585     /*
586      * Create THREADS threads to access shared data.
587      */
588     for (count = 0; count < THREADS; count++) {
589         threads[count].thread_num = count;
590         threads[count].r_collisions = 0;
591         threads[count].w_collisions = 0;
592         threads[count].updates = 0;
593         threads[count].interval = rand_r (&seed) % ITERATIONS;
594         status = pthread_create (&threads[count].thread_id,
595             NULL, thread_routine, (void*)&threads[count]);
596         if (status != 0)
597             err_abort (status, "Create thread");
598     }
599
600     /*
601      * Wait for all threads to complete, and collect
602      * statistics.
603      */
604     for (count = 0; count < THREADS; count++) {
605         status = pthread_join (threads[count].thread_id, NULL);
606         if (status != 0)
607             err_abort (status, "Join thread");
608         thread_updates += threads[count].updates;
609         printf ("%02d: interval %d, updates %d, "
610                 "r_collisions %d, w_collisions %d\n",
611             count, threads[count].interval,
612             threads[count].updates,
613             threads[count].r_collisions, threads[count].w_collisions);
614     }
615
616     /*
617      * Collect statistics for the data.
618      */
619     for (data_count = 0; data_count < DATASIZE; data_count++) {
620         data_updates += data[data_count].updates;
621         printf ("data %02d: value %d, %d updates\n",
622             data_count, data[data_count].data, data[data_count].updates);
623         rwl_destroy (&data[data_count].lock);
624     }
625
626     return 0;
627 }
628
629 #endif