]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/semlock.c
17Mar06
[bacula/bacula] / bacula / src / lib / semlock.c
1 /*
2  * Bacula Semaphore code. This code permits setting up
3  *  a semaphore that lets through a specified number
4  *  of callers simultaneously. Once the number of callers
5  *  exceed the limit, they block.
6  *
7  *  Kern Sibbald, March MMIII
8  *
9  *   Derived from rwlock.h which was in turn derived from code in
10  *     "Programming with POSIX Threads" By David R. Butenhof
11  &
12  *   Version $Id$
13  *
14  */
15 /*
16    Copyright (C) 2000-2004 Kern Sibbald and John Walker
17
18    This program is free software; you can redistribute it and/or
19    modify it under the terms of the GNU General Public License as
20    published by the Free Software Foundation; either version 2 of
21    the License, or (at your option) any later version.
22
23    This program is distributed in the hope that it will be useful,
24    but WITHOUT ANY WARRANTY; without even the implied warranty of
25    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26    General Public License for more details.
27
28    You should have received a copy of the GNU General Public
29    License along with this program; if not, write to the Free
30    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
31    MA 02111-1307, USA.
32
33  */
34
35 #include "bacula.h"
36
37 /*
38  * Initialize a semaphore
39  *
40  *  Returns: 0 on success
41  *           errno on failure
42  */
43 int sem_init(semlock_t *sem, int max_active)
44 {
45    int stat;
46
47    sem->active = sem->waiting = 0;
48    sem->max_active = max_active;
49    if ((stat = pthread_mutex_init(&sem->mutex, NULL)) != 0) {
50       return stat;
51    }
52    if ((stat = pthread_cond_init(&sem->wait, NULL)) != 0) {
53       pthread_mutex_destroy(&sem->mutex);
54       return stat;
55    }
56    sem->valid = SEMLOCK_VALID;
57    return 0;
58 }
59
60 /*
61  * Destroy a semaphore
62  *
63  * Returns: 0 on success
64  *          errno on failure
65  */
66 int sem_destroy(semlock_t *sem)
67 {
68    int stat, stat1;
69
70   if (sem->valid != SEMLOCK_VALID) {
71      return EINVAL;
72   }
73   if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
74      return stat;
75   }
76
77   /*
78    * If any threads are active, report EBUSY
79    */
80   if (sem->active > 0) {
81      pthread_mutex_unlock(&sem->mutex);
82      return EBUSY;
83   }
84
85   /*
86    * If any threads are waiting, report EBUSY
87    */
88   if (sem->waiting > 0) {
89      pthread_mutex_unlock(&sem->mutex);
90      return EBUSY;
91   }
92
93   sem->valid = 0;
94   if ((stat = pthread_mutex_unlock(&sem->mutex)) != 0) {
95      return stat;
96   }
97   stat  = pthread_mutex_destroy(&sem->mutex);
98   stat1 = pthread_cond_destroy(&sem->wait);
99   return (stat != 0 ? stat : stat1);
100 }
101
102 /*
103  * Handle cleanup when the wait lock condition variable
104  *    wait is released.
105  */
106 static void sem_release(void *arg)
107 {
108    semlock_t *sem = (semlock_t *)arg;
109
110    sem->waiting--;
111    pthread_mutex_unlock(&sem->mutex);
112 }
113
114
115 /*
116  * Lock semaphore, wait until locked (or error).
117  */
118 int sem_lock(semlock_t *sem)
119 {
120    int stat;
121
122    if (sem->valid != SEMLOCK_VALID) {
123       return EINVAL;
124    }
125    if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
126       return stat;
127    }
128    if (sem->active >= sem->max_active) {
129       sem->waiting++;                 /* indicate that we are waiting */
130       pthread_cleanup_push(sem_release, (void *)sem);
131       while (sem->active >= sem->max_active) {
132          if ((stat = pthread_cond_wait(&sem->wait, &sem->mutex)) != 0) {
133             break;                    /* error, bail out */
134          }
135       }
136       pthread_cleanup_pop(0);
137       sem->waiting--;                 /* we are no longer waiting */
138    }
139    if (stat == 0) {
140       sem->active++;                  /* we are running */
141    }
142    pthread_mutex_unlock(&sem->mutex);
143    return stat;
144 }
145
146 /*
147  * Attempt to lock semaphore, don't wait
148  */
149 int sem_trylock(semlock_t *sem)
150 {
151    int stat, stat1;
152
153    if (sem->valid != SEMLOCK_VALID) {
154       return EINVAL;
155    }
156    if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
157       return stat;
158    }
159
160    if (sem->active >= sem->max_active) {
161       stat = EBUSY;
162    } else {
163       sem->active++;                 /* we are running */
164    }
165    stat1 = pthread_mutex_unlock(&sem->mutex);
166    return (stat == 0 ? stat1 : stat);
167 }
168
169 /*
170  * Unlock semaphore
171  *  Start any waiting callers
172  */
173 int sem_unlock(semlock_t *sem)
174 {
175    int stat, stat1;
176
177    if (sem->valid != SEMLOCK_VALID) {
178       return EINVAL;
179    }
180    if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
181       return stat;
182    }
183    sem->active--;
184    if (sem->active < 0) {
185       Emsg0(M_ABORT, 0, _("sem_unlock by non-owner.\n"));
186    }
187    if (sem->active >= sem->max_active) {
188       stat = 0;                       /* caller(s) still active */
189    } else {
190       /* No more active, awaken someone */
191       if (sem->waiting > 0) {         /* if someone waiting */
192          stat = pthread_cond_broadcast(&sem->wait);
193       }
194    }
195    stat1 = pthread_mutex_unlock(&sem->mutex);
196    return (stat == 0 ? stat1 : stat);
197 }
198
199 #ifdef TEST_SEMLOCK
200
201 #define THREADS     5
202 #define DATASIZE   15
203 #define ITERATIONS 10000
204
205 /*
206  * Keep statics for each thread.
207  */
208 typedef struct thread_tag {
209    int thread_num;
210    pthread_t thread_id;
211    int writes;
212    int reads;
213    int interval;
214 } thread_t;
215
216 /*
217  * Semaphore lock and shared data.
218  */
219 typedef struct data_tag {
220    semlock_t lock;
221    int data;
222    int writes;
223 } data_t;
224
225 thread_t threads[THREADS];
226 data_t data[DATASIZE];
227
228 /*
229  * Thread start routine that uses semaphores locks.
230  */
231 void *thread_routine(void *arg)
232 {
233    thread_t *self = (thread_t *)arg;
234    int repeats = 0;
235    int iteration;
236    int element = 0;
237    int status;
238
239    for (iteration=0; iteration < ITERATIONS; iteration++) {
240       /*
241        * Each "self->interval" iterations, perform an
242        * update operation (write lock instead of read
243        * lock).
244        */
245       if ((iteration % self->interval) == 0) {
246          status = sem_writelock(&data[element].lock);
247          if (status != 0) {
248             Emsg1(M_ABORT, 0, _("Write lock failed. ERR=%s\n"), strerror(status));
249          }
250          data[element].data = self->thread_num;
251          data[element].writes++;
252          self->writes++;
253          status = sem_writeunlock(&data[element].lock);
254          if (status != 0) {
255             Emsg1(M_ABORT, 0, _("Write unlock failed. ERR=%s\n"), strerror(status));
256          }
257       } else {
258          /*
259           * Look at the current data element to see whether
260           * the current thread last updated it. Count the
261           * times to report later.
262           */
263           status = sem_readlock(&data[element].lock);
264           if (status != 0) {
265              Emsg1(M_ABORT, 0, _("Read lock failed. ERR=%s\n"), strerror(status));
266           }
267           self->reads++;
268           if (data[element].data == self->thread_num)
269              repeats++;
270           status = sem_readunlock(&data[element].lock);
271           if (status != 0) {
272              Emsg1(M_ABORT, 0, _("Read unlock failed. ERR=%s\n"), strerror(status));
273           }
274       }
275       element++;
276       if (element >= DATASIZE) {
277          element = 0;
278       }
279    }
280    if (repeats > 0) {
281       Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
282          self->thread_num, repeats);
283    }
284    return NULL;
285 }
286
287 int main (int argc, char *argv[])
288 {
289     int count;
290     int data_count;
291     int status;
292     unsigned int seed = 1;
293     int thread_writes = 0;
294     int data_writes = 0;
295
296 #ifdef sun
297     /*
298      * On Solaris 2.5, threads are not timesliced. To ensure
299      * that our threads can run concurrently, we need to
300      * increase the concurrency level to THREADS.
301      */
302     thr_setconcurrency (THREADS);
303 #endif
304
305     /*
306      * Initialize the shared data.
307      */
308     for (data_count = 0; data_count < DATASIZE; data_count++) {
309         data[data_count].data = 0;
310         data[data_count].writes = 0;
311         status = sem_init (&data[data_count].lock);
312         if (status != 0) {
313            Emsg1(M_ABORT, 0, _("Init rwlock failed. ERR=%s\n"), strerror(status));
314         }
315     }
316
317     /*
318      * Create THREADS threads to access shared data.
319      */
320     for (count = 0; count < THREADS; count++) {
321         threads[count].thread_num = count + 1;
322         threads[count].writes = 0;
323         threads[count].reads = 0;
324         threads[count].interval = rand_r (&seed) % 71;
325         status = pthread_create(&threads[count].thread_id,
326             NULL, thread_routine, (void*)&threads[count]);
327         if (status != 0) {
328            Emsg1(M_ABORT, 0, _("Create thread failed. ERR=%s\n"), strerror(status));
329         }
330     }
331
332     /*
333      * Wait for all threads to complete, and collect
334      * statistics.
335      */
336     for (count = 0; count < THREADS; count++) {
337         status = pthread_join (threads[count].thread_id, NULL);
338         if (status != 0) {
339            Emsg1(M_ABORT, 0, _("Join thread failed. ERR=%s\n"), strerror(status));
340         }
341         thread_writes += threads[count].writes;
342         printf (_("%02d: interval %d, writes %d, reads %d\n"),
343             count, threads[count].interval,
344             threads[count].writes, threads[count].reads);
345     }
346
347     /*
348      * Collect statistics for the data.
349      */
350     for (data_count = 0; data_count < DATASIZE; data_count++) {
351         data_writes += data[data_count].writes;
352         printf (_("data %02d: value %d, %d writes\n"),
353             data_count, data[data_count].data, data[data_count].writes);
354         sem_destroy (&data[data_count].lock);
355     }
356
357     printf (_("Total: %d thread writes, %d data writes\n"),
358         thread_writes, data_writes);
359     return 0;
360 }
361
362 #endif
363
364 #ifdef TEST_SEM_TRY_LOCK
365 /*
366  * semlock_try_main.c
367  *
368  * Demonstrate use of non-blocking read-write locks.
369  *
370  * Special notes: On a Solaris system, call thr_setconcurrency()
371  * to allow interleaved thread execution, since threads are not
372  * timesliced.
373  */
374 #include <pthread.h>
375 #include "semlock.h"
376 #include "errors.h"
377
378 #define THREADS         5
379 #define ITERATIONS      1000
380 #define DATASIZE        15
381
382 /*
383  * Keep statistics for each thread.
384  */
385 typedef struct thread_tag {
386     int         thread_num;
387     pthread_t   thread_id;
388     int         r_collisions;
389     int         w_collisions;
390     int         updates;
391     int         interval;
392 } thread_t;
393
394 /*
395  * Read-write lock and shared data
396  */
397 typedef struct data_tag {
398     semlock_t    lock;
399     int         data;
400     int         updates;
401 } data_t;
402
403 thread_t threads[THREADS];
404 data_t data[DATASIZE];
405
406 /*
407  * Thread start routine that uses read-write locks
408  */
409 void *thread_routine (void *arg)
410 {
411     thread_t *self = (thread_t*)arg;
412     int iteration;
413     int element;
414     int status;
415
416     element = 0;                        /* Current data element */
417
418     for (iteration = 0; iteration < ITERATIONS; iteration++) {
419         if ((iteration % self->interval) == 0) {
420             status = sem_writetrylock (&data[element].lock);
421             if (status == EBUSY)
422                 self->w_collisions++;
423             else if (status == 0) {
424                 data[element].data++;
425                 data[element].updates++;
426                 self->updates++;
427                 sem_writeunlock (&data[element].lock);
428             } else
429                 err_abort (status, _("Try write lock"));
430         } else {
431             status = sem_readtrylock (&data[element].lock);
432             if (status == EBUSY)
433                 self->r_collisions++;
434             else if (status != 0) {
435                 err_abort (status, _("Try read lock"));
436             } else {
437                 if (data[element].data != data[element].updates)
438                     printf ("%d: data[%d] %d != %d\n",
439                         self->thread_num, element,
440                         data[element].data, data[element].updates);
441                 sem_readunlock (&data[element].lock);
442             }
443         }
444
445         element++;
446         if (element >= DATASIZE)
447             element = 0;
448     }
449     return NULL;
450 }
451
452 int main (int argc, char *argv[])
453 {
454     int count, data_count;
455     unsigned int seed = 1;
456     int thread_updates = 0, data_updates = 0;
457     int status;
458
459 #ifdef sun
460     /*
461      * On Solaris 2.5, threads are not timesliced. To ensure
462      * that our threads can run concurrently, we need to
463      * increase the concurrency level to THREADS.
464      */
465     DPRINTF (("Setting concurrency level to %d\n", THREADS));
466     thr_setconcurrency (THREADS);
467 #endif
468
469     /*
470      * Initialize the shared data.
471      */
472     for (data_count = 0; data_count < DATASIZE; data_count++) {
473         data[data_count].data = 0;
474         data[data_count].updates = 0;
475         sem_init (&data[data_count].lock);
476     }
477
478     /*
479      * Create THREADS threads to access shared data.
480      */
481     for (count = 0; count < THREADS; count++) {
482         threads[count].thread_num = count;
483         threads[count].r_collisions = 0;
484         threads[count].w_collisions = 0;
485         threads[count].updates = 0;
486         threads[count].interval = rand_r (&seed) % ITERATIONS;
487         status = pthread_create (&threads[count].thread_id,
488             NULL, thread_routine, (void*)&threads[count]);
489         if (status != 0)
490             err_abort (status, _("Create thread"));
491     }
492
493     /*
494      * Wait for all threads to complete, and collect
495      * statistics.
496      */
497     for (count = 0; count < THREADS; count++) {
498         status = pthread_join(threads[count].thread_id, NULL);
499         if (status != 0)
500             err_abort(status, _("Join thread"));
501         thread_updates += threads[count].updates;
502         printf (_("%02d: interval %d, updates %d, "
503                 "r_collisions %d, w_collisions %d\n"),
504             count, threads[count].interval,
505             threads[count].updates,
506             threads[count].r_collisions, threads[count].w_collisions);
507     }
508
509     /*
510      * Collect statistics for the data.
511      */
512     for (data_count = 0; data_count < DATASIZE; data_count++) {
513         data_updates += data[data_count].updates;
514         printf (_("data %02d: value %d, %d updates\n"),
515             data_count, data[data_count].data, data[data_count].updates);
516         sem_destroy(&data[data_count].lock);
517     }
518
519     return 0;
520 }
521
522 #endif