]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
migrate
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2  *
3  *   Bacula scheduler
4  *     It looks at what jobs are to be run and when
5  *     and waits around until it is time to
6  *     fire them up.
7  *
8  *     Kern Sibbald, May MM, major revision December MMIII
9  *
10  *   Version $Id$
11  */
12 /*
13    Copyright (C) 2000-2006 Kern Sibbald
14
15    This program is free software; you can redistribute it and/or
16    modify it under the terms of the GNU General Public License
17    version 2 as amended with additional clauses defined in the
18    file LICENSE in the main source directory.
19
20    This program is distributed in the hope that it will be useful,
21    but WITHOUT ANY WARRANTY; without even the implied warranty of
22    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
23    the file LICENSE for additional details.
24
25  */
26
27 #include "bacula.h"
28 #include "dird.h"
29
30 /* #define SCHED_DEBUG */
31
32
33 /* Local variables */
34 struct job_item {
35    RUN *run;
36    JOB *job;
37    time_t runtime;
38    int Priority;
39    dlink link;                        /* link for list */
40 };
41
42 /* List of jobs to be run. They were scheduled in this hour or the next */
43 static dlist *jobs_to_run;               /* list of jobs to be run */
44
45 /* Time interval in secs to sleep if nothing to be run */
46 static int const NEXT_CHECK_SECS = 60;
47
48 /* Forward referenced subroutines */
49 static void find_runs();
50 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
51 static void dump_job(job_item *ji, const char *msg);
52
53 /* Imported subroutines */
54
55 /* Imported variables */
56
57
58 /*********************************************************************
59  *
60  *         Main Bacula Scheduler
61  *
62  */
63 JCR *wait_for_next_job(char *one_shot_job_to_run)
64 {
65    JCR *jcr;
66    JOB *job;
67    RUN *run;
68    time_t now;
69    static bool first = true;
70    job_item *next_job = NULL;
71
72    Dmsg0(200, "Enter wait_for_next_job\n");
73    if (first) {
74       first = false;
75       /* Create scheduled jobs list */
76       jobs_to_run = New(dlist(next_job, &next_job->link));
77       if (one_shot_job_to_run) {            /* one shot */
78          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
79          if (!job) {
80             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
81          }
82          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
83          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
84          set_jcr_defaults(jcr, job);
85          return jcr;
86       }
87    }
88    /* Wait until we have something in the
89     * next hour or so.
90     */
91 again:
92    while (jobs_to_run->empty()) {
93       find_runs();
94       if (!jobs_to_run->empty()) {
95          break;
96       }
97       bmicrosleep(NEXT_CHECK_SECS, 0); /* recheck once per minute */
98    }
99
100 #ifdef  list_chain
101    job_item *je;
102    foreach_dlist(je, jobs_to_run) {
103       dump_job(je, _("Walk queue"));
104    }
105 #endif
106    /*
107     * Pull the first job to run (already sorted by runtime and
108     *  Priority, then wait around until it is time to run it.
109     */
110    next_job = (job_item *)jobs_to_run->first();
111    jobs_to_run->remove(next_job);
112
113    dump_job(next_job, _("Dequeued job"));
114
115    if (!next_job) {                /* we really should have something now */
116       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
117    }
118
119    /* Now wait for the time to run the job */
120    for (;;) {
121       time_t twait;
122       now = time(NULL);
123       twait = next_job->runtime - now;
124       if (twait <= 0) {               /* time to run it */
125          break;
126       }
127       bmicrosleep(twait, 0);
128    }
129    run = next_job->run;               /* pick up needed values */
130    job = next_job->job;
131    if (job->enabled) {
132       dump_job(next_job, _("Run job"));
133    }
134    free(next_job);
135    if (!job->enabled) {
136       goto again;                     /* ignore this job */
137    }
138    run->last_run = now;               /* mark as run now */
139
140    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
141    ASSERT(job);
142    set_jcr_defaults(jcr, job);
143    if (run->level) {
144       jcr->JobLevel = run->level;     /* override run level */
145    }
146    if (run->pool) {
147       jcr->pool = run->pool;          /* override pool */
148    }
149    if (run->full_pool) {
150       jcr->full_pool = run->full_pool; /* override full pool */
151    }
152    if (run->inc_pool) {
153       jcr->inc_pool = run->inc_pool;  /* override inc pool */
154    }
155    if (run->dif_pool) {
156       jcr->dif_pool = run->dif_pool;  /* override dif pool */
157    }
158    if (run->storage) {
159       set_storage(jcr, run->storage); /* override storage */
160    }
161    if (run->msgs) {
162       jcr->messages = run->msgs;      /* override messages */
163    }
164    if (run->Priority) {
165       jcr->JobPriority = run->Priority;
166    }
167    if (run->spool_data_set) {
168       jcr->spool_data = run->spool_data;
169    }
170    if (run->write_part_after_job_set) {
171       jcr->write_part_after_job = run->write_part_after_job;
172    }
173    Dmsg0(200, "Leave wait_for_next_job()\n");
174    return jcr;
175 }
176
177
178 /*
179  * Shutdown the scheduler
180  */
181 void term_scheduler()
182 {
183    if (jobs_to_run) {
184       job_item *je;
185       /* Release all queued job entries to be run */
186       foreach_dlist(je, jobs_to_run) {
187          free(je);
188       }
189       delete jobs_to_run;
190    }
191 }
192
193 /*
194  * Find all jobs to be run this hour and the next hour.
195  */
196 static void find_runs()
197 {
198    time_t now, next_hour, runtime;
199    RUN *run;
200    JOB *job;
201    SCHED *sched;
202    struct tm tm;
203    int minute;
204    int hour, mday, wday, month, wom, woy;
205    /* Items corresponding to above at the next hour */
206    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year;
207
208    Dmsg0(1200, "enter find_runs()\n");
209
210
211    /* compute values for time now */
212    now = time(NULL);
213    localtime_r(&now, &tm);
214    hour = tm.tm_hour;
215    minute = tm.tm_min;
216    mday = tm.tm_mday - 1;
217    wday = tm.tm_wday;
218    month = tm.tm_mon;
219    wom = mday / 7;
220    woy = tm_woy(now);                     /* get week of year */
221
222    /*
223     * Compute values for next hour from now.
224     * We do this to be sure we don't miss a job while
225     * sleeping.
226     */
227    next_hour = now + 3600;
228    localtime_r(&next_hour, &tm);
229    nh_hour = tm.tm_hour;
230    nh_mday = tm.tm_mday - 1;
231    nh_wday = tm.tm_wday;
232    nh_month = tm.tm_mon;
233    nh_year  = tm.tm_year;
234    nh_wom = nh_mday / 7;
235    nh_woy = tm_woy(now);                     /* get week of year */
236
237    /* Loop through all jobs */
238    LockRes();
239    foreach_res(job, R_JOB) {
240       sched = job->schedule;
241       if (sched == NULL || !job->enabled) { /* scheduled? or enabled? */
242          continue;                    /* no, skip this job */
243       }
244       Dmsg1(1200, "Got job: %s\n", job->hdr.name);
245       for (run=sched->run; run; run=run->next) {
246          bool run_now, run_nh;
247          /*
248           * Find runs scheduled between now and the next hour.
249           */
250 #ifdef xxxx
251          Dmsg0(000, "\n");
252          Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
253             hour, month, mday, wday, wom, woy);
254          Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
255             bit_is_set(hour, run->hour),
256             bit_is_set(month, run->month),
257             bit_is_set(mday, run->mday),
258             bit_is_set(wday, run->wday),
259             bit_is_set(wom, run->wom),
260             bit_is_set(woy, run->woy));
261
262          Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
263             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
264          Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
265             bit_is_set(nh_hour, run->hour),
266             bit_is_set(nh_month, run->month),
267             bit_is_set(nh_mday, run->mday),
268             bit_is_set(nh_wday, run->wday),
269             bit_is_set(nh_wom, run->wom),
270             bit_is_set(nh_woy, run->woy));
271 #endif
272
273          run_now = bit_is_set(hour, run->hour) &&
274             bit_is_set(mday, run->mday) &&
275             bit_is_set(wday, run->wday) &&
276             bit_is_set(month, run->month) &&
277             bit_is_set(wom, run->wom) &&
278             bit_is_set(woy, run->woy);
279
280          run_nh = bit_is_set(nh_hour, run->hour) &&
281             bit_is_set(nh_mday, run->mday) &&
282             bit_is_set(nh_wday, run->wday) &&
283             bit_is_set(nh_month, run->month) &&
284             bit_is_set(nh_wom, run->wom) &&
285             bit_is_set(nh_woy, run->woy);
286
287          Dmsg2(1200, "run_now=%d run_nh=%d\n", run_now, run_nh);
288
289          /* find time (time_t) job is to be run */
290          localtime_r(&now, &tm);      /* reset tm structure */
291          tm.tm_min = run->minute;     /* set run minute */
292          tm.tm_sec = 0;               /* zero secs */
293          if (run_now) {
294             runtime = mktime(&tm);
295             add_job(job, run, now, runtime);
296          }
297          /* If job is to be run in the next hour schedule it */
298          if (run_nh) {
299             /* Set correct values */
300             tm.tm_hour = nh_hour;
301             tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */
302             tm.tm_mon = nh_month;
303             tm.tm_year = nh_year;
304             runtime = mktime(&tm);
305             add_job(job, run, now, runtime);
306          }
307       }
308    }
309    UnlockRes();
310    Dmsg0(1200, "Leave find_runs()\n");
311 }
312
313 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
314 {
315    job_item *ji;
316    bool inserted = false;
317    /*
318     * Don't run any job that ran less than a minute ago, but
319     *  do run any job scheduled less than a minute ago.
320     */
321    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
322 #ifdef SCHED_DEBUG
323       char dt[50], dt1[50], dt2[50];
324       bstrftime_nc(dt, sizeof(dt), runtime);
325       bstrftime_nc(dt1, sizeof(dt1), run->last_run);
326       bstrftime_nc(dt2, sizeof(dt2), now);
327       Dmsg4(000, "Drop: Job=\"%s\" run=%s. last_run=%s. now=%s\n", job->hdr.name,
328             dt, dt1, dt2);
329       fflush(stdout);
330 #endif
331       return;
332    }
333    /* accept to run this job */
334    job_item *je = (job_item *)malloc(sizeof(job_item));
335    je->run = run;
336    je->job = job;
337    je->runtime = runtime;
338    if (run->Priority) {
339       je->Priority = run->Priority;
340    } else {
341       je->Priority = job->Priority;
342    }
343
344    /* Add this job to the wait queue in runtime, priority sorted order */
345    foreach_dlist(ji, jobs_to_run) {
346       if (ji->runtime > je->runtime ||
347           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
348          jobs_to_run->insert_before(je, ji);
349          dump_job(je, _("Inserted job"));
350          inserted = true;
351          break;
352       }
353    }
354    /* If place not found in queue, append it */
355    if (!inserted) {
356       jobs_to_run->append(je);
357       dump_job(je, _("Appended job"));
358    }
359 #ifdef SCHED_DEBUG
360    foreach_dlist(ji, jobs_to_run) {
361       dump_job(ji, _("Run queue"));
362    }
363    Dmsg0(000, "End run queue\n");
364 #endif
365 }
366
367 static void dump_job(job_item *ji, const char *msg)
368 {
369 #ifdef SCHED_DEBUG
370    char dt[MAX_TIME_LENGTH];
371    int save_debug = debug_level;
372    debug_level = 200;
373    if (debug_level < 200) {
374       return;
375    }
376    bstrftime_nc(dt, sizeof(dt), ji->runtime);
377    Dmsg4(200, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name,
378       ji->Priority, dt);
379    fflush(stdout);
380    debug_level = save_debug;
381 #endif
382 }