]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/worker.c
Big backport from Enterprise
[bacula/bacula] / bacula / src / lib / worker.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula worker class. It permits creating a worker thread,
21  *  then sending data via a fifo queue to it.
22  *
23  *  Kern Sibbald, August 2014
24  *
25  */
26
27 #define LOCKMGR_COMPLIANT
28 #include "bacula.h"
29 #include "worker.h"
30
31 int worker::init(int fifo_size)
32 {
33    int stat;
34
35    if ((stat = pthread_mutex_init(&mutex, NULL)) != 0) {
36       return stat;
37    }
38    if ((stat = pthread_mutex_init(&fmutex, NULL)) != 0) {
39       pthread_mutex_destroy(&mutex);
40       return stat;
41    }
42    if ((stat = pthread_cond_init(&full_wait, NULL)) != 0) {
43       pthread_mutex_destroy(&mutex);
44       pthread_mutex_destroy(&fmutex);
45       return stat;
46    }
47    if ((stat = pthread_cond_init(&empty_wait, NULL)) != 0) {
48       pthread_cond_destroy(&full_wait);
49       pthread_mutex_destroy(&mutex);
50       pthread_mutex_destroy(&fmutex);
51       return stat;
52    }
53    if ((stat = pthread_cond_init(&m_wait, NULL)) != 0) {
54       pthread_cond_destroy(&empty_wait);
55       pthread_cond_destroy(&full_wait);
56       pthread_mutex_destroy(&mutex);
57       pthread_mutex_destroy(&fmutex);
58       return stat;
59    }
60    valid = WORKER_VALID;
61    fifo = New(flist(fifo_size));
62    fpool = New(alist(fifo_size + 2, false));
63    worker_running = false;
64    set_wait_state();
65    return 0;
66 }
67
68 /*
69  * Handle cleanup when the lock is released.
70  */
71 static void worker_cleanup(void *arg)
72 {
73    worker *wrk = (worker *)arg;
74    wrk->release_lock();
75 }
76
77
78 void worker::release_lock()
79 {
80    pthread_mutex_unlock(&mutex);
81 }
82
83
84 void worker::set_wait_state()
85 {
86    m_state = WORKER_WAIT;
87 }
88
89 void worker::set_run_state()
90 {
91    if (is_quit_state()) return;
92    m_state = WORKER_RUN;
93    if (worker_waiting) {
94       pthread_cond_signal(&m_wait);
95    }
96 }
97       
98 void worker::set_quit_state()
99 {
100    P(mutex);
101    m_state = WORKER_QUIT;
102    pthread_cond_signal(&m_wait);
103    pthread_cond_signal(&empty_wait);
104    V(mutex);
105 }
106
107
108 /* Empty the fifo putting in free pool */
109 void worker::discard_queue()
110 {
111   void *item;
112
113   P(mutex);
114   P(fmutex);
115   while ((item = fifo->dequeue())) {
116      fpool->push(item);
117   }
118   V(fmutex);
119   V(mutex);
120 }
121
122 /*
123  * Destroy a read/write lock
124  *
125  * Returns: 0 on success
126  *          errno on failure
127  */
128 int worker::destroy()
129 {
130    int stat, stat1, stat2, stat3, stat4;
131    POOLMEM *item;
132
133    m_state = WORKER_QUIT;
134    pthread_cond_signal(&m_wait);
135    pthread_cond_signal(&empty_wait);
136
137    P(fmutex);
138    /* Release free pool */
139    while ((item = (POOLMEM *)fpool->pop())) {
140       free_pool_memory(item);
141    }
142    V(fmutex);
143    fpool->destroy();   
144    free(fpool);
145
146    /* Release work queue */
147    while ((item = (POOLMEM *)fifo->dequeue())) {
148       free_pool_memory(item);
149    }
150    valid = 0;
151    worker_running = false;
152
153    fifo->destroy();
154    free(fifo);
155
156    stat  = pthread_mutex_destroy(&mutex);
157    stat1 = pthread_mutex_destroy(&fmutex);
158    stat2 = pthread_cond_destroy(&full_wait);
159    stat3 = pthread_cond_destroy(&empty_wait);
160    stat4 = pthread_cond_destroy(&m_wait);
161    if (stat != 0) return stat;
162    if (stat1 != 0) return stat1;
163    if (stat2 != 0) return stat2;
164    if (stat3 != 0) return stat3;
165    if (stat4 != 0) return stat4;
166    return 0;
167 }
168
169
170 /* Start the worker thread */
171 int worker::start(void *(*auser_sub)(void *), void *auser_ctx)
172 {
173    int stat;
174    int i;
175    if (valid != WORKER_VALID) {
176       return EINVAL;
177    }
178    user_sub = auser_sub;
179    user_ctx = auser_ctx;
180    if ((stat = pthread_create(&worker_id, NULL, user_sub, this) != 0)) {
181       return stat;
182    }
183    /* Wait for thread to start, but not too long */
184    for (i=0; i<100 && !is_running(); i++) {
185       bmicrosleep(0, 5000);
186    }
187    set_run_state();
188    return 0;
189 }
190
191 /* Wait for the worker thread to empty the queue */
192 void worker::wait_queue_empty()
193 {
194    if (is_quit_state()) {
195       return;
196    }
197    P(mutex);
198    while (!empty() && !is_quit_state()) {
199       pthread_cond_wait(&empty_wait, &mutex);
200    }
201    V(mutex);
202    return;
203 }
204
205 /* Wait for the main thread to release us */
206 void worker::wait()
207 {
208    P(mutex);
209    pthread_cleanup_push(worker_cleanup, (void *)this);
210    while (is_wait_state() && !is_quit_state()) {
211       worker_waiting = true;
212       pthread_cond_signal(&m_wait);
213       pthread_cond_wait(&m_wait, &mutex);
214    }
215    pthread_cleanup_pop(0);
216    worker_waiting = false;
217    V(mutex);
218 }
219
220 /* Stop the worker thread */
221 int worker::stop()
222 {
223    if (valid != WORKER_VALID) {
224       return EINVAL;
225    }
226    m_state = WORKER_QUIT;
227    pthread_cond_signal(&m_wait);
228    pthread_cond_signal(&empty_wait);
229
230    if (!pthread_equal(worker_id, pthread_self())) {
231       pthread_cancel(worker_id);
232       pthread_join(worker_id, NULL);
233    }
234    return 0;
235 }
236
237
238 /*
239  * Queue an item for the worker thread. Called by main thread.
240  */
241 bool worker::queue(void *item)
242 {
243    bool was_empty = false;;
244
245    if (valid != WORKER_VALID || is_quit_state()) {
246       return EINVAL;
247    }
248    P(mutex);
249    done = false;
250    //pthread_cleanup_push(worker_cleanup, (void *)this);
251    while (full() && !is_quit_state()) {
252       pthread_cond_wait(&full_wait, &mutex);
253    }
254    //pthread_cleanup_pop(0);
255    /* Maybe this should be worker_running */
256    was_empty = empty();
257    if (!fifo->queue(item)) {
258       /* Since we waited for !full this cannot happen */
259       V(mutex);
260       ASSERT2(1, "Fifo queue failed.\n");
261    }
262    if (was_empty) {
263       pthread_cond_signal(&empty_wait);
264    }
265    m_state = WORKER_RUN;
266    if (worker_waiting) {
267       pthread_cond_signal(&m_wait);
268    }
269    V(mutex);
270    return 1;
271 }
272
273 /*
274  * Wait for work to complete
275  */
276 void worker::finish_work()
277 {
278    P(mutex);
279    while (!empty() && !is_quit_state()) {
280       pthread_cond_wait(&empty_wait, &mutex);
281    }
282    done = true;                       /* Tell worker that work is done */
283    m_state = WORKER_WAIT;             /* force worker into wait state */
284    V(mutex);                          /* pause for state transition */
285    if (waiting_on_empty) pthread_cond_signal(&empty_wait);
286    P(mutex);
287    /* Wait until worker in wait state */
288    while (!worker_waiting && !is_quit_state()) {
289       if (waiting_on_empty) pthread_cond_signal(&empty_wait);
290       pthread_cond_wait(&m_wait, &mutex);
291    }
292    V(mutex);   
293    discard_queue();
294 }
295
296 /*
297  * Dequeue a work item. Called by worker thread.
298  */
299 void *worker::dequeue()
300 {
301    bool was_full = false;;
302    void *item = NULL;
303
304    if (valid != WORKER_VALID || done || is_quit_state()) {
305       return NULL;
306    }
307    P(mutex);
308    //pthread_cleanup_push(worker_cleanup, (void *)this);
309    while (empty() && !done && !is_quit_state()) {
310       waiting_on_empty = true;
311       pthread_cond_wait(&empty_wait, &mutex);
312    }
313    waiting_on_empty = false;
314    //pthread_cleanup_pop(0);
315    was_full = full();
316    item = fifo->dequeue();
317    if (was_full) {
318       pthread_cond_signal(&full_wait);
319    }
320    if (empty()) {
321       pthread_cond_signal(&empty_wait);
322    }
323    V(mutex);
324    return item;
325 }
326
327 /*
328  * Pop a free buffer from the list, if one exists.
329  *  Called by main thread to get a free buffer.
330  *  If none exists (NULL returned), it must allocate
331  *  one.
332  */
333 void *worker::pop_free_buffer()
334 {
335    void *free_buf;
336
337    P(fmutex);
338    free_buf = fpool->pop();
339    V(fmutex);
340    return free_buf;
341 }
342
343 /*
344  * Once a work item (buffer) has been processed by the
345  *  worker thread, it will put it on the free buffer list
346  *  (fpool).
347  */
348 void worker::push_free_buffer(void *buf)
349 {
350    P(fmutex);
351    fpool->push(buf);
352    V(fmutex);
353 }
354
355
356 //=================================================
357
358 #ifdef TEST_PROGRAM
359
360 void *worker_prog(void *wctx)
361 {
362    POOLMEM *buf;
363    worker *wrk = (worker *)wctx;
364
365    wrk->set_running();
366
367    while (!wrk->is_quit_state()) {
368       if (wrk->is_wait_state()) {
369          wrk->wait();
370          continue;
371       }   
372       buf = (POOLMEM *)wrk->dequeue();
373       if (!buf) {
374          printf("worker: got null stop\n");
375          return NULL;
376       }
377       printf("ctx=%lld worker: %s\n", (long long int)wrk->get_ctx(), buf);
378       wrk->push_free_buffer(buf);
379    }
380    printf("worker: asked to stop");
381    return NULL;
382 }
383
384 int main(int argc, char *argv[])
385 {
386    POOLMEM *buf;
387    int i;
388    worker *wrk;
389    void *ctx;
390
391    wrk = New(worker(10));
392    ctx = (void *)1;
393    wrk->start(worker_prog, ctx);
394
395    for (i=1; i<=40; i++) {
396       buf = (POOLMEM *)wrk->pop_free_buffer();
397       if (!buf) {
398          buf = get_pool_memory(PM_BSOCK);
399          printf("Alloc %p\n", buf);
400       }
401       sprintf(buf, "This is item %d", i);
402       wrk->queue(buf);
403       //printf("back from queue %d\n", i);
404    }
405    wrk->wait_queue_empty();
406    wrk->set_wait_state();
407    printf("======\n");
408    for (i=1; i<=5; i++) {
409       buf = (POOLMEM *)wrk->pop_free_buffer();
410       if (!buf) {
411          buf = get_pool_memory(PM_BSOCK);
412          printf("Alloc %p\n", buf);
413       }
414       sprintf(buf, "This is item %d", i);
415       wrk->queue(buf);
416       //printf("back from queue %d\n", i);
417    }
418    wrk->set_run_state();
419    for (i=6; i<=40; i++) {
420       buf = (POOLMEM *)wrk->pop_free_buffer();
421       if (!buf) {
422          buf = get_pool_memory(PM_BSOCK);
423          printf("Alloc %p\n", buf);
424       }
425       sprintf(buf, "This is item %d", i);
426       wrk->queue(buf);
427       //printf("back from queue %d\n", i);
428    }
429    wrk->wait_queue_empty();
430    wrk->stop();
431    wrk->destroy();
432    free(wrk);
433    
434    close_memory_pool();
435    sm_dump(false);       /* test program */
436 }
437 #endif