]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
- Move test for MaxStartDelay as suggested by Peter.
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2  *
3  *   Bacula scheduler
4  *     It looks at what jobs are to be run and when
5  *     and waits around until it is time to
6  *     fire them up.
7  *
8  *     Kern Sibbald, May MM, major revision December MMIII
9  *
10  *   Version $Id$
11  */
12 /*
13    Copyright (C) 2000-2005 Kern Sibbald
14
15    This program is free software; you can redistribute it and/or
16    modify it under the terms of the GNU General Public License as
17    published by the Free Software Foundation; either version 2 of
18    the License, or (at your option) any later version.
19
20    This program is distributed in the hope that it will be useful,
21    but WITHOUT ANY WARRANTY; without even the implied warranty of
22    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
23    General Public License for more details.
24
25    You should have received a copy of the GNU General Public
26    License along with this program; if not, write to the Free
27    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
28    MA 02111-1307, USA.
29
30  */
31
32 #include "bacula.h"
33 #include "dird.h"
34
35 /* #define SCHED_DEBUG */
36
37
38 /* Local variables */
39 struct job_item {
40    RUN *run;
41    JOB *job;
42    time_t runtime;
43    int Priority;
44    dlink link;                        /* link for list */
45 };
46
47 /* List of jobs to be run. They were scheduled in this hour or the next */
48 static dlist *jobs_to_run;               /* list of jobs to be run */
49
50 /* Time interval in secs to sleep if nothing to be run */
51 static int const NEXT_CHECK_SECS = 60;
52
53 /* Forward referenced subroutines */
54 static void find_runs();
55 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
56 static void dump_job(job_item *ji, const char *msg);
57
58 /* Imported subroutines */
59
60 /* Imported variables */
61
62
63 /*********************************************************************
64  *
65  *         Main Bacula Scheduler
66  *
67  */
68 JCR *wait_for_next_job(char *one_shot_job_to_run)
69 {
70    JCR *jcr;
71    JOB *job;
72    RUN *run;
73    time_t now;
74    static bool first = true;
75    job_item *next_job = NULL;
76
77    Dmsg0(200, "Enter wait_for_next_job\n");
78    if (first) {
79       first = false;
80       /* Create scheduled jobs list */
81       jobs_to_run = New(dlist(next_job, &next_job->link));
82       if (one_shot_job_to_run) {            /* one shot */
83          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
84          if (!job) {
85             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
86          }
87          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
88          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
89          set_jcr_defaults(jcr, job);
90          return jcr;
91       }
92    }
93    /* Wait until we have something in the
94     * next hour or so.
95     */
96    while (jobs_to_run->empty()) {
97       find_runs();
98       if (!jobs_to_run->empty()) {
99          break;
100       }
101       bmicrosleep(NEXT_CHECK_SECS, 0); /* recheck once per minute */
102    }
103
104 #ifdef  list_chain
105    job_item *je;
106    foreach_dlist(je, jobs_to_run) {
107       dump_job(je, "Walk queue");
108    }
109 #endif
110    /*
111     * Pull the first job to run (already sorted by runtime and
112     *  Priority, then wait around until it is time to run it.
113     */
114    next_job = (job_item *)jobs_to_run->first();
115    jobs_to_run->remove(next_job);
116
117    dump_job(next_job, "Dequeued job");
118
119    if (!next_job) {                /* we really should have something now */
120       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
121    }
122
123    /* Now wait for the time to run the job */
124    for (;;) {
125       time_t twait;
126       now = time(NULL);
127       twait = next_job->runtime - now;
128       if (twait <= 0) {               /* time to run it */
129          break;
130       }
131       bmicrosleep(twait, 0);
132    }
133    run = next_job->run;               /* pick up needed values */
134    job = next_job->job;
135    run->last_run = now;               /* mark as run now */
136
137    dump_job(next_job, "Run job");
138
139    free(next_job);
140
141    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
142    ASSERT(job);
143    set_jcr_defaults(jcr, job);
144    if (run->level) {
145       jcr->JobLevel = run->level;     /* override run level */
146    }
147    if (run->pool) {
148       jcr->pool = run->pool;          /* override pool */
149    }
150    if (run->full_pool) {
151       jcr->full_pool = run->full_pool; /* override full pool */
152    }
153    if (run->inc_pool) {
154       jcr->inc_pool = run->inc_pool;  /* override inc pool */
155    }
156    if (run->dif_pool) {
157       jcr->dif_pool = run->dif_pool;  /* override dif pool */
158    }
159    if (run->storage) {
160       set_storage(jcr, run->storage); /* override storage */
161    }
162    if (run->msgs) {
163       jcr->messages = run->msgs;      /* override messages */
164    }
165    if (run->Priority) {
166       jcr->JobPriority = run->Priority;
167    }
168    if (run->spool_data_set) {
169       jcr->spool_data = run->spool_data;
170    }
171    if (run->write_part_after_job_set) {
172       jcr->write_part_after_job = run->write_part_after_job;
173    }
174    Dmsg0(200, "Leave wait_for_next_job()\n");
175    return jcr;
176 }
177
178
179 /*
180  * Shutdown the scheduler
181  */
182 void term_scheduler()
183 {
184    if (jobs_to_run) {
185       job_item *je;
186       /* Release all queued job entries to be run */
187       foreach_dlist(je, jobs_to_run) {
188          free(je);
189       }
190       delete jobs_to_run;
191    }
192 }
193
194 /*
195  * Find all jobs to be run this hour and the next hour.
196  */
197 static void find_runs()
198 {
199    time_t now, next_hour, runtime;
200    RUN *run;
201    JOB *job;
202    SCHED *sched;
203    struct tm tm;
204    int minute;
205    int hour, mday, wday, month, wom, woy;
206    /* Items corresponding to above at the next hour */
207    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year;
208
209    Dmsg0(1200, "enter find_runs()\n");
210
211
212    /* compute values for time now */
213    now = time(NULL);
214    localtime_r(&now, &tm);
215    hour = tm.tm_hour;
216    minute = tm.tm_min;
217    mday = tm.tm_mday - 1;
218    wday = tm.tm_wday;
219    month = tm.tm_mon;
220    wom = mday / 7;
221    woy = tm_woy(now);                     /* get week of year */
222
223    /*
224     * Compute values for next hour from now.
225     * We do this to be sure we don't miss a job while
226     * sleeping.
227     */
228    next_hour = now + 3600;
229    localtime_r(&next_hour, &tm);
230    nh_hour = tm.tm_hour;
231    nh_mday = tm.tm_mday - 1;
232    nh_wday = tm.tm_wday;
233    nh_month = tm.tm_mon;
234    nh_year  = tm.tm_year;
235    nh_wom = nh_mday / 7;
236    nh_woy = tm_woy(now);                     /* get week of year */
237
238    /* Loop through all jobs */
239    LockRes();
240    foreach_res(job, R_JOB) {
241       sched = job->schedule;
242       if (sched == NULL) {            /* scheduled? */
243          continue;                    /* no, skip this job */
244       }
245       Dmsg1(1200, "Got job: %s\n", job->hdr.name);
246       for (run=sched->run; run; run=run->next) {
247          bool run_now, run_nh;
248          /*
249           * Find runs scheduled between now and the next hour.
250           */
251 #ifdef xxxx
252          Dmsg0(000, "\n");
253          Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
254             hour, month, mday, wday, wom, woy);
255          Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
256             bit_is_set(hour, run->hour),
257             bit_is_set(month, run->month),
258             bit_is_set(mday, run->mday),
259             bit_is_set(wday, run->wday),
260             bit_is_set(wom, run->wom),
261             bit_is_set(woy, run->woy));
262
263          Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
264             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
265          Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
266             bit_is_set(nh_hour, run->hour),
267             bit_is_set(nh_month, run->month),
268             bit_is_set(nh_mday, run->mday),
269             bit_is_set(nh_wday, run->wday),
270             bit_is_set(nh_wom, run->wom),
271             bit_is_set(nh_woy, run->woy));
272 #endif
273
274          run_now = bit_is_set(hour, run->hour) &&
275             bit_is_set(mday, run->mday) &&
276             bit_is_set(wday, run->wday) &&
277             bit_is_set(month, run->month) &&
278             bit_is_set(wom, run->wom) &&
279             bit_is_set(woy, run->woy);
280
281          run_nh = bit_is_set(nh_hour, run->hour) &&
282             bit_is_set(nh_mday, run->mday) &&
283             bit_is_set(nh_wday, run->wday) &&
284             bit_is_set(nh_month, run->month) &&
285             bit_is_set(nh_wom, run->wom) &&
286             bit_is_set(nh_woy, run->woy);
287
288          Dmsg2(1200, "run_now=%d run_nh=%d\n", run_now, run_nh);
289
290          /* find time (time_t) job is to be run */
291          localtime_r(&now, &tm);      /* reset tm structure */
292          tm.tm_min = run->minute;     /* set run minute */
293          tm.tm_sec = 0;               /* zero secs */
294          if (run_now) {
295             runtime = mktime(&tm);
296             add_job(job, run, now, runtime);
297          }
298          /* If job is to be run in the next hour schedule it */
299          if (run_nh) {
300             /* Set correct values */
301             tm.tm_hour = nh_hour;
302             tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */
303             tm.tm_mon = nh_month;
304             tm.tm_year = nh_year;
305             runtime = mktime(&tm);
306             add_job(job, run, now, runtime);
307          }
308       }
309    }
310    UnlockRes();
311    Dmsg0(1200, "Leave find_runs()\n");
312 }
313
314 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
315 {
316    job_item *ji;
317    bool inserted = false;
318    /*
319     * Don't run any job that ran less than a minute ago, but
320     *  do run any job scheduled less than a minute ago.
321     */
322    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
323 #ifdef SCHED_DEBUG
324       char dt[50], dt1[50], dt2[50];
325       bstrftime_nc(dt, sizeof(dt), runtime);
326       bstrftime_nc(dt1, sizeof(dt1), run->last_run);
327       bstrftime_nc(dt2, sizeof(dt2), now);
328       Dmsg4(000, "Drop: Job=\"%s\" run=%s. last_run=%s. now=%s\n", job->hdr.name,
329             dt, dt1, dt2);
330       fflush(stdout);
331 #endif
332       return;
333    }
334    /* accept to run this job */
335    job_item *je = (job_item *)malloc(sizeof(job_item));
336    je->run = run;
337    je->job = job;
338    je->runtime = runtime;
339    if (run->Priority) {
340       je->Priority = run->Priority;
341    } else {
342       je->Priority = job->Priority;
343    }
344
345    /* Add this job to the wait queue in runtime, priority sorted order */
346    foreach_dlist(ji, jobs_to_run) {
347       if (ji->runtime > je->runtime ||
348           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
349          jobs_to_run->insert_before(je, ji);
350          dump_job(je, "Inserted job");
351          inserted = true;
352          break;
353       }
354    }
355    /* If place not found in queue, append it */
356    if (!inserted) {
357       jobs_to_run->append(je);
358       dump_job(je, "Appended job");
359    }
360 #ifdef SCHED_DEBUG
361    foreach_dlist(ji, jobs_to_run) {
362       dump_job(ji, "Run queue");
363    }
364    Dmsg0(000, "End run queue\n");
365 #endif
366 }
367
368 static void dump_job(job_item *ji, const char *msg)
369 {
370 #ifdef SCHED_DEBUG
371    char dt[MAX_TIME_LENGTH];
372    int save_debug = debug_level;
373    debug_level = 200;
374    if (debug_level < 200) {
375       return;
376    }
377    bstrftime_nc(dt, sizeof(dt), ji->runtime);
378    Dmsg4(200, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name,
379       ji->Priority, dt);
380    fflush(stdout);
381    debug_level = save_debug;
382 #endif
383 }