X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Fdird%2Fscheduler.c;h=d10abbca64ad8bb3b8b1da3aadf36a87e7e23e10;hb=4bf5214eb31f01ee0af7b05fbf0e9d16a5706dbd;hp=8ec070d77a0e9c34fcaed22b0be979c19df6604a;hpb=c62398d50d0f5afb873192adfef6a330bb260e3e;p=bacula%2Fbacula diff --git a/bacula/src/dird/scheduler.c b/bacula/src/dird/scheduler.c index 8ec070d77a..d10abbca64 100644 --- a/bacula/src/dird/scheduler.c +++ b/bacula/src/dird/scheduler.c @@ -1,269 +1,444 @@ +/* + Bacula® - The Network Backup Solution + + Copyright (C) 2000-2009 Free Software Foundation Europe e.V. + + The main author of Bacula is Kern Sibbald, with contributions from + many others, a complete list can be found in the file AUTHORS. + This program is Free Software; you can redistribute it and/or + modify it under the terms of version two of the GNU General Public + License as published by the Free Software Foundation and included + in the file LICENSE. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. + + Bacula® is a registered trademark of Kern Sibbald. + The licensor of Bacula is the Free Software Foundation Europe + (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, + Switzerland, email:ftf@fsfeurope.org. +*/ /* * * Bacula scheduler * It looks at what jobs are to be run and when - * and waits around until it is time to + * and waits around until it is time to * fire them up. * - * Kern Sibbald, May MM + * Kern Sibbald, May MM, major revision December MMIII * * Version $Id$ */ -/* - Copyright (C) 2000-2003 Kern Sibbald and John Walker - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2 of - the License, or (at your option) any later version. +#include "bacula.h" +#include "dird.h" - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. +#if 0 +#define SCHED_DEBUG +#define DBGLVL 0 +#else +#undef SCHED_DEBUG +#define DBGLVL 200 +#endif - You should have received a copy of the GNU General Public - License along with this program; if not, write to the Free - Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, - MA 02111-1307, USA. +const int dbglvl = DBGLVL; - */ +/* Local variables */ +struct job_item { + RUN *run; + JOB *job; + time_t runtime; + int Priority; + dlink link; /* link for list */ +}; -#include "bacula.h" -#include "dird.h" +/* List of jobs to be run. They were scheduled in this hour or the next */ +static dlist *jobs_to_run; /* list of jobs to be run */ +/* Time interval in secs to sleep if nothing to be run */ +static int const next_check_secs = 60; /* Forward referenced subroutines */ static void find_runs(); static void add_job(JOB *job, RUN *run, time_t now, time_t runtime); +static void dump_job(job_item *ji, const char *msg); /* Imported subroutines */ /* Imported variables */ -/* Local variables */ -typedef struct { - RUN *run; - JOB *job; - time_t runtime; -} RUNJOB; - -static int num_runjobs; /* total jobs found by find_runs() */ -static int rem_runjobs; /* jobs remaining to be processed */ -static int max_runjobs; /* max jobs in runjobs array */ -static RUNJOB *runjobs; /* array of jobs to be run */ - +/** + * called by reload_config to tell us that the schedules + * we may have based our next jobs to run queues have been + * invalidated. In fact the schedules may not have changed + * but the run object that we have recorded the last_run time + * on are new and no longer have a valid last_run time which + * causes us to double run schedules that get put into the list + * because run_nh = 1. + */ +static bool schedules_invalidated = false; +void invalidate_schedules(void) { + schedules_invalidated = true; +} /********************************************************************* * - * Main Bacula Scheduler + * Main Bacula Scheduler * */ -JCR *wait_for_next_job(char *job_to_run) +JCR *wait_for_next_job(char *one_shot_job_to_run) { JCR *jcr; JOB *job; RUN *run; - time_t now, runtime, nexttime; - int jobindex, i; - static int first = TRUE; - char dt[MAX_TIME_LENGTH]; + time_t now, prev; + static bool first = true; + job_item *next_job = NULL; - Dmsg0(200, "Enter wait_for_next_job\n"); + Dmsg0(dbglvl, "Enter wait_for_next_job\n"); if (first) { - first = FALSE; - max_runjobs = 10; - runjobs = (RUNJOB *) malloc(sizeof(RUNJOB) * max_runjobs); - num_runjobs = 0; - rem_runjobs = 0; - if (job_to_run) { /* one shot */ - job = (JOB *)GetResWithName(R_JOB, job_to_run); - if (!job) { - Emsg1(M_ABORT, 0, _("Job %s not found\n"), job_to_run); - } - Dmsg1(5, "Found job_to_run %s\n", job_to_run); - jcr = new_jcr(sizeof(JCR), dird_free_jcr); - set_jcr_defaults(jcr, job); - return jcr; + first = false; + /* Create scheduled jobs list */ + jobs_to_run = New(dlist(next_job, &next_job->link)); + if (one_shot_job_to_run) { /* one shot */ + job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run); + if (!job) { + Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run); + } + Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run); + jcr = new_jcr(sizeof(JCR), dird_free_jcr); + set_jcr_defaults(jcr, job); + return jcr; } } + /* Wait until we have something in the * next hour or so. */ - while (rem_runjobs == 0) { +again: + while (jobs_to_run->empty()) { find_runs(); - if (rem_runjobs > 0) { - break; + if (!jobs_to_run->empty()) { + break; } - bmicrosleep(60, 0); /* recheck once per minute */ + bmicrosleep(next_check_secs, 0); /* recheck once per minute */ } - /* - * Sort through what is to be run in the next - * two hours to find the first job to be run, - * then wait around until it is time. - * - */ - time(&now); - nexttime = now + 60 * 60 * 24; /* a much later time */ - jobindex = -1; - bstrftime(dt, sizeof(dt), now); - Dmsg2(400, "jobs=%d. Now is %s\n", rem_runjobs, dt); - for (i=0; i 0 && runtime < nexttime) { /* find minimum time job */ - nexttime = runtime; - jobindex = i; - } -#ifdef xxxx_debug - if (runtime > 0) { - bstrftime(dt, sizeof(dt), runjobs[i].runtime); - Dmsg2(100, " %s run %s\n", dt, runjobs[i].job->hdr.name); - } -#endif +#ifdef list_chain + job_item *je; + foreach_dlist(je, jobs_to_run) { + dump_job(je, _("Walk queue")); } - if (jobindex < 0) { /* we really should have something now */ +#endif + /* + * Pull the first job to run (already sorted by runtime and + * Priority, then wait around until it is time to run it. + */ + next_job = (job_item *)jobs_to_run->first(); + jobs_to_run->remove(next_job); + + dump_job(next_job, _("Dequeued job")); + + if (!next_job) { /* we really should have something now */ Emsg0(M_ABORT, 0, _("Scheduler logic error\n")); } /* Now wait for the time to run the job */ for (;;) { time_t twait; + /** discard scheduled queue and rebuild with new schedule objects. **/ + lock_jobs(); + if (schedules_invalidated) { + dump_job(next_job, "Invalidated job"); + free(next_job); + while (!jobs_to_run->empty()) { + next_job = (job_item *)jobs_to_run->first(); + jobs_to_run->remove(next_job); + dump_job(next_job, "Invalidated job"); + free(next_job); + } + schedules_invalidated = false; + unlock_jobs(); + goto again; + } + unlock_jobs(); + prev = now = time(NULL); + twait = next_job->runtime - now; + if (twait <= 0) { /* time to run it */ + break; + } + /* Recheck at least once per minute */ + bmicrosleep((next_check_secs < twait)?next_check_secs:twait, 0); + /* Attempt to handle clock shift (but not daylight savings time changes) + * we allow a skew of 10 seconds before invalidating everything. + */ now = time(NULL); - twait = nexttime - now; - if (twait <= 0) { /* time to run it */ - break; + if (now < prev-10 || now > (prev+next_check_secs+10)) { + schedules_invalidated = true; } - bmicrosleep(twait, 0); } - run = runjobs[jobindex].run; - job = runjobs[jobindex].job; - runjobs[jobindex].runtime = 0; /* remove from list */ - run->last_run = now; /* mark as run */ - rem_runjobs--; /* decrement count of remaining jobs */ - jcr = new_jcr(sizeof(JCR), dird_free_jcr); + run = next_job->run; /* pick up needed values */ + job = next_job->job; + if (job->enabled) { + dump_job(next_job, _("Run job")); + } + free(next_job); + if (!job->enabled) { + free_jcr(jcr); + goto again; /* ignore this job */ + } + run->last_run = now; /* mark as run now */ + ASSERT(job); set_jcr_defaults(jcr, job); if (run->level) { - jcr->JobLevel = run->level; /* override run level */ + jcr->set_JobLevel(run->level); /* override run level */ } if (run->pool) { - jcr->pool = run->pool; /* override pool */ + jcr->pool = run->pool; /* override pool */ + jcr->run_pool_override = true; + } + if (run->full_pool) { + jcr->full_pool = run->full_pool; /* override full pool */ + jcr->run_full_pool_override = true; + } + if (run->inc_pool) { + jcr->inc_pool = run->inc_pool; /* override inc pool */ + jcr->run_inc_pool_override = true; + } + if (run->diff_pool) { + jcr->diff_pool = run->diff_pool; /* override dif pool */ + jcr->run_diff_pool_override = true; } if (run->storage) { - jcr->store = run->storage; /* override storage */ + USTORE store; + store.store = run->storage; + pm_strcpy(store.store_source, _("run override")); + set_rwstorage(jcr, &store); /* override storage */ } if (run->msgs) { jcr->messages = run->msgs; /* override messages */ } - Dmsg0(200, "Leave wait_for_next_job()\n"); + if (run->Priority) { + jcr->JobPriority = run->Priority; + } + if (run->spool_data_set) { + jcr->spool_data = run->spool_data; + } + if (run->write_part_after_job_set) { + jcr->write_part_after_job = run->write_part_after_job; + } + Dmsg0(dbglvl, "Leave wait_for_next_job()\n"); return jcr; } /* - * Shutdown the scheduler + * Shutdown the scheduler */ void term_scheduler() { - if (runjobs) { /* free allocated memory */ - free(runjobs); - runjobs = NULL; - max_runjobs = 0; + if (jobs_to_run) { + delete jobs_to_run; } } - -/* - * Find all jobs to be run this hour - * and the next hour. +/* + * Find all jobs to be run this hour and the next hour. */ static void find_runs() { - time_t now, runtime; + time_t now, next_hour, runtime; RUN *run; JOB *job; SCHED *sched; struct tm tm; - int hour, next_hour, minute, mday, wday, month, wpos; + int minute; + int hour, mday, wday, month, wom, woy; + /* Items corresponding to above at the next hour */ + int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year; - Dmsg0(200, "enter find_runs()\n"); - num_runjobs = 0; + Dmsg0(dbglvl, "enter find_runs()\n"); + + /* compute values for time now */ now = time(NULL); - localtime_r(&now, &tm); - + (void)localtime_r(&now, &tm); hour = tm.tm_hour; - next_hour = hour + 1; - if (next_hour > 23) - next_hour -= 24; minute = tm.tm_min; mday = tm.tm_mday - 1; wday = tm.tm_wday; month = tm.tm_mon; - wpos = (tm.tm_mday - 1) / 7; + wom = mday / 7; + woy = tm_woy(now); /* get week of year */ + + Dmsg7(dbglvl, "now = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n", + now, hour, month, mday, wday, wom, woy); + + /* + * Compute values for next hour from now. + * We do this to be sure we don't miss a job while + * sleeping. + */ + next_hour = now + 3600; + (void)localtime_r(&next_hour, &tm); + nh_hour = tm.tm_hour; + nh_mday = tm.tm_mday - 1; + nh_wday = tm.tm_wday; + nh_month = tm.tm_mon; + nh_year = tm.tm_year; + nh_wom = nh_mday / 7; + nh_woy = tm_woy(now); /* get week of year */ + + Dmsg7(dbglvl, "nh = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n", + next_hour, nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy); /* Loop through all jobs */ LockRes(); - for (job=NULL; (job=(JOB *)GetNextRes(R_JOB, (RES *)job)); ) { + foreach_res(job, R_JOB) { sched = job->schedule; - if (sched == NULL) { /* scheduled? */ - continue; /* no, skip this job */ + if (sched == NULL || !job->enabled) { /* scheduled? or enabled? */ + continue; /* no, skip this job */ } + Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name); for (run=sched->run; run; run=run->next) { + bool run_now, run_nh; + /* + * Find runs scheduled between now and the next hour. + */ +#ifdef xxxx + Dmsg0(000, "\n"); + Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n", + hour, month, mday, wday, wom, woy); + Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n", + bit_is_set(hour, run->hour), + bit_is_set(month, run->month), + bit_is_set(mday, run->mday), + bit_is_set(wday, run->wday), + bit_is_set(wom, run->wom), + bit_is_set(woy, run->woy)); - /* Find runs scheduled in this our or in the - * next hour (we may be one second before the next hour). - */ - if ((bit_is_set(hour, run->hour) || bit_is_set(next_hour, run->hour)) && - (bit_is_set(mday, run->mday) || bit_is_set(wday, run->wday)) && - bit_is_set(month, run->month) && bit_is_set(wpos, run->wpos)) { - - /* find time (time_t) job is to be run */ - localtime_r(&now, &tm); - tm.tm_min = run->minute; - tm.tm_sec = 0; - if (bit_is_set(hour, run->hour)) { - runtime = mktime(&tm); - add_job(job, run, now, runtime); - } - if (bit_is_set(next_hour, run->hour)) { - tm.tm_hour++; - if (tm.tm_hour > 23) { - tm.tm_hour = 0; - } - runtime = mktime(&tm); - add_job(job, run, now, runtime); - } - } - } - } + Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n", + nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy); + Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n", + bit_is_set(nh_hour, run->hour), + bit_is_set(nh_month, run->month), + bit_is_set(nh_mday, run->mday), + bit_is_set(nh_wday, run->wday), + bit_is_set(nh_wom, run->wom), + bit_is_set(nh_woy, run->woy)); +#endif + + run_now = bit_is_set(hour, run->hour) && + bit_is_set(mday, run->mday) && + bit_is_set(wday, run->wday) && + bit_is_set(month, run->month) && + bit_is_set(wom, run->wom) && + bit_is_set(woy, run->woy); + + run_nh = bit_is_set(nh_hour, run->hour) && + bit_is_set(nh_mday, run->mday) && + bit_is_set(nh_wday, run->wday) && + bit_is_set(nh_month, run->month) && + bit_is_set(nh_wom, run->wom) && + bit_is_set(nh_woy, run->woy); + Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh); + + if (run_now || run_nh) { + /* find time (time_t) job is to be run */ + (void)localtime_r(&now, &tm); /* reset tm structure */ + tm.tm_min = run->minute; /* set run minute */ + tm.tm_sec = 0; /* zero secs */ + runtime = mktime(&tm); + if (run_now) { + add_job(job, run, now, runtime); + } + /* If job is to be run in the next hour schedule it */ + if (run_nh) { + add_job(job, run, now, runtime + 3600); + } + } + } + } UnlockRes(); - rem_runjobs = num_runjobs; - Dmsg0(200, "Leave find_runs()\n"); + Dmsg0(dbglvl, "Leave find_runs()\n"); } static void add_job(JOB *job, RUN *run, time_t now, time_t runtime) { + job_item *ji; + bool inserted = false; /* * Don't run any job that ran less than a minute ago, but * do run any job scheduled less than a minute ago. */ - if ((runtime - run->last_run < 61) || (runtime+59 < now)) { + if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) { +#ifdef SCHED_DEBUG + Dmsg4(000, "Drop: Job=\"%s\" run=%lld. last_run=%lld. now=%lld\n", job->hdr.name, + (utime_t)runtime, (utime_t)run->last_run, (utime_t)now); + fflush(stdout); +#endif return; } - - /* Make sure array is big enough */ - if (num_runjobs == max_runjobs) { - max_runjobs += 10; - runjobs = (RUNJOB *)realloc(runjobs, sizeof(RUNJOB) * max_runjobs); - if (!runjobs) - Emsg0(M_ABORT, 0, _("Out of memory\n")); - } +#ifdef SCHED_DEBUG + Dmsg4(000, "Add: Job=\"%s\" run=%lld last_run=%lld now=%lld\n", job->hdr.name, + (utime_t)runtime, (utime_t)run->last_run, (utime_t)now); +#endif /* accept to run this job */ - runjobs[num_runjobs].run = run; - runjobs[num_runjobs].job = job; - runjobs[num_runjobs++].runtime = runtime; /* when to run it */ + job_item *je = (job_item *)malloc(sizeof(job_item)); + je->run = run; + je->job = job; + je->runtime = runtime; + if (run->Priority) { + je->Priority = run->Priority; + } else { + je->Priority = job->Priority; + } + + /* Add this job to the wait queue in runtime, priority sorted order */ + foreach_dlist(ji, jobs_to_run) { + if (ji->runtime > je->runtime || + (ji->runtime == je->runtime && ji->Priority > je->Priority)) { + jobs_to_run->insert_before(je, ji); + dump_job(je, _("Inserted job")); + inserted = true; + break; + } + } + /* If place not found in queue, append it */ + if (!inserted) { + jobs_to_run->append(je); + dump_job(je, _("Appended job")); + } +#ifdef SCHED_DEBUG + foreach_dlist(ji, jobs_to_run) { + dump_job(ji, _("Run queue")); + } + Dmsg0(000, "End run queue\n"); +#endif +} + +static void dump_job(job_item *ji, const char *msg) +{ +#ifdef SCHED_DEBUG + char dt[MAX_TIME_LENGTH]; + int save_debug = debug_level; + if (debug_level < dbglvl) { + return; + } + bstrftime_nc(dt, sizeof(dt), ji->runtime); + Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name, + ji->Priority, dt); + fflush(stdout); + debug_level = save_debug; +#endif }