]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
Backport from Bacula Enterprise
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5    Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
6
7    The original author of Bacula is Kern Sibbald, with contributions
8    from many others, a complete list can be found in the file AUTHORS.
9
10    You may use this file and others of this release according to the
11    license defined in the LICENSE file, which includes the Affero General
12    Public License, v3.0 ("AGPLv3") and some additional permissions and
13    terms pursuant to its AGPLv3 Section 7.
14
15    This notice must be preserved when any source code is 
16    conveyed and/or propagated.
17
18    Bacula(R) is a registered trademark of Kern Sibbald.
19 */
20 /*
21  *
22  *   Bacula scheduler
23  *     It looks at what jobs are to be run and when
24  *     and waits around until it is time to
25  *     fire them up.
26  *
27  *     Kern Sibbald, May MM, major revision December MMIII
28  *
29  */
30
31 #include "bacula.h"
32 #include "dird.h"
33
34 #if 0
35 #define SCHED_DEBUG
36 #define DBGLVL 0
37 #else
38 #undef SCHED_DEBUG
39 #define DBGLVL DT_SCHEDULER|200
40 #endif
41
42 const int dbglvl = DBGLVL;
43
44 /* Local variables */
45 struct job_item {
46    RUN *run;
47    JOB *job;
48    time_t runtime;
49    int Priority;
50    dlink link;                        /* link for list */
51 };
52
53 /* List of jobs to be run. They were scheduled in this hour or the next */
54 static dlist *jobs_to_run;               /* list of jobs to be run */
55
56 /* Time interval in secs to sleep if nothing to be run */
57 static int const next_check_secs = 60;
58
59 /* Forward referenced subroutines */
60 static void find_runs();
61 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
62 static void dump_job(job_item *ji, const char *msg);
63
64 /* Imported subroutines */
65
66 /* Imported variables */
67
68 /*
69  * called by reload_config to tell us that the schedules
70  * we may have based our next jobs to run queues have been
71  * invalidated.  In fact the schedules may not have changed
72  * but the run object that we have recorded the last_run time
73  * on are new and no longer have a valid last_run time which
74  * causes us to double run schedules that get put into the list
75  * because run_nh = 1.
76  */
77 static bool schedules_invalidated = false;
78 void invalidate_schedules(void) {
79     schedules_invalidated = true;
80 }
81
82 /*********************************************************************
83  *
84  *         Main Bacula Scheduler
85  *
86  */
87 JCR *wait_for_next_job(char *one_shot_job_to_run)
88 {
89    JCR *jcr;
90    JOB *job;
91    RUN *run;
92    time_t now, prev;
93    static bool first = true;
94    job_item *next_job = NULL;
95
96    Dmsg0(dbglvl, "Enter wait_for_next_job\n");
97    if (first) {
98       first = false;
99       /* Create scheduled jobs list */
100       jobs_to_run = New(dlist(next_job, &next_job->link));
101       if (one_shot_job_to_run) {            /* one shot */
102          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
103          if (!job) {
104             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
105          }
106          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
107          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
108          set_jcr_defaults(jcr, job);
109          return jcr;
110       }
111    }
112
113    /* Wait until we have something in the
114     * next hour or so.
115     */
116 again:
117    while (jobs_to_run->empty()) {
118       find_runs();
119       if (!jobs_to_run->empty()) {
120          break;
121       }
122       bmicrosleep(next_check_secs, 0); /* recheck once per minute */
123    }
124
125 #ifdef  list_chain
126    job_item *je;
127    foreach_dlist(je, jobs_to_run) {
128       dump_job(je, _("Walk queue"));
129    }
130 #endif
131    /*
132     * Pull the first job to run (already sorted by runtime and
133     *  Priority, then wait around until it is time to run it.
134     */
135    next_job = (job_item *)jobs_to_run->first();
136    jobs_to_run->remove(next_job);
137
138    dump_job(next_job, _("Dequeued job"));
139
140    if (!next_job) {                /* we really should have something now */
141       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
142    }
143
144    /* Now wait for the time to run the job */
145    for (;;) {
146       time_t twait;
147       /** discard scheduled queue and rebuild with new schedule objects. **/
148       lock_jobs();
149       if (schedules_invalidated) {
150           dump_job(next_job, "Invalidated job");
151           free(next_job);
152           while (!jobs_to_run->empty()) {
153               next_job = (job_item *)jobs_to_run->first();
154               jobs_to_run->remove(next_job);
155               dump_job(next_job, "Invalidated job");
156               free(next_job);
157           }
158           schedules_invalidated = false;
159           unlock_jobs();
160           goto again;
161       }
162       unlock_jobs();
163       prev = now = time(NULL);
164       twait = next_job->runtime - now;
165       if (twait <= 0) {               /* time to run it */
166          break;
167       }
168       /* Recheck at least once per minute */
169       bmicrosleep((next_check_secs < twait)?next_check_secs:twait, 0);
170       /* Attempt to handle clock shift (but not daylight savings time changes)
171        * we allow a skew of 10 seconds before invalidating everything.
172        */
173       now = time(NULL);
174       if (now < prev-10 || now > (prev+next_check_secs+10)) {
175          schedules_invalidated = true;
176       }
177    }
178    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
179    run = next_job->run;               /* pick up needed values */
180    job = next_job->job;
181    if (job->enabled && (!job->client || job->client->enabled)) {
182       dump_job(next_job, _("Run job"));  /* no client and job enabled */
183    }
184    free(next_job);
185    if (!job->enabled || (job->client && !job->client->enabled)) {
186       free_jcr(jcr);
187       goto again;                     /* ignore this job */
188    }
189    run->last_run = now;               /* mark as run now */
190
191    ASSERT(job);
192    set_jcr_defaults(jcr, job);
193    if (run->level) {
194       jcr->setJobLevel(run->level);  /* override run level */
195    }
196    if (run->pool) {
197       jcr->pool = run->pool;          /* override pool */
198       jcr->run_pool_override = true;
199    }
200    if (run->next_pool) {
201       jcr->next_pool = run->next_pool; /* override next pool */
202       jcr->run_next_pool_override = true;
203    }
204    if (run->full_pool) {
205       jcr->full_pool = run->full_pool; /* override full pool */
206       jcr->run_full_pool_override = true;
207    }
208    if (run->inc_pool) {
209       jcr->inc_pool = run->inc_pool;  /* override inc pool */
210       jcr->run_inc_pool_override = true;
211    }
212    if (run->diff_pool) {
213       jcr->diff_pool = run->diff_pool;  /* override dif pool */
214       jcr->run_diff_pool_override = true;
215    }
216    if (run->storage) {
217       USTORE store;
218       store.store = run->storage;
219       pm_strcpy(store.store_source, _("run override"));
220       set_rwstorage(jcr, &store);     /* override storage */
221    }
222    if (run->msgs) {
223       jcr->messages = run->msgs;      /* override messages */
224    }
225    if (run->Priority) {
226       jcr->JobPriority = run->Priority;
227    }
228    if (run->spool_data_set) {
229       jcr->spool_data = run->spool_data;
230    }
231    if (run->accurate_set) {     /* overwrite accurate mode */
232       jcr->accurate = run->accurate;
233    }
234    if (run->write_part_after_job_set) {
235       jcr->write_part_after_job = run->write_part_after_job;
236    }
237    if (run->MaxRunSchedTime_set) {
238       jcr->MaxRunSchedTime = run->MaxRunSchedTime;
239    }
240    Dmsg0(dbglvl, "Leave wait_for_next_job()\n");
241    return jcr;
242 }
243
244
245 /*
246  * Shutdown the scheduler
247  */
248 void term_scheduler()
249 {
250    if (jobs_to_run) {
251       delete jobs_to_run;
252    }
253 }
254
255 /*
256  * Find all jobs to be run this hour and the next hour.
257  */
258 static void find_runs()
259 {
260    time_t now, next_hour, runtime;
261    RUN *run;
262    JOB *job;
263    SCHED *sched;
264    struct tm tm;
265    int hour, mday, wday, month, wom, woy, ldom;
266    /* Items corresponding to above at the next hour */
267    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_ldom;
268
269    Dmsg0(dbglvl, "enter find_runs()\n");
270
271    /* compute values for time now */
272    now = time(NULL);
273    (void)localtime_r(&now, &tm);
274    hour = tm.tm_hour;
275    mday = tm.tm_mday - 1;
276    wday = tm.tm_wday;
277    month = tm.tm_mon;
278    wom = mday / 7;
279    woy = tm_woy(now);                     /* get week of year */
280    ldom = tm_ldom(month, tm.tm_year + 1900);
281
282    Dmsg7(dbglvl, "now = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
283          now, hour, month, mday, wday, wom, woy);
284
285    /*
286     * Compute values for next hour from now.
287     * We do this to be sure we don't miss a job while
288     * sleeping.
289     */
290    next_hour = now + 3600;
291    (void)localtime_r(&next_hour, &tm);
292    nh_hour = tm.tm_hour;
293    nh_mday = tm.tm_mday - 1;
294    nh_wday = tm.tm_wday;
295    nh_month = tm.tm_mon;
296    nh_wom = nh_mday / 7;
297    nh_woy = tm_woy(next_hour);              /* get week of year */
298    nh_ldom = tm_ldom(nh_month, tm.tm_year + 1900);
299
300    Dmsg7(dbglvl, "nh = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
301          next_hour, nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
302
303    /* Loop through all jobs */
304    LockRes();
305    foreach_res(job, R_JOB) {
306       sched = job->schedule;
307       if (!sched || !job->enabled || (sched && !sched->enabled) ||
308          (job->client && !job->client->enabled)) {
309          continue;                    /* no, skip this job */
310       }
311       Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name);
312       for (run=sched->run; run; run=run->next) {
313          bool run_now, run_nh;
314          /*
315           * Find runs scheduled between now and the next hour.
316           */
317 #ifdef xxxx
318          Dmsg0(000, "\n");
319          Dmsg7(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
320             hour, month, mday, wday, wom, woy, ldom);
321          Dmsg7(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
322             bit_is_set(hour, run->hour),
323             bit_is_set(month, run->month),
324             bit_is_set(mday, run->mday),
325             bit_is_set(wday, run->wday),
326             bit_is_set(wom, run->wom),
327             bit_is_set(woy, run->woy),
328             bit_is_set(31, run->mday));
329
330
331          Dmsg7(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
332             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy, nh_ldom);
333          Dmsg7(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
334             bit_is_set(nh_hour, run->hour),
335             bit_is_set(nh_month, run->month),
336             bit_is_set(nh_mday, run->mday),
337             bit_is_set(nh_wday, run->wday),
338             bit_is_set(nh_wom, run->wom),
339             bit_is_set(nh_woy, run->woy),
340             bit_is_set(31, run->mday));
341 #endif
342
343          run_now = bit_is_set(hour, run->hour) &&
344             ((bit_is_set(mday, run->mday) &&
345               bit_is_set(wday, run->wday) &&
346               bit_is_set(month, run->month) &&
347               bit_is_set(wom, run->wom) &&
348               bit_is_set(woy, run->woy)) ||
349              (bit_is_set(month, run->month) &&
350               bit_is_set(31, run->mday) && mday == ldom));
351
352          run_nh = bit_is_set(nh_hour, run->hour) &&
353             ((bit_is_set(nh_mday, run->mday) &&
354               bit_is_set(nh_wday, run->wday) &&
355               bit_is_set(nh_month, run->month) &&
356               bit_is_set(nh_wom, run->wom) &&
357               bit_is_set(nh_woy, run->woy)) ||
358              (bit_is_set(nh_month, run->month) &&
359               bit_is_set(31, run->mday) && nh_mday == nh_ldom));
360
361          Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh);
362
363          if (run_now || run_nh) {
364            /* find time (time_t) job is to be run */
365            (void)localtime_r(&now, &tm);      /* reset tm structure */
366            tm.tm_min = run->minute;     /* set run minute */
367            tm.tm_sec = 0;               /* zero secs */
368            runtime = mktime(&tm);
369            if (run_now) {
370              add_job(job, run, now, runtime);
371            }
372            /* If job is to be run in the next hour schedule it */
373            if (run_nh) {
374              add_job(job, run, now, runtime + 3600);
375            }
376          }
377       }
378    }
379    UnlockRes();
380    Dmsg0(dbglvl, "Leave find_runs()\n");
381 }
382
383 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
384 {
385    job_item *ji;
386    bool inserted = false;
387    /*
388     * Don't run any job that ran less than a minute ago, but
389     *  do run any job scheduled less than a minute ago.
390     */
391    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
392 #ifdef SCHED_DEBUG
393       Dmsg4(000, "Drop: Job=\"%s\" run=%lld. last_run=%lld. now=%lld\n", job->hdr.name,
394             (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
395       fflush(stdout);
396 #endif
397       return;
398    }
399 #ifdef SCHED_DEBUG
400    Dmsg4(000, "Add: Job=\"%s\" run=%lld last_run=%lld now=%lld\n", job->hdr.name,
401             (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
402 #endif
403    /* accept to run this job */
404    job_item *je = (job_item *)malloc(sizeof(job_item));
405    je->run = run;
406    je->job = job;
407    je->runtime = runtime;
408    if (run->Priority) {
409       je->Priority = run->Priority;
410    } else {
411       je->Priority = job->Priority;
412    }
413
414    /* Add this job to the wait queue in runtime, priority sorted order */
415    foreach_dlist(ji, jobs_to_run) {
416       if (ji->runtime > je->runtime ||
417           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
418          jobs_to_run->insert_before(je, ji);
419          dump_job(je, _("Inserted job"));
420          inserted = true;
421          break;
422       }
423    }
424    /* If place not found in queue, append it */
425    if (!inserted) {
426       jobs_to_run->append(je);
427       dump_job(je, _("Appended job"));
428    }
429 #ifdef SCHED_DEBUG
430    foreach_dlist(ji, jobs_to_run) {
431       dump_job(ji, _("Run queue"));
432    }
433    Dmsg0(000, "End run queue\n");
434 #endif
435 }
436
437 static void dump_job(job_item *ji, const char *msg)
438 {
439 #ifdef SCHED_DEBUG
440    char dt[MAX_TIME_LENGTH];
441    int64_t save_debug = debug_level;
442
443    if (!chk_dbglvl(dbglvl)) {
444       return;
445    }
446    bstrftime_nc(dt, sizeof(dt), ji->runtime);
447    Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name,
448       ji->Priority, dt);
449    fflush(stdout);
450    debug_level = save_debug;
451 #endif
452 }