]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/scheduler.c
Misc -- see techlog
[bacula/bacula] / bacula / src / dird / scheduler.c
1 /*
2  *
3  *   Bacula scheduler
4  *     It looks at what jobs are to be run and when
5  *     and waits around until it is time to 
6  *     fire them up.
7  *
8  *     Kern Sibbald, May MM, major revision December MMIII
9  *
10  *   Version $Id$
11  */
12 /*
13    Copyright (C) 2000-2004 Kern Sibbald and John Walker
14
15    This program is free software; you can redistribute it and/or
16    modify it under the terms of the GNU General Public License as
17    published by the Free Software Foundation; either version 2 of
18    the License, or (at your option) any later version.
19
20    This program is distributed in the hope that it will be useful,
21    but WITHOUT ANY WARRANTY; without even the implied warranty of
22    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
23    General Public License for more details.
24
25    You should have received a copy of the GNU General Public
26    License along with this program; if not, write to the Free
27    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
28    MA 02111-1307, USA.
29
30  */
31
32 #include "bacula.h"
33 #include "dird.h"
34
35 /* #define SCHED_DEBUG */
36
37
38 /* Local variables */
39 struct job_item {  
40    RUN *run;
41    JOB *job;
42    time_t runtime;
43    int Priority;
44    dlink link;                        /* link for list */
45 };              
46
47 /* List of jobs to be run. They were scheduled in this hour or the next */
48 static dlist *jobs_to_run;               /* list of jobs to be run */
49
50 /* Time interval in secs to sleep if nothing to be run */
51 static int const NEXT_CHECK_SECS = 60;
52
53 /* Forward referenced subroutines */
54 static void find_runs();
55 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
56 static void dump_job(job_item *ji, const char *msg);
57
58 /* Imported subroutines */
59
60 /* Imported variables */
61
62
63 /*********************************************************************
64  *
65  *         Main Bacula Scheduler
66  *
67  */
68 JCR *wait_for_next_job(char *one_shot_job_to_run)
69 {
70    JCR *jcr;
71    JOB *job;
72    RUN *run;
73    time_t now;
74    static bool first = true;
75    job_item *next_job = NULL;
76
77    Dmsg0(200, "Enter wait_for_next_job\n");
78    if (first) {
79       first = false;
80       /* Create scheduled jobs list */
81       jobs_to_run = New(dlist(next_job, &next_job->link));
82       if (one_shot_job_to_run) {            /* one shot */
83          job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
84          if (!job) {
85             Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
86          }
87          Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
88          jcr = new_jcr(sizeof(JCR), dird_free_jcr);
89          set_jcr_defaults(jcr, job);
90          return jcr;
91       }
92    }
93    /* Wait until we have something in the
94     * next hour or so.
95     */
96    while (jobs_to_run->empty()) {
97       find_runs();
98       if (!jobs_to_run->empty()) {
99          break;
100       }
101       bmicrosleep(NEXT_CHECK_SECS, 0); /* recheck once per minute */
102    }
103
104 #ifdef  list_chain
105    job_item *je;
106    foreach_dlist(je, jobs_to_run) {
107       dump_job(je, "Walk queue");
108    }
109 #endif
110    /* 
111     * Pull the first job to run (already sorted by runtime and
112     *  Priority, then wait around until it is time to run it.
113     */
114    next_job = (job_item *)jobs_to_run->first();
115    jobs_to_run->remove(next_job);
116
117    dump_job(next_job, "Dequeued job");
118
119    if (!next_job) {                /* we really should have something now */
120       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
121    }
122
123    /* Now wait for the time to run the job */
124    for (;;) {
125       time_t twait;
126       now = time(NULL);
127       twait = next_job->runtime - now;
128       if (twait <= 0) {               /* time to run it */
129          break;
130       }
131       bmicrosleep(twait, 0);
132    }
133    run = next_job->run;               /* pick up needed values */
134    job = next_job->job;
135    run->last_run = now;               /* mark as run now */
136
137    dump_job(next_job, "Run job");
138
139    free(next_job);
140
141    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
142    ASSERT(job);
143    set_jcr_defaults(jcr, job);
144    if (run->level) {
145       jcr->JobLevel = run->level;     /* override run level */
146    }
147    if (run->pool) {
148       jcr->pool = run->pool;          /* override pool */
149    }
150    if (run->storage) {
151       jcr->store = run->storage;      /* override storage */
152    }
153    if (run->msgs) {
154       jcr->messages = run->msgs;      /* override messages */
155    }
156    if (run->Priority) {
157       jcr->JobPriority = run->Priority;
158    }
159    if (run->spool_data_set) {
160       jcr->spool_data = run->spool_data;
161    }
162    Dmsg0(200, "Leave wait_for_next_job()\n");
163    return jcr;
164 }
165
166
167 /*
168  * Shutdown the scheduler  
169  */
170 void term_scheduler()
171 {
172    if (jobs_to_run) {
173       job_item *je;
174       /* Release all queued job entries to be run */
175       foreach_dlist(je, jobs_to_run) {
176          free(je);
177       }
178       delete jobs_to_run;
179    }
180 }
181
182 /*          
183  * Find all jobs to be run this hour and the next hour.
184  */
185 static void find_runs()
186 {
187    time_t now, next_hour, runtime;
188    RUN *run;
189    JOB *job;
190    SCHED *sched;
191    struct tm tm;
192    int minute;
193    int hour, mday, wday, month, wom, woy;
194    /* Items corresponding to above at the next hour */
195    int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year;
196
197    Dmsg0(200, "enter find_runs()\n");
198
199    
200    /* compute values for time now */
201    now = time(NULL);
202    localtime_r(&now, &tm);
203    hour = tm.tm_hour;
204    minute = tm.tm_min;
205    mday = tm.tm_mday - 1;
206    wday = tm.tm_wday;
207    month = tm.tm_mon;
208    wom = mday / 7;
209    woy = tm_woy(now);                     /* get week of year */
210
211    /* 
212     * Compute values for next hour from now.
213     * We do this to be sure we don't miss a job while
214     * sleeping.
215     */
216    next_hour = now + 3600;  
217    localtime_r(&next_hour, &tm);
218    nh_hour = tm.tm_hour;
219    nh_mday = tm.tm_mday - 1;
220    nh_wday = tm.tm_wday;
221    nh_month = tm.tm_mon;
222    nh_year  = tm.tm_year;
223    nh_wom = nh_mday / 7;
224    nh_woy = tm_woy(now);                     /* get week of year */
225
226    /* Loop through all jobs */
227    LockRes();
228    foreach_res(job, R_JOB) {
229       sched = job->schedule;
230       if (sched == NULL) {            /* scheduled? */
231          continue;                    /* no, skip this job */
232       }
233       Dmsg1(200, "Got job: %s\n", job->hdr.name);
234       for (run=sched->run; run; run=run->next) {
235          bool run_now, run_nh;
236          /* 
237           * Find runs scheduled between now and the next hour.
238           */
239 #ifdef xxxx
240          Dmsg0(000, "\n");
241          Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n", 
242             hour, month, mday, wday, wom, woy);
243          Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n", 
244             bit_is_set(hour, run->hour),
245             bit_is_set(month, run->month),
246             bit_is_set(mday, run->mday),
247             bit_is_set(wday, run->wday),
248             bit_is_set(wom, run->wom),
249             bit_is_set(woy, run->woy));
250
251          Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n", 
252             nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
253          Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n", 
254             bit_is_set(nh_hour, run->hour),
255             bit_is_set(nh_month, run->month),
256             bit_is_set(nh_mday, run->mday),
257             bit_is_set(nh_wday, run->wday),
258             bit_is_set(nh_wom, run->wom),
259             bit_is_set(nh_woy, run->woy));
260 #endif
261
262          run_now = bit_is_set(hour, run->hour) &&
263             bit_is_set(mday, run->mday) && 
264             bit_is_set(wday, run->wday) &&
265             bit_is_set(month, run->month) &&
266             bit_is_set(wom, run->wom) &&
267             bit_is_set(woy, run->woy);
268
269          run_nh = bit_is_set(nh_hour, run->hour) &&
270             bit_is_set(nh_mday, run->mday) && 
271             bit_is_set(nh_wday, run->wday) &&
272             bit_is_set(nh_month, run->month) &&
273             bit_is_set(nh_wom, run->wom) &&
274             bit_is_set(nh_woy, run->woy);
275
276          Dmsg2(200, "run_now=%d run_nh=%d\n", run_now, run_nh);
277
278          /* find time (time_t) job is to be run */
279          localtime_r(&now, &tm);      /* reset tm structure */
280          tm.tm_min = run->minute;     /* set run minute */
281          tm.tm_sec = 0;               /* zero secs */
282          if (run_now) {
283             runtime = mktime(&tm);
284             add_job(job, run, now, runtime);
285          }
286          /* If job is to be run in the next hour schedule it */
287          if (run_nh) {
288             /* Set correct values */
289             tm.tm_hour = nh_hour;
290             tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */
291             tm.tm_mon = nh_month;
292             tm.tm_year = nh_year;
293             runtime = mktime(&tm);
294             add_job(job, run, now, runtime);
295          }
296       }  
297    }
298    UnlockRes();
299    Dmsg0(200, "Leave find_runs()\n");
300 }
301
302 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
303 {
304    job_item *ji;
305    bool inserted = false;
306    /*
307     * Don't run any job that ran less than a minute ago, but
308     *  do run any job scheduled less than a minute ago.
309     */
310    if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
311 #ifdef SCHED_DEBUG
312       char dt[50], dt1[50], dt2[50];
313       bstrftime_nc(dt, sizeof(dt), runtime);  
314       bstrftime_nc(dt1, sizeof(dt1), run->last_run);
315       bstrftime_nc(dt2, sizeof(dt2), now);
316       Dmsg4(000, "Drop: Job=\"%s\" run=%s. last_run=%s. now=%s\n", job->hdr.name, 
317             dt, dt1, dt2);
318       fflush(stdout);
319 #endif
320       return;
321    }
322    /* accept to run this job */
323    job_item *je = (job_item *)malloc(sizeof(job_item));
324    je->run = run;
325    je->job = job;
326    je->runtime = runtime;
327    if (run->Priority) {
328       je->Priority = run->Priority;
329    } else {
330       je->Priority = job->Priority;
331    }
332
333    /* Add this job to the wait queue in runtime, priority sorted order */
334    foreach_dlist(ji, jobs_to_run) {
335       if (ji->runtime > je->runtime || 
336           (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
337          jobs_to_run->insert_before(je, ji);
338          dump_job(je, "Inserted job");
339          inserted = true;
340          break;
341       }
342    }
343    /* If place not found in queue, append it */
344    if (!inserted) {
345       jobs_to_run->append(je);
346       dump_job(je, "Appended job");
347    }
348 #ifdef SCHED_DEBUG
349    foreach_dlist(ji, jobs_to_run) {
350       dump_job(ji, "Run queue");
351    }
352    Dmsg0(000, "End run queue\n");
353 #endif
354 }
355
356 static void dump_job(job_item *ji, const char *msg) 
357 {
358 #ifdef SCHED_DEBUG
359    char dt[MAX_TIME_LENGTH];
360    int save_debug = debug_level;
361    debug_level = 200;
362    if (debug_level < 200) {
363       return;
364    }
365    bstrftime_nc(dt, sizeof(dt), ji->runtime);  
366    Dmsg4(200, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name, 
367       ji->Priority, dt);
368    fflush(stdout);
369    debug_level = save_debug;
370 #endif
371 }