]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
This commit was manufactured by cvs2svn to create tag
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2  *
3  *   Bacula scheduler
4  *     It looks at what jobs are to be run and when
5  *     and waits around until it is time to
6  *     fire them up.
7  *
8  *     Kern Sibbald, May MM, major revision December MMIII
9  *
10  *   Version $Id$
11  */
12 /*
13    Copyright (C) 2000-2006 Kern Sibbald
14
15    This program is free software; you can redistribute it and/or
16    modify it under the terms of the GNU General Public License
17    version 2 as amended with additional clauses defined in the
18    file LICENSE in the main source directory.
19
20    This program is distributed in the hope that it will be useful,
21    but WITHOUT ANY WARRANTY; without even the implied warranty of
22    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
23    the file LICENSE for additional details.
24
25  */
26
27 #include "bacula.h"
28 #include "dird.h"
29
30 #if 0
31 #define SCHED_DEBUG
32 #define DBGLVL 0
33 #else
34 #undef SCHED_DEBUG
35 #define DBGLVL 200
36 #endif
37
38 const int dbglvl = DBGLVL;
39
40 /* Local variables */
41 struct job_item {
42    RUN *run;
43    JOB *job;
44    time_t runtime;
45    int Priority;
46    dlink link;                        /* link for list */
47 };
48
49 /* List of jobs to be run. They were scheduled in this hour or the next */
50 static dlist *jobs_to_run;               /* list of jobs to be run */
51
52 /* Time interval in secs to sleep if nothing to be run */
53 static int const next_check_secs = 60;
54
55 /* Forward referenced subroutines */
56 static void find_runs();
57 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
58 static void dump_job(job_item *ji, const char *msg);
59
60 /* Imported subroutines */
61
62 /* Imported variables */
63
64 /**
65  * called by reload_config to tell us that the schedules
66  * we may have based our next jobs to run queues have been
67  * invalidated.  In fact the schedules may not have changed
68  * but the run object that we have recorded the last_run time
69  * on are new and no longer have a valid last_run time which
70  * causes us to double run schedules that get put into the list
71  * because run_nh = 1.
72  */   
73 static bool schedules_invalidated = false;
74 void invalidate_schedules(void) {
75     schedules_invalidated = true;
76 }
77
78 /*********************************************************************
79  *
80  *         Main Bacula Scheduler
81  *
82  */
83 JCR *wait_for_next_job(char *one_shot_job_to_run)
84 {
85    JCR *jcr;
86    JOB *job;
87    RUN *run;
88    time_t now, prev;
89    static bool first = true;
90    job_item *next_job = NULL;
91
92    Dmsg0(dbglvl, "Enter wait_for_next_job\n");
93    if (first) {
94       first = false;
95       /* Create scheduled jobs list */
96       jobs_to_run = New(dlist(next_job, &next_job->link));
97       if (one_shot_job_to_run) {            /* one shot */
98          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
99          if (!job) {
100             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
101          }
102          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
103          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
104          set_jcr_defaults(jcr, job);
105          return jcr;
106       }
107    }
108    /* Wait until we have something in the
109     * next hour or so.
110     */
111 again:
112    while (jobs_to_run->empty()) {
113       find_runs();
114       if (!jobs_to_run->empty()) {
115          break;
116       }
117       bmicrosleep(next_check_secs, 0); /* recheck once per minute */
118    }
119
120 #ifdef  list_chain
121    job_item *je;
122    foreach_dlist(je, jobs_to_run) {
123       dump_job(je, _("Walk queue"));
124    }
125 #endif
126    /*
127     * Pull the first job to run (already sorted by runtime and
128     *  Priority, then wait around until it is time to run it.
129     */
130    next_job = (job_item *)jobs_to_run->first();
131    jobs_to_run->remove(next_job);
132
133    dump_job(next_job, _("Dequeued job"));
134
135    if (!next_job) {                /* we really should have something now */
136       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
137    }
138
139    /* Now wait for the time to run the job */
140    for (;;) {
141       time_t twait;
142       /** discard scheduled queue and rebuild with new schedule objects. **/
143       lock_jobs();
144       if (schedules_invalidated) { 
145           dump_job(next_job, "Invalidated job");
146           free(next_job);
147           while (!jobs_to_run->empty()) {
148               next_job = (job_item *)jobs_to_run->first();
149               jobs_to_run->remove(next_job);
150               dump_job(next_job, "Invalidated job");
151               free(next_job);
152           }
153           schedules_invalidated = false;
154           goto again;
155       }
156       unlock_jobs();
157       prev = now = time(NULL);
158       twait = next_job->runtime - now;
159       if (twait <= 0) {               /* time to run it */
160          break;
161       }
162       /* Recheck at least once per minute */
163       bmicrosleep((next_check_secs < twait)?next_check_secs:twait, 0);
164       /* Attempt to handle clock shift from/to daylight savings time
165        * we allow a skew of 10 seconds before invalidating everything.
166        */
167       now = time(NULL);
168       if (now < prev+10 || now > (prev+next_check_secs+10)) {
169          schedules_invalidated = true;
170       }
171    }
172    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
173    run = next_job->run;               /* pick up needed values */
174    job = next_job->job;
175    if (job->enabled) {
176       dump_job(next_job, _("Run job"));
177    }
178    free(next_job);
179    if (!job->enabled) {
180       free_jcr(jcr);
181       goto again;                     /* ignore this job */
182    }
183    run->last_run = now;               /* mark as run now */
184
185    ASSERT(job);
186    set_jcr_defaults(jcr, job);
187    if (run->level) {
188       jcr->JobLevel = run->level;     /* override run level */
189    }
190    if (run->pool) {
191       jcr->pool = run->pool;          /* override pool */
192    }
193    if (run->full_pool) {
194       jcr->full_pool = run->full_pool; /* override full pool */
195    }
196    if (run->inc_pool) {
197       jcr->inc_pool = run->inc_pool;  /* override inc pool */
198    }
199    if (run->dif_pool) {
200       jcr->dif_pool = run->dif_pool;  /* override dif pool */
201    }
202    if (run->storage) {
203       set_storage(jcr, run->storage); /* override storage */
204    }
205    if (run->msgs) {
206       jcr->messages = run->msgs;      /* override messages */
207    }
208    if (run->Priority) {
209       jcr->JobPriority = run->Priority;
210    }
211    if (run->spool_data_set) {
212       jcr->spool_data = run->spool_data;
213    }
214    if (run->write_part_after_job_set) {
215       jcr->write_part_after_job = run->write_part_after_job;
216    }
217    Dmsg0(dbglvl, "Leave wait_for_next_job()\n");
218    return jcr;
219 }
220
221
222 /*
223  * Shutdown the scheduler
224  */
225 void term_scheduler()
226 {
227    if (jobs_to_run) {
228       job_item *je;
229       /* Release all queued job entries to be run */
230       foreach_dlist(je, jobs_to_run) {
231          free(je);
232       }
233       delete jobs_to_run;
234    }
235 }
236
237 /*
238  * Find all jobs to be run this hour and the next hour.
239  */
240 static void find_runs()
241 {
242    time_t now, next_hour, runtime;
243    RUN *run;
244    JOB *job;
245    SCHED *sched;
246    struct tm tm;
247    int minute;
248    int hour, mday, wday, month, wom, woy;
249    /* Items corresponding to above at the next hour */
250    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year;
251
252    Dmsg0(dbglvl, "enter find_runs()\n");
253
254
255    /* compute values for time now */
256    now = time(NULL);
257    localtime_r(&now, &tm);
258    hour = tm.tm_hour;
259    minute = tm.tm_min;
260    mday = tm.tm_mday - 1;
261    wday = tm.tm_wday;
262    month = tm.tm_mon;
263    wom = mday / 7;
264    woy = tm_woy(now);                     /* get week of year */
265
266    /*
267     * Compute values for next hour from now.
268     * We do this to be sure we don't miss a job while
269     * sleeping.
270     */
271    next_hour = now + 3600;
272    localtime_r(&next_hour, &tm);
273    nh_hour = tm.tm_hour;
274    nh_mday = tm.tm_mday - 1;
275    nh_wday = tm.tm_wday;
276    nh_month = tm.tm_mon;
277    nh_year  = tm.tm_year;
278    nh_wom = nh_mday / 7;
279    nh_woy = tm_woy(now);                     /* get week of year */
280
281    /* Loop through all jobs */
282    LockRes();
283    foreach_res(job, R_JOB) {
284       sched = job->schedule;
285       if (sched == NULL || !job->enabled) { /* scheduled? or enabled? */
286          continue;                    /* no, skip this job */
287       }
288       Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name);
289       for (run=sched->run; run; run=run->next) {
290          bool run_now, run_nh;
291          /*
292           * Find runs scheduled between now and the next hour.
293           */
294 #ifdef xxxx
295          Dmsg0(000, "\n");
296          Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
297             hour, month, mday, wday, wom, woy);
298          Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
299             bit_is_set(hour, run->hour),
300             bit_is_set(month, run->month),
301             bit_is_set(mday, run->mday),
302             bit_is_set(wday, run->wday),
303             bit_is_set(wom, run->wom),
304             bit_is_set(woy, run->woy));
305
306          Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
307             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
308          Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
309             bit_is_set(nh_hour, run->hour),
310             bit_is_set(nh_month, run->month),
311             bit_is_set(nh_mday, run->mday),
312             bit_is_set(nh_wday, run->wday),
313             bit_is_set(nh_wom, run->wom),
314             bit_is_set(nh_woy, run->woy));
315 #endif
316
317          run_now = bit_is_set(hour, run->hour) &&
318             bit_is_set(mday, run->mday) &&
319             bit_is_set(wday, run->wday) &&
320             bit_is_set(month, run->month) &&
321             bit_is_set(wom, run->wom) &&
322             bit_is_set(woy, run->woy);
323
324          run_nh = bit_is_set(nh_hour, run->hour) &&
325             bit_is_set(nh_mday, run->mday) &&
326             bit_is_set(nh_wday, run->wday) &&
327             bit_is_set(nh_month, run->month) &&
328             bit_is_set(nh_wom, run->wom) &&
329             bit_is_set(nh_woy, run->woy);
330
331          Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh);
332
333          /* find time (time_t) job is to be run */
334          localtime_r(&now, &tm);      /* reset tm structure */
335          tm.tm_min = run->minute;     /* set run minute */
336          tm.tm_sec = 0;               /* zero secs */
337          if (run_now) {
338             runtime = mktime(&tm);
339             add_job(job, run, now, runtime);
340          }
341          /* If job is to be run in the next hour schedule it */
342          if (run_nh) {
343             /* Set correct values */
344             tm.tm_hour = nh_hour;
345             tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */
346             tm.tm_mon = nh_month;
347             tm.tm_year = nh_year;
348             runtime = mktime(&tm);
349             add_job(job, run, now, runtime);
350          }
351       }
352    }
353    UnlockRes();
354    Dmsg0(dbglvl, "Leave find_runs()\n");
355 }
356
357 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
358 {
359    job_item *ji;
360    bool inserted = false;
361    /*
362     * Don't run any job that ran less than a minute ago, but
363     *  do run any job scheduled less than a minute ago.
364     */
365    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
366 #ifdef SCHED_DEBUG
367       char dt[50], dt1[50], dt2[50];
368       bstrftime_nc(dt, sizeof(dt), runtime);
369       bstrftime_nc(dt1, sizeof(dt1), run->last_run);
370       bstrftime_nc(dt2, sizeof(dt2), now);
371       Dmsg7(000, "Drop: Job=\"%s\" run=%s(%x). last_run=%s(%x). now=%s(%x)\n", job->hdr.name, 
372             dt, runtime, dt1, run->last_run, dt2, now);
373       fflush(stdout);
374 #endif
375       return;
376    }
377 #ifdef SCHED_DEBUG
378    Dmsg4(000, "Add: Job=\"%s\" run=%x last_run=%x now=%x\n", job->hdr.name, 
379             runtime, run->last_run, now);
380 #endif
381    /* accept to run this job */
382    job_item *je = (job_item *)malloc(sizeof(job_item));
383    je->run = run;
384    je->job = job;
385    je->runtime = runtime;
386    if (run->Priority) {
387       je->Priority = run->Priority;
388    } else {
389       je->Priority = job->Priority;
390    }
391
392    /* Add this job to the wait queue in runtime, priority sorted order */
393    foreach_dlist(ji, jobs_to_run) {
394       if (ji->runtime > je->runtime ||
395           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
396          jobs_to_run->insert_before(je, ji);
397          dump_job(je, _("Inserted job"));
398          inserted = true;
399          break;
400       }
401    }
402    /* If place not found in queue, append it */
403    if (!inserted) {
404       jobs_to_run->append(je);
405       dump_job(je, _("Appended job"));
406    }
407 #ifdef SCHED_DEBUG
408    foreach_dlist(ji, jobs_to_run) {
409       dump_job(ji, _("Run queue"));
410    }
411    Dmsg0(000, "End run queue\n");
412 #endif
413 }
414
415 static void dump_job(job_item *ji, const char *msg)
416 {
417 #ifdef SCHED_DEBUG
418    char dt[MAX_TIME_LENGTH];
419    int save_debug = debug_level;
420    if (debug_level < dbglvl) {
421       return;
422    }
423    bstrftime_nc(dt, sizeof(dt), ji->runtime);
424    Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name, 
425       ji->Priority, dt);
426    fflush(stdout);
427    debug_level = save_debug;
428 #endif
429 }