]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
856fdf84b6d6592f8de032b871c18b700dbc22db
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  *
21  *   Bacula scheduler
22  *     It looks at what jobs are to be run and when
23  *     and waits around until it is time to
24  *     fire them up.
25  *
26  *     Kern Sibbald, May MM, major revision December MMIII
27  *
28  */
29
30 #include "bacula.h"
31 #include "dird.h"
32
33 #if 0
34 #define SCHED_DEBUG
35 #define DBGLVL 0
36 #else
37 #undef SCHED_DEBUG
38 #define DBGLVL DT_SCHEDULER|200
39 #endif
40
41 const int dbglvl = DBGLVL;
42
43 /* Local variables */
44 struct job_item {
45    RUN *run;
46    JOB *job;
47    time_t runtime;
48    int Priority;
49    dlink link;                        /* link for list */
50 };
51
52 /* List of jobs to be run. They were scheduled in this hour or the next */
53 static dlist *jobs_to_run;               /* list of jobs to be run */
54
55 /* Time interval in secs to sleep if nothing to be run */
56 static int const next_check_secs = 60;
57
58 /* Forward referenced subroutines */
59 static void find_runs();
60 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
61 static void dump_job(job_item *ji, const char *msg);
62
63 /* Imported subroutines */
64
65 /* Imported variables */
66
67 /*
68  * called by reload_config to tell us that the schedules
69  * we may have based our next jobs to run queues have been
70  * invalidated.  In fact the schedules may not have changed
71  * but the run object that we have recorded the last_run time
72  * on are new and no longer have a valid last_run time which
73  * causes us to double run schedules that get put into the list
74  * because run_nh = 1.
75  */
76 static bool schedules_invalidated = false;
77 void invalidate_schedules(void) {
78     schedules_invalidated = true;
79 }
80
81 /*********************************************************************
82  *
83  *         Main Bacula Scheduler
84  *
85  */
86 JCR *wait_for_next_job(char *one_shot_job_to_run)
87 {
88    JCR *jcr;
89    JOB *job;
90    RUN *run;
91    time_t now, prev;
92    static bool first = true;
93    job_item *next_job = NULL;
94
95    Dmsg0(dbglvl, "Enter wait_for_next_job\n");
96    if (first) {
97       first = false;
98       /* Create scheduled jobs list */
99       jobs_to_run = New(dlist(next_job, &next_job->link));
100       if (one_shot_job_to_run) {            /* one shot */
101          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
102          if (!job) {
103             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
104          }
105          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
106          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
107          set_jcr_defaults(jcr, job);
108          return jcr;
109       }
110    }
111
112    /* Wait until we have something in the
113     * next hour or so.
114     */
115 again:
116    while (jobs_to_run->empty()) {
117       find_runs();
118       if (!jobs_to_run->empty()) {
119          break;
120       }
121       bmicrosleep(next_check_secs, 0); /* recheck once per minute */
122    }
123
124 #ifdef  list_chain
125    job_item *je;
126    foreach_dlist(je, jobs_to_run) {
127       dump_job(je, _("Walk queue"));
128    }
129 #endif
130    /*
131     * Pull the first job to run (already sorted by runtime and
132     *  Priority, then wait around until it is time to run it.
133     */
134    next_job = (job_item *)jobs_to_run->first();
135    jobs_to_run->remove(next_job);
136
137    dump_job(next_job, _("Dequeued job"));
138
139    if (!next_job) {                /* we really should have something now */
140       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
141    }
142
143    /* Now wait for the time to run the job */
144    for (;;) {
145       time_t twait;
146       /** discard scheduled queue and rebuild with new schedule objects. **/
147       lock_jobs();
148       if (schedules_invalidated) {
149           dump_job(next_job, "Invalidated job");
150           free(next_job);
151           while (!jobs_to_run->empty()) {
152               next_job = (job_item *)jobs_to_run->first();
153               jobs_to_run->remove(next_job);
154               dump_job(next_job, "Invalidated job");
155               free(next_job);
156           }
157           schedules_invalidated = false;
158           unlock_jobs();
159           goto again;
160       }
161       unlock_jobs();
162       prev = now = time(NULL);
163       twait = next_job->runtime - now;
164       if (twait <= 0) {               /* time to run it */
165          break;
166       }
167       /* Recheck at least once per minute */
168       bmicrosleep((next_check_secs < twait)?next_check_secs:twait, 0);
169       /* Attempt to handle clock shift (but not daylight savings time changes)
170        * we allow a skew of 10 seconds before invalidating everything.
171        */
172       now = time(NULL);
173       if (now < prev-10 || now > (prev+next_check_secs+10)) {
174          schedules_invalidated = true;
175       }
176    }
177    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
178    run = next_job->run;               /* pick up needed values */
179    job = next_job->job;
180    if (job->enabled && (!job->client || job->client->enabled)) {
181       dump_job(next_job, _("Run job"));  /* no client and job enabled */
182    }
183    free(next_job);
184    if (!job->enabled || (job->client && !job->client->enabled)) {
185       free_jcr(jcr);
186       goto again;                     /* ignore this job */
187    }
188    run->last_run = now;               /* mark as run now */
189
190    ASSERT(job);
191    set_jcr_defaults(jcr, job);
192    if (run->level) {
193       jcr->setJobLevel(run->level);  /* override run level */
194    }
195    if (run->pool) {
196       jcr->pool = run->pool;          /* override pool */
197       jcr->run_pool_override = true;
198    }
199    if (run->next_pool) {
200       jcr->next_pool = run->next_pool; /* override next pool */
201       jcr->run_next_pool_override = true;
202    }
203    if (run->full_pool) {
204       jcr->full_pool = run->full_pool; /* override full pool */
205       jcr->run_full_pool_override = true;
206    }
207    if (run->inc_pool) {
208       jcr->inc_pool = run->inc_pool;  /* override inc pool */
209       jcr->run_inc_pool_override = true;
210    }
211    if (run->diff_pool) {
212       jcr->diff_pool = run->diff_pool;  /* override dif pool */
213       jcr->run_diff_pool_override = true;
214    }
215    if (run->storage) {
216       USTORE store;
217       store.store = run->storage;
218       pm_strcpy(store.store_source, _("run override"));
219       set_rwstorage(jcr, &store);     /* override storage */
220    }
221    if (run->msgs) {
222       jcr->messages = run->msgs;      /* override messages */
223    }
224    if (run->Priority) {
225       jcr->JobPriority = run->Priority;
226    }
227    if (run->spool_data_set) {
228       jcr->spool_data = run->spool_data;
229    }
230    if (run->accurate_set) {     /* overwrite accurate mode */
231       jcr->accurate = run->accurate;
232    }
233    if (run->write_part_after_job_set) {
234       jcr->write_part_after_job = run->write_part_after_job;
235    }
236    if (run->MaxRunSchedTime_set) {
237       jcr->MaxRunSchedTime = run->MaxRunSchedTime;
238    }
239    Dmsg0(dbglvl, "Leave wait_for_next_job()\n");
240    return jcr;
241 }
242
243
244 /*
245  * Shutdown the scheduler
246  */
247 void term_scheduler()
248 {
249    if (jobs_to_run) {
250       delete jobs_to_run;
251    }
252 }
253
254 /*
255  * Find all jobs to be run this hour and the next hour.
256  */
257 static void find_runs()
258 {
259    time_t now, next_hour, runtime;
260    RUN *run;
261    JOB *job;
262    SCHED *sched;
263    struct tm tm;
264    int hour, mday, wday, month, wom, woy, ldom;
265    /* Items corresponding to above at the next hour */
266    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_ldom;
267
268    Dmsg0(dbglvl, "enter find_runs()\n");
269
270    /* compute values for time now */
271    now = time(NULL);
272    (void)localtime_r(&now, &tm);
273    hour = tm.tm_hour;
274    mday = tm.tm_mday - 1;
275    wday = tm.tm_wday;
276    month = tm.tm_mon;
277    wom = mday / 7;
278    woy = tm_woy(now);                     /* get week of year */
279    ldom = tm_ldom(month, tm.tm_year + 1900);
280
281    Dmsg7(dbglvl, "now = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
282          now, hour, month, mday, wday, wom, woy);
283
284    /*
285     * Compute values for next hour from now.
286     * We do this to be sure we don't miss a job while
287     * sleeping.
288     */
289    next_hour = now + 3600;
290    (void)localtime_r(&next_hour, &tm);
291    nh_hour = tm.tm_hour;
292    nh_mday = tm.tm_mday - 1;
293    nh_wday = tm.tm_wday;
294    nh_month = tm.tm_mon;
295    nh_wom = nh_mday / 7;
296    nh_woy = tm_woy(next_hour);              /* get week of year */
297    nh_ldom = tm_ldom(nh_month, tm.tm_year + 1900);
298
299    Dmsg7(dbglvl, "nh = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
300          next_hour, nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
301
302    /* Loop through all jobs */
303    LockRes();
304    foreach_res(job, R_JOB) {
305       sched = job->schedule;
306       if (!sched || !job->enabled || (sched && !sched->enabled) ||
307          (job->client && !job->client->enabled)) {
308          continue;                    /* no, skip this job */
309       }
310       Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name);
311       for (run=sched->run; run; run=run->next) {
312          bool run_now, run_nh;
313          /*
314           * Find runs scheduled between now and the next hour.
315           */
316 #ifdef xxxx
317          Dmsg0(000, "\n");
318          Dmsg7(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
319             hour, month, mday, wday, wom, woy, ldom);
320          Dmsg7(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
321             bit_is_set(hour, run->hour),
322             bit_is_set(month, run->month),
323             bit_is_set(mday, run->mday),
324             bit_is_set(wday, run->wday),
325             bit_is_set(wom, run->wom),
326             bit_is_set(woy, run->woy),
327             bit_is_set(31, run->mday));
328
329
330          Dmsg7(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
331             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy, nh_ldom);
332          Dmsg7(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
333             bit_is_set(nh_hour, run->hour),
334             bit_is_set(nh_month, run->month),
335             bit_is_set(nh_mday, run->mday),
336             bit_is_set(nh_wday, run->wday),
337             bit_is_set(nh_wom, run->wom),
338             bit_is_set(nh_woy, run->woy),
339             bit_is_set(31, run->mday));
340 #endif
341
342          run_now = bit_is_set(hour, run->hour) &&
343             ((bit_is_set(mday, run->mday) &&
344               bit_is_set(wday, run->wday) &&
345               bit_is_set(month, run->month) &&
346               bit_is_set(wom, run->wom) &&
347               bit_is_set(woy, run->woy)) ||
348              (bit_is_set(month, run->month) &&
349               bit_is_set(31, run->mday) && mday == ldom));
350
351          run_nh = bit_is_set(nh_hour, run->hour) &&
352             ((bit_is_set(nh_mday, run->mday) &&
353               bit_is_set(nh_wday, run->wday) &&
354               bit_is_set(nh_month, run->month) &&
355               bit_is_set(nh_wom, run->wom) &&
356               bit_is_set(nh_woy, run->woy)) ||
357              (bit_is_set(nh_month, run->month) &&
358               bit_is_set(31, run->mday) && nh_mday == nh_ldom));
359
360          Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh);
361
362          if (run_now || run_nh) {
363            /* find time (time_t) job is to be run */
364            (void)localtime_r(&now, &tm);      /* reset tm structure */
365            tm.tm_min = run->minute;     /* set run minute */
366            tm.tm_sec = 0;               /* zero secs */
367            runtime = mktime(&tm);
368            if (run_now) {
369              add_job(job, run, now, runtime);
370            }
371            /* If job is to be run in the next hour schedule it */
372            if (run_nh) {
373              add_job(job, run, now, runtime + 3600);
374            }
375          }
376       }
377    }
378    UnlockRes();
379    Dmsg0(dbglvl, "Leave find_runs()\n");
380 }
381
382 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
383 {
384    job_item *ji;
385    bool inserted = false;
386    /*
387     * Don't run any job that ran less than a minute ago, but
388     *  do run any job scheduled less than a minute ago.
389     */
390    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
391 #ifdef SCHED_DEBUG
392       Dmsg4(000, "Drop: Job=\"%s\" run=%lld. last_run=%lld. now=%lld\n", job->hdr.name,
393             (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
394       fflush(stdout);
395 #endif
396       return;
397    }
398 #ifdef SCHED_DEBUG
399    Dmsg4(000, "Add: Job=\"%s\" run=%lld last_run=%lld now=%lld\n", job->hdr.name,
400             (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
401 #endif
402    /* accept to run this job */
403    job_item *je = (job_item *)malloc(sizeof(job_item));
404    je->run = run;
405    je->job = job;
406    je->runtime = runtime;
407    if (run->Priority) {
408       je->Priority = run->Priority;
409    } else {
410       je->Priority = job->Priority;
411    }
412
413    /* Add this job to the wait queue in runtime, priority sorted order */
414    foreach_dlist(ji, jobs_to_run) {
415       if (ji->runtime > je->runtime ||
416           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
417          jobs_to_run->insert_before(je, ji);
418          dump_job(je, _("Inserted job"));
419          inserted = true;
420          break;
421       }
422    }
423    /* If place not found in queue, append it */
424    if (!inserted) {
425       jobs_to_run->append(je);
426       dump_job(je, _("Appended job"));
427    }
428 #ifdef SCHED_DEBUG
429    foreach_dlist(ji, jobs_to_run) {
430       dump_job(ji, _("Run queue"));
431    }
432    Dmsg0(000, "End run queue\n");
433 #endif
434 }
435
436 static void dump_job(job_item *ji, const char *msg)
437 {
438 #ifdef SCHED_DEBUG
439    char dt[MAX_TIME_LENGTH];
440    int64_t save_debug = debug_level;
441
442    if (!chk_dbglvl(dbglvl)) {
443       return;
444    }
445    bstrftime_nc(dt, sizeof(dt), ji->runtime);
446    Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name,
447       ji->Priority, dt);
448    fflush(stdout);
449    debug_level = save_debug;
450 #endif
451 }