* and waits around until it is time to
* fire them up.
*
- * Kern Sibbald, May MM, revised December MMIII
+ * Kern Sibbald, May MM, major revision December MMIII
*
* Version $Id$
*/
#include "dird.h"
-/* Forward referenced subroutines */
-static void find_runs();
-static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
-
-/* Imported subroutines */
-
-/* Imported variables */
-
/* Local variables */
struct job_item {
RUN *run;
JOB *job;
time_t runtime;
+ int Priority;
dlink link; /* link for list */
};
static dlist *jobs_to_run; /* list of jobs to be run */
/* Time interval in secs to sleep if nothing to be run */
-#define NEXT_CHECK_SECS 60
+static int const NEXT_CHECK_SECS = 60;
+
+/* Forward referenced subroutines */
+static void find_runs();
+static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
+static void dump_job(job_item *ji, char *msg);
+
+/* Imported subroutines */
+
+/* Imported variables */
+
/*********************************************************************
*
JCR *jcr;
JOB *job;
RUN *run;
- time_t now, runtime, nexttime;
+ time_t now;
static bool first = true;
- char dt[MAX_TIME_LENGTH];
job_item *next_job = NULL;
Dmsg0(200, "Enter wait_for_next_job\n");
/* Wait until we have something in the
* next hour or so.
*/
- while (jobs_to_run->size() == 0) {
+ while (jobs_to_run->empty()) {
find_runs();
- if (jobs_to_run->size() > 0) {
+ if (!jobs_to_run->empty()) {
break;
}
bmicrosleep(NEXT_CHECK_SECS, 0); /* recheck once per minute */
}
+#define list_chain
+#ifdef list_chain
+ for (job_item *je=NULL; (je=(job_item *)jobs_to_run->next(je)); ) {
+ dump_job(je, "Walk queue");
+ }
+#endif
/*
- * Sort through what is to be run in the next
- * two hours to find the first job to be run,
- * then wait around until it is time.
- *
+ * Pull the first job to run (already sorted by runtime and
+ * Priority, then wait around until it is time to run it.
*/
- time(&now);
- nexttime = now + 60 * 60 * 24; /* a much later time */
- bstrftime(dt, sizeof(dt), now);
- Dmsg2(400, "jobs=%d. Now is %s\n", jobs_to_run->size(), dt);
- next_job = NULL;
- for (job_item *je=NULL; (je=(job_item *)jobs_to_run->next(je)); ) {
- runtime = je->runtime;
- if (runtime > 0 && runtime < nexttime) { /* find minimum time job */
- nexttime = runtime;
- next_job = je;
- }
+ next_job = (job_item *)jobs_to_run->first();
+ jobs_to_run->remove(next_job);
+
+ dump_job(next_job, "Dequeued job");
-#define xxxx_debug
-#ifdef xxxx_debug
- if (runtime > 0) {
- bstrftime(dt, sizeof(dt), next_job->runtime);
- Dmsg2(100, " %s run %s\n", dt, next_job->job->hdr.name);
- }
-#endif
- }
if (!next_job) { /* we really should have something now */
Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
}
for (;;) {
time_t twait;
now = time(NULL);
- twait = nexttime - now;
+ twait = next_job->runtime - now;
if (twait <= 0) { /* time to run it */
break;
}
}
run = next_job->run; /* pick up needed values */
job = next_job->job;
- jobs_to_run->remove(next_job); /* remove from list */
- run->last_run = now; /* mark as run */
+ run->last_run = now; /* mark as run now */
+
+ dump_job(next_job, "Run job");
+
+ free(next_job);
jcr = new_jcr(sizeof(JCR), dird_free_jcr);
ASSERT(job);
next_hour = now + 3600;
localtime_r(&next_hour, &tm);
nh_hour = tm.tm_hour;
- minute = tm.tm_min;
nh_mday = tm.tm_mday - 1;
nh_wday = tm.tm_wday;
nh_month = tm.tm_mon;
if (sched == NULL) { /* scheduled? */
continue; /* no, skip this job */
}
+ Dmsg1(200, "Got job: %s\n", job->hdr.name);
for (run=sched->run; run; run=run->next) {
bool run_now, run_nh;
-
- /* Find runs scheduled between now and the next
- * check time (typically 60 seconds)
+ /*
+ * Find runs scheduled between now and the next hour.
*/
+#ifdef xxxx
+ Dmsg0(000, "\n");
+ Dmsg6(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
+ hour, month, mday, wday, wom, woy);
+ Dmsg6(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
+ bit_is_set(hour, run->hour),
+ bit_is_set(month, run->month),
+ bit_is_set(mday, run->mday),
+ bit_is_set(wday, run->wday),
+ bit_is_set(wom, run->wom),
+ bit_is_set(woy, run->woy));
+
+ Dmsg6(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
+ nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
+ Dmsg6(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d\n",
+ bit_is_set(nh_hour, run->hour),
+ bit_is_set(nh_month, run->month),
+ bit_is_set(nh_mday, run->mday),
+ bit_is_set(nh_wday, run->wday),
+ bit_is_set(nh_wom, run->wom),
+ bit_is_set(nh_woy, run->woy));
+#endif
+
run_now = bit_is_set(hour, run->hour) &&
(bit_is_set(mday, run->mday) || bit_is_set(wday, run->wday)) &&
bit_is_set(month, run->month) &&
bit_is_set(nh_wom, run->wom) &&
bit_is_set(nh_woy, run->woy);
+ Dmsg2(200, "run_now=%d run_nh=%d\n", run_now, run_nh);
+
/* find time (time_t) job is to be run */
localtime_r(&now, &tm); /* reset tm structure */
tm.tm_min = run->minute; /* set run minute */
if (run_nh) {
/* Set correct values */
tm.tm_hour = nh_hour;
- tm.tm_mday = nh_mday;
+ tm.tm_mday = nh_mday + 1; /* fixup because we biased for tests above */
tm.tm_mon = nh_month;
tm.tm_year = nh_year;
runtime = mktime(&tm);
static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
{
+ job_item *ji = NULL;
+ bool inserted = false;
/*
* Don't run any job that ran less than a minute ago, but
* do run any job scheduled less than a minute ago.
*/
- if ((runtime - run->last_run < 61) || (runtime+59 < now)) {
+ if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
+#ifdef xxx
+ char dt[50], dt1[50];
+ bstrftime(dt, sizeof(dt), runtime);
+ strcpy(dt+7, dt+9); /* cut century */
+ bstrftime(dt1, sizeof(dt1), now);
+ strcpy(dt1+7, dt1+9); /* cut century */
+ Dmsg2(000, "runtime=%s now=%s\n", dt, dt1);
+#endif
return;
}
/* accept to run this job */
je->run = run;
je->job = job;
je->runtime = runtime;
- /* ***FIXME**** (enhancement) insert by sorted runtime */
- jobs_to_run->append(je);
+ if (run->Priority) {
+ je->Priority = run->Priority;
+ } else {
+ je->Priority = job->Priority;
+ }
+
+ /* Add this job to the wait queue in runtime, priority sorted order */
+ while ( (ji=(job_item *)jobs_to_run->next(ji)) ) {
+ if (ji->runtime > je->runtime ||
+ (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
+ jobs_to_run->insert_before(je, ji);
+ dump_job(je, "Inserted job");
+ inserted = true;
+ break;
+ }
+ }
+ /* If place not found in queue, append it */
+ if (!inserted) {
+ jobs_to_run->append(je);
+ dump_job(je, "Appended job");
+ }
+}
+
+static void dump_job(job_item *ji, char *msg)
+{
+ char dt[MAX_TIME_LENGTH];
+ if (debug_level < 200) {
+ return;
+ }
+ bstrftime(dt, sizeof(dt), ji->runtime);
+ strcpy(dt+7, dt+9); /* cut century */
+ Dmsg4(200, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name,
+ ji->Priority, dt);
}