]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
Big backport from Enterprise
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  *
21  *   Bacula scheduler
22  *     It looks at what jobs are to be run and when
23  *     and waits around until it is time to
24  *     fire them up.
25  *
26  *     Kern Sibbald, May MM, major revision December MMIII
27  *
28  */
29
30 #include "bacula.h"
31 #include "dird.h"
32
33 #if 0
34 #define SCHED_DEBUG
35 #define DBGLVL 0
36 #else
37 #undef SCHED_DEBUG
38 #define DBGLVL DT_SCHEDULER|200
39 #endif
40
41 const int dbglvl = DBGLVL;
42
43 /* Local variables */
44 struct job_item {
45    RUN *run;
46    JOB *job;
47    time_t runtime;
48    int Priority;
49    dlink link;                        /* link for list */
50 };
51
52 /* List of jobs to be run. They were scheduled in this hour or the next */
53 static dlist *jobs_to_run;               /* list of jobs to be run */
54
55 /* Time interval in secs to sleep if nothing to be run */
56 static int const next_check_secs = 60;
57
58 /* Forward referenced subroutines */
59 static void find_runs();
60 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
61 static void dump_job(job_item *ji, const char *msg);
62
63 /* Imported subroutines */
64
65 /* Imported variables */
66
67 /*
68  * called by reload_config to tell us that the schedules
69  * we may have based our next jobs to run queues have been
70  * invalidated.  In fact the schedules may not have changed
71  * but the run object that we have recorded the last_run time
72  * on are new and no longer have a valid last_run time which
73  * causes us to double run schedules that get put into the list
74  * because run_nh = 1.
75  */
76 static bool schedules_invalidated = false;
77 void invalidate_schedules(void) {
78     schedules_invalidated = true;
79 }
80
81 /*********************************************************************
82  *
83  *         Main Bacula Scheduler
84  *
85  */
86 JCR *wait_for_next_job(char *one_shot_job_to_run)
87 {
88    JCR *jcr;
89    JOB *job;
90    RUN *run;
91    time_t now, prev;
92    static bool first = true;
93    job_item *next_job = NULL;
94
95    Dmsg0(dbglvl, "Enter wait_for_next_job\n");
96    if (first) {
97       first = false;
98       /* Create scheduled jobs list */
99       jobs_to_run = New(dlist(next_job, &next_job->link));
100       if (one_shot_job_to_run) {            /* one shot */
101          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
102          if (!job) {
103             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
104          }
105          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
106          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
107          set_jcr_defaults(jcr, job);
108          return jcr;
109       }
110    }
111
112    /* Wait until we have something in the
113     * next hour or so.
114     */
115 again:
116    while (jobs_to_run->empty()) {
117       find_runs();
118       if (!jobs_to_run->empty()) {
119          break;
120       }
121       bmicrosleep(next_check_secs, 0); /* recheck once per minute */
122    }
123
124 #ifdef  list_chain
125    job_item *je;
126    foreach_dlist(je, jobs_to_run) {
127       dump_job(je, _("Walk queue"));
128    }
129 #endif
130    /*
131     * Pull the first job to run (already sorted by runtime and
132     *  Priority, then wait around until it is time to run it.
133     */
134    next_job = (job_item *)jobs_to_run->first();
135    jobs_to_run->remove(next_job);
136
137    dump_job(next_job, _("Dequeued job"));
138
139    if (!next_job) {                /* we really should have something now */
140       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
141    }
142
143    /* Now wait for the time to run the job */
144    for (;;) {
145       time_t twait;
146       /** discard scheduled queue and rebuild with new schedule objects. **/
147       lock_jobs();
148       if (schedules_invalidated) {
149           dump_job(next_job, "Invalidated job");
150           free(next_job);
151           while (!jobs_to_run->empty()) {
152               next_job = (job_item *)jobs_to_run->first();
153               jobs_to_run->remove(next_job);
154               dump_job(next_job, "Invalidated job");
155               free(next_job);
156           }
157           schedules_invalidated = false;
158           unlock_jobs();
159           goto again;
160       }
161       unlock_jobs();
162       prev = now = time(NULL);
163       twait = next_job->runtime - now;
164       if (twait <= 0) {               /* time to run it */
165          break;
166       }
167       /* Recheck at least once per minute */
168       bmicrosleep((next_check_secs < twait)?next_check_secs:twait, 0);
169       /* Attempt to handle clock shift (but not daylight savings time changes)
170        * we allow a skew of 10 seconds before invalidating everything.
171        */
172       now = time(NULL);
173       if (now < prev-10 || now > (prev+next_check_secs+10)) {
174          schedules_invalidated = true;
175       }
176    }
177    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
178    run = next_job->run;               /* pick up needed values */
179    job = next_job->job;
180    if (job->is_enabled() && (!job->client || job->client->is_enabled())) {
181       dump_job(next_job, _("Run job"));  /* no client and job enabled */
182    }
183    free(next_job);
184    if (!job->is_enabled() || (job->client && !job->client->is_enabled())) {
185       free_jcr(jcr);
186       goto again;                     /* ignore this job */
187    }
188    run->last_run = now;               /* mark as run now */
189
190    ASSERT(job);
191    set_jcr_defaults(jcr, job);
192    if (run->level) {
193       jcr->setJobLevel(run->level);  /* override run level */
194    }
195    if (run->pool) {
196       jcr->pool = run->pool;          /* override pool */
197       jcr->run_pool_override = true;
198    }
199    if (run->next_pool) {
200       jcr->next_pool = run->next_pool; /* override next pool */
201       jcr->run_next_pool_override = true;
202    }
203    if (run->full_pool) {
204       jcr->full_pool = run->full_pool; /* override full pool */
205       jcr->run_full_pool_override = true;
206    }
207    if (run->vfull_pool) {
208       jcr->vfull_pool = run->vfull_pool; /* override virtual full pool */
209       jcr->run_vfull_pool_override = true;
210    }
211    if (run->inc_pool) {
212       jcr->inc_pool = run->inc_pool;  /* override inc pool */
213       jcr->run_inc_pool_override = true;
214    }
215    if (run->diff_pool) {
216       jcr->diff_pool = run->diff_pool;  /* override dif pool */
217       jcr->run_diff_pool_override = true;
218    }
219    if (run->storage) {
220       USTORE store;
221       store.store = run->storage;
222       pm_strcpy(store.store_source, _("run override"));
223       set_rwstorage(jcr, &store);     /* override storage */
224    }
225    if (run->msgs) {
226       jcr->messages = run->msgs;      /* override messages */
227    }
228    if (run->Priority) {
229       jcr->JobPriority = run->Priority;
230    }
231    if (run->spool_data_set) {
232       jcr->spool_data = run->spool_data;
233    }
234    if (run->accurate_set) {     /* overwrite accurate mode */
235       jcr->accurate = run->accurate;
236    }
237    if (run->write_part_after_job_set) {
238       jcr->write_part_after_job = run->write_part_after_job;
239    }
240    if (run->MaxRunSchedTime_set) {
241       jcr->MaxRunSchedTime = run->MaxRunSchedTime;
242    }
243    Dmsg0(dbglvl, "Leave wait_for_next_job()\n");
244    return jcr;
245 }
246
247
248 /*
249  * Shutdown the scheduler
250  */
251 void term_scheduler()
252 {
253    if (jobs_to_run) {
254       delete jobs_to_run;
255    }
256 }
257
258 /*
259  * Find all jobs to be run this hour and the next hour.
260  */
261 static void find_runs()
262 {
263    time_t now, next_hour, runtime;
264    RUN *run;
265    JOB *job;
266    SCHED *sched;
267    struct tm tm;
268    int hour, mday, wday, month, wom, woy, ldom;
269    /* Items corresponding to above at the next hour */
270    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_ldom;
271
272    Dmsg0(dbglvl, "enter find_runs()\n");
273
274    /* compute values for time now */
275    now = time(NULL);
276    (void)localtime_r(&now, &tm);
277    hour = tm.tm_hour;
278    mday = tm.tm_mday - 1;
279    wday = tm.tm_wday;
280    month = tm.tm_mon;
281    wom = mday / 7;
282    woy = tm_woy(now);                     /* get week of year */
283    ldom = tm_ldom(month, tm.tm_year + 1900);
284
285    Dmsg7(dbglvl, "now = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
286          now, hour, month, mday, wday, wom, woy);
287
288    /*
289     * Compute values for next hour from now.
290     * We do this to be sure we don't miss a job while
291     * sleeping.
292     */
293    next_hour = now + 3600;
294    (void)localtime_r(&next_hour, &tm);
295    nh_hour = tm.tm_hour;
296    nh_mday = tm.tm_mday - 1;
297    nh_wday = tm.tm_wday;
298    nh_month = tm.tm_mon;
299    nh_wom = nh_mday / 7;
300    nh_woy = tm_woy(next_hour);              /* get week of year */
301    nh_ldom = tm_ldom(nh_month, tm.tm_year + 1900);
302
303    Dmsg7(dbglvl, "nh = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
304          next_hour, nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
305
306    /* Loop through all jobs */
307    LockRes();
308    foreach_res(job, R_JOB) {
309       sched = job->schedule;
310       if (!sched || !job->is_enabled() || (sched && !sched->is_enabled()) ||
311          (job->client && !job->client->is_enabled())) {
312          continue;                    /* no, skip this job */
313       }
314       Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name);
315       for (run=sched->run; run; run=run->next) {
316          bool run_now, run_nh;
317          /*
318           * Find runs scheduled between now and the next hour.
319           */
320 #ifdef xxxx
321          Dmsg0(000, "\n");
322          Dmsg7(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
323             hour, month, mday, wday, wom, woy, ldom);
324          Dmsg7(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
325             bit_is_set(hour, run->hour),
326             bit_is_set(month, run->month),
327             bit_is_set(mday, run->mday),
328             bit_is_set(wday, run->wday),
329             bit_is_set(wom, run->wom),
330             bit_is_set(woy, run->woy),
331             bit_is_set(31, run->mday));
332
333
334          Dmsg7(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
335             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy, nh_ldom);
336          Dmsg7(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
337             bit_is_set(nh_hour, run->hour),
338             bit_is_set(nh_month, run->month),
339             bit_is_set(nh_mday, run->mday),
340             bit_is_set(nh_wday, run->wday),
341             bit_is_set(nh_wom, run->wom),
342             bit_is_set(nh_woy, run->woy),
343             bit_is_set(31, run->mday));
344 #endif
345
346          run_now = bit_is_set(hour, run->hour) &&
347             ((bit_is_set(mday, run->mday) &&
348               bit_is_set(wday, run->wday) &&
349               bit_is_set(month, run->month) &&
350               bit_is_set(wom, run->wom) &&
351               bit_is_set(woy, run->woy)) ||
352              (bit_is_set(month, run->month) &&
353               bit_is_set(31, run->mday) && mday == ldom));
354
355          run_nh = bit_is_set(nh_hour, run->hour) &&
356             ((bit_is_set(nh_mday, run->mday) &&
357               bit_is_set(nh_wday, run->wday) &&
358               bit_is_set(nh_month, run->month) &&
359               bit_is_set(nh_wom, run->wom) &&
360               bit_is_set(nh_woy, run->woy)) ||
361              (bit_is_set(nh_month, run->month) &&
362               bit_is_set(31, run->mday) && nh_mday == nh_ldom));
363
364          Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh);
365
366          if (run_now || run_nh) {
367            /* find time (time_t) job is to be run */
368            (void)localtime_r(&now, &tm);      /* reset tm structure */
369            tm.tm_min = run->minute;     /* set run minute */
370            tm.tm_sec = 0;               /* zero secs */
371            runtime = mktime(&tm);
372            if (run_now) {
373              add_job(job, run, now, runtime);
374            }
375            /* If job is to be run in the next hour schedule it */
376            if (run_nh) {
377              add_job(job, run, now, runtime + 3600);
378            }
379          }
380       }
381    }
382    UnlockRes();
383    Dmsg0(dbglvl, "Leave find_runs()\n");
384 }
385
386 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
387 {
388    job_item *ji;
389    bool inserted = false;
390    /*
391     * Don't run any job that ran less than a minute ago, but
392     *  do run any job scheduled less than a minute ago.
393     */
394    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
395 #ifdef SCHED_DEBUG
396       Dmsg4(000, "Drop: Job=\"%s\" run=%lld. last_run=%lld. now=%lld\n", job->hdr.name,
397             (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
398       fflush(stdout);
399 #endif
400       return;
401    }
402 #ifdef SCHED_DEBUG
403    Dmsg4(000, "Add: Job=\"%s\" run=%lld last_run=%lld now=%lld\n", job->hdr.name,
404             (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
405 #endif
406    /* accept to run this job */
407    job_item *je = (job_item *)malloc(sizeof(job_item));
408    je->run = run;
409    je->job = job;
410    je->runtime = runtime;
411    if (run->Priority) {
412       je->Priority = run->Priority;
413    } else {
414       je->Priority = job->Priority;
415    }
416
417    /* Add this job to the wait queue in runtime, priority sorted order */
418    foreach_dlist(ji, jobs_to_run) {
419       if (ji->runtime > je->runtime ||
420           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
421          jobs_to_run->insert_before(je, ji);
422          dump_job(je, _("Inserted job"));
423          inserted = true;
424          break;
425       }
426    }
427    /* If place not found in queue, append it */
428    if (!inserted) {
429       jobs_to_run->append(je);
430       dump_job(je, _("Appended job"));
431    }
432 #ifdef SCHED_DEBUG
433    foreach_dlist(ji, jobs_to_run) {
434       dump_job(ji, _("Run queue"));
435    }
436    Dmsg0(000, "End run queue\n");
437 #endif
438 }
439
440 static void dump_job(job_item *ji, const char *msg)
441 {
442 #ifdef SCHED_DEBUG
443    char dt[MAX_TIME_LENGTH];
444    int64_t save_debug = debug_level;
445
446    if (!chk_dbglvl(dbglvl)) {
447       return;
448    }
449    bstrftime_nc(dt, sizeof(dt), ji->runtime);
450    Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name,
451       ji->Priority, dt);
452    fflush(stdout);
453    debug_level = save_debug;
454 #endif
455 }