]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/rwlock.c
- Convert more atoi to str_to_int64() for DB.
[bacula/bacula] / bacula / src / lib / rwlock.c
1 /*
2  * Bacula Thread Read/Write locking code. It permits
3  *  multiple readers but only one writer.  Note, however,
4  *  that the writer thread is permitted to make multiple
5  *  nested write lock calls.
6  *
7  *  Kern Sibbald, January MMI
8  *
9  *   Version $Id$
10  *
11  *  This code adapted from "Programming with POSIX Threads", by
12  *    David R. Butenhof
13  *
14  */
15 /*
16    Copyright (C) 2000-2004 Kern Sibbald and John Walker
17
18    This program is free software; you can redistribute it and/or
19    modify it under the terms of the GNU General Public License as
20    published by the Free Software Foundation; either version 2 of
21    the License, or (at your option) any later version.
22
23    This program is distributed in the hope that it will be useful,
24    but WITHOUT ANY WARRANTY; without even the implied warranty of
25    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26    General Public License for more details.
27
28    You should have received a copy of the GNU General Public
29    License along with this program; if not, write to the Free
30    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
31    MA 02111-1307, USA.
32
33  */
34
35 #include "bacula.h"
36
37 /*
38  * Initialize a read/write lock
39  *
40  *  Returns: 0 on success
41  *           errno on failure
42  */
43 int rwl_init(brwlock_t *rwl)
44 {
45    int stat;
46
47    rwl->r_active = rwl->w_active = 0;
48    rwl->r_wait = rwl->w_wait = 0;
49    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
50       return stat;
51    }
52    if ((stat = pthread_cond_init(&rwl->read, NULL)) != 0) {
53       pthread_mutex_destroy(&rwl->mutex);
54       return stat;
55    }
56    if ((stat = pthread_cond_init(&rwl->write, NULL)) != 0) {
57       pthread_cond_destroy(&rwl->read);
58       pthread_mutex_destroy(&rwl->mutex);
59       return stat;
60    }
61    rwl->valid = RWLOCK_VALID;
62    return 0;
63 }
64
65 /*
66  * Destroy a read/write lock
67  *
68  * Returns: 0 on success
69  *          errno on failure
70  */
71 int rwl_destroy(brwlock_t *rwl)
72 {
73    int stat, stat1, stat2;
74
75   if (rwl->valid != RWLOCK_VALID) {
76      return EINVAL;
77   }
78   if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
79      return stat;
80   }
81
82   /*
83    * If any threads are active, report EBUSY
84    */
85   if (rwl->r_active > 0 || rwl->w_active) {
86      pthread_mutex_unlock(&rwl->mutex);
87      return EBUSY;
88   }
89
90   /*
91    * If any threads are waiting, report EBUSY
92    */
93   if (rwl->r_wait > 0 || rwl->w_wait > 0) {
94      pthread_mutex_unlock(&rwl->mutex);
95      return EBUSY;
96   }
97
98   rwl->valid = 0;
99   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
100      return stat;
101   }
102   stat  = pthread_mutex_destroy(&rwl->mutex);
103   stat1 = pthread_cond_destroy(&rwl->read);
104   stat2 = pthread_cond_destroy(&rwl->write);
105   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
106 }
107
108 /*
109  * Handle cleanup when the read lock condition variable
110  * wait is released.
111  */
112 static void rwl_read_release(void *arg)
113 {
114    brwlock_t *rwl = (brwlock_t *)arg;
115
116    rwl->r_wait--;
117    pthread_mutex_unlock(&rwl->mutex);
118 }
119
120 /*
121  * Handle cleanup when the write lock condition variable wait
122  * is released.
123  */
124 static void rwl_write_release(void *arg)
125 {
126    brwlock_t *rwl = (brwlock_t *)arg;
127
128    rwl->w_wait--;
129    pthread_mutex_unlock(&rwl->mutex);
130 }
131
132 /*
133  * Lock for read access, wait until locked (or error).
134  */
135 int rwl_readlock(brwlock_t *rwl)
136 {
137    int stat;
138
139    if (rwl->valid != RWLOCK_VALID) {
140       return EINVAL;
141    }
142    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
143       return stat;
144    }
145    if (rwl->w_active) {
146       rwl->r_wait++;                  /* indicate that we are waiting */
147       pthread_cleanup_push(rwl_read_release, (void *)rwl);
148       while (rwl->w_active) {
149          stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
150          if (stat != 0) {
151             break;                    /* error, bail out */
152          }
153       }
154       pthread_cleanup_pop(0);
155       rwl->r_wait--;                  /* we are no longer waiting */
156    }
157    if (stat == 0) {
158       rwl->r_active++;                /* we are running */
159    }
160    pthread_mutex_unlock(&rwl->mutex);
161    return stat;
162 }
163
164 /*
165  * Attempt to lock for read access, don't wait
166  */
167 int rwl_readtrylock(brwlock_t *rwl)
168 {
169    int stat, stat2;
170
171    if (rwl->valid != RWLOCK_VALID) {
172       return EINVAL;
173    }
174    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
175       return stat;
176    }
177    if (rwl->w_active) {
178       stat = EBUSY;
179    } else {
180       rwl->r_active++;                /* we are running */
181    }
182    stat2 = pthread_mutex_unlock(&rwl->mutex);
183    return (stat == 0 ? stat2 : stat);
184 }
185
186 /*
187  * Unlock read lock
188  */
189 int rwl_readunlock(brwlock_t *rwl)
190 {
191    int stat, stat2;
192
193    if (rwl->valid != RWLOCK_VALID) {
194       return EINVAL;
195    }
196    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
197       return stat;
198    }
199    rwl->r_active--;
200    if (rwl->r_active == 0 && rwl->w_wait > 0) { /* if writers waiting */
201       stat = pthread_cond_broadcast(&rwl->write);
202    }
203    stat2 = pthread_mutex_unlock(&rwl->mutex);
204    return (stat == 0 ? stat2 : stat);
205 }
206
207
208 /*
209  * Lock for write access, wait until locked (or error).
210  *   Multiple nested write locking is permitted.
211  */
212 int rwl_writelock(brwlock_t *rwl)
213 {
214    int stat;
215
216    if (rwl->valid != RWLOCK_VALID) {
217       return EINVAL;
218    }
219    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
220       return stat;
221    }
222    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
223       rwl->w_active++;
224       pthread_mutex_unlock(&rwl->mutex);
225       return 0;
226    }
227    if (rwl->w_active || rwl->r_active > 0) {
228       rwl->w_wait++;                  /* indicate that we are waiting */
229       pthread_cleanup_push(rwl_write_release, (void *)rwl);
230       while (rwl->w_active || rwl->r_active > 0) {
231          if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
232             break;                    /* error, bail out */
233          }
234       }
235       pthread_cleanup_pop(0);
236       rwl->w_wait--;                  /* we are no longer waiting */
237    }
238    if (stat == 0) {
239       rwl->w_active++;                /* we are running */
240       rwl->writer_id = pthread_self(); /* save writer thread's id */
241    }
242    pthread_mutex_unlock(&rwl->mutex);
243    return stat;
244 }
245
246 /*
247  * Attempt to lock for write access, don't wait
248  */
249 int rwl_writetrylock(brwlock_t *rwl)
250 {
251    int stat, stat2;
252
253    if (rwl->valid != RWLOCK_VALID) {
254       return EINVAL;
255    }
256    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
257       return stat;
258    }
259    if (rwl->w_active && pthread_equal(rwl->writer_id, pthread_self())) {
260       rwl->w_active++;
261       pthread_mutex_unlock(&rwl->mutex);
262       return 0;
263    }
264    if (rwl->w_active || rwl->r_active > 0) {
265       stat = EBUSY;
266    } else {
267       rwl->w_active = 1;              /* we are running */
268       rwl->writer_id = pthread_self(); /* save writer thread's id */
269    }
270    stat2 = pthread_mutex_unlock(&rwl->mutex);
271    return (stat == 0 ? stat2 : stat);
272 }
273
274 /*
275  * Unlock write lock
276  *  Start any waiting writers in preference to waiting readers
277  */
278 int rwl_writeunlock(brwlock_t *rwl)
279 {
280    int stat, stat2;
281
282    if (rwl->valid != RWLOCK_VALID) {
283       return EINVAL;
284    }
285    if ((stat = pthread_mutex_lock(&rwl->mutex)) != 0) {
286       return stat;
287    }
288    if (rwl->w_active <= 0) {
289       Emsg0(M_ABORT, 0, "rwl_writeunlock called too many times.\n");
290    }
291    rwl->w_active--;
292    if (!pthread_equal(pthread_self(), rwl->writer_id)) {
293       Emsg0(M_ABORT, 0, "rwl_writeunlock by non-owner.\n");
294    }
295    if (rwl->w_active > 0) {
296       stat = 0;                       /* writers still active */
297    } else {
298       /* No more writers, awaken someone */
299       if (rwl->r_wait > 0) {         /* if readers waiting */
300          stat = pthread_cond_broadcast(&rwl->read);
301       } else if (rwl->w_wait > 0) {
302          stat = pthread_cond_broadcast(&rwl->write);
303       }
304    }
305    stat2 = pthread_mutex_unlock(&rwl->mutex);
306    return (stat == 0 ? stat2 : stat);
307 }
308
309 #ifdef TEST_RWLOCK
310
311 #define THREADS     5
312 #define DATASIZE   15
313 #define ITERATIONS 10000
314
315 /*
316  * Keep statics for each thread.
317  */
318 typedef struct thread_tag {
319    int thread_num;
320    pthread_t thread_id;
321    int writes;
322    int reads;
323    int interval;
324 } thread_t;
325
326 /*
327  * Read/write lock and shared data.
328  */
329 typedef struct data_tag {
330    brwlock_t lock;
331    int data;
332    int writes;
333 } data_t;
334
335 thread_t threads[THREADS];
336 data_t data[DATASIZE];
337
338 /*
339  * Thread start routine that uses read/write locks.
340  */
341 void *thread_routine(void *arg)
342 {
343    thread_t *self = (thread_t *)arg;
344    int repeats = 0;
345    int iteration;
346    int element = 0;
347    int status;
348
349    for (iteration=0; iteration < ITERATIONS; iteration++) {
350       /*
351        * Each "self->interval" iterations, perform an
352        * update operation (write lock instead of read
353        * lock).
354        */
355       if ((iteration % self->interval) == 0) {
356          status = rwl_writelock(&data[element].lock);
357          if (status != 0) {
358             Emsg1(M_ABORT, 0, "Write lock failed. ERR=%s\n", strerror(status));
359          }
360          data[element].data = self->thread_num;
361          data[element].writes++;
362          self->writes++;
363          status = rwl_writeunlock(&data[element].lock);
364          if (status != 0) {
365             Emsg1(M_ABORT, 0, "Write unlock failed. ERR=%s\n", strerror(status));
366          }
367       } else {
368          /*
369           * Look at the current data element to see whether
370           * the current thread last updated it. Count the
371           * times to report later.
372           */
373           status = rwl_readlock(&data[element].lock);
374           if (status != 0) {
375              Emsg1(M_ABORT, 0, "Read lock failed. ERR=%s\n", strerror(status));
376           }
377           self->reads++;
378           if (data[element].data == self->thread_num)
379              repeats++;
380           status = rwl_readunlock(&data[element].lock);
381           if (status != 0) {
382              Emsg1(M_ABORT, 0, "Read unlock failed. ERR=%s\n", strerror(status));
383           }
384       }
385       element++;
386       if (element >= DATASIZE) {
387          element = 0;
388       }
389    }
390    if (repeats > 0) {
391       Pmsg2(000, "Thread %d found unchanged elements %d times\n",
392          self->thread_num, repeats);
393    }
394    return NULL;
395 }
396
397 int main (int argc, char *argv[])
398 {
399     int count;
400     int data_count;
401     int status;
402     unsigned int seed = 1;
403     int thread_writes = 0;
404     int data_writes = 0;
405
406 #ifdef sun
407     /*
408      * On Solaris 2.5, threads are not timesliced. To ensure
409      * that our threads can run concurrently, we need to
410      * increase the concurrency level to THREADS.
411      */
412     thr_setconcurrency (THREADS);
413 #endif
414
415     /*
416      * Initialize the shared data.
417      */
418     for (data_count = 0; data_count < DATASIZE; data_count++) {
419         data[data_count].data = 0;
420         data[data_count].writes = 0;
421         status = rwl_init (&data[data_count].lock);
422         if (status != 0) {
423            Emsg1(M_ABORT, 0, "Init rwlock failed. ERR=%s\n", strerror(status));
424         }
425     }
426
427     /*
428      * Create THREADS threads to access shared data.
429      */
430     for (count = 0; count < THREADS; count++) {
431         threads[count].thread_num = count + 1;
432         threads[count].writes = 0;
433         threads[count].reads = 0;
434         threads[count].interval = rand_r (&seed) % 71;
435         status = pthread_create (&threads[count].thread_id,
436             NULL, thread_routine, (void*)&threads[count]);
437         if (status != 0) {
438            Emsg1(M_ABORT, 0, "Create thread failed. ERR=%s\n", strerror(status));
439         }
440     }
441
442     /*
443      * Wait for all threads to complete, and collect
444      * statistics.
445      */
446     for (count = 0; count < THREADS; count++) {
447         status = pthread_join (threads[count].thread_id, NULL);
448         if (status != 0) {
449            Emsg1(M_ABORT, 0, "Join thread failed. ERR=%s\n", strerror(status));
450         }
451         thread_writes += threads[count].writes;
452         printf ("%02d: interval %d, writes %d, reads %d\n",
453             count, threads[count].interval,
454             threads[count].writes, threads[count].reads);
455     }
456
457     /*
458      * Collect statistics for the data.
459      */
460     for (data_count = 0; data_count < DATASIZE; data_count++) {
461         data_writes += data[data_count].writes;
462         printf ("data %02d: value %d, %d writes\n",
463             data_count, data[data_count].data, data[data_count].writes);
464         rwl_destroy (&data[data_count].lock);
465     }
466
467     printf ("Total: %d thread writes, %d data writes\n",
468         thread_writes, data_writes);
469     return 0;
470 }
471
472 #endif
473
474 #ifdef TEST_RW_TRY_LOCK
475 /*
476  * brwlock_try_main.c
477  *
478  * Demonstrate use of non-blocking read-write locks.
479  *
480  * Special notes: On a Solaris system, call thr_setconcurrency()
481  * to allow interleaved thread execution, since threads are not
482  * timesliced.
483  */
484 #include <pthread.h>
485 #include "rwlock.h"
486 #include "errors.h"
487
488 #define THREADS         5
489 #define ITERATIONS      1000
490 #define DATASIZE        15
491
492 /*
493  * Keep statistics for each thread.
494  */
495 typedef struct thread_tag {
496     int         thread_num;
497     pthread_t   thread_id;
498     int         r_collisions;
499     int         w_collisions;
500     int         updates;
501     int         interval;
502 } thread_t;
503
504 /*
505  * Read-write lock and shared data
506  */
507 typedef struct data_tag {
508     brwlock_t    lock;
509     int         data;
510     int         updates;
511 } data_t;
512
513 thread_t threads[THREADS];
514 data_t data[DATASIZE];
515
516 /*
517  * Thread start routine that uses read-write locks
518  */
519 void *thread_routine (void *arg)
520 {
521     thread_t *self = (thread_t*)arg;
522     int iteration;
523     int element;
524     int status;
525
526     element = 0;                        /* Current data element */
527
528     for (iteration = 0; iteration < ITERATIONS; iteration++) {
529         if ((iteration % self->interval) == 0) {
530             status = rwl_writetrylock (&data[element].lock);
531             if (status == EBUSY)
532                 self->w_collisions++;
533             else if (status == 0) {
534                 data[element].data++;
535                 data[element].updates++;
536                 self->updates++;
537                 rwl_writeunlock (&data[element].lock);
538             } else
539                 err_abort (status, "Try write lock");
540         } else {
541             status = rwl_readtrylock (&data[element].lock);
542             if (status == EBUSY)
543                 self->r_collisions++;
544             else if (status != 0) {
545                 err_abort (status, "Try read lock");
546             } else {
547                 if (data[element].data != data[element].updates)
548                     printf ("%d: data[%d] %d != %d\n",
549                         self->thread_num, element,
550                         data[element].data, data[element].updates);
551                 rwl_readunlock (&data[element].lock);
552             }
553         }
554
555         element++;
556         if (element >= DATASIZE)
557             element = 0;
558     }
559     return NULL;
560 }
561
562 int main (int argc, char *argv[])
563 {
564     int count, data_count;
565     unsigned int seed = 1;
566     int thread_updates = 0, data_updates = 0;
567     int status;
568
569 #ifdef sun
570     /*
571      * On Solaris 2.5, threads are not timesliced. To ensure
572      * that our threads can run concurrently, we need to
573      * increase the concurrency level to THREADS.
574      */
575     DPRINTF (("Setting concurrency level to %d\n", THREADS));
576     thr_setconcurrency (THREADS);
577 #endif
578
579     /*
580      * Initialize the shared data.
581      */
582     for (data_count = 0; data_count < DATASIZE; data_count++) {
583         data[data_count].data = 0;
584         data[data_count].updates = 0;
585         rwl_init (&data[data_count].lock);
586     }
587
588     /*
589      * Create THREADS threads to access shared data.
590      */
591     for (count = 0; count < THREADS; count++) {
592         threads[count].thread_num = count;
593         threads[count].r_collisions = 0;
594         threads[count].w_collisions = 0;
595         threads[count].updates = 0;
596         threads[count].interval = rand_r (&seed) % ITERATIONS;
597         status = pthread_create (&threads[count].thread_id,
598             NULL, thread_routine, (void*)&threads[count]);
599         if (status != 0)
600             err_abort (status, "Create thread");
601     }
602
603     /*
604      * Wait for all threads to complete, and collect
605      * statistics.
606      */
607     for (count = 0; count < THREADS; count++) {
608         status = pthread_join (threads[count].thread_id, NULL);
609         if (status != 0)
610             err_abort (status, "Join thread");
611         thread_updates += threads[count].updates;
612         printf ("%02d: interval %d, updates %d, "
613                 "r_collisions %d, w_collisions %d\n",
614             count, threads[count].interval,
615             threads[count].updates,
616             threads[count].r_collisions, threads[count].w_collisions);
617     }
618
619     /*
620      * Collect statistics for the data.
621      */
622     for (data_count = 0; data_count < DATASIZE; data_count++) {
623         data_updates += data[data_count].updates;
624         printf ("data %02d: value %d, %d updates\n",
625             data_count, data[data_count].data, data[data_count].updates);
626         rwl_destroy (&data[data_count].lock);
627     }
628
629     return 0;
630 }
631
632 #endif