2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2017 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Bacula worker class. It permits creating a worker thread,
21 * then sending data via a fifo queue to it.
23 * Kern Sibbald, August 2014
27 #define LOCKMGR_COMPLIANT
31 int worker::init(int fifo_size)
35 if ((stat = pthread_mutex_init(&mutex, NULL)) != 0) {
38 if ((stat = pthread_mutex_init(&fmutex, NULL)) != 0) {
39 pthread_mutex_destroy(&mutex);
42 if ((stat = pthread_cond_init(&full_wait, NULL)) != 0) {
43 pthread_mutex_destroy(&mutex);
44 pthread_mutex_destroy(&fmutex);
47 if ((stat = pthread_cond_init(&empty_wait, NULL)) != 0) {
48 pthread_cond_destroy(&full_wait);
49 pthread_mutex_destroy(&mutex);
50 pthread_mutex_destroy(&fmutex);
53 if ((stat = pthread_cond_init(&m_wait, NULL)) != 0) {
54 pthread_cond_destroy(&empty_wait);
55 pthread_cond_destroy(&full_wait);
56 pthread_mutex_destroy(&mutex);
57 pthread_mutex_destroy(&fmutex);
61 fifo = New(flist(fifo_size));
62 fpool = New(alist(fifo_size + 2, false));
63 worker_running = false;
69 * Handle cleanup when the lock is released.
71 static void worker_cleanup(void *arg)
73 worker *wrk = (worker *)arg;
78 void worker::release_lock()
80 pthread_mutex_unlock(&mutex);
84 void worker::set_wait_state()
86 m_state = WORKER_WAIT;
89 void worker::set_run_state()
91 if (is_quit_state()) return;
94 pthread_cond_signal(&m_wait);
98 void worker::set_quit_state()
101 m_state = WORKER_QUIT;
102 pthread_cond_signal(&m_wait);
103 pthread_cond_signal(&empty_wait);
108 /* Empty the fifo putting in free pool */
109 void worker::discard_queue()
115 while ((item = fifo->dequeue())) {
123 * Destroy a read/write lock
125 * Returns: 0 on success
128 int worker::destroy()
130 int stat, stat1, stat2, stat3, stat4;
133 m_state = WORKER_QUIT;
134 pthread_cond_signal(&m_wait);
135 pthread_cond_signal(&empty_wait);
138 /* Release free pool */
139 while ((item = (POOLMEM *)fpool->pop())) {
140 free_pool_memory(item);
146 /* Release work queue */
147 while ((item = (POOLMEM *)fifo->dequeue())) {
148 free_pool_memory(item);
151 worker_running = false;
156 stat = pthread_mutex_destroy(&mutex);
157 stat1 = pthread_mutex_destroy(&fmutex);
158 stat2 = pthread_cond_destroy(&full_wait);
159 stat3 = pthread_cond_destroy(&empty_wait);
160 stat4 = pthread_cond_destroy(&m_wait);
161 if (stat != 0) return stat;
162 if (stat1 != 0) return stat1;
163 if (stat2 != 0) return stat2;
164 if (stat3 != 0) return stat3;
165 if (stat4 != 0) return stat4;
170 /* Start the worker thread */
171 int worker::start(void *(*auser_sub)(void *), void *auser_ctx)
175 if (valid != WORKER_VALID) {
178 user_sub = auser_sub;
179 user_ctx = auser_ctx;
180 if ((stat = pthread_create(&worker_id, NULL, user_sub, this) != 0)) {
183 /* Wait for thread to start, but not too long */
184 for (i=0; i<100 && !is_running(); i++) {
185 bmicrosleep(0, 5000);
191 /* Wait for the worker thread to empty the queue */
192 void worker::wait_queue_empty()
194 if (is_quit_state()) {
198 while (!empty() && !is_quit_state()) {
199 pthread_cond_wait(&empty_wait, &mutex);
205 /* Wait for the main thread to release us */
209 pthread_cleanup_push(worker_cleanup, (void *)this);
210 while (is_wait_state() && !is_quit_state()) {
211 worker_waiting = true;
212 pthread_cond_signal(&m_wait);
213 pthread_cond_wait(&m_wait, &mutex);
215 pthread_cleanup_pop(0);
216 worker_waiting = false;
220 /* Stop the worker thread */
223 if (valid != WORKER_VALID) {
226 m_state = WORKER_QUIT;
227 pthread_cond_signal(&m_wait);
228 pthread_cond_signal(&empty_wait);
230 if (!pthread_equal(worker_id, pthread_self())) {
231 pthread_cancel(worker_id);
232 pthread_join(worker_id, NULL);
239 * Queue an item for the worker thread. Called by main thread.
241 bool worker::queue(void *item)
243 bool was_empty = false;;
245 if (valid != WORKER_VALID || is_quit_state()) {
250 //pthread_cleanup_push(worker_cleanup, (void *)this);
251 while (full() && !is_quit_state()) {
252 pthread_cond_wait(&full_wait, &mutex);
254 //pthread_cleanup_pop(0);
255 /* Maybe this should be worker_running */
257 if (!fifo->queue(item)) {
258 /* Since we waited for !full this cannot happen */
260 ASSERT2(1, "Fifo queue failed.\n");
263 pthread_cond_signal(&empty_wait);
265 m_state = WORKER_RUN;
266 if (worker_waiting) {
267 pthread_cond_signal(&m_wait);
274 * Wait for work to complete
276 void worker::finish_work()
279 while (!empty() && !is_quit_state()) {
280 pthread_cond_wait(&empty_wait, &mutex);
282 done = true; /* Tell worker that work is done */
283 m_state = WORKER_WAIT; /* force worker into wait state */
284 V(mutex); /* pause for state transition */
285 if (waiting_on_empty) pthread_cond_signal(&empty_wait);
287 /* Wait until worker in wait state */
288 while (!worker_waiting && !is_quit_state()) {
289 if (waiting_on_empty) pthread_cond_signal(&empty_wait);
290 pthread_cond_wait(&m_wait, &mutex);
297 * Dequeue a work item. Called by worker thread.
299 void *worker::dequeue()
301 bool was_full = false;;
304 if (valid != WORKER_VALID || done || is_quit_state()) {
308 //pthread_cleanup_push(worker_cleanup, (void *)this);
309 while (empty() && !done && !is_quit_state()) {
310 waiting_on_empty = true;
311 pthread_cond_wait(&empty_wait, &mutex);
313 waiting_on_empty = false;
314 //pthread_cleanup_pop(0);
316 item = fifo->dequeue();
318 pthread_cond_signal(&full_wait);
321 pthread_cond_signal(&empty_wait);
328 * Pop a free buffer from the list, if one exists.
329 * Called by main thread to get a free buffer.
330 * If none exists (NULL returned), it must allocate
333 void *worker::pop_free_buffer()
338 free_buf = fpool->pop();
344 * Once a work item (buffer) has been processed by the
345 * worker thread, it will put it on the free buffer list
348 void worker::push_free_buffer(void *buf)
356 //=================================================
360 void *worker_prog(void *wctx)
363 worker *wrk = (worker *)wctx;
367 while (!wrk->is_quit_state()) {
368 if (wrk->is_wait_state()) {
372 buf = (POOLMEM *)wrk->dequeue();
374 printf("worker: got null stop\n");
377 printf("ctx=%lld worker: %s\n", (long long int)wrk->get_ctx(), buf);
378 wrk->push_free_buffer(buf);
380 printf("worker: asked to stop");
384 int main(int argc, char *argv[])
391 wrk = New(worker(10));
393 wrk->start(worker_prog, ctx);
395 for (i=1; i<=40; i++) {
396 buf = (POOLMEM *)wrk->pop_free_buffer();
398 buf = get_pool_memory(PM_BSOCK);
399 printf("Alloc %p\n", buf);
401 sprintf(buf, "This is item %d", i);
403 //printf("back from queue %d\n", i);
405 wrk->wait_queue_empty();
406 wrk->set_wait_state();
408 for (i=1; i<=5; i++) {
409 buf = (POOLMEM *)wrk->pop_free_buffer();
411 buf = get_pool_memory(PM_BSOCK);
412 printf("Alloc %p\n", buf);
414 sprintf(buf, "This is item %d", i);
416 //printf("back from queue %d\n", i);
418 wrk->set_run_state();
419 for (i=6; i<=40; i++) {
420 buf = (POOLMEM *)wrk->pop_free_buffer();
422 buf = get_pool_memory(PM_BSOCK);
423 printf("Alloc %p\n", buf);
425 sprintf(buf, "This is item %d", i);
427 //printf("back from queue %d\n", i);
429 wrk->wait_queue_empty();
435 sm_dump(false); /* test program */