2 * Bacula Semaphore code. This code permits setting up
3 * a semaphore that lets through a specified number
4 * of callers simultaneously. Once the number of callers
5 * exceed the limit, they block.
7 * Kern Sibbald, March MMIII
9 * Derived from rwlock.h which was in turn derived from code in
10 * "Programming with POSIX Threads" By David R. Butenhof
16 Copyright (C) 2000-2004 Kern Sibbald and John Walker
18 This program is free software; you can redistribute it and/or
19 modify it under the terms of the GNU General Public License as
20 published by the Free Software Foundation; either version 2 of
21 the License, or (at your option) any later version.
23 This program is distributed in the hope that it will be useful,
24 but WITHOUT ANY WARRANTY; without even the implied warranty of
25 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26 General Public License for more details.
28 You should have received a copy of the GNU General Public
29 License along with this program; if not, write to the Free
30 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
38 * Initialize a semaphore
40 * Returns: 0 on success
43 int sem_init(semlock_t *sem, int max_active)
47 sem->active = sem->waiting = 0;
48 sem->max_active = max_active;
49 if ((stat = pthread_mutex_init(&sem->mutex, NULL)) != 0) {
52 if ((stat = pthread_cond_init(&sem->wait, NULL)) != 0) {
53 pthread_mutex_destroy(&sem->mutex);
56 sem->valid = SEMLOCK_VALID;
63 * Returns: 0 on success
66 int sem_destroy(semlock_t *sem)
70 if (sem->valid != SEMLOCK_VALID) {
73 if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
78 * If any threads are active, report EBUSY
80 if (sem->active > 0) {
81 pthread_mutex_unlock(&sem->mutex);
86 * If any threads are waiting, report EBUSY
88 if (sem->waiting > 0) {
89 pthread_mutex_unlock(&sem->mutex);
94 if ((stat = pthread_mutex_unlock(&sem->mutex)) != 0) {
97 stat = pthread_mutex_destroy(&sem->mutex);
98 stat1 = pthread_cond_destroy(&sem->wait);
99 return (stat != 0 ? stat : stat1);
103 * Handle cleanup when the wait lock condition variable
106 static void sem_release(void *arg)
108 semlock_t *sem = (semlock_t *)arg;
111 pthread_mutex_unlock(&sem->mutex);
116 * Lock semaphore, wait until locked (or error).
118 int sem_lock(semlock_t *sem)
122 if (sem->valid != SEMLOCK_VALID) {
125 if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
128 if (sem->active >= sem->max_active) {
129 sem->waiting++; /* indicate that we are waiting */
130 pthread_cleanup_push(sem_release, (void *)sem);
131 while (sem->active >= sem->max_active) {
132 if ((stat = pthread_cond_wait(&sem->wait, &sem->mutex)) != 0) {
133 break; /* error, bail out */
136 pthread_cleanup_pop(0);
137 sem->waiting--; /* we are no longer waiting */
140 sem->active++; /* we are running */
142 pthread_mutex_unlock(&sem->mutex);
147 * Attempt to lock semaphore, don't wait
149 int sem_trylock(semlock_t *sem)
153 if (sem->valid != SEMLOCK_VALID) {
156 if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
160 if (sem->active >= sem->max_active) {
163 sem->active++; /* we are running */
165 stat1 = pthread_mutex_unlock(&sem->mutex);
166 return (stat == 0 ? stat1 : stat);
171 * Start any waiting callers
173 int sem_unlock(semlock_t *sem)
177 if (sem->valid != SEMLOCK_VALID) {
180 if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
184 if (sem->active < 0) {
185 Emsg0(M_ABORT, 0, _("sem_unlock by non-owner.\n"));
187 if (sem->active >= sem->max_active) {
188 stat = 0; /* caller(s) still active */
190 /* No more active, awaken someone */
191 if (sem->waiting > 0) { /* if someone waiting */
192 stat = pthread_cond_broadcast(&sem->wait);
195 stat1 = pthread_mutex_unlock(&sem->mutex);
196 return (stat == 0 ? stat1 : stat);
203 #define ITERATIONS 10000
206 * Keep statics for each thread.
208 typedef struct thread_tag {
217 * Semaphore lock and shared data.
219 typedef struct data_tag {
225 thread_t threads[THREADS];
226 data_t data[DATASIZE];
229 * Thread start routine that uses semaphores locks.
231 void *thread_routine(void *arg)
233 thread_t *self = (thread_t *)arg;
239 for (iteration=0; iteration < ITERATIONS; iteration++) {
241 * Each "self->interval" iterations, perform an
242 * update operation (write lock instead of read
245 if ((iteration % self->interval) == 0) {
246 status = sem_writelock(&data[element].lock);
248 Emsg1(M_ABORT, 0, _("Write lock failed. ERR=%s\n"), strerror(status));
250 data[element].data = self->thread_num;
251 data[element].writes++;
253 status = sem_writeunlock(&data[element].lock);
255 Emsg1(M_ABORT, 0, _("Write unlock failed. ERR=%s\n"), strerror(status));
259 * Look at the current data element to see whether
260 * the current thread last updated it. Count the
261 * times to report later.
263 status = sem_readlock(&data[element].lock);
265 Emsg1(M_ABORT, 0, _("Read lock failed. ERR=%s\n"), strerror(status));
268 if (data[element].data == self->thread_num)
270 status = sem_readunlock(&data[element].lock);
272 Emsg1(M_ABORT, 0, _("Read unlock failed. ERR=%s\n"), strerror(status));
276 if (element >= DATASIZE) {
281 Pmsg2(000, _("Thread %d found unchanged elements %d times\n"),
282 self->thread_num, repeats);
287 int main (int argc, char *argv[])
292 unsigned int seed = 1;
293 int thread_writes = 0;
298 * On Solaris 2.5, threads are not timesliced. To ensure
299 * that our threads can run concurrently, we need to
300 * increase the concurrency level to THREADS.
302 thr_setconcurrency (THREADS);
306 * Initialize the shared data.
308 for (data_count = 0; data_count < DATASIZE; data_count++) {
309 data[data_count].data = 0;
310 data[data_count].writes = 0;
311 status = sem_init (&data[data_count].lock);
313 Emsg1(M_ABORT, 0, _("Init rwlock failed. ERR=%s\n"), strerror(status));
318 * Create THREADS threads to access shared data.
320 for (count = 0; count < THREADS; count++) {
321 threads[count].thread_num = count + 1;
322 threads[count].writes = 0;
323 threads[count].reads = 0;
324 threads[count].interval = rand_r (&seed) % 71;
325 status = pthread_create(&threads[count].thread_id,
326 NULL, thread_routine, (void*)&threads[count]);
328 Emsg1(M_ABORT, 0, _("Create thread failed. ERR=%s\n"), strerror(status));
333 * Wait for all threads to complete, and collect
336 for (count = 0; count < THREADS; count++) {
337 status = pthread_join (threads[count].thread_id, NULL);
339 Emsg1(M_ABORT, 0, _("Join thread failed. ERR=%s\n"), strerror(status));
341 thread_writes += threads[count].writes;
342 printf (_("%02d: interval %d, writes %d, reads %d\n"),
343 count, threads[count].interval,
344 threads[count].writes, threads[count].reads);
348 * Collect statistics for the data.
350 for (data_count = 0; data_count < DATASIZE; data_count++) {
351 data_writes += data[data_count].writes;
352 printf (_("data %02d: value %d, %d writes\n"),
353 data_count, data[data_count].data, data[data_count].writes);
354 sem_destroy (&data[data_count].lock);
357 printf (_("Total: %d thread writes, %d data writes\n"),
358 thread_writes, data_writes);
364 #ifdef TEST_SEM_TRY_LOCK
368 * Demonstrate use of non-blocking read-write locks.
370 * Special notes: On a Solaris system, call thr_setconcurrency()
371 * to allow interleaved thread execution, since threads are not
379 #define ITERATIONS 1000
383 * Keep statistics for each thread.
385 typedef struct thread_tag {
395 * Read-write lock and shared data
397 typedef struct data_tag {
403 thread_t threads[THREADS];
404 data_t data[DATASIZE];
407 * Thread start routine that uses read-write locks
409 void *thread_routine (void *arg)
411 thread_t *self = (thread_t*)arg;
416 element = 0; /* Current data element */
418 for (iteration = 0; iteration < ITERATIONS; iteration++) {
419 if ((iteration % self->interval) == 0) {
420 status = sem_writetrylock (&data[element].lock);
422 self->w_collisions++;
423 else if (status == 0) {
424 data[element].data++;
425 data[element].updates++;
427 sem_writeunlock (&data[element].lock);
429 err_abort (status, _("Try write lock"));
431 status = sem_readtrylock (&data[element].lock);
433 self->r_collisions++;
434 else if (status != 0) {
435 err_abort (status, _("Try read lock"));
437 if (data[element].data != data[element].updates)
438 printf ("%d: data[%d] %d != %d\n",
439 self->thread_num, element,
440 data[element].data, data[element].updates);
441 sem_readunlock (&data[element].lock);
446 if (element >= DATASIZE)
452 int main (int argc, char *argv[])
454 int count, data_count;
455 unsigned int seed = 1;
456 int thread_updates = 0, data_updates = 0;
461 * On Solaris 2.5, threads are not timesliced. To ensure
462 * that our threads can run concurrently, we need to
463 * increase the concurrency level to THREADS.
465 DPRINTF (("Setting concurrency level to %d\n", THREADS));
466 thr_setconcurrency (THREADS);
470 * Initialize the shared data.
472 for (data_count = 0; data_count < DATASIZE; data_count++) {
473 data[data_count].data = 0;
474 data[data_count].updates = 0;
475 sem_init (&data[data_count].lock);
479 * Create THREADS threads to access shared data.
481 for (count = 0; count < THREADS; count++) {
482 threads[count].thread_num = count;
483 threads[count].r_collisions = 0;
484 threads[count].w_collisions = 0;
485 threads[count].updates = 0;
486 threads[count].interval = rand_r (&seed) % ITERATIONS;
487 status = pthread_create (&threads[count].thread_id,
488 NULL, thread_routine, (void*)&threads[count]);
490 err_abort (status, _("Create thread"));
494 * Wait for all threads to complete, and collect
497 for (count = 0; count < THREADS; count++) {
498 status = pthread_join(threads[count].thread_id, NULL);
500 err_abort(status, _("Join thread"));
501 thread_updates += threads[count].updates;
502 printf (_("%02d: interval %d, updates %d, "
503 "r_collisions %d, w_collisions %d\n"),
504 count, threads[count].interval,
505 threads[count].updates,
506 threads[count].r_collisions, threads[count].w_collisions);
510 * Collect statistics for the data.
512 for (data_count = 0; data_count < DATASIZE; data_count++) {
513 data_updates += data[data_count].updates;
514 printf (_("data %02d: value %d, %d updates\n"),
515 data_count, data[data_count].data, data[data_count].updates);
516 sem_destroy(&data[data_count].lock);