]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
- Apply patch from Christopher Hull
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2  *
3  *   Bacula scheduler
4  *     It looks at what jobs are to be run and when
5  *     and waits around until it is time to
6  *     fire them up.
7  *
8  *     Kern Sibbald, May MM, major revision December MMIII
9  *
10  *   Version $Id$
11  */
12 /*
13    Copyright (C) 2000-2006 Kern Sibbald
14
15    This program is free software; you can redistribute it and/or
16    modify it under the terms of the GNU General Public License
17    version 2 as amended with additional clauses defined in the
18    file LICENSE in the main source directory.
19
20    This program is distributed in the hope that it will be useful,
21    but WITHOUT ANY WARRANTY; without even the implied warranty of
22    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
23    the file LICENSE for additional details.
24
25  */
26
27 #include "bacula.h"
28 #include "dird.h"
29
30 #if 0
31 #define SCHED_DEBUG
32 #define DBGLVL 0
33 #else
34 #undef SCHED_DEBUG
35 #define DBGLVL 200
36 #endif
37
38 const int dbglvl = DBGLVL;
39
40 /* Local variables */
41 struct job_item {
42    RUN *run;
43    JOB *job;
44    time_t runtime;
45    int Priority;
46    dlink link;                        /* link for list */
47 };
48
49 /* List of jobs to be run. They were scheduled in this hour or the next */
50 static dlist *jobs_to_run;               /* list of jobs to be run */
51
52 /* Time interval in secs to sleep if nothing to be run */
53 static int const NEXT_CHECK_SECS = 60;
54
55 /* Forward referenced subroutines */
56 static void find_runs();
57 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
58 static void dump_job(job_item *ji, const char *msg);
59
60 /* Imported subroutines */
61
62 /* Imported variables */
63
64 /**
65  * called by reload_config to tell us that the schedules
66  * we may have based our next jobs to run queues have been
67  * invalidated.  In fact the schedules may not have changed
68  * but the run object that we have recorded the last_run time
69  * on are new and no longer have a valid last_run time which
70  * causes us to double run schedules that get put into the list
71  * because run_nh = 1.
72  */   
73 static bool schedules_invalidated = false;
74 void invalidate_schedules(void) {
75     schedules_invalidated = true;
76 }
77
78 /*********************************************************************
79  *
80  *         Main Bacula Scheduler
81  *
82  */
83 JCR *wait_for_next_job(char *one_shot_job_to_run)
84 {
85    JCR *jcr;
86    JOB *job;
87    RUN *run;
88    time_t now;
89    static bool first = true;
90    job_item *next_job = NULL;
91
92    Dmsg0(dbglvl, "Enter wait_for_next_job\n");
93    if (first) {
94       first = false;
95       /* Create scheduled jobs list */
96       jobs_to_run = New(dlist(next_job, &next_job->link));
97       if (one_shot_job_to_run) {            /* one shot */
98          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
99          if (!job) {
100             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
101          }
102          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
103          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
104          set_jcr_defaults(jcr, job);
105          return jcr;
106       }
107    }
108    /* Wait until we have something in the
109     * next hour or so.
110     */
111 again:
112    while (jobs_to_run->empty()) {
113       find_runs();
114       if (!jobs_to_run->empty()) {
115          break;
116       }
117       bmicrosleep(NEXT_CHECK_SECS, 0); /* recheck once per minute */
118    }
119
120 #ifdef  list_chain
121    job_item *je;
122    foreach_dlist(je, jobs_to_run) {
123       dump_job(je, _("Walk queue"));
124    }
125 #endif
126    /*
127     * Pull the first job to run (already sorted by runtime and
128     *  Priority, then wait around until it is time to run it.
129     */
130    next_job = (job_item *)jobs_to_run->first();
131    jobs_to_run->remove(next_job);
132
133    dump_job(next_job, _("Dequeued job"));
134
135    if (!next_job) {                /* we really should have something now */
136       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
137    }
138
139    /* Now wait for the time to run the job */
140    for (;;) {
141       time_t twait;
142       if (schedules_invalidated) { /** discard scheduled queue and rebuild with new schedule objects. **/
143           dump_job(next_job, "Invalidated job");
144           free(next_job);
145           while (!jobs_to_run->empty()) {
146               next_job = (job_item *)jobs_to_run->first();
147               jobs_to_run->remove(next_job);
148               dump_job(next_job, "Invalidated job");
149               free(next_job);
150           }
151           schedules_invalidated = false;
152           goto again;
153       }
154       now = time(NULL);
155       twait = next_job->runtime - now;
156       if (twait <= 0) {               /* time to run it */
157          break;
158       }
159       bmicrosleep((NEXT_CHECK_SECS<twait)?NEXT_CHECK_SECS:twait, 0); /* recheck at least once per minute */      
160    }
161    run = next_job->run;               /* pick up needed values */
162    job = next_job->job;
163    if (job->enabled) {
164       dump_job(next_job, _("Run job"));
165    }
166    free(next_job);
167    if (!job->enabled) {
168       goto again;                     /* ignore this job */
169    }
170    run->last_run = now;               /* mark as run now */
171
172    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
173    ASSERT(job);
174    set_jcr_defaults(jcr, job);
175    if (run->level) {
176       jcr->JobLevel = run->level;     /* override run level */
177    }
178    if (run->pool) {
179       jcr->pool = run->pool;          /* override pool */
180    }
181    if (run->full_pool) {
182       jcr->full_pool = run->full_pool; /* override full pool */
183    }
184    if (run->inc_pool) {
185       jcr->inc_pool = run->inc_pool;  /* override inc pool */
186    }
187    if (run->dif_pool) {
188       jcr->dif_pool = run->dif_pool;  /* override dif pool */
189    }
190    if (run->storage) {
191       set_storage(jcr, run->storage); /* override storage */
192    }
193    if (run->msgs) {
194       jcr->messages = run->msgs;      /* override messages */
195    }
196    if (run->Priority) {
197       jcr->JobPriority = run->Priority;
198    }
199    if (run->spool_data_set) {
200       jcr->spool_data = run->spool_data;
201    }
202    if (run->write_part_after_job_set) {
203       jcr->write_part_after_job = run->write_part_after_job;
204    }
205    Dmsg0(dbglvl, "Leave wait_for_next_job()\n");
206    return jcr;
207 }
208
209
210 /*
211  * Shutdown the scheduler
212  */
213 void term_scheduler()
214 {
215    if (jobs_to_run) {
216       job_item *je;
217       /* Release all queued job entries to be run */
218       foreach_dlist(je, jobs_to_run) {
219          free(je);
220       }
221       delete jobs_to_run;
222    }
223 }
224
225 /*
226  * Find all jobs to be run this hour and the next hour.
227  */
228 static void find_runs()
229 {
230    time_t now, next_hour, runtime;
231    RUN *run;
232    JOB *job;
233    SCHED *sched;
234    struct tm tm;
235    int minute;
236    int hour, mday, wday, month, wom, woy;
237    /* Items corresponding to above at the next hour */
238    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year;
239
240    Dmsg0(dbglvl, "enter find_runs()\n");
241
242
243    /* compute values for time now */
244    now = time(NULL);
245    localtime_r(&now, &tm);
246    hour = tm.tm_hour;
247    minute = tm.tm_min;
248    mday = tm.tm_mday - 1;
249    wday = tm.tm_wday;
250    month = tm.tm_mon;
251    wom = mday / 7;
252    woy = tm_woy(now);                     /* get week of year */
253
254    /*
255     * Compute values for next hour from now.
256     * We do this to be sure we don't miss a job while
257     * sleeping.
258     */
259    next_hour = now + 3600;
260    localtime_r(&next_hour, &tm);
261    nh_hour = tm.tm_hour;
262    nh_mday = tm.tm_mday - 1;
263    nh_wday = tm.tm_wday;
264    nh_month = tm.tm_mon;
265    nh_year  = tm.tm_year;
266    nh_wom = nh_mday / 7;
267    nh_woy = tm_woy(now);                     /* get week of year */
268
269    /* Loop through all jobs */
270    LockRes();
271    foreach_res(job, R_JOB) {
272       sched = job->schedule;
273       if (sched == NULL || !job->enabled) { /* scheduled? or enabled? */
274          continue;                    /* no, skip this job */
275       }
276       Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name);
277       for (run=sched->run; run; run=run->next) {
278          bool run_now, run_nh;
279          /*
280           * Find runs scheduled between now and the next hour.
281           */
282 #ifdef xxxx
283          Dmsg0(000, "\n");
284          Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
285             hour, month, mday, wday, wom, woy);
286          Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
287             bit_is_set(hour, run->hour),
288             bit_is_set(month, run->month),
289             bit_is_set(mday, run->mday),
290             bit_is_set(wday, run->wday),
291             bit_is_set(wom, run->wom),
292             bit_is_set(woy, run->woy));
293
294          Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
295             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
296          Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
297             bit_is_set(nh_hour, run->hour),
298             bit_is_set(nh_month, run->month),
299             bit_is_set(nh_mday, run->mday),
300             bit_is_set(nh_wday, run->wday),
301             bit_is_set(nh_wom, run->wom),
302             bit_is_set(nh_woy, run->woy));
303 #endif
304
305          run_now = bit_is_set(hour, run->hour) &&
306             bit_is_set(mday, run->mday) &&
307             bit_is_set(wday, run->wday) &&
308             bit_is_set(month, run->month) &&
309             bit_is_set(wom, run->wom) &&
310             bit_is_set(woy, run->woy);
311
312          run_nh = bit_is_set(nh_hour, run->hour) &&
313             bit_is_set(nh_mday, run->mday) &&
314             bit_is_set(nh_wday, run->wday) &&
315             bit_is_set(nh_month, run->month) &&
316             bit_is_set(nh_wom, run->wom) &&
317             bit_is_set(nh_woy, run->woy);
318
319          Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh);
320
321          /* find time (time_t) job is to be run */
322          localtime_r(&now, &tm);      /* reset tm structure */
323          tm.tm_min = run->minute;     /* set run minute */
324          tm.tm_sec = 0;               /* zero secs */
325          if (run_now) {
326             runtime = mktime(&tm);
327             add_job(job, run, now, runtime);
328          }
329          /* If job is to be run in the next hour schedule it */
330          if (run_nh) {
331             /* Set correct values */
332             tm.tm_hour = nh_hour;
333             tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */
334             tm.tm_mon = nh_month;
335             tm.tm_year = nh_year;
336             runtime = mktime(&tm);
337             add_job(job, run, now, runtime);
338          }
339       }
340    }
341    UnlockRes();
342    Dmsg0(dbglvl, "Leave find_runs()\n");
343 }
344
345 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
346 {
347    job_item *ji;
348    bool inserted = false;
349    /*
350     * Don't run any job that ran less than a minute ago, but
351     *  do run any job scheduled less than a minute ago.
352     */
353    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
354 #ifdef SCHED_DEBUG
355       char dt[50], dt1[50], dt2[50];
356       bstrftime_nc(dt, sizeof(dt), runtime);
357       bstrftime_nc(dt1, sizeof(dt1), run->last_run);
358       bstrftime_nc(dt2, sizeof(dt2), now);
359       Dmsg7(000, "Drop: Job=\"%s\" run=%s(%x). last_run=%s(%x). now=%s(%x)\n", job->hdr.name, 
360             dt, runtime, dt1, run->last_run, dt2, now);
361       fflush(stdout);
362 #endif
363       return;
364    }
365 #ifdef SCHED_DEBUG
366    Dmsg4(000, "Add: Job=\"%s\" run=%x last_run=%x now=%x\n", job->hdr.name, 
367             runtime, run->last_run, now);
368 #endif
369    /* accept to run this job */
370    job_item *je = (job_item *)malloc(sizeof(job_item));
371    je->run = run;
372    je->job = job;
373    je->runtime = runtime;
374    if (run->Priority) {
375       je->Priority = run->Priority;
376    } else {
377       je->Priority = job->Priority;
378    }
379
380    /* Add this job to the wait queue in runtime, priority sorted order */
381    foreach_dlist(ji, jobs_to_run) {
382       if (ji->runtime > je->runtime ||
383           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
384          jobs_to_run->insert_before(je, ji);
385          dump_job(je, _("Inserted job"));
386          inserted = true;
387          break;
388       }
389    }
390    /* If place not found in queue, append it */
391    if (!inserted) {
392       jobs_to_run->append(je);
393       dump_job(je, _("Appended job"));
394    }
395 #ifdef SCHED_DEBUG
396    foreach_dlist(ji, jobs_to_run) {
397       dump_job(ji, _("Run queue"));
398    }
399    Dmsg0(000, "End run queue\n");
400 #endif
401 }
402
403 static void dump_job(job_item *ji, const char *msg)
404 {
405 #ifdef SCHED_DEBUG
406    char dt[MAX_TIME_LENGTH];
407    int save_debug = debug_level;
408    if (debug_level < dbglvl) {
409       return;
410    }
411    bstrftime_nc(dt, sizeof(dt), ji->runtime);
412    Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name, 
413       ji->Priority, dt);
414    fflush(stdout);
415    debug_level = save_debug;
416 #endif
417 }