]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/dird/scheduler.c
Implement MaxVirtualFullInterval
[bacula/bacula] / bacula / src / dird / scheduler.c
index 5633fb3294da4e9445060eb61f00ad9f39c8a410..5108142a2d3a6b11e174c063588f7ebd40278225 100644 (file)
+/*
+   Bacula(R) - The Network Backup Solution
+
+   Copyright (C) 2000-2015 Kern Sibbald
+
+   The original author of Bacula is Kern Sibbald, with contributions
+   from many others, a complete list can be found in the file AUTHORS.
+
+   You may use this file and others of this release according to the
+   license defined in the LICENSE file, which includes the Affero General
+   Public License, v3.0 ("AGPLv3") and some additional permissions and
+   terms pursuant to its AGPLv3 Section 7.
+
+   This notice must be preserved when any source code is 
+   conveyed and/or propagated.
+
+   Bacula(R) is a registered trademark of Kern Sibbald.
+*/
 /*
  *
  *   Bacula scheduler
  *     It looks at what jobs are to be run and when
- *     and waits around until it is time to 
+ *     and waits around until it is time to
  *     fire them up.
  *
- *     Kern Sibbald, May MM
+ *     Kern Sibbald, May MM, major revision December MMIII
  *
- *   Version $Id$
  */
-/*
-   Copyright (C) 2000-2003 Kern Sibbald and John Walker
 
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License as
-   published by the Free Software Foundation; either version 2 of
-   the License, or (at your option) any later version.
+#include "bacula.h"
+#include "dird.h"
 
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-   General Public License for more details.
+#if 0
+#define SCHED_DEBUG
+#define DBGLVL 0
+#else
+#undef SCHED_DEBUG
+#define DBGLVL DT_SCHEDULER|200
+#endif
 
-   You should have received a copy of the GNU General Public
-   License along with this program; if not, write to the Free
-   Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
-   MA 02111-1307, USA.
+const int dbglvl = DBGLVL;
 
- */
+/* Local variables */
+struct job_item {
+   RUN *run;
+   JOB *job;
+   time_t runtime;
+   int Priority;
+   dlink link;                        /* link for list */
+};
 
-#include "bacula.h"
-#include "dird.h"
+/* List of jobs to be run. They were scheduled in this hour or the next */
+static dlist *jobs_to_run;               /* list of jobs to be run */
 
+/* Time interval in secs to sleep if nothing to be run */
+static int const next_check_secs = 60;
 
 /* Forward referenced subroutines */
 static void find_runs();
 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
+static void dump_job(job_item *ji, const char *msg);
 
 /* Imported subroutines */
 
 /* Imported variables */
 
-/* Local variables */
-typedef struct {
-   RUN *run;
-   JOB *job;
-   time_t runtime;
-} RUNJOB;
-
-static int num_runjobs;              /* total jobs found by find_runs() */
-static int rem_runjobs;              /* jobs remaining to be processed */
-static int max_runjobs;              /* max jobs in runjobs array */
-static RUNJOB *runjobs;              /* array of jobs to be run */
-
+/*
+ * called by reload_config to tell us that the schedules
+ * we may have based our next jobs to run queues have been
+ * invalidated.  In fact the schedules may not have changed
+ * but the run object that we have recorded the last_run time
+ * on are new and no longer have a valid last_run time which
+ * causes us to double run schedules that get put into the list
+ * because run_nh = 1.
+ */
+static bool schedules_invalidated = false;
+void invalidate_schedules(void) {
+    schedules_invalidated = true;
+}
 
 /*********************************************************************
  *
- *        Main Bacula Scheduler
+ *         Main Bacula Scheduler
  *
  */
-JCR *wait_for_next_job(char *job_to_run)
+JCR *wait_for_next_job(char *one_shot_job_to_run)
 {
    JCR *jcr;
    JOB *job;
    RUN *run;
-   time_t now, runtime, nexttime;
-   int jobindex, i;
-   static int first = TRUE;
-   char dt[MAX_TIME_LENGTH];
+   time_t now, prev;
+   static bool first = true;
+   job_item *next_job = NULL;
 
-   Dmsg0(200, "Enter wait_for_next_job\n");
+   Dmsg0(dbglvl, "Enter wait_for_next_job\n");
    if (first) {
-      first = FALSE;
-      max_runjobs = 10;
-      runjobs = (RUNJOB *) malloc(sizeof(RUNJOB) * max_runjobs);
-      num_runjobs = 0;
-      rem_runjobs = 0;
-      if (job_to_run) {              /* one shot */
-        job = (JOB *)GetResWithName(R_JOB, job_to_run);
-        if (!job) {
-            Emsg1(M_ABORT, 0, _("Job %s not found\n"), job_to_run);
-        }
-         Dmsg1(5, "Found job_to_run %s\n", job_to_run);
-        jcr = new_jcr(sizeof(JCR), dird_free_jcr);
-        set_jcr_defaults(jcr, job);
-        return jcr;
+      first = false;
+      /* Create scheduled jobs list */
+      jobs_to_run = New(dlist(next_job, &next_job->link));
+      if (one_shot_job_to_run) {            /* one shot */
+         job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
+         if (!job) {
+            Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
+         }
+         Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
+         jcr = new_jcr(sizeof(JCR), dird_free_jcr);
+         set_jcr_defaults(jcr, job);
+         return jcr;
       }
    }
+
    /* Wait until we have something in the
     * next hour or so.
     */
-   while (rem_runjobs == 0) {
+again:
+   while (jobs_to_run->empty()) {
       find_runs();
-      if (rem_runjobs > 0) {
-        break;
+      if (!jobs_to_run->empty()) {
+         break;
       }
-      bmicrosleep(60, 0);            /* recheck once per minute */
+      bmicrosleep(next_check_secs, 0); /* recheck once per minute */
    }
 
-   /* 
-    * Sort through what is to be run in the next
-    * two hours to find the first job to be run,
-    * then wait around until it is time.
-    *
-    */
-   time(&now);
-   nexttime = now + 60 * 60 * 24;     /* a much later time */
-   jobindex = -1;
-   bstrftime(dt, sizeof(dt), now);
-   Dmsg2(400, "jobs=%d. Now is %s\n", rem_runjobs, dt);
-   for (i=0; i<num_runjobs; i++) {
-      runtime = runjobs[i].runtime;
-      if (runtime > 0 && runtime < nexttime) { /* find minimum time job */
-        nexttime = runtime;
-        jobindex = i;
-      }
-#ifdef xxxx_debug
-      if (runtime > 0) {
-        bstrftime(dt, sizeof(dt), runjobs[i].runtime);  
-         Dmsg2(100, "    %s run %s\n", dt, runjobs[i].job->hdr.name);
-      }
-#endif
+#ifdef  list_chain
+   job_item *je;
+   foreach_dlist(je, jobs_to_run) {
+      dump_job(je, _("Walk queue"));
    }
-   if (jobindex < 0) {               /* we really should have something now */
+#endif
+   /*
+    * Pull the first job to run (already sorted by runtime and
+    *  Priority, then wait around until it is time to run it.
+    */
+   next_job = (job_item *)jobs_to_run->first();
+   jobs_to_run->remove(next_job);
+
+   dump_job(next_job, _("Dequeued job"));
+
+   if (!next_job) {                /* we really should have something now */
       Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
    }
 
    /* Now wait for the time to run the job */
    for (;;) {
       time_t twait;
+      /** discard scheduled queue and rebuild with new schedule objects. **/
+      lock_jobs();
+      if (schedules_invalidated) {
+          dump_job(next_job, "Invalidated job");
+          free(next_job);
+          while (!jobs_to_run->empty()) {
+              next_job = (job_item *)jobs_to_run->first();
+              jobs_to_run->remove(next_job);
+              dump_job(next_job, "Invalidated job");
+              free(next_job);
+          }
+          schedules_invalidated = false;
+          unlock_jobs();
+          goto again;
+      }
+      unlock_jobs();
+      prev = now = time(NULL);
+      twait = next_job->runtime - now;
+      if (twait <= 0) {               /* time to run it */
+         break;
+      }
+      /* Recheck at least once per minute */
+      bmicrosleep((next_check_secs < twait)?next_check_secs:twait, 0);
+      /* Attempt to handle clock shift (but not daylight savings time changes)
+       * we allow a skew of 10 seconds before invalidating everything.
+       */
       now = time(NULL);
-      twait = nexttime - now;
-      if (twait <= 0) {              /* time to run it */
-        break;
+      if (now < prev-10 || now > (prev+next_check_secs+10)) {
+         schedules_invalidated = true;
       }
-      bmicrosleep(twait, 0);
    }
-   run = runjobs[jobindex].run;
-   job = runjobs[jobindex].job;
-   runjobs[jobindex].runtime = 0;     /* remove from list */
-   run->last_run = now;              /* mark as run */
-   rem_runjobs--;                    /* decrement count of remaining jobs */
-
    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
+   run = next_job->run;               /* pick up needed values */
+   job = next_job->job;
+   if (job->enabled && (!job->client || job->client->enabled)) {
+      dump_job(next_job, _("Run job"));  /* no client and job enabled */
+   }
+   free(next_job);
+   if (!job->enabled || (job->client && !job->client->enabled)) {
+      free_jcr(jcr);
+      goto again;                     /* ignore this job */
+   }
+   run->last_run = now;               /* mark as run now */
+
    ASSERT(job);
    set_jcr_defaults(jcr, job);
    if (run->level) {
-      jcr->JobLevel = run->level;       /* override run level */
+      jcr->setJobLevel(run->level);  /* override run level */
    }
    if (run->pool) {
-      jcr->pool = run->pool;         /* override pool */
+      jcr->pool = run->pool;          /* override pool */
+      jcr->run_pool_override = true;
+   }
+   if (run->next_pool) {
+      jcr->next_pool = run->next_pool; /* override next pool */
+      jcr->run_next_pool_override = true;
+   }
+   if (run->full_pool) {
+      jcr->full_pool = run->full_pool; /* override full pool */
+      jcr->run_full_pool_override = true;
+   }
+   if (run->vfull_pool) {
+      jcr->vfull_pool = run->vfull_pool; /* override virtual full pool */
+      jcr->run_vfull_pool_override = true;
+   }
+   if (run->inc_pool) {
+      jcr->inc_pool = run->inc_pool;  /* override inc pool */
+      jcr->run_inc_pool_override = true;
+   }
+   if (run->diff_pool) {
+      jcr->diff_pool = run->diff_pool;  /* override dif pool */
+      jcr->run_diff_pool_override = true;
    }
    if (run->storage) {
-      jcr->store = run->storage;      /* override storage */
+      USTORE store;
+      store.store = run->storage;
+      pm_strcpy(store.store_source, _("run override"));
+      set_rwstorage(jcr, &store);     /* override storage */
    }
    if (run->msgs) {
       jcr->messages = run->msgs;      /* override messages */
@@ -160,113 +228,228 @@ JCR *wait_for_next_job(char *job_to_run)
    if (run->Priority) {
       jcr->JobPriority = run->Priority;
    }
-   Dmsg0(200, "Leave wait_for_next_job()\n");
+   if (run->spool_data_set) {
+      jcr->spool_data = run->spool_data;
+   }
+   if (run->accurate_set) {     /* overwrite accurate mode */
+      jcr->accurate = run->accurate;
+   }
+   if (run->write_part_after_job_set) {
+      jcr->write_part_after_job = run->write_part_after_job;
+   }
+   if (run->MaxRunSchedTime_set) {
+      jcr->MaxRunSchedTime = run->MaxRunSchedTime;
+   }
+   Dmsg0(dbglvl, "Leave wait_for_next_job()\n");
    return jcr;
 }
 
 
 /*
- * Shutdown the scheduler  
+ * Shutdown the scheduler
  */
 void term_scheduler()
 {
-   if (runjobs) {                    /* free allocated memory */
-      free(runjobs);
-      runjobs = NULL;
-      max_runjobs = 0;
+   if (jobs_to_run) {
+      delete jobs_to_run;
    }
 }
 
-
-/*         
- * Find all jobs to be run this hour
- * and the next hour.
+/*
+ * Find all jobs to be run this hour and the next hour.
  */
 static void find_runs()
 {
-   time_t now, runtime;
+   time_t now, next_hour, runtime;
    RUN *run;
    JOB *job;
    SCHED *sched;
    struct tm tm;
-   int hour, next_hour, minute, mday, wday, month, wpos;
+   int hour, mday, wday, month, wom, woy, ldom;
+   /* Items corresponding to above at the next hour */
+   int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_ldom;
 
-   Dmsg0(200, "enter find_runs()\n");
-   num_runjobs = 0;
+   Dmsg0(dbglvl, "enter find_runs()\n");
 
+   /* compute values for time now */
    now = time(NULL);
-   localtime_r(&now, &tm);
-   
+   (void)localtime_r(&now, &tm);
    hour = tm.tm_hour;
-   next_hour = hour + 1;
-   if (next_hour > 23)
-      next_hour -= 24;
-   minute = tm.tm_min;
    mday = tm.tm_mday - 1;
    wday = tm.tm_wday;
    month = tm.tm_mon;
-   wpos = (tm.tm_mday - 1) / 7; 
+   wom = mday / 7;
+   woy = tm_woy(now);                     /* get week of year */
+   ldom = tm_ldom(month, tm.tm_year + 1900);
+
+   Dmsg7(dbglvl, "now = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
+         now, hour, month, mday, wday, wom, woy);
+
+   /*
+    * Compute values for next hour from now.
+    * We do this to be sure we don't miss a job while
+    * sleeping.
+    */
+   next_hour = now + 3600;
+   (void)localtime_r(&next_hour, &tm);
+   nh_hour = tm.tm_hour;
+   nh_mday = tm.tm_mday - 1;
+   nh_wday = tm.tm_wday;
+   nh_month = tm.tm_mon;
+   nh_wom = nh_mday / 7;
+   nh_woy = tm_woy(next_hour);              /* get week of year */
+   nh_ldom = tm_ldom(nh_month, tm.tm_year + 1900);
+
+   Dmsg7(dbglvl, "nh = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
+         next_hour, nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
 
    /* Loop through all jobs */
    LockRes();
-   for (job=NULL; (job=(JOB *)GetNextRes(R_JOB, (RES *)job)); ) {
+   foreach_res(job, R_JOB) {
       sched = job->schedule;
-      if (sched == NULL) {           /* scheduled? */
-        continue;                    /* no, skip this job */
+      if (!sched || !job->enabled || (sched && !sched->enabled) ||
+         (job->client && !job->client->enabled)) {
+         continue;                    /* no, skip this job */
       }
+      Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name);
       for (run=sched->run; run; run=run->next) {
+         bool run_now, run_nh;
+         /*
+          * Find runs scheduled between now and the next hour.
+          */
+#ifdef xxxx
+         Dmsg0(000, "\n");
+         Dmsg7(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
+            hour, month, mday, wday, wom, woy, ldom);
+         Dmsg7(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
+            bit_is_set(hour, run->hour),
+            bit_is_set(month, run->month),
+            bit_is_set(mday, run->mday),
+            bit_is_set(wday, run->wday),
+            bit_is_set(wom, run->wom),
+            bit_is_set(woy, run->woy),
+            bit_is_set(31, run->mday));
 
-        /* Find runs scheduled in this our or in the
-         * next hour (we may be one second before the next hour).
-         */
-        if ((bit_is_set(hour, run->hour) || bit_is_set(next_hour, run->hour)) &&
-            (bit_is_set(mday, run->mday) || bit_is_set(wday, run->wday)) && 
-            bit_is_set(month, run->month) && bit_is_set(wpos, run->wpos)) {
-
-           /* find time (time_t) job is to be run */
-           localtime_r(&now, &tm);
-           tm.tm_min = run->minute;
-           tm.tm_sec = 0;
-           if (bit_is_set(hour, run->hour)) {
-              runtime = mktime(&tm);
-              add_job(job, run, now, runtime);
-           }
-           if (bit_is_set(next_hour, run->hour)) {
-              tm.tm_hour++;
-              if (tm.tm_hour > 23) {
-                 tm.tm_hour = 0;
-              }
-              runtime = mktime(&tm);
-              add_job(job, run, now, runtime);
-           }
-        }
-      }  
-   }
 
+         Dmsg7(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
+            nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy, nh_ldom);
+         Dmsg7(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
+            bit_is_set(nh_hour, run->hour),
+            bit_is_set(nh_month, run->month),
+            bit_is_set(nh_mday, run->mday),
+            bit_is_set(nh_wday, run->wday),
+            bit_is_set(nh_wom, run->wom),
+            bit_is_set(nh_woy, run->woy),
+            bit_is_set(31, run->mday));
+#endif
+
+         run_now = bit_is_set(hour, run->hour) &&
+            ((bit_is_set(mday, run->mday) &&
+              bit_is_set(wday, run->wday) &&
+              bit_is_set(month, run->month) &&
+              bit_is_set(wom, run->wom) &&
+              bit_is_set(woy, run->woy)) ||
+             (bit_is_set(month, run->month) &&
+              bit_is_set(31, run->mday) && mday == ldom));
+
+         run_nh = bit_is_set(nh_hour, run->hour) &&
+            ((bit_is_set(nh_mday, run->mday) &&
+              bit_is_set(nh_wday, run->wday) &&
+              bit_is_set(nh_month, run->month) &&
+              bit_is_set(nh_wom, run->wom) &&
+              bit_is_set(nh_woy, run->woy)) ||
+             (bit_is_set(nh_month, run->month) &&
+              bit_is_set(31, run->mday) && nh_mday == nh_ldom));
+
+         Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh);
+
+         if (run_now || run_nh) {
+           /* find time (time_t) job is to be run */
+           (void)localtime_r(&now, &tm);      /* reset tm structure */
+           tm.tm_min = run->minute;     /* set run minute */
+           tm.tm_sec = 0;               /* zero secs */
+           runtime = mktime(&tm);
+           if (run_now) {
+             add_job(job, run, now, runtime);
+           }
+           /* If job is to be run in the next hour schedule it */
+           if (run_nh) {
+             add_job(job, run, now, runtime + 3600);
+           }
+         }
+      }
+   }
    UnlockRes();
-   rem_runjobs = num_runjobs;
-   Dmsg0(200, "Leave find_runs()\n");
+   Dmsg0(dbglvl, "Leave find_runs()\n");
 }
 
 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
 {
+   job_item *ji;
+   bool inserted = false;
    /*
     * Don't run any job that ran less than a minute ago, but
     *  do run any job scheduled less than a minute ago.
     */
-   if ((runtime - run->last_run < 61) || (runtime+59 < now)) {
+   if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
+#ifdef SCHED_DEBUG
+      Dmsg4(000, "Drop: Job=\"%s\" run=%lld. last_run=%lld. now=%lld\n", job->hdr.name,
+            (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
+      fflush(stdout);
+#endif
       return;
    }
-
-   /* Make sure array is big enough */
-   if (num_runjobs == max_runjobs) {
-      max_runjobs += 10;
-      runjobs = (RUNJOB *)realloc(runjobs, sizeof(RUNJOB) * max_runjobs);
-      if (!runjobs)
-         Emsg0(M_ABORT, 0, _("Out of memory\n"));
-   } 
+#ifdef SCHED_DEBUG
+   Dmsg4(000, "Add: Job=\"%s\" run=%lld last_run=%lld now=%lld\n", job->hdr.name,
+            (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
+#endif
    /* accept to run this job */
-   runjobs[num_runjobs].run = run;
-   runjobs[num_runjobs].job = job;
-   runjobs[num_runjobs++].runtime = runtime; /* when to run it */
+   job_item *je = (job_item *)malloc(sizeof(job_item));
+   je->run = run;
+   je->job = job;
+   je->runtime = runtime;
+   if (run->Priority) {
+      je->Priority = run->Priority;
+   } else {
+      je->Priority = job->Priority;
+   }
+
+   /* Add this job to the wait queue in runtime, priority sorted order */
+   foreach_dlist(ji, jobs_to_run) {
+      if (ji->runtime > je->runtime ||
+          (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
+         jobs_to_run->insert_before(je, ji);
+         dump_job(je, _("Inserted job"));
+         inserted = true;
+         break;
+      }
+   }
+   /* If place not found in queue, append it */
+   if (!inserted) {
+      jobs_to_run->append(je);
+      dump_job(je, _("Appended job"));
+   }
+#ifdef SCHED_DEBUG
+   foreach_dlist(ji, jobs_to_run) {
+      dump_job(ji, _("Run queue"));
+   }
+   Dmsg0(000, "End run queue\n");
+#endif
+}
+
+static void dump_job(job_item *ji, const char *msg)
+{
+#ifdef SCHED_DEBUG
+   char dt[MAX_TIME_LENGTH];
+   int64_t save_debug = debug_level;
+
+   if (!chk_dbglvl(dbglvl)) {
+      return;
+   }
+   bstrftime_nc(dt, sizeof(dt), ji->runtime);
+   Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name,
+      ji->Priority, dt);
+   fflush(stdout);
+   debug_level = save_debug;
+#endif
 }