]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/workq.c
bfed633a8d4be8fe80ae2f9b703ba69e300b8f07
[bacula/bacula] / bacula / src / lib / workq.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2016 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula work queue routines. Permits passing work to
21  *  multiple threads.
22  *
23  *  Kern Sibbald, January MMI
24  *
25  *  This code adapted from "Programming with POSIX Threads", by
26  *    David R. Butenhof
27  *
28  * Example:
29  *
30  * static workq_t job_wq;    define work queue
31  *
32  *  Initialize queue
33  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
34  *     berrno be;
35  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
36  *   }
37  *
38  *  Add an item to the queue
39  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
40  *      berrno be;
41  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
42  *   }
43  *
44  *  Terminate the queue
45  *  workq_destroy(workq_t *wq);
46  *
47  */
48
49 #include "bacula.h"
50 #include "jcr.h"
51
52 /* Forward referenced functions */
53 extern "C" void *workq_server(void *arg);
54
55 /*
56  * Initialize a work queue
57  *
58  *  Returns: 0 on success
59  *           errno on failure
60  */
61 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
62 {
63    int stat;
64
65    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
66       return stat;
67    }
68    if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69       pthread_attr_destroy(&wq->attr);
70       return stat;
71    }
72    if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
73       pthread_attr_destroy(&wq->attr);
74       return stat;
75    }
76    if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
77       pthread_mutex_destroy(&wq->mutex);
78       pthread_attr_destroy(&wq->attr);
79       return stat;
80    }
81    wq->quit = 0;
82    wq->first = wq->last = NULL;
83    wq->max_workers = threads;         /* max threads to create */
84    wq->num_workers = 0;               /* no threads yet */
85    wq->idle_workers = 0;              /* no idle threads */
86    wq->engine = engine;               /* routine to run */
87    wq->valid = WORKQ_VALID;
88    return 0;
89 }
90
91 /*
92  * Destroy a work queue
93  *
94  * Returns: 0 on success
95  *          errno on failure
96  */
97 int workq_destroy(workq_t *wq)
98 {
99    int stat, stat1, stat2;
100
101   if (wq->valid != WORKQ_VALID) {
102      return EINVAL;
103   }
104   P(wq->mutex);
105   wq->valid = 0;                      /* prevent any more operations */
106
107   /*
108    * If any threads are active, wake them
109    */
110   if (wq->num_workers > 0) {
111      wq->quit = 1;
112      if (wq->idle_workers) {
113         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
114            V(wq->mutex);
115            return stat;
116         }
117      }
118      while (wq->num_workers > 0) {
119         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
120            V(wq->mutex);
121            return stat;
122         }
123      }
124   }
125   V(wq->mutex);
126   stat  = pthread_mutex_destroy(&wq->mutex);
127   stat1 = pthread_cond_destroy(&wq->work);
128   stat2 = pthread_attr_destroy(&wq->attr);
129   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
130 }
131
132
133 /*
134  *  Add work to a queue
135  *    wq is a queue that was created with workq_init
136  *    element is a user unique item that will be passed to the
137  *        processing routine
138  *    work_item will get internal work queue item -- if it is not NULL
139  *    priority if non-zero will cause the item to be placed on the
140  *        head of the list instead of the tail.
141  */
142 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
143 {
144    int stat=0;
145    workq_ele_t *item;
146    pthread_t id;
147
148    Dmsg0(1400, "workq_add\n");
149    if (wq->valid != WORKQ_VALID) {
150       return EINVAL;
151    }
152
153    if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
154       return ENOMEM;
155    }
156    item->data = element;
157    item->next = NULL;
158    P(wq->mutex);
159
160    Dmsg0(1400, "add item to queue\n");
161    if (priority) {
162       /* Add to head of queue */
163       if (wq->first == NULL) {
164          wq->first = item;
165          wq->last = item;
166       } else {
167          item->next = wq->first;
168          wq->first = item;
169       }
170    } else {
171       /* Add to end of queue */
172       if (wq->first == NULL) {
173          wq->first = item;
174       } else {
175          wq->last->next = item;
176       }
177       wq->last = item;
178    }
179
180    /* if any threads are idle, wake one */
181    if (wq->idle_workers > 0) {
182       Dmsg0(1400, "Signal worker\n");
183       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
184          V(wq->mutex);
185          return stat;
186       }
187    } else if (wq->num_workers < wq->max_workers) {
188       Dmsg0(1400, "Create worker thread\n");
189       /* No idle threads so create a new one */
190       set_thread_concurrency(wq->max_workers + 1);
191       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
192          V(wq->mutex);
193          return stat;
194       }
195       wq->num_workers++;
196    }
197    V(wq->mutex);
198    Dmsg0(1400, "Return workq_add\n");
199    /* Return work_item if requested */
200    if (work_item) {
201       *work_item = item;
202    }
203    return stat;
204 }
205
206 /*
207  *  Remove work from a queue
208  *    wq is a queue that was created with workq_init
209  *    work_item is an element of work
210  *
211  *   Note, it is "removed" by immediately calling a processing routine.
212  *    if you want to cancel it, you need to provide some external means
213  *    of doing so.
214  */
215 int workq_remove(workq_t *wq, workq_ele_t *work_item)
216 {
217    int stat, found = 0;
218    pthread_t id;
219    workq_ele_t *item, *prev;
220
221    Dmsg0(1400, "workq_remove\n");
222    if (wq->valid != WORKQ_VALID) {
223       return EINVAL;
224    }
225
226    P(wq->mutex);
227
228    for (prev=item=wq->first; item; item=item->next) {
229       if (item == work_item) {
230          found = 1;
231          break;
232       }
233       prev = item;
234    }
235    if (!found) {
236       return EINVAL;
237    }
238
239    /* Move item to be first on list */
240    if (wq->first != work_item) {
241       prev->next = work_item->next;
242       if (wq->last == work_item) {
243          wq->last = prev;
244       }
245       work_item->next = wq->first;
246       wq->first = work_item;
247    }
248
249    /* if any threads are idle, wake one */
250    if (wq->idle_workers > 0) {
251       Dmsg0(1400, "Signal worker\n");
252       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
253          V(wq->mutex);
254          return stat;
255       }
256    } else {
257       Dmsg0(1400, "Create worker thread\n");
258       /* No idle threads so create a new one */
259       set_thread_concurrency(wq->max_workers + 1);
260       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
261          V(wq->mutex);
262          return stat;
263       }
264       wq->num_workers++;
265    }
266    V(wq->mutex);
267    Dmsg0(1400, "Return workq_remove\n");
268    return stat;
269 }
270
271
272 /*
273  * This is the worker thread that serves the work queue.
274  * In due course, it will call the user's engine.
275  */
276 extern "C"
277 void *workq_server(void *arg)
278 {
279    struct timespec timeout;
280    workq_t *wq = (workq_t *)arg;
281    workq_ele_t *we;
282    int stat, timedout;
283
284    Dmsg0(1400, "Start workq_server\n");
285    P(wq->mutex);
286    set_jcr_in_tsd(INVALID_JCR);
287
288    for (;;) {
289       struct timeval tv;
290       struct timezone tz;
291
292       Dmsg0(1400, "Top of for loop\n");
293       timedout = 0;
294       Dmsg0(1400, "gettimeofday()\n");
295       gettimeofday(&tv, &tz);
296       timeout.tv_nsec = 0;
297       timeout.tv_sec = tv.tv_sec + 2;
298
299       while (wq->first == NULL && !wq->quit) {
300          /*
301           * Wait 2 seconds, then if no more work, exit
302           */
303          Dmsg0(1400, "pthread_cond_timedwait()\n");
304          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
305          Dmsg1(1400, "timedwait=%d\n", stat);
306          if (stat == ETIMEDOUT) {
307             timedout = 1;
308             break;
309          } else if (stat != 0) {
310             /* This shouldn't happen */
311             Dmsg0(1400, "This shouldn't happen\n");
312             wq->num_workers--;
313             V(wq->mutex);
314             return NULL;
315          }
316       }
317       we = wq->first;
318       if (we != NULL) {
319          wq->first = we->next;
320          if (wq->last == we) {
321             wq->last = NULL;
322          }
323          V(wq->mutex);
324          /* Call user's routine here */
325          Dmsg0(1400, "Calling user engine.\n");
326          wq->engine(we->data);
327          Dmsg0(1400, "Back from user engine.\n");
328          free(we);                    /* release work entry */
329          Dmsg0(1400, "relock mutex\n");
330          P(wq->mutex);
331          Dmsg0(1400, "Done lock mutex\n");
332       }
333       /*
334        * If no more work request, and we are asked to quit, then do it
335        */
336       if (wq->first == NULL && wq->quit) {
337          wq->num_workers--;
338          if (wq->num_workers == 0) {
339             Dmsg0(1400, "Wake up destroy routine\n");
340             /* Wake up destroy routine if he is waiting */
341             pthread_cond_broadcast(&wq->work);
342          }
343          Dmsg0(1400, "Unlock mutex\n");
344          V(wq->mutex);
345          Dmsg0(1400, "Return from workq_server\n");
346          return NULL;
347       }
348       Dmsg0(1400, "Check for work request\n");
349       /*
350        * If no more work requests, and we waited long enough, quit
351        */
352       Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
353       Dmsg1(1400, "timedout=%d\n", timedout);
354       if (wq->first == NULL && timedout) {
355          Dmsg0(1400, "break big loop\n");
356          wq->num_workers--;
357          break;
358       }
359       Dmsg0(1400, "Loop again\n");
360    } /* end of big for loop */
361
362    Dmsg0(1400, "unlock mutex\n");
363    V(wq->mutex);
364    Dmsg0(1400, "End workq_server\n");
365    return NULL;
366 }