]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
add jobq+serial.h+priorities+recycling
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2  * Bacula job queue routines.
3  *
4  *  Kern Sibbald, July MMIII
5  *
6  *   Version $Id$
7  *
8  *  This code was adapted from the Bacula workq, which was
9  *    adapted from "Programming with POSIX Threads", by
10  *    David R. Butenhof
11  *
12  * Example:
13  *
14  * static jobq_t jq;    define job queue
15  *
16  *  Initialize queue
17  *  if ((stat = jobq_init(&jq, max_workers, job_thread)) != 0) {
18  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
19  *   }
20  *
21  *  Add an item to the queue
22  *  if ((stat = jobq_add(&jq, jcr)) != 0) {
23  *      Emsg1(M_ABORT, 0, "Could not add job to queue: ERR=%s\n", strerror(errno));
24  *   }
25  *
26  *  Terminate the queue
27  *  jobq_destroy(jobq_t *jq);
28  *
29  */
30 /*
31    Copyright (C) 2000-2003 Kern Sibbald and John Walker
32
33    This program is free software; you can redistribute it and/or
34    modify it under the terms of the GNU General Public License as
35    published by the Free Software Foundation; either version 2 of
36    the License, or (at your option) any later version.
37
38    This program is distributed in the hope that it will be useful,
39    but WITHOUT ANY WARRANTY; without even the implied warranty of
40    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
41    General Public License for more details.
42
43    You should have received a copy of the GNU General Public
44    License along with this program; if not, write to the Free
45    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
46    MA 02111-1307, USA.
47
48  */
49
50 #include "bacula.h"
51 #include "dird.h"
52
53 /* Forward referenced functions */
54 static void *jobq_server(void *arg);
55
56 /*   
57  * Initialize a job queue
58  *
59  *  Returns: 0 on success
60  *           errno on failure
61  */
62 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
63 {
64    int stat;
65    jobq_item_t *item = NULL;
66                         
67    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
68       return stat;
69    }
70    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
71       pthread_attr_destroy(&jq->attr);
72       return stat;
73    }
74    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
75       pthread_attr_destroy(&jq->attr);
76       return stat;
77    }
78    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
79       pthread_mutex_destroy(&jq->mutex);
80       pthread_attr_destroy(&jq->attr);
81       return stat;
82    }
83    jq->quit = false;
84    jq->max_workers = threads;         /* max threads to create */
85    jq->num_workers = 0;               /* no threads yet */
86    jq->idle_workers = 0;              /* no idle threads */
87    jq->engine = engine;               /* routine to run */
88    jq->valid = JOBQ_VALID; 
89    jq->list.init(item, &item->link);
90    return 0;
91 }
92
93 /*
94  * Destroy the job queue
95  *
96  * Returns: 0 on success
97  *          errno on failure
98  */
99 int jobq_destroy(jobq_t *jq)
100 {
101    int stat, stat1, stat2;
102
103   if (jq->valid != JOBQ_VALID) {
104      return EINVAL;
105   }
106   if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
107      return stat;
108   }
109   jq->valid = 0;                      /* prevent any more operations */
110
111   /* 
112    * If any threads are active, wake them 
113    */
114   if (jq->num_workers > 0) {
115      jq->quit = true;
116      if (jq->idle_workers) {
117         if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
118            pthread_mutex_unlock(&jq->mutex);
119            return stat;
120         }
121      }
122      while (jq->num_workers > 0) {
123         if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
124            pthread_mutex_unlock(&jq->mutex);
125            return stat;
126         }
127      }
128   }
129   if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
130      return stat;
131   }
132   stat  = pthread_mutex_destroy(&jq->mutex);
133   stat1 = pthread_cond_destroy(&jq->work);
134   stat2 = pthread_attr_destroy(&jq->attr);
135   jq->list.destroy();
136   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
137 }
138
139
140 /*
141  *  Add a job to the queue
142  *    jq is a queue that was created with jobq_init
143  *   
144  */
145 int jobq_add(jobq_t *jq, JCR *jcr)
146 {
147    int stat;
148    jobq_item_t *item, *li;
149    pthread_t id;
150    bool inserted = false;
151     
152    Dmsg0(200, "jobq_add\n");
153    if (jq->valid != JOBQ_VALID) {
154       return EINVAL;
155    }
156
157    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
158       return ENOMEM;
159    }
160    item->jcr = jcr;
161    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
162       free(item);
163       return stat;
164    }
165
166    Dmsg0(200, "add item to queue\n");
167    for (li=NULL; (li=(jobq_item_t *)jq->list.next(li)); ) {
168       if (li->jcr->JobPriority < jcr->JobPriority) {
169          jq->list.insert_before(item, li);
170          inserted = true;
171       }
172    }
173    if (!inserted) {
174       jq->list.append(item);
175    }
176
177    /* if any threads are idle, wake one */
178    if (jq->idle_workers > 0) {
179       Dmsg0(200, "Signal worker\n");
180       if ((stat = pthread_cond_signal(&jq->work)) != 0) {
181          pthread_mutex_unlock(&jq->mutex);
182          return stat;
183       }
184    } else if (jq->num_workers < jq->max_workers) {
185       Dmsg0(200, "Create worker thread\n");
186       /* No idle threads so create a new one */
187       set_thread_concurrency(jq->max_workers + 1);
188       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
189          pthread_mutex_unlock(&jq->mutex);
190          return stat;
191       }
192       jq->num_workers++;
193    }
194    pthread_mutex_unlock(&jq->mutex);
195    Dmsg0(200, "Return jobq_add\n");
196    return stat;
197 }
198
199 /*
200  *  Remove a job from the job queue
201  *    jq is a queue that was created with jobq_init
202  *    work_item is an element of work
203  *
204  *   Note, it is "removed" by immediately calling a processing routine.
205  *    if you want to cancel it, you need to provide some external means
206  *    of doing so.
207  */
208 int jobq_remove(jobq_t *jq, JCR *jcr)
209 {
210    int stat;
211    bool found = false;
212    pthread_t id;
213    jobq_item_t *item;
214     
215    Dmsg0(200, "jobq_remove\n");
216    if (jq->valid != JOBQ_VALID) {
217       return EINVAL;
218    }
219
220    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
221       return stat;
222    }
223
224    for (item=NULL; (item=(jobq_item_t *)jq->list.next(item)); ) {
225       if (jcr == item->jcr) {
226          found = true;
227          break;
228       }
229    }
230    if (!found) {
231       return EINVAL;
232    }
233
234    /* Move item to be the first on the list */
235    jq->list.remove(item);
236    jq->list.prepend(item);
237    
238    /* if any threads are idle, wake one */
239    if (jq->idle_workers > 0) {
240       Dmsg0(200, "Signal worker\n");
241       if ((stat = pthread_cond_signal(&jq->work)) != 0) {
242          pthread_mutex_unlock(&jq->mutex);
243          return stat;
244       }
245    } else {
246       Dmsg0(200, "Create worker thread\n");
247       /* No idle threads so create a new one */
248       set_thread_concurrency(jq->max_workers + 1);
249       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
250          pthread_mutex_unlock(&jq->mutex);
251          return stat;
252       }
253       jq->num_workers++;
254    }
255    pthread_mutex_unlock(&jq->mutex);
256    Dmsg0(200, "Return jobq_remove\n");
257    return stat;
258 }
259
260
261 /* 
262  * This is the worker thread that serves the job queue.
263  * When all the resources are acquired for the job, 
264  *  it will call the user's engine.
265  */
266 static void *jobq_server(void *arg)
267 {
268    struct timespec timeout;
269    jobq_t *jq = (jobq_t *)arg;
270    jobq_item_t *je;                   /* job entry in queue */
271    int stat;
272    bool timedout;
273
274    Dmsg0(200, "Start jobq_server\n");
275    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
276       return NULL;
277    }
278
279    for (;;) {
280       struct timeval tv;
281       struct timezone tz;
282
283       Dmsg0(200, "Top of for loop\n");
284       timedout = false;
285       Dmsg0(200, "gettimeofday()\n");
286       gettimeofday(&tv, &tz);
287       timeout.tv_nsec = 0;
288       timeout.tv_sec = tv.tv_sec + 2;
289
290       while (jq->list.empty() && !jq->quit) {
291          /*
292           * Wait 2 seconds, then if no more work, exit
293           */
294          Dmsg0(200, "pthread_cond_timedwait()\n");
295          stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
296          Dmsg1(200, "timedwait=%d\n", stat);
297          if (stat == ETIMEDOUT) {
298             timedout = true;
299             break;
300          } else if (stat != 0) {
301             /* This shouldn't happen */
302             Dmsg0(200, "This shouldn't happen\n");
303             jq->num_workers--;
304             pthread_mutex_unlock(&jq->mutex);
305             return NULL;
306          }
307       } 
308       je = (jobq_item_t *)jq->list.first();
309       if (je != NULL) {
310          jq->list.remove(je);
311          if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
312             return NULL;
313          }
314          /* Call user's routine here */
315          Dmsg0(200, "Calling user engine.\n");
316          jq->engine(je->jcr);
317          Dmsg0(200, "Back from user engine.\n");
318          free(je);                    /* release job entry */
319          Dmsg0(200, "relock mutex\n"); 
320          if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
321             return NULL;
322          }
323          Dmsg0(200, "Done lock mutex\n");
324       }
325       /*
326        * If no more work request, and we are asked to quit, then do it
327        */
328       if (jq->list.empty() && jq->quit) {
329          jq->num_workers--;
330          if (jq->num_workers == 0) {
331             Dmsg0(200, "Wake up destroy routine\n");
332             /* Wake up destroy routine if he is waiting */
333             pthread_cond_broadcast(&jq->work);
334          }
335          Dmsg0(200, "Unlock mutex\n");
336          pthread_mutex_unlock(&jq->mutex);
337          Dmsg0(200, "Return from jobq_server\n");
338          return NULL;
339       }
340       Dmsg0(200, "Check for work request\n");
341       /* 
342        * If no more work requests, and we waited long enough, quit
343        */
344       Dmsg1(200, "jq empty = %d\n", jq->list.empty());
345       Dmsg1(200, "timedout=%d\n", timedout);
346       if (jq->list.empty() && timedout) {
347          Dmsg0(200, "break big loop\n");
348          jq->num_workers--;
349          break;
350       }
351       Dmsg0(200, "Loop again\n");
352    } /* end of big for loop */
353
354    Dmsg0(200, "unlock mutex\n");
355    pthread_mutex_unlock(&jq->mutex);
356    Dmsg0(200, "End jobq_server\n");
357    return NULL;
358 }