* and waits around until it is time to
* fire them up.
*
- * Kern Sibbald, May MM
+ * Kern Sibbald, May MM, major revision December MMIII
*
* Version $Id$
*/
/*
- Copyright (C) 2000-2003 Kern Sibbald and John Walker
+ Copyright (C) 2000-2004 Kern Sibbald and John Walker
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
#include "bacula.h"
#include "dird.h"
+/* #define PHIL */
+
+
+/* Local variables */
+struct job_item {
+ RUN *run;
+ JOB *job;
+ time_t runtime;
+ int Priority;
+ dlink link; /* link for list */
+};
+
+/* List of jobs to be run. They were scheduled in this hour or the next */
+static dlist *jobs_to_run; /* list of jobs to be run */
+
+/* Time interval in secs to sleep if nothing to be run */
+static int const NEXT_CHECK_SECS = 60;
/* Forward referenced subroutines */
static void find_runs();
static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
+static void dump_job(job_item *ji, char *msg);
/* Imported subroutines */
/* Imported variables */
-/* Local variables */
-typedef struct {
- RUN *run;
- JOB *job;
- time_t runtime;
-} RUNJOB;
-
-static int num_runjobs; /* total jobs found by find_runs() */
-static int rem_runjobs; /* jobs remaining to be processed */
-static int max_runjobs; /* max jobs in runjobs array */
-static RUNJOB *runjobs; /* array of jobs to be run */
-
/*********************************************************************
*
* Main Bacula Scheduler
*
*/
-JCR *wait_for_next_job(char *job_to_run)
+JCR *wait_for_next_job(char *one_shot_job_to_run)
{
JCR *jcr;
JOB *job;
RUN *run;
- time_t now, runtime, nexttime;
- int jobindex, i;
- static int first = TRUE;
- char dt[MAX_TIME_LENGTH];
+ time_t now;
+ static bool first = true;
+ job_item *next_job = NULL;
Dmsg0(200, "Enter wait_for_next_job\n");
if (first) {
- first = FALSE;
- max_runjobs = 10;
- runjobs = (RUNJOB *) malloc(sizeof(RUNJOB) * max_runjobs);
- num_runjobs = 0;
- rem_runjobs = 0;
- if (job_to_run) { /* one shot */
- job = (JOB *)GetResWithName(R_JOB, job_to_run);
+ first = false;
+ /* Create scheduled jobs list */
+ jobs_to_run = new dlist(next_job, &next_job->link);
+ if (one_shot_job_to_run) { /* one shot */
+ job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
if (!job) {
- Emsg1(M_ABORT, 0, _("Job %s not found\n"), job_to_run);
+ Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
}
- Dmsg1(5, "Found job_to_run %s\n", job_to_run);
+ Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
jcr = new_jcr(sizeof(JCR), dird_free_jcr);
set_jcr_defaults(jcr, job);
return jcr;
/* Wait until we have something in the
* next hour or so.
*/
- while (rem_runjobs == 0) {
+ while (jobs_to_run->empty()) {
find_runs();
- if (rem_runjobs > 0) {
+ if (!jobs_to_run->empty()) {
break;
}
- bmicrosleep(60, 0); /* recheck once per minute */
+ bmicrosleep(NEXT_CHECK_SECS, 0); /* recheck once per minute */
}
+#ifdef list_chain
+ job_item *je;
+ foreach_dlist(je, jobs_to_run) {
+ dump_job(je, "Walk queue");
+ }
+#endif
/*
- * Sort through what is to be run in the next
- * two hours to find the first job to be run,
- * then wait around until it is time.
- *
+ * Pull the first job to run (already sorted by runtime and
+ * Priority, then wait around until it is time to run it.
*/
- time(&now);
- nexttime = now + 60 * 60 * 24; /* a much later time */
- jobindex = -1;
- bstrftime(dt, sizeof(dt), now);
- Dmsg2(400, "jobs=%d. Now is %s\n", rem_runjobs, dt);
- for (i=0; i<num_runjobs; i++) {
- runtime = runjobs[i].runtime;
- if (runtime > 0 && runtime < nexttime) { /* find minimum time job */
- nexttime = runtime;
- jobindex = i;
- }
-#ifdef xxxx_debug
- if (runtime > 0) {
- bstrftime(dt, sizeof(dt), runjobs[i].runtime);
- Dmsg2(100, " %s run %s\n", dt, runjobs[i].job->hdr.name);
- }
-#endif
- }
- if (jobindex < 0) { /* we really should have something now */
+ next_job = (job_item *)jobs_to_run->first();
+ jobs_to_run->remove(next_job);
+
+ dump_job(next_job, "Dequeued job");
+
+ if (!next_job) { /* we really should have something now */
Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
}
for (;;) {
time_t twait;
now = time(NULL);
- twait = nexttime - now;
+ twait = next_job->runtime - now;
if (twait <= 0) { /* time to run it */
break;
}
bmicrosleep(twait, 0);
}
- run = runjobs[jobindex].run;
- job = runjobs[jobindex].job;
- runjobs[jobindex].runtime = 0; /* remove from list */
- run->last_run = now; /* mark as run */
- rem_runjobs--; /* decrement count of remaining jobs */
+ run = next_job->run; /* pick up needed values */
+ job = next_job->job;
+ run->last_run = now; /* mark as run now */
+
+ dump_job(next_job, "Run job");
+
+ free(next_job);
jcr = new_jcr(sizeof(JCR), dird_free_jcr);
ASSERT(job);
if (run->pool) {
jcr->pool = run->pool; /* override pool */
}
+ if (run->full_pool) {
+ jcr->pool = run->full_pool; /* override full pool */
+ }
+ if (run->inc_pool) {
+ jcr->pool = run->inc_pool; /* override inc pool */
+ }
+ if (run->dif_pool) {
+ jcr->pool = run->dif_pool; /* override dif pool */
+ }
if (run->storage) {
jcr->store = run->storage; /* override storage */
}
*/
void term_scheduler()
{
- if (runjobs) { /* free allocated memory */
- free(runjobs);
- runjobs = NULL;
- max_runjobs = 0;
+ if (jobs_to_run) {
+ job_item *je;
+ /* Release all queued job entries to be run */
+ foreach_dlist(je, jobs_to_run) {
+ free(je);
+ }
+ delete jobs_to_run;
}
}
-
/*
- * Find all jobs to be run this hour
- * and the next hour.
+ * Find all jobs to be run this hour and the next hour.
*/
static void find_runs()
{
- time_t now, runtime;
+ time_t now, next_hour, runtime;
RUN *run;
JOB *job;
SCHED *sched;
struct tm tm;
- int hour, next_hour, minute, mday, wday, month, wpos;
+ int minute;
+ int hour, mday, wday, month, wom, woy;
+ /* Items corresponding to above at the next hour */
+ int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_year;
Dmsg0(200, "enter find_runs()\n");
- num_runjobs = 0;
+
+ /* compute values for time now */
now = time(NULL);
localtime_r(&now, &tm);
-
hour = tm.tm_hour;
- next_hour = hour + 1;
- if (next_hour > 23)
- next_hour -= 24;
minute = tm.tm_min;
mday = tm.tm_mday - 1;
wday = tm.tm_wday;
month = tm.tm_mon;
- wpos = (tm.tm_mday - 1) / 7;
+ wom = mday / 7;
+ woy = tm_woy(now); /* get week of year */
+
+ /*
+ * Compute values for next hour from now.
+ * We do this to be sure we don't miss a job while
+ * sleeping.
+ */
+ next_hour = now + 3600;
+ localtime_r(&next_hour, &tm);
+ nh_hour = tm.tm_hour;
+ nh_mday = tm.tm_mday - 1;
+ nh_wday = tm.tm_wday;
+ nh_month = tm.tm_mon;
+ nh_year = tm.tm_year;
+ nh_wom = nh_mday / 7;
+ nh_woy = tm_woy(now); /* get week of year */
/* Loop through all jobs */
LockRes();
- for (job=NULL; (job=(JOB *)GetNextRes(R_JOB, (RES *)job)); ) {
+ foreach_res(job, R_JOB) {
sched = job->schedule;
if (sched == NULL) { /* scheduled? */
continue; /* no, skip this job */
}
+ Dmsg1(200, "Got job: %s\n", job->hdr.name);
for (run=sched->run; run; run=run->next) {
-
- /* Find runs scheduled in this our or in the
- * next hour (we may be one second before the next hour).
+ bool run_now, run_nh;
+ /*
+ * Find runs scheduled between now and the next hour.
*/
- if ((bit_is_set(hour, run->hour) || bit_is_set(next_hour, run->hour)) &&
- (bit_is_set(mday, run->mday) || bit_is_set(wday, run->wday)) &&
- bit_is_set(month, run->month) && bit_is_set(wpos, run->wpos)) {
-
- /* find time (time_t) job is to be run */
- localtime_r(&now, &tm);
- tm.tm_min = run->minute;
- tm.tm_sec = 0;
- if (bit_is_set(hour, run->hour)) {
- runtime = mktime(&tm);
- add_job(job, run, now, runtime);
- }
- if (bit_is_set(next_hour, run->hour)) {
- tm.tm_hour++;
- if (tm.tm_hour > 23) {
- tm.tm_hour = 0;
- }
- runtime = mktime(&tm);
- add_job(job, run, now, runtime);
- }
+#ifdef xxxx
+ Dmsg0(000, "\n");
+ Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
+ hour, month, mday, wday, wom, woy);
+ Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
+ bit_is_set(hour, run->hour),
+ bit_is_set(month, run->month),
+ bit_is_set(mday, run->mday),
+ bit_is_set(wday, run->wday),
+ bit_is_set(wom, run->wom),
+ bit_is_set(woy, run->woy));
+
+ Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
+ nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
+ Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
+ bit_is_set(nh_hour, run->hour),
+ bit_is_set(nh_month, run->month),
+ bit_is_set(nh_mday, run->mday),
+ bit_is_set(nh_wday, run->wday),
+ bit_is_set(nh_wom, run->wom),
+ bit_is_set(nh_woy, run->woy));
+#endif
+
+ run_now = bit_is_set(hour, run->hour) &&
+ (bit_is_set(mday, run->mday) || bit_is_set(wday, run->wday)) &&
+ bit_is_set(month, run->month) &&
+ bit_is_set(wom, run->wom) &&
+ bit_is_set(woy, run->woy);
+
+ run_nh = bit_is_set(nh_hour, run->hour) &&
+ (bit_is_set(nh_mday, run->mday) || bit_is_set(nh_wday, run->wday)) &&
+ bit_is_set(nh_month, run->month) &&
+ bit_is_set(nh_wom, run->wom) &&
+ bit_is_set(nh_woy, run->woy);
+
+ Dmsg2(200, "run_now=%d run_nh=%d\n", run_now, run_nh);
+
+ /* find time (time_t) job is to be run */
+ localtime_r(&now, &tm); /* reset tm structure */
+ tm.tm_min = run->minute; /* set run minute */
+ tm.tm_sec = 0; /* zero secs */
+ if (run_now) {
+ runtime = mktime(&tm);
+ add_job(job, run, now, runtime);
+ }
+ /* If job is to be run in the next hour schedule it */
+ if (run_nh) {
+ /* Set correct values */
+ tm.tm_hour = nh_hour;
+ tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */
+ tm.tm_mon = nh_month;
+ tm.tm_year = nh_year;
+ runtime = mktime(&tm);
+ add_job(job, run, now, runtime);
}
}
}
-
UnlockRes();
- rem_runjobs = num_runjobs;
Dmsg0(200, "Leave find_runs()\n");
}
static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
{
+ job_item *ji;
+ bool inserted = false;
/*
* Don't run any job that ran less than a minute ago, but
* do run any job scheduled less than a minute ago.
*/
- if ((runtime - run->last_run < 61) || (runtime+59 < now)) {
+ if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
+#ifdef PHIL
+ char dt[50], dt1[50], dt2[50];
+ bstrftime_nc(dt, sizeof(dt), runtime);
+ bstrftime_nc(dt1, sizeof(dt1), run->last_run);
+ bstrftime_nc(dt2, sizeof(dt2), now);
+ Dmsg4(000, "Drop: Job=\"%s\" run=%s. last_run=%s. now=%s\n", job->hdr.name,
+ dt, dt1, dt2);
+ fflush(stdout);
+#endif
return;
}
-
- /* Make sure array is big enough */
- if (num_runjobs == max_runjobs) {
- max_runjobs += 10;
- runjobs = (RUNJOB *)realloc(runjobs, sizeof(RUNJOB) * max_runjobs);
- if (!runjobs)
- Emsg0(M_ABORT, 0, _("Out of memory\n"));
- }
/* accept to run this job */
- runjobs[num_runjobs].run = run;
- runjobs[num_runjobs].job = job;
- runjobs[num_runjobs++].runtime = runtime; /* when to run it */
+ job_item *je = (job_item *)malloc(sizeof(job_item));
+ je->run = run;
+ je->job = job;
+ je->runtime = runtime;
+ if (run->Priority) {
+ je->Priority = run->Priority;
+ } else {
+ je->Priority = job->Priority;
+ }
+
+ /* Add this job to the wait queue in runtime, priority sorted order */
+ foreach_dlist(ji, jobs_to_run) {
+ if (ji->runtime > je->runtime ||
+ (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
+ jobs_to_run->insert_before(je, ji);
+ dump_job(je, "Inserted job");
+ inserted = true;
+ break;
+ }
+ }
+ /* If place not found in queue, append it */
+ if (!inserted) {
+ jobs_to_run->append(je);
+ dump_job(je, "Appended job");
+ }
+ foreach_dlist(ji, jobs_to_run) {
+ dump_job(ji, "Run queue");
+ }
+ Dmsg0(000, "End run queue\n");
+}
+
+static void dump_job(job_item *ji, char *msg)
+{
+#ifdef PHIL
+ char dt[MAX_TIME_LENGTH];
+ int save_debug = debug_level;
+ debug_level = 200;
+ if (debug_level < 200) {
+ return;
+ }
+ bstrftime_nc(dt, sizeof(dt), ji->runtime);
+ Dmsg4(200, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name,
+ ji->Priority, dt);
+ fflush(stdout);
+ debug_level = save_debug;
+#endif
}