]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
Eliminate zombie job in SD
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2  *
3  *   Bacula scheduler
4  *     It looks at what jobs are to be run and when
5  *     and waits around until it is time to 
6  *     fire them up.
7  *
8  *     Kern Sibbald, May MM, major revision December MMIII
9  *
10  *   Version $Id$
11  */
12 /*
13    Copyright (C) 2000-2004 Kern Sibbald and John Walker
14
15    This program is free software; you can redistribute it and/or
16    modify it under the terms of the GNU General Public License as
17    published by the Free Software Foundation; either version 2 of
18    the License, or (at your option) any later version.
19
20    This program is distributed in the hope that it will be useful,
21    but WITHOUT ANY WARRANTY; without even the implied warranty of
22    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
23    General Public License for more details.
24
25    You should have received a copy of the GNU General Public
26    License along with this program; if not, write to the Free
27    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
28    MA 02111-1307, USA.
29
30  */
31
32 #include "bacula.h"
33 #include "dird.h"
34
35 /* #define SCHED_DEBUG */
36
37
38 /* Local variables */
39 struct job_item {  
40    RUN *run;
41    JOB *job;
42    time_t runtime;
43    int Priority;
44    dlink link;                        /* link for list */
45 };              
46
47 /* List of jobs to be run. They were scheduled in this hour or the next */
48 static dlist *jobs_to_run;               /* list of jobs to be run */
49
50 /* Time interval in secs to sleep if nothing to be run */
51 static int const NEXT_CHECK_SECS = 60;
52
53 /* Forward referenced subroutines */
54 static void find_runs();
55 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
56 static void dump_job(job_item *ji, char *msg);
57
58 /* Imported subroutines */
59
60 /* Imported variables */
61
62
63 /*********************************************************************
64  *
65  *         Main Bacula Scheduler
66  *
67  */
68 JCR *wait_for_next_job(char *one_shot_job_to_run)
69 {
70    JCR *jcr;
71    JOB *job;
72    RUN *run;
73    time_t now;
74    static bool first = true;
75    job_item *next_job = NULL;
76
77    Dmsg0(200, "Enter wait_for_next_job\n");
78    if (first) {
79       first = false;
80       /* Create scheduled jobs list */
81       jobs_to_run = new dlist(next_job, &next_job->link);
82       if (one_shot_job_to_run) {            /* one shot */
83          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
84          if (!job) {
85             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
86          }
87          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
88          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
89          set_jcr_defaults(jcr, job);
90          return jcr;
91       }
92    }
93    /* Wait until we have something in the
94     * next hour or so.
95     */
96    while (jobs_to_run->empty()) {
97       find_runs();
98       if (!jobs_to_run->empty()) {
99          break;
100       }
101       bmicrosleep(NEXT_CHECK_SECS, 0); /* recheck once per minute */
102    }
103
104 #ifdef  list_chain
105    job_item *je;
106    foreach_dlist(je, jobs_to_run) {
107       dump_job(je, "Walk queue");
108    }
109 #endif
110    /* 
111     * Pull the first job to run (already sorted by runtime and
112     *  Priority, then wait around until it is time to run it.
113     */
114    next_job = (job_item *)jobs_to_run->first();
115    jobs_to_run->remove(next_job);
116
117    dump_job(next_job, "Dequeued job");
118
119    if (!next_job) {                /* we really should have something now */
120       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
121    }
122
123    /* Now wait for the time to run the job */
124    for (;;) {
125       time_t twait;
126       now = time(NULL);
127       twait = next_job->runtime - now;
128       if (twait <= 0) {               /* time to run it */
129          break;
130       }
131       bmicrosleep(twait, 0);
132    }
133    run = next_job->run;               /* pick up needed values */
134    job = next_job->job;
135    run->last_run = now;               /* mark as run now */
136
137    dump_job(next_job, "Run job");
138
139    free(next_job);
140
141    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
142    ASSERT(job);
143    set_jcr_defaults(jcr, job);
144    if (run->level) {
145       jcr->JobLevel = run->level;        /* override run level */
146    }
147    if (run->pool) {
148       jcr->pool = run->pool;          /* override pool */
149    }
150    if (run->full_pool) {
151       jcr->pool = run->full_pool;     /* override full pool */
152    }
153    if (run->inc_pool) {
154       jcr->pool = run->inc_pool;      /* override inc pool */
155    }
156    if (run->dif_pool) {
157       jcr->pool = run->dif_pool;      /* override dif pool */
158    }
159    if (run->storage) {
160       jcr->store = run->storage;      /* override storage */
161    }
162    if (run->msgs) {
163       jcr->messages = run->msgs;      /* override messages */
164    }
165    if (run->Priority) {
166       jcr->JobPriority = run->Priority;
167    }
168    if (run->spool_data_set) {
169       jcr->spool_data = run->spool_data;
170    }
171    Dmsg0(200, "Leave wait_for_next_job()\n");
172    return jcr;
173 }
174
175
176 /*
177  * Shutdown the scheduler  
178  */
179 void term_scheduler()
180 {
181    if (jobs_to_run) {
182       job_item *je;
183       /* Release all queued job entries to be run */
184       foreach_dlist(je, jobs_to_run) {
185          free(je);
186       }
187       delete jobs_to_run;
188    }
189 }
190
191 /*          
192  * Find all jobs to be run this hour and the next hour.
193  */
194 static void find_runs()
195 {
196    time_t now, next_hour, runtime;
197    RUN *run;
198    JOB *job;
199    SCHED *sched;
200    struct tm tm;
201    int minute;
202    int hour, mday, wday, month, wom, woy;
203    /* Items corresponding to above at the next hour */
204    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year;
205
206    Dmsg0(200, "enter find_runs()\n");
207
208    
209    /* compute values for time now */
210    now = time(NULL);
211    localtime_r(&now, &tm);
212    hour = tm.tm_hour;
213    minute = tm.tm_min;
214    mday = tm.tm_mday - 1;
215    wday = tm.tm_wday;
216    month = tm.tm_mon;
217    wom = mday / 7;
218    woy = tm_woy(now);                     /* get week of year */
219
220    /* 
221     * Compute values for next hour from now.
222     * We do this to be sure we don't miss a job while
223     * sleeping.
224     */
225    next_hour = now + 3600;  
226    localtime_r(&next_hour, &tm);
227    nh_hour = tm.tm_hour;
228    nh_mday = tm.tm_mday - 1;
229    nh_wday = tm.tm_wday;
230    nh_month = tm.tm_mon;
231    nh_year  = tm.tm_year;
232    nh_wom = nh_mday / 7;
233    nh_woy = tm_woy(now);                     /* get week of year */
234
235    /* Loop through all jobs */
236    LockRes();
237    foreach_res(job, R_JOB) {
238       sched = job->schedule;
239       if (sched == NULL) {            /* scheduled? */
240          continue;                    /* no, skip this job */
241       }
242       Dmsg1(200, "Got job: %s\n", job->hdr.name);
243       for (run=sched->run; run; run=run->next) {
244          bool run_now, run_nh;
245          /* 
246           * Find runs scheduled between now and the next hour.
247           */
248 #ifdef xxxx
249          Dmsg0(000, "\n");
250          Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n", 
251             hour, month, mday, wday, wom, woy);
252          Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n", 
253             bit_is_set(hour, run->hour),
254             bit_is_set(month, run->month),
255             bit_is_set(mday, run->mday),
256             bit_is_set(wday, run->wday),
257             bit_is_set(wom, run->wom),
258             bit_is_set(woy, run->woy));
259
260          Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n", 
261             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
262          Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n", 
263             bit_is_set(nh_hour, run->hour),
264             bit_is_set(nh_month, run->month),
265             bit_is_set(nh_mday, run->mday),
266             bit_is_set(nh_wday, run->wday),
267             bit_is_set(nh_wom, run->wom),
268             bit_is_set(nh_woy, run->woy));
269 #endif
270
271          run_now = bit_is_set(hour, run->hour) &&
272             bit_is_set(mday, run->mday) && 
273             bit_is_set(wday, run->wday) &&
274             bit_is_set(month, run->month) &&
275             bit_is_set(wom, run->wom) &&
276             bit_is_set(woy, run->woy);
277
278          run_nh = bit_is_set(nh_hour, run->hour) &&
279             bit_is_set(nh_mday, run->mday) && 
280             bit_is_set(nh_wday, run->wday) &&
281             bit_is_set(nh_month, run->month) &&
282             bit_is_set(nh_wom, run->wom) &&
283             bit_is_set(nh_woy, run->woy);
284
285          Dmsg2(200, "run_now=%d run_nh=%d\n", run_now, run_nh);
286
287          /* find time (time_t) job is to be run */
288          localtime_r(&now, &tm);      /* reset tm structure */
289          tm.tm_min = run->minute;     /* set run minute */
290          tm.tm_sec = 0;               /* zero secs */
291          if (run_now) {
292             runtime = mktime(&tm);
293             add_job(job, run, now, runtime);
294          }
295          /* If job is to be run in the next hour schedule it */
296          if (run_nh) {
297             /* Set correct values */
298             tm.tm_hour = nh_hour;
299             tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */
300             tm.tm_mon = nh_month;
301             tm.tm_year = nh_year;
302             runtime = mktime(&tm);
303             add_job(job, run, now, runtime);
304          }
305       }  
306    }
307    UnlockRes();
308    Dmsg0(200, "Leave find_runs()\n");
309 }
310
311 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
312 {
313    job_item *ji;
314    bool inserted = false;
315    /*
316     * Don't run any job that ran less than a minute ago, but
317     *  do run any job scheduled less than a minute ago.
318     */
319    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
320 #ifdef SCHED_DEBUG
321       char dt[50], dt1[50], dt2[50];
322       bstrftime_nc(dt, sizeof(dt), runtime);  
323       bstrftime_nc(dt1, sizeof(dt1), run->last_run);
324       bstrftime_nc(dt2, sizeof(dt2), now);
325       Dmsg4(000, "Drop: Job=\"%s\" run=%s. last_run=%s. now=%s\n", job->hdr.name, 
326             dt, dt1, dt2);
327       fflush(stdout);
328 #endif
329       return;
330    }
331    /* accept to run this job */
332    job_item *je = (job_item *)malloc(sizeof(job_item));
333    je->run = run;
334    je->job = job;
335    je->runtime = runtime;
336    if (run->Priority) {
337       je->Priority = run->Priority;
338    } else {
339       je->Priority = job->Priority;
340    }
341
342    /* Add this job to the wait queue in runtime, priority sorted order */
343    foreach_dlist(ji, jobs_to_run) {
344       if (ji->runtime > je->runtime || 
345           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
346          jobs_to_run->insert_before(je, ji);
347          dump_job(je, "Inserted job");
348          inserted = true;
349          break;
350       }
351    }
352    /* If place not found in queue, append it */
353    if (!inserted) {
354       jobs_to_run->append(je);
355       dump_job(je, "Appended job");
356    }
357 #ifdef SCHED_DEBUG
358    foreach_dlist(ji, jobs_to_run) {
359       dump_job(ji, "Run queue");
360    }
361    Dmsg0(000, "End run queue\n");
362 #endif
363 }
364
365 static void dump_job(job_item *ji, char *msg) 
366 {
367 #ifdef SCHED_DEBUG
368    char dt[MAX_TIME_LENGTH];
369    int save_debug = debug_level;
370    debug_level = 200;
371    if (debug_level < 200) {
372       return;
373    }
374    bstrftime_nc(dt, sizeof(dt), ji->runtime);  
375    Dmsg4(200, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name, 
376       ji->Priority, dt);
377    fflush(stdout);
378    debug_level = save_debug;
379 #endif
380 }