]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
83bdcdea0eeb6af6618e4f63bfcd1ccd301bd092
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2  *
3  *   Bacula scheduler
4  *     It looks at what jobs are to be run and when
5  *     and waits around until it is time to
6  *     fire them up.
7  *
8  *     Kern Sibbald, May MM, major revision December MMIII
9  *
10  *   Version $Id$
11  */
12 /*
13    Copyright (C) 2000-2006 Kern Sibbald
14
15    This program is free software; you can redistribute it and/or
16    modify it under the terms of the GNU General Public License
17    version 2 as amended with additional clauses defined in the
18    file LICENSE in the main source directory.
19
20    This program is distributed in the hope that it will be useful,
21    but WITHOUT ANY WARRANTY; without even the implied warranty of
22    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
23    the file LICENSE for additional details.
24
25  */
26
27 #include "bacula.h"
28 #include "dird.h"
29
30 #if 0
31 #define SCHED_DEBUG
32 #define DBGLVL 0
33 #else
34 #undef SCHED_DEBUG
35 #define DBGLVL 200
36 #endif
37
38 const int dbglvl = DBGLVL;
39
40 /* Local variables */
41 struct job_item {
42    RUN *run;
43    JOB *job;
44    time_t runtime;
45    int Priority;
46    dlink link;                        /* link for list */
47 };
48
49 /* List of jobs to be run. They were scheduled in this hour or the next */
50 static dlist *jobs_to_run;               /* list of jobs to be run */
51
52 /* Time interval in secs to sleep if nothing to be run */
53 static int const next_check_secs = 60;
54
55 /* Forward referenced subroutines */
56 static void find_runs();
57 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
58 static void dump_job(job_item *ji, const char *msg);
59
60 /* Imported subroutines */
61
62 /* Imported variables */
63
64 /**
65  * called by reload_config to tell us that the schedules
66  * we may have based our next jobs to run queues have been
67  * invalidated.  In fact the schedules may not have changed
68  * but the run object that we have recorded the last_run time
69  * on are new and no longer have a valid last_run time which
70  * causes us to double run schedules that get put into the list
71  * because run_nh = 1.
72  */   
73 static bool schedules_invalidated = false;
74 void invalidate_schedules(void) {
75     schedules_invalidated = true;
76 }
77
78 /*********************************************************************
79  *
80  *         Main Bacula Scheduler
81  *
82  */
83 JCR *wait_for_next_job(char *one_shot_job_to_run)
84 {
85    JCR *jcr;
86    JOB *job;
87    RUN *run;
88    time_t now, prev;
89    static bool first = true;
90    job_item *next_job = NULL;
91
92    Dmsg0(dbglvl, "Enter wait_for_next_job\n");
93    if (first) {
94       first = false;
95       /* Create scheduled jobs list */
96       jobs_to_run = New(dlist(next_job, &next_job->link));
97       if (one_shot_job_to_run) {            /* one shot */
98          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
99          if (!job) {
100             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
101          }
102          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
103          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
104          set_jcr_defaults(jcr, job);
105          return jcr;
106       }
107    }
108    /* Wait until we have something in the
109     * next hour or so.
110     */
111 again:
112    while (jobs_to_run->empty()) {
113       find_runs();
114       if (!jobs_to_run->empty()) {
115          break;
116       }
117       bmicrosleep(next_check_secs, 0); /* recheck once per minute */
118    }
119
120 #ifdef  list_chain
121    job_item *je;
122    foreach_dlist(je, jobs_to_run) {
123       dump_job(je, _("Walk queue"));
124    }
125 #endif
126    /*
127     * Pull the first job to run (already sorted by runtime and
128     *  Priority, then wait around until it is time to run it.
129     */
130    next_job = (job_item *)jobs_to_run->first();
131    jobs_to_run->remove(next_job);
132
133    dump_job(next_job, _("Dequeued job"));
134
135    if (!next_job) {                /* we really should have something now */
136       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
137    }
138
139    /* Now wait for the time to run the job */
140    for (;;) {
141       time_t twait;
142       /** discard scheduled queue and rebuild with new schedule objects. **/
143       lock_jobs();
144       if (schedules_invalidated) { 
145           dump_job(next_job, "Invalidated job");
146           free(next_job);
147           while (!jobs_to_run->empty()) {
148               next_job = (job_item *)jobs_to_run->first();
149               jobs_to_run->remove(next_job);
150               dump_job(next_job, "Invalidated job");
151               free(next_job);
152           }
153           schedules_invalidated = false;
154           unlock_jobs();
155           goto again;
156       }
157       unlock_jobs();
158       prev = now = time(NULL);
159       twait = next_job->runtime - now;
160       if (twait <= 0) {               /* time to run it */
161          break;
162       }
163       /* Recheck at least once per minute */
164       bmicrosleep((next_check_secs < twait)?next_check_secs:twait, 0);
165       /* Attempt to handle clock shift from/to daylight savings time
166        * we allow a skew of 10 seconds before invalidating everything.
167        */
168       now = time(NULL);
169       if (now < prev+10 || now > (prev+next_check_secs+10)) {
170          schedules_invalidated = true;
171       }
172    }
173    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
174    run = next_job->run;               /* pick up needed values */
175    job = next_job->job;
176    if (job->enabled) {
177       dump_job(next_job, _("Run job"));
178    }
179    free(next_job);
180    if (!job->enabled) {
181       free_jcr(jcr);
182       goto again;                     /* ignore this job */
183    }
184    run->last_run = now;               /* mark as run now */
185
186    ASSERT(job);
187    set_jcr_defaults(jcr, job);
188    if (run->level) {
189       jcr->JobLevel = run->level;     /* override run level */
190    }
191    if (run->pool) {
192       jcr->pool = run->pool;          /* override pool */
193    }
194    if (run->full_pool) {
195       jcr->full_pool = run->full_pool; /* override full pool */
196    }
197    if (run->inc_pool) {
198       jcr->inc_pool = run->inc_pool;  /* override inc pool */
199    }
200    if (run->dif_pool) {
201       jcr->dif_pool = run->dif_pool;  /* override dif pool */
202    }
203    if (run->storage) {
204       set_storage(jcr, run->storage); /* override storage */
205    }
206    if (run->msgs) {
207       jcr->messages = run->msgs;      /* override messages */
208    }
209    if (run->Priority) {
210       jcr->JobPriority = run->Priority;
211    }
212    if (run->spool_data_set) {
213       jcr->spool_data = run->spool_data;
214    }
215    if (run->write_part_after_job_set) {
216       jcr->write_part_after_job = run->write_part_after_job;
217    }
218    Dmsg0(dbglvl, "Leave wait_for_next_job()\n");
219    return jcr;
220 }
221
222
223 /*
224  * Shutdown the scheduler
225  */
226 void term_scheduler()
227 {
228    if (jobs_to_run) {
229       job_item *je;
230       /* Release all queued job entries to be run */
231       foreach_dlist(je, jobs_to_run) {
232          free(je);
233       }
234       delete jobs_to_run;
235    }
236 }
237
238 /*
239  * Find all jobs to be run this hour and the next hour.
240  */
241 static void find_runs()
242 {
243    time_t now, next_hour, runtime;
244    RUN *run;
245    JOB *job;
246    SCHED *sched;
247    struct tm tm;
248    int minute;
249    int hour, mday, wday, month, wom, woy;
250    /* Items corresponding to above at the next hour */
251    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year;
252
253    Dmsg0(dbglvl, "enter find_runs()\n");
254
255
256    /* compute values for time now */
257    now = time(NULL);
258    localtime_r(&now, &tm);
259    hour = tm.tm_hour;
260    minute = tm.tm_min;
261    mday = tm.tm_mday - 1;
262    wday = tm.tm_wday;
263    month = tm.tm_mon;
264    wom = mday / 7;
265    woy = tm_woy(now);                     /* get week of year */
266
267    /*
268     * Compute values for next hour from now.
269     * We do this to be sure we don't miss a job while
270     * sleeping.
271     */
272    next_hour = now + 3600;
273    localtime_r(&next_hour, &tm);
274    nh_hour = tm.tm_hour;
275    nh_mday = tm.tm_mday - 1;
276    nh_wday = tm.tm_wday;
277    nh_month = tm.tm_mon;
278    nh_year  = tm.tm_year;
279    nh_wom = nh_mday / 7;
280    nh_woy = tm_woy(now);                     /* get week of year */
281
282    /* Loop through all jobs */
283    LockRes();
284    foreach_res(job, R_JOB) {
285       sched = job->schedule;
286       if (sched == NULL || !job->enabled) { /* scheduled? or enabled? */
287          continue;                    /* no, skip this job */
288       }
289       Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name);
290       for (run=sched->run; run; run=run->next) {
291          bool run_now, run_nh;
292          /*
293           * Find runs scheduled between now and the next hour.
294           */
295 #ifdef xxxx
296          Dmsg0(000, "\n");
297          Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
298             hour, month, mday, wday, wom, woy);
299          Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
300             bit_is_set(hour, run->hour),
301             bit_is_set(month, run->month),
302             bit_is_set(mday, run->mday),
303             bit_is_set(wday, run->wday),
304             bit_is_set(wom, run->wom),
305             bit_is_set(woy, run->woy));
306
307          Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
308             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
309          Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
310             bit_is_set(nh_hour, run->hour),
311             bit_is_set(nh_month, run->month),
312             bit_is_set(nh_mday, run->mday),
313             bit_is_set(nh_wday, run->wday),
314             bit_is_set(nh_wom, run->wom),
315             bit_is_set(nh_woy, run->woy));
316 #endif
317
318          run_now = bit_is_set(hour, run->hour) &&
319             bit_is_set(mday, run->mday) &&
320             bit_is_set(wday, run->wday) &&
321             bit_is_set(month, run->month) &&
322             bit_is_set(wom, run->wom) &&
323             bit_is_set(woy, run->woy);
324
325          run_nh = bit_is_set(nh_hour, run->hour) &&
326             bit_is_set(nh_mday, run->mday) &&
327             bit_is_set(nh_wday, run->wday) &&
328             bit_is_set(nh_month, run->month) &&
329             bit_is_set(nh_wom, run->wom) &&
330             bit_is_set(nh_woy, run->woy);
331
332          Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh);
333
334          /* find time (time_t) job is to be run */
335          localtime_r(&now, &tm);      /* reset tm structure */
336          tm.tm_min = run->minute;     /* set run minute */
337          tm.tm_sec = 0;               /* zero secs */
338          if (run_now) {
339             runtime = mktime(&tm);
340             add_job(job, run, now, runtime);
341          }
342          /* If job is to be run in the next hour schedule it */
343          if (run_nh) {
344             /* Set correct values */
345             tm.tm_hour = nh_hour;
346             tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */
347             tm.tm_mon = nh_month;
348             tm.tm_year = nh_year;
349             runtime = mktime(&tm);
350             add_job(job, run, now, runtime);
351          }
352       }
353    }
354    UnlockRes();
355    Dmsg0(dbglvl, "Leave find_runs()\n");
356 }
357
358 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
359 {
360    job_item *ji;
361    bool inserted = false;
362    /*
363     * Don't run any job that ran less than a minute ago, but
364     *  do run any job scheduled less than a minute ago.
365     */
366    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
367 #ifdef SCHED_DEBUG
368       char dt[50], dt1[50], dt2[50];
369       bstrftime_nc(dt, sizeof(dt), runtime);
370       bstrftime_nc(dt1, sizeof(dt1), run->last_run);
371       bstrftime_nc(dt2, sizeof(dt2), now);
372       Dmsg7(000, "Drop: Job=\"%s\" run=%s(%x). last_run=%s(%x). now=%s(%x)\n", job->hdr.name, 
373             dt, runtime, dt1, run->last_run, dt2, now);
374       fflush(stdout);
375 #endif
376       return;
377    }
378 #ifdef SCHED_DEBUG
379    Dmsg4(000, "Add: Job=\"%s\" run=%x last_run=%x now=%x\n", job->hdr.name, 
380             runtime, run->last_run, now);
381 #endif
382    /* accept to run this job */
383    job_item *je = (job_item *)malloc(sizeof(job_item));
384    je->run = run;
385    je->job = job;
386    je->runtime = runtime;
387    if (run->Priority) {
388       je->Priority = run->Priority;
389    } else {
390       je->Priority = job->Priority;
391    }
392
393    /* Add this job to the wait queue in runtime, priority sorted order */
394    foreach_dlist(ji, jobs_to_run) {
395       if (ji->runtime > je->runtime ||
396           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
397          jobs_to_run->insert_before(je, ji);
398          dump_job(je, _("Inserted job"));
399          inserted = true;
400          break;
401       }
402    }
403    /* If place not found in queue, append it */
404    if (!inserted) {
405       jobs_to_run->append(je);
406       dump_job(je, _("Appended job"));
407    }
408 #ifdef SCHED_DEBUG
409    foreach_dlist(ji, jobs_to_run) {
410       dump_job(ji, _("Run queue"));
411    }
412    Dmsg0(000, "End run queue\n");
413 #endif
414 }
415
416 static void dump_job(job_item *ji, const char *msg)
417 {
418 #ifdef SCHED_DEBUG
419    char dt[MAX_TIME_LENGTH];
420    int save_debug = debug_level;
421    if (debug_level < dbglvl) {
422       return;
423    }
424    bstrftime_nc(dt, sizeof(dt), ji->runtime);
425    Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name, 
426       ji->Priority, dt);
427    fflush(stdout);
428    debug_level = save_debug;
429 #endif
430 }