]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
Backport from BEE
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from many
7    others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    Bacula® is a registered trademark of Kern Sibbald.
15 */
16 /*
17  *
18  *   Bacula scheduler
19  *     It looks at what jobs are to be run and when
20  *     and waits around until it is time to
21  *     fire them up.
22  *
23  *     Kern Sibbald, May MM, major revision December MMIII
24  *
25  */
26
27 #include "bacula.h"
28 #include "dird.h"
29
30 #if 0
31 #define SCHED_DEBUG
32 #define DBGLVL 0
33 #else
34 #undef SCHED_DEBUG
35 #define DBGLVL DT_SCHEDULER|200
36 #endif
37
38 const int dbglvl = DBGLVL;
39
40 /* Local variables */
41 struct job_item {
42    RUN *run;
43    JOB *job;
44    time_t runtime;
45    int Priority;
46    dlink link;                        /* link for list */
47 };
48
49 /* List of jobs to be run. They were scheduled in this hour or the next */
50 static dlist *jobs_to_run;               /* list of jobs to be run */
51
52 /* Time interval in secs to sleep if nothing to be run */
53 static int const next_check_secs = 60;
54
55 /* Forward referenced subroutines */
56 static void find_runs();
57 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
58 static void dump_job(job_item *ji, const char *msg);
59
60 /* Imported subroutines */
61
62 /* Imported variables */
63
64 /*
65  * called by reload_config to tell us that the schedules
66  * we may have based our next jobs to run queues have been
67  * invalidated.  In fact the schedules may not have changed
68  * but the run object that we have recorded the last_run time
69  * on are new and no longer have a valid last_run time which
70  * causes us to double run schedules that get put into the list
71  * because run_nh = 1.
72  */
73 static bool schedules_invalidated = false;
74 void invalidate_schedules(void) {
75     schedules_invalidated = true;
76 }
77
78 /*********************************************************************
79  *
80  *         Main Bacula Scheduler
81  *
82  */
83 JCR *wait_for_next_job(char *one_shot_job_to_run)
84 {
85    JCR *jcr;
86    JOB *job;
87    RUN *run;
88    time_t now, prev;
89    static bool first = true;
90    job_item *next_job = NULL;
91
92    Dmsg0(dbglvl, "Enter wait_for_next_job\n");
93    if (first) {
94       first = false;
95       /* Create scheduled jobs list */
96       jobs_to_run = New(dlist(next_job, &next_job->link));
97       if (one_shot_job_to_run) {            /* one shot */
98          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
99          if (!job) {
100             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
101          }
102          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
103          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
104          set_jcr_defaults(jcr, job);
105          return jcr;
106       }
107    }
108
109    /* Wait until we have something in the
110     * next hour or so.
111     */
112 again:
113    while (jobs_to_run->empty()) {
114       find_runs();
115       if (!jobs_to_run->empty()) {
116          break;
117       }
118       bmicrosleep(next_check_secs, 0); /* recheck once per minute */
119    }
120
121 #ifdef  list_chain
122    job_item *je;
123    foreach_dlist(je, jobs_to_run) {
124       dump_job(je, _("Walk queue"));
125    }
126 #endif
127    /*
128     * Pull the first job to run (already sorted by runtime and
129     *  Priority, then wait around until it is time to run it.
130     */
131    next_job = (job_item *)jobs_to_run->first();
132    jobs_to_run->remove(next_job);
133
134    dump_job(next_job, _("Dequeued job"));
135
136    if (!next_job) {                /* we really should have something now */
137       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
138    }
139
140    /* Now wait for the time to run the job */
141    for (;;) {
142       time_t twait;
143       /** discard scheduled queue and rebuild with new schedule objects. **/
144       lock_jobs();
145       if (schedules_invalidated) {
146           dump_job(next_job, "Invalidated job");
147           free(next_job);
148           while (!jobs_to_run->empty()) {
149               next_job = (job_item *)jobs_to_run->first();
150               jobs_to_run->remove(next_job);
151               dump_job(next_job, "Invalidated job");
152               free(next_job);
153           }
154           schedules_invalidated = false;
155           unlock_jobs();
156           goto again;
157       }
158       unlock_jobs();
159       prev = now = time(NULL);
160       twait = next_job->runtime - now;
161       if (twait <= 0) {               /* time to run it */
162          break;
163       }
164       /* Recheck at least once per minute */
165       bmicrosleep((next_check_secs < twait)?next_check_secs:twait, 0);
166       /* Attempt to handle clock shift (but not daylight savings time changes)
167        * we allow a skew of 10 seconds before invalidating everything.
168        */
169       now = time(NULL);
170       if (now < prev-10 || now > (prev+next_check_secs+10)) {
171          schedules_invalidated = true;
172       }
173    }
174    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
175    run = next_job->run;               /* pick up needed values */
176    job = next_job->job;
177    if (job->enabled) {
178       dump_job(next_job, _("Run job"));
179    }
180    free(next_job);
181    if (!job->enabled) {
182       free_jcr(jcr);
183       goto again;                     /* ignore this job */
184    }
185    run->last_run = now;               /* mark as run now */
186
187    ASSERT(job);
188    set_jcr_defaults(jcr, job);
189    if (run->level) {
190       jcr->setJobLevel(run->level);  /* override run level */
191    }
192    if (run->pool) {
193       jcr->pool = run->pool;          /* override pool */
194       jcr->run_pool_override = true;
195    }
196    if (run->next_pool) {
197       jcr->next_pool = run->next_pool; /* override next pool */
198       jcr->run_next_pool_override = true;
199    }
200    if (run->full_pool) {
201       jcr->full_pool = run->full_pool; /* override full pool */
202       jcr->run_full_pool_override = true;
203    }
204    if (run->inc_pool) {
205       jcr->inc_pool = run->inc_pool;  /* override inc pool */
206       jcr->run_inc_pool_override = true;
207    }
208    if (run->diff_pool) {
209       jcr->diff_pool = run->diff_pool;  /* override dif pool */
210       jcr->run_diff_pool_override = true;
211    }
212    if (run->storage) {
213       USTORE store;
214       store.store = run->storage;
215       pm_strcpy(store.store_source, _("run override"));
216       set_rwstorage(jcr, &store);     /* override storage */
217    }
218    if (run->msgs) {
219       jcr->messages = run->msgs;      /* override messages */
220    }
221    if (run->Priority) {
222       jcr->JobPriority = run->Priority;
223    }
224    if (run->spool_data_set) {
225       jcr->spool_data = run->spool_data;
226    }
227    if (run->accurate_set) {     /* overwrite accurate mode */
228       jcr->accurate = run->accurate;
229    }
230    if (run->write_part_after_job_set) {
231       jcr->write_part_after_job = run->write_part_after_job;
232    }
233    if (run->MaxRunSchedTime_set) {
234       jcr->MaxRunSchedTime = run->MaxRunSchedTime;
235    }
236    Dmsg0(dbglvl, "Leave wait_for_next_job()\n");
237    return jcr;
238 }
239
240
241 /*
242  * Shutdown the scheduler
243  */
244 void term_scheduler()
245 {
246    if (jobs_to_run) {
247       delete jobs_to_run;
248    }
249 }
250
251 /*
252  * Find all jobs to be run this hour and the next hour.
253  */
254 static void find_runs()
255 {
256    time_t now, next_hour, runtime;
257    RUN *run;
258    JOB *job;
259    SCHED *sched;
260    struct tm tm;
261    int hour, mday, wday, month, wom, woy, ldom;
262    /* Items corresponding to above at the next hour */
263    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_ldom;
264
265    Dmsg0(dbglvl, "enter find_runs()\n");
266
267    /* compute values for time now */
268    now = time(NULL);
269    (void)localtime_r(&now, &tm);
270    hour = tm.tm_hour;
271    mday = tm.tm_mday - 1;
272    wday = tm.tm_wday;
273    month = tm.tm_mon;
274    wom = mday / 7;
275    woy = tm_woy(now);                     /* get week of year */
276    ldom = tm_ldom(month, tm.tm_year + 1900);
277
278    Dmsg7(dbglvl, "now = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
279          now, hour, month, mday, wday, wom, woy);
280
281    /*
282     * Compute values for next hour from now.
283     * We do this to be sure we don't miss a job while
284     * sleeping.
285     */
286    next_hour = now + 3600;
287    (void)localtime_r(&next_hour, &tm);
288    nh_hour = tm.tm_hour;
289    nh_mday = tm.tm_mday - 1;
290    nh_wday = tm.tm_wday;
291    nh_month = tm.tm_mon;
292    nh_wom = nh_mday / 7;
293    nh_woy = tm_woy(next_hour);              /* get week of year */
294    nh_ldom = tm_ldom(nh_month, tm.tm_year + 1900);
295
296    Dmsg7(dbglvl, "nh = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
297          next_hour, nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
298
299    /* Loop through all jobs */
300    LockRes();
301    foreach_res(job, R_JOB) {
302       sched = job->schedule;
303       if (sched == NULL || !job->enabled) { /* scheduled? or enabled? */
304          continue;                    /* no, skip this job */
305       }
306       Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name);
307       for (run=sched->run; run; run=run->next) {
308          bool run_now, run_nh;
309          /*
310           * Find runs scheduled between now and the next hour.
311           */
312 #ifdef xxxx
313          Dmsg0(000, "\n");
314          Dmsg7(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
315             hour, month, mday, wday, wom, woy, ldom);
316          Dmsg7(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
317             bit_is_set(hour, run->hour),
318             bit_is_set(month, run->month),
319             bit_is_set(mday, run->mday),
320             bit_is_set(wday, run->wday),
321             bit_is_set(wom, run->wom),
322             bit_is_set(woy, run->woy),
323             bit_is_set(31, run->mday));
324
325
326          Dmsg7(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
327             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy, nh_ldom);
328          Dmsg7(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
329             bit_is_set(nh_hour, run->hour),
330             bit_is_set(nh_month, run->month),
331             bit_is_set(nh_mday, run->mday),
332             bit_is_set(nh_wday, run->wday),
333             bit_is_set(nh_wom, run->wom),
334             bit_is_set(nh_woy, run->woy),
335             bit_is_set(31, run->mday));
336 #endif
337
338          run_now = bit_is_set(hour, run->hour) &&
339             ((bit_is_set(mday, run->mday) &&
340               bit_is_set(wday, run->wday) &&
341               bit_is_set(month, run->month) &&
342               bit_is_set(wom, run->wom) &&
343               bit_is_set(woy, run->woy)) ||
344              (bit_is_set(month, run->month) &&
345               bit_is_set(31, run->mday) && mday == ldom));
346
347          run_nh = bit_is_set(nh_hour, run->hour) &&
348             ((bit_is_set(nh_mday, run->mday) &&
349               bit_is_set(nh_wday, run->wday) &&
350               bit_is_set(nh_month, run->month) &&
351               bit_is_set(nh_wom, run->wom) &&
352               bit_is_set(nh_woy, run->woy)) ||
353              (bit_is_set(nh_month, run->month) &&
354               bit_is_set(31, run->mday) && nh_mday == nh_ldom));
355
356          Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh);
357
358          if (run_now || run_nh) {
359            /* find time (time_t) job is to be run */
360            (void)localtime_r(&now, &tm);      /* reset tm structure */
361            tm.tm_min = run->minute;     /* set run minute */
362            tm.tm_sec = 0;               /* zero secs */
363            runtime = mktime(&tm);
364            if (run_now) {
365              add_job(job, run, now, runtime);
366            }
367            /* If job is to be run in the next hour schedule it */
368            if (run_nh) {
369              add_job(job, run, now, runtime + 3600);
370            }
371          }
372       }
373    }
374    UnlockRes();
375    Dmsg0(dbglvl, "Leave find_runs()\n");
376 }
377
378 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
379 {
380    job_item *ji;
381    bool inserted = false;
382    /*
383     * Don't run any job that ran less than a minute ago, but
384     *  do run any job scheduled less than a minute ago.
385     */
386    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
387 #ifdef SCHED_DEBUG
388       Dmsg4(000, "Drop: Job=\"%s\" run=%lld. last_run=%lld. now=%lld\n", job->hdr.name,
389             (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
390       fflush(stdout);
391 #endif
392       return;
393    }
394 #ifdef SCHED_DEBUG
395    Dmsg4(000, "Add: Job=\"%s\" run=%lld last_run=%lld now=%lld\n", job->hdr.name,
396             (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
397 #endif
398    /* accept to run this job */
399    job_item *je = (job_item *)malloc(sizeof(job_item));
400    je->run = run;
401    je->job = job;
402    je->runtime = runtime;
403    if (run->Priority) {
404       je->Priority = run->Priority;
405    } else {
406       je->Priority = job->Priority;
407    }
408
409    /* Add this job to the wait queue in runtime, priority sorted order */
410    foreach_dlist(ji, jobs_to_run) {
411       if (ji->runtime > je->runtime ||
412           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
413          jobs_to_run->insert_before(je, ji);
414          dump_job(je, _("Inserted job"));
415          inserted = true;
416          break;
417       }
418    }
419    /* If place not found in queue, append it */
420    if (!inserted) {
421       jobs_to_run->append(je);
422       dump_job(je, _("Appended job"));
423    }
424 #ifdef SCHED_DEBUG
425    foreach_dlist(ji, jobs_to_run) {
426       dump_job(ji, _("Run queue"));
427    }
428    Dmsg0(000, "End run queue\n");
429 #endif
430 }
431
432 static void dump_job(job_item *ji, const char *msg)
433 {
434 #ifdef SCHED_DEBUG
435    char dt[MAX_TIME_LENGTH];
436    int64_t save_debug = debug_level;
437    if (!chk_dbglvl(dbglvl)) {
438       return;
439    }
440    bstrftime_nc(dt, sizeof(dt), ji->runtime);
441    Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name,
442       ji->Priority, dt);
443    fflush(stdout);
444    debug_level = save_debug;
445 #endif
446 }