]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
======================= Warning ==========================
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2  *
3  *   Bacula scheduler
4  *     It looks at what jobs are to be run and when
5  *     and waits around until it is time to
6  *     fire them up.
7  *
8  *     Kern Sibbald, May MM, major revision December MMIII
9  *
10  *   Version $Id$
11  */
12 /*
13    Copyright (C) 2000-2006 Kern Sibbald
14
15    This program is free software; you can redistribute it and/or
16    modify it under the terms of the GNU General Public License
17    version 2 as amended with additional clauses defined in the
18    file LICENSE in the main source directory.
19
20    This program is distributed in the hope that it will be useful,
21    but WITHOUT ANY WARRANTY; without even the implied warranty of
22    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
23    the file LICENSE for additional details.
24
25  */
26
27 #include "bacula.h"
28 #include "dird.h"
29
30 #if 0
31 #define SCHED_DEBUG
32 #define DBGLVL 0
33 #else
34 #undef SCHED_DEBUG
35 #define DBGLVL 200
36 #endif
37
38 const int dbglvl = DBGLVL;
39
40 /* Local variables */
41 struct job_item {
42    RUN *run;
43    JOB *job;
44    time_t runtime;
45    int Priority;
46    dlink link;                        /* link for list */
47 };
48
49 /* List of jobs to be run. They were scheduled in this hour or the next */
50 static dlist *jobs_to_run;               /* list of jobs to be run */
51
52 /* Time interval in secs to sleep if nothing to be run */
53 static int const next_check_secs = 60;
54
55 /* Forward referenced subroutines */
56 static void find_runs();
57 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
58 static void dump_job(job_item *ji, const char *msg);
59
60 /* Imported subroutines */
61
62 /* Imported variables */
63
64 /**
65  * called by reload_config to tell us that the schedules
66  * we may have based our next jobs to run queues have been
67  * invalidated.  In fact the schedules may not have changed
68  * but the run object that we have recorded the last_run time
69  * on are new and no longer have a valid last_run time which
70  * causes us to double run schedules that get put into the list
71  * because run_nh = 1.
72  */   
73 static bool schedules_invalidated = false;
74 void invalidate_schedules(void) {
75     schedules_invalidated = true;
76 }
77
78 /*********************************************************************
79  *
80  *         Main Bacula Scheduler
81  *
82  */
83 JCR *wait_for_next_job(char *one_shot_job_to_run)
84 {
85    JCR *jcr;
86    JOB *job;
87    RUN *run;
88    time_t now, prev;
89    static bool first = true;
90    job_item *next_job = NULL;
91
92    Dmsg0(dbglvl, "Enter wait_for_next_job\n");
93    if (first) {
94       first = false;
95       /* Create scheduled jobs list */
96       jobs_to_run = New(dlist(next_job, &next_job->link));
97       if (one_shot_job_to_run) {            /* one shot */
98          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
99          if (!job) {
100             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
101          }
102          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
103          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
104          set_jcr_defaults(jcr, job);
105          return jcr;
106       }
107    }
108    /* Wait until we have something in the
109     * next hour or so.
110     */
111 again:
112    while (jobs_to_run->empty()) {
113       find_runs();
114       if (!jobs_to_run->empty()) {
115          break;
116       }
117       bmicrosleep(next_check_secs, 0); /* recheck once per minute */
118    }
119
120 #ifdef  list_chain
121    job_item *je;
122    foreach_dlist(je, jobs_to_run) {
123       dump_job(je, _("Walk queue"));
124    }
125 #endif
126    /*
127     * Pull the first job to run (already sorted by runtime and
128     *  Priority, then wait around until it is time to run it.
129     */
130    next_job = (job_item *)jobs_to_run->first();
131    jobs_to_run->remove(next_job);
132
133    dump_job(next_job, _("Dequeued job"));
134
135    if (!next_job) {                /* we really should have something now */
136       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
137    }
138
139    /* Now wait for the time to run the job */
140    for (;;) {
141       time_t twait;
142       /** discard scheduled queue and rebuild with new schedule objects. **/
143       lock_jobs();
144       if (schedules_invalidated) { 
145           dump_job(next_job, "Invalidated job");
146           free(next_job);
147           while (!jobs_to_run->empty()) {
148               next_job = (job_item *)jobs_to_run->first();
149               jobs_to_run->remove(next_job);
150               dump_job(next_job, "Invalidated job");
151               free(next_job);
152           }
153           schedules_invalidated = false;
154           unlock_jobs();
155           goto again;
156       }
157       unlock_jobs();
158       prev = now = time(NULL);
159       twait = next_job->runtime - now;
160       if (twait <= 0) {               /* time to run it */
161          break;
162       }
163       /* Recheck at least once per minute */
164       bmicrosleep((next_check_secs < twait)?next_check_secs:twait, 0);
165       /* Attempt to handle clock shift from/to daylight savings time
166        * we allow a skew of 10 seconds before invalidating everything.
167        */
168       now = time(NULL);
169       if (now < prev+10 || now > (prev+next_check_secs+10)) {
170          schedules_invalidated = true;
171       }
172    }
173    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
174    run = next_job->run;               /* pick up needed values */
175    job = next_job->job;
176    if (job->enabled) {
177       dump_job(next_job, _("Run job"));
178    }
179    free(next_job);
180    if (!job->enabled) {
181       free_jcr(jcr);
182       goto again;                     /* ignore this job */
183    }
184    run->last_run = now;               /* mark as run now */
185
186    ASSERT(job);
187    set_jcr_defaults(jcr, job);
188    if (run->level) {
189       jcr->JobLevel = run->level;     /* override run level */
190    }
191    if (run->pool) {
192       jcr->pool = run->pool;          /* override pool */
193       jcr->run_pool_override = true;
194    }
195    if (run->full_pool) {
196       jcr->full_pool = run->full_pool; /* override full pool */
197       jcr->run_full_pool_override = true;
198    }
199    if (run->inc_pool) {
200       jcr->inc_pool = run->inc_pool;  /* override inc pool */
201       jcr->run_inc_pool_override = true;
202    }
203    if (run->diff_pool) {
204       jcr->diff_pool = run->diff_pool;  /* override dif pool */
205       jcr->run_diff_pool_override = true;
206    }
207    if (run->storage) {
208       set_rwstorage(jcr, run->storage); /* override storage */
209    }
210    if (run->msgs) {
211       jcr->messages = run->msgs;      /* override messages */
212    }
213    if (run->Priority) {
214       jcr->JobPriority = run->Priority;
215    }
216    if (run->spool_data_set) {
217       jcr->spool_data = run->spool_data;
218    }
219    if (run->write_part_after_job_set) {
220       jcr->write_part_after_job = run->write_part_after_job;
221    }
222    Dmsg0(dbglvl, "Leave wait_for_next_job()\n");
223    return jcr;
224 }
225
226
227 /*
228  * Shutdown the scheduler
229  */
230 void term_scheduler()
231 {
232    if (jobs_to_run) {
233       job_item *je;
234       /* Release all queued job entries to be run */
235       foreach_dlist(je, jobs_to_run) {
236          free(je);
237       }
238       delete jobs_to_run;
239    }
240 }
241
242 /*
243  * Find all jobs to be run this hour and the next hour.
244  */
245 static void find_runs()
246 {
247    time_t now, next_hour, runtime;
248    RUN *run;
249    JOB *job;
250    SCHED *sched;
251    struct tm tm;
252    int minute;
253    int hour, mday, wday, month, wom, woy;
254    /* Items corresponding to above at the next hour */
255    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year;
256
257    Dmsg0(dbglvl, "enter find_runs()\n");
258
259
260    /* compute values for time now */
261    now = time(NULL);
262    (void)localtime_r(&now, &tm);
263    hour = tm.tm_hour;
264    minute = tm.tm_min;
265    mday = tm.tm_mday - 1;
266    wday = tm.tm_wday;
267    month = tm.tm_mon;
268    wom = mday / 7;
269    woy = tm_woy(now);                     /* get week of year */
270
271    /*
272     * Compute values for next hour from now.
273     * We do this to be sure we don't miss a job while
274     * sleeping.
275     */
276    next_hour = now + 3600;
277    (void)localtime_r(&next_hour, &tm);
278    nh_hour = tm.tm_hour;
279    nh_mday = tm.tm_mday - 1;
280    nh_wday = tm.tm_wday;
281    nh_month = tm.tm_mon;
282    nh_year  = tm.tm_year;
283    nh_wom = nh_mday / 7;
284    nh_woy = tm_woy(now);                     /* get week of year */
285
286    /* Loop through all jobs */
287    LockRes();
288    foreach_res(job, R_JOB) {
289       sched = job->schedule;
290       if (sched == NULL || !job->enabled) { /* scheduled? or enabled? */
291          continue;                    /* no, skip this job */
292       }
293       Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name);
294       for (run=sched->run; run; run=run->next) {
295          bool run_now, run_nh;
296          /*
297           * Find runs scheduled between now and the next hour.
298           */
299 #ifdef xxxx
300          Dmsg0(000, "\n");
301          Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
302             hour, month, mday, wday, wom, woy);
303          Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
304             bit_is_set(hour, run->hour),
305             bit_is_set(month, run->month),
306             bit_is_set(mday, run->mday),
307             bit_is_set(wday, run->wday),
308             bit_is_set(wom, run->wom),
309             bit_is_set(woy, run->woy));
310
311          Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
312             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
313          Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
314             bit_is_set(nh_hour, run->hour),
315             bit_is_set(nh_month, run->month),
316             bit_is_set(nh_mday, run->mday),
317             bit_is_set(nh_wday, run->wday),
318             bit_is_set(nh_wom, run->wom),
319             bit_is_set(nh_woy, run->woy));
320 #endif
321
322          run_now = bit_is_set(hour, run->hour) &&
323             bit_is_set(mday, run->mday) &&
324             bit_is_set(wday, run->wday) &&
325             bit_is_set(month, run->month) &&
326             bit_is_set(wom, run->wom) &&
327             bit_is_set(woy, run->woy);
328
329          run_nh = bit_is_set(nh_hour, run->hour) &&
330             bit_is_set(nh_mday, run->mday) &&
331             bit_is_set(nh_wday, run->wday) &&
332             bit_is_set(nh_month, run->month) &&
333             bit_is_set(nh_wom, run->wom) &&
334             bit_is_set(nh_woy, run->woy);
335
336          Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh);
337
338          /* find time (time_t) job is to be run */
339          (void)localtime_r(&now, &tm);      /* reset tm structure */
340          tm.tm_min = run->minute;     /* set run minute */
341          tm.tm_sec = 0;               /* zero secs */
342          if (run_now) {
343             runtime = mktime(&tm);
344             add_job(job, run, now, runtime);
345          }
346          /* If job is to be run in the next hour schedule it */
347          if (run_nh) {
348             /* Set correct values */
349             tm.tm_hour = nh_hour;
350             tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */
351             tm.tm_mon = nh_month;
352             tm.tm_year = nh_year;
353             runtime = mktime(&tm);
354             add_job(job, run, now, runtime);
355          }
356       }
357    }
358    UnlockRes();
359    Dmsg0(dbglvl, "Leave find_runs()\n");
360 }
361
362 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
363 {
364    job_item *ji;
365    bool inserted = false;
366    /*
367     * Don't run any job that ran less than a minute ago, but
368     *  do run any job scheduled less than a minute ago.
369     */
370    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
371 #ifdef SCHED_DEBUG
372       char dt[50], dt1[50], dt2[50];
373       bstrftime_nc(dt, sizeof(dt), runtime);
374       bstrftime_nc(dt1, sizeof(dt1), run->last_run);
375       bstrftime_nc(dt2, sizeof(dt2), now);
376       Dmsg7(000, "Drop: Job=\"%s\" run=%s(%x). last_run=%s(%x). now=%s(%x)\n", job->hdr.name, 
377             dt, runtime, dt1, run->last_run, dt2, now);
378       fflush(stdout);
379 #endif
380       return;
381    }
382 #ifdef SCHED_DEBUG
383    Dmsg4(000, "Add: Job=\"%s\" run=%x last_run=%x now=%x\n", job->hdr.name, 
384             runtime, run->last_run, now);
385 #endif
386    /* accept to run this job */
387    job_item *je = (job_item *)malloc(sizeof(job_item));
388    je->run = run;
389    je->job = job;
390    je->runtime = runtime;
391    if (run->Priority) {
392       je->Priority = run->Priority;
393    } else {
394       je->Priority = job->Priority;
395    }
396
397    /* Add this job to the wait queue in runtime, priority sorted order */
398    foreach_dlist(ji, jobs_to_run) {
399       if (ji->runtime > je->runtime ||
400           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
401          jobs_to_run->insert_before(je, ji);
402          dump_job(je, _("Inserted job"));
403          inserted = true;
404          break;
405       }
406    }
407    /* If place not found in queue, append it */
408    if (!inserted) {
409       jobs_to_run->append(je);
410       dump_job(je, _("Appended job"));
411    }
412 #ifdef SCHED_DEBUG
413    foreach_dlist(ji, jobs_to_run) {
414       dump_job(ji, _("Run queue"));
415    }
416    Dmsg0(000, "End run queue\n");
417 #endif
418 }
419
420 static void dump_job(job_item *ji, const char *msg)
421 {
422 #ifdef SCHED_DEBUG
423    char dt[MAX_TIME_LENGTH];
424    int save_debug = debug_level;
425    if (debug_level < dbglvl) {
426       return;
427    }
428    bstrftime_nc(dt, sizeof(dt), ji->runtime);
429    Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name, 
430       ji->Priority, dt);
431    fflush(stdout);
432    debug_level = save_debug;
433 #endif
434 }