X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Fdird%2Fscheduler.c;h=edd9afe853e5ed124ae11da79d0c98fd49e1b5fd;hb=da8584df92d56dd3cbf3c9a7563b22d8335ba219;hp=65ac9ada47881cfb9284a6b2943cb85bdea1eb05;hpb=576122e4af8fec3a308edca0727a92333992d432;p=bacula%2Fbacula diff --git a/bacula/src/dird/scheduler.c b/bacula/src/dird/scheduler.c index 65ac9ada47..edd9afe853 100644 --- a/bacula/src/dird/scheduler.c +++ b/bacula/src/dird/scheduler.c @@ -2,15 +2,15 @@ * * Bacula scheduler * It looks at what jobs are to be run and when - * and waits around until it is time to + * and waits around until it is time to * fire them up. * - * Kern Sibbald, May MM, revised December MMIII + * Kern Sibbald, May MM, major revision December MMIII * * Version $Id$ */ /* - Copyright (C) 2000-2003 Kern Sibbald and John Walker + Copyright (C) 2000-2005 Kern Sibbald This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -32,28 +32,33 @@ #include "bacula.h" #include "dird.h" +/* #define SCHED_DEBUG */ -/* Forward referenced subroutines */ -static void find_runs(); -static void add_job(JOB *job, RUN *run, time_t now, time_t runtime); - -/* Imported subroutines */ - -/* Imported variables */ /* Local variables */ -struct job_item { +struct job_item { RUN *run; JOB *job; time_t runtime; + int Priority; dlink link; /* link for list */ -}; +}; /* List of jobs to be run. They were scheduled in this hour or the next */ static dlist *jobs_to_run; /* list of jobs to be run */ /* Time interval in secs to sleep if nothing to be run */ -#define NEXT_CHECK_SECS 60 +static int const NEXT_CHECK_SECS = 60; + +/* Forward referenced subroutines */ +static void find_runs(); +static void add_job(JOB *job, RUN *run, time_t now, time_t runtime); +static void dump_job(job_item *ji, const char *msg); + +/* Imported subroutines */ + +/* Imported variables */ + /********************************************************************* * @@ -65,16 +70,15 @@ JCR *wait_for_next_job(char *one_shot_job_to_run) JCR *jcr; JOB *job; RUN *run; - time_t now, runtime, nexttime; + time_t now; static bool first = true; - char dt[MAX_TIME_LENGTH]; job_item *next_job = NULL; Dmsg0(200, "Enter wait_for_next_job\n"); if (first) { first = false; /* Create scheduled jobs list */ - jobs_to_run = new dlist(next_job, &next_job->link); + jobs_to_run = New(dlist(next_job, &next_job->link)); if (one_shot_job_to_run) { /* one shot */ job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run); if (!job) { @@ -89,40 +93,29 @@ JCR *wait_for_next_job(char *one_shot_job_to_run) /* Wait until we have something in the * next hour or so. */ - while (jobs_to_run->size() == 0) { + while (jobs_to_run->empty()) { find_runs(); - if (jobs_to_run->size() > 0) { + if (!jobs_to_run->empty()) { break; } bmicrosleep(NEXT_CHECK_SECS, 0); /* recheck once per minute */ } - /* - * Sort through what is to be run in the next - * two hours to find the first job to be run, - * then wait around until it is time. - * +#ifdef list_chain + job_item *je; + foreach_dlist(je, jobs_to_run) { + dump_job(je, "Walk queue"); + } +#endif + /* + * Pull the first job to run (already sorted by runtime and + * Priority, then wait around until it is time to run it. */ - time(&now); - nexttime = now + 60 * 60 * 24; /* a much later time */ - bstrftime(dt, sizeof(dt), now); - Dmsg2(400, "jobs=%d. Now is %s\n", jobs_to_run->size(), dt); - next_job = NULL; - for (job_item *je=NULL; (je=(job_item *)jobs_to_run->next(je)); ) { - runtime = je->runtime; - if (runtime > 0 && runtime < nexttime) { /* find minimum time job */ - nexttime = runtime; - next_job = je; - } + next_job = (job_item *)jobs_to_run->first(); + jobs_to_run->remove(next_job); + + dump_job(next_job, "Dequeued job"); -#define xxxx_debug -#ifdef xxxx_debug - if (runtime > 0) { - bstrftime(dt, sizeof(dt), next_job->runtime); - Dmsg2(100, " %s run %s\n", dt, next_job->job->hdr.name); - } -#endif - } if (!next_job) { /* we really should have something now */ Emsg0(M_ABORT, 0, _("Scheduler logic error\n")); } @@ -131,7 +124,7 @@ JCR *wait_for_next_job(char *one_shot_job_to_run) for (;;) { time_t twait; now = time(NULL); - twait = nexttime - now; + twait = next_job->runtime - now; if (twait <= 0) { /* time to run it */ break; } @@ -139,20 +132,32 @@ JCR *wait_for_next_job(char *one_shot_job_to_run) } run = next_job->run; /* pick up needed values */ job = next_job->job; - jobs_to_run->remove(next_job); /* remove from list */ - run->last_run = now; /* mark as run */ + run->last_run = now; /* mark as run now */ + + dump_job(next_job, "Run job"); + + free(next_job); jcr = new_jcr(sizeof(JCR), dird_free_jcr); ASSERT(job); set_jcr_defaults(jcr, job); if (run->level) { - jcr->JobLevel = run->level; /* override run level */ + jcr->JobLevel = run->level; /* override run level */ } if (run->pool) { jcr->pool = run->pool; /* override pool */ } + if (run->full_pool) { + jcr->full_pool = run->full_pool; /* override full pool */ + } + if (run->inc_pool) { + jcr->inc_pool = run->inc_pool; /* override inc pool */ + } + if (run->dif_pool) { + jcr->dif_pool = run->dif_pool; /* override dif pool */ + } if (run->storage) { - jcr->store = run->storage; /* override storage */ + set_storage(jcr, run->storage); /* override storage */ } if (run->msgs) { jcr->messages = run->msgs; /* override messages */ @@ -160,24 +165,33 @@ JCR *wait_for_next_job(char *one_shot_job_to_run) if (run->Priority) { jcr->JobPriority = run->Priority; } + if (run->spool_data_set) { + jcr->spool_data = run->spool_data; + } + if (run->write_part_after_job_set) { + jcr->write_part_after_job = run->write_part_after_job; + } Dmsg0(200, "Leave wait_for_next_job()\n"); return jcr; } /* - * Shutdown the scheduler + * Shutdown the scheduler */ void term_scheduler() { - /* Release all queued job entries to be run */ - for (void *je=NULL; (je=jobs_to_run->next(je)); ) { - free(je); + if (jobs_to_run) { + job_item *je; + /* Release all queued job entries to be run */ + foreach_dlist(je, jobs_to_run) { + free(je); + } + delete jobs_to_run; } - delete jobs_to_run; } -/* +/* * Find all jobs to be run this hour and the next hour. */ static void find_runs() @@ -192,9 +206,9 @@ static void find_runs() /* Items corresponding to above at the next hour */ int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year; - Dmsg0(200, "enter find_runs()\n"); + Dmsg0(1200, "enter find_runs()\n"); + - /* compute values for time now */ now = time(NULL); localtime_r(&now, &tm); @@ -203,50 +217,76 @@ static void find_runs() mday = tm.tm_mday - 1; wday = tm.tm_wday; month = tm.tm_mon; - wom = tm_wom(tm.tm_mday, tm.tm_wday); /* get week of month */ + wom = mday / 7; woy = tm_woy(now); /* get week of year */ - /* + /* * Compute values for next hour from now. * We do this to be sure we don't miss a job while * sleeping. */ - next_hour = now + 3600; + next_hour = now + 3600; localtime_r(&next_hour, &tm); nh_hour = tm.tm_hour; - minute = tm.tm_min; nh_mday = tm.tm_mday - 1; nh_wday = tm.tm_wday; nh_month = tm.tm_mon; nh_year = tm.tm_year; - nh_wom = tm_wom(tm.tm_mday, tm.tm_wday); /* get week of month */ + nh_wom = nh_mday / 7; nh_woy = tm_woy(now); /* get week of year */ /* Loop through all jobs */ LockRes(); - for (job=NULL; (job=(JOB *)GetNextRes(R_JOB, (RES *)job)); ) { + foreach_res(job, R_JOB) { sched = job->schedule; if (sched == NULL) { /* scheduled? */ continue; /* no, skip this job */ } + Dmsg1(1200, "Got job: %s\n", job->hdr.name); for (run=sched->run; run; run=run->next) { bool run_now, run_nh; - - /* Find runs scheduled between now and the next - * check time (typically 60 seconds) + /* + * Find runs scheduled between now and the next hour. */ +#ifdef xxxx + Dmsg0(000, "\n"); + Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n", + hour, month, mday, wday, wom, woy); + Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n", + bit_is_set(hour, run->hour), + bit_is_set(month, run->month), + bit_is_set(mday, run->mday), + bit_is_set(wday, run->wday), + bit_is_set(wom, run->wom), + bit_is_set(woy, run->woy)); + + Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n", + nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy); + Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n", + bit_is_set(nh_hour, run->hour), + bit_is_set(nh_month, run->month), + bit_is_set(nh_mday, run->mday), + bit_is_set(nh_wday, run->wday), + bit_is_set(nh_wom, run->wom), + bit_is_set(nh_woy, run->woy)); +#endif + run_now = bit_is_set(hour, run->hour) && - (bit_is_set(mday, run->mday) || bit_is_set(wday, run->wday)) && + bit_is_set(mday, run->mday) && + bit_is_set(wday, run->wday) && bit_is_set(month, run->month) && bit_is_set(wom, run->wom) && bit_is_set(woy, run->woy); run_nh = bit_is_set(nh_hour, run->hour) && - (bit_is_set(nh_mday, run->mday) || bit_is_set(nh_wday, run->wday)) && + bit_is_set(nh_mday, run->mday) && + bit_is_set(nh_wday, run->wday) && bit_is_set(nh_month, run->month) && bit_is_set(nh_wom, run->wom) && bit_is_set(nh_woy, run->woy); + Dmsg2(1200, "run_now=%d run_nh=%d\n", run_now, run_nh); + /* find time (time_t) job is to be run */ localtime_r(&now, &tm); /* reset tm structure */ tm.tm_min = run->minute; /* set run minute */ @@ -259,25 +299,36 @@ static void find_runs() if (run_nh) { /* Set correct values */ tm.tm_hour = nh_hour; - tm.tm_mday = nh_mday; + tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */ tm.tm_mon = nh_month; tm.tm_year = nh_year; runtime = mktime(&tm); add_job(job, run, now, runtime); } - } + } } UnlockRes(); - Dmsg0(200, "Leave find_runs()\n"); + Dmsg0(1200, "Leave find_runs()\n"); } static void add_job(JOB *job, RUN *run, time_t now, time_t runtime) { + job_item *ji; + bool inserted = false; /* * Don't run any job that ran less than a minute ago, but * do run any job scheduled less than a minute ago. */ - if ((runtime - run->last_run < 61) || (runtime+59 < now)) { + if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) { +#ifdef SCHED_DEBUG + char dt[50], dt1[50], dt2[50]; + bstrftime_nc(dt, sizeof(dt), runtime); + bstrftime_nc(dt1, sizeof(dt1), run->last_run); + bstrftime_nc(dt2, sizeof(dt2), now); + Dmsg4(000, "Drop: Job=\"%s\" run=%s. last_run=%s. now=%s\n", job->hdr.name, + dt, dt1, dt2); + fflush(stdout); +#endif return; } /* accept to run this job */ @@ -285,6 +336,48 @@ static void add_job(JOB *job, RUN *run, time_t now, time_t runtime) je->run = run; je->job = job; je->runtime = runtime; - /* ***FIXME**** (enhancement) insert by sorted runtime */ - jobs_to_run->append(je); + if (run->Priority) { + je->Priority = run->Priority; + } else { + je->Priority = job->Priority; + } + + /* Add this job to the wait queue in runtime, priority sorted order */ + foreach_dlist(ji, jobs_to_run) { + if (ji->runtime > je->runtime || + (ji->runtime == je->runtime && ji->Priority > je->Priority)) { + jobs_to_run->insert_before(je, ji); + dump_job(je, "Inserted job"); + inserted = true; + break; + } + } + /* If place not found in queue, append it */ + if (!inserted) { + jobs_to_run->append(je); + dump_job(je, "Appended job"); + } +#ifdef SCHED_DEBUG + foreach_dlist(ji, jobs_to_run) { + dump_job(ji, "Run queue"); + } + Dmsg0(000, "End run queue\n"); +#endif +} + +static void dump_job(job_item *ji, const char *msg) +{ +#ifdef SCHED_DEBUG + char dt[MAX_TIME_LENGTH]; + int save_debug = debug_level; + debug_level = 200; + if (debug_level < 200) { + return; + } + bstrftime_nc(dt, sizeof(dt), ji->runtime); + Dmsg4(200, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name, + ji->Priority, dt); + fflush(stdout); + debug_level = save_debug; +#endif }