]> git.sur5r.net Git - bacula/bacula/commitdiff
Fix rescheduling on error for new job queue code
authorKern Sibbald <kern@sibbald.com>
Mon, 21 Jul 2003 13:51:21 +0000 (13:51 +0000)
committerKern Sibbald <kern@sibbald.com>
Mon, 21 Jul 2003 13:51:21 +0000 (13:51 +0000)
git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@640 91ce42f0-d328-0410-95d8-f526ca767f89

bacula/src/dird/dird.c
bacula/src/dird/job.c
bacula/src/dird/jobq.c
bacula/src/dird/protos.h
bacula/src/dird/ua_cmds.c
bacula/src/dird/ua_run.c

index f2960894db4c6f2af0dedbd729c0c9872eba49be..f6f0213c71e661840c1878e3dae49dde855e2499 100644 (file)
@@ -44,7 +44,6 @@ extern void term_ua_server();
 extern int do_backup(JCR *jcr);
 extern void backup_cleanup(void);
 extern void start_UA_server(char *addr, int port);
-extern void run_job(JCR *jcr);
 extern void init_job_server(int max_workers);
 
 static char *configfile = NULL;
index e026dffbc69f8d88807d7e401f9eb537026f437d..75ea929ffbe15befc397d4216e52955f74a05b18 100644 (file)
 /* Forward referenced subroutines */
 static void *job_thread(void *arg);
 static char *edit_run_codes(JCR *jcr, char *omsg, char *imsg);
-static void release_resource_locks(JCR *jcr);
 static int acquire_resource_locks(JCR *jcr);
 #ifdef USE_SEMAPHORE
 static void backoff_resource_locks(JCR *jcr, int count);
+static void release_resource_locks(JCR *jcr);
 #endif
 
 /* Exported subroutines */
-void run_job(JCR *jcr);
 
 
 /* Imported subroutines */
@@ -58,9 +57,6 @@ static int waiting = 0;             /* count of waiting threads */
 #else
 #ifdef JOB_QUEUE  
 jobq_t job_queue;
-#else
-/* Queue of jobs to be run */
-workq_t job_wq;                  /* our job work queue */
 #endif
 #endif
 
@@ -83,11 +79,6 @@ void init_job_server(int max_workers)
    if ((stat = jobq_init(&job_queue, max_workers, job_thread)) != 0) {
       Emsg1(M_ABORT, 0, _("Could not init job queue: ERR=%s\n"), strerror(stat));
    }
-#else
-   /* This is the OLD work queue code to go away */
-   if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
-      Emsg1(M_ABORT, 0, _("Could not init job work queue: ERR=%s\n"), strerror(stat));
-   }
 #endif
 #endif
    return;
@@ -103,10 +94,6 @@ void run_job(JCR *jcr)
    int stat, errstat;
 #ifdef USE_SEMAPHORE
    pthread_t tid;
-#else
-#ifndef JOB_QUEUE
-   workq_ele_t *work_item;
-#endif
 #endif
 
    sm_check(__FILE__, __LINE__, True);
@@ -171,12 +158,6 @@ void run_job(JCR *jcr)
    if ((stat = jobq_add(&job_queue, jcr)) != 0) {
       Emsg1(M_ABORT, 0, _("Could not add job queue: ERR=%s\n"), strerror(stat));
    }
-#else
-   /* Queue the job to be run */
-   if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) {
-      Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat));
-   }
-   jcr->work_item = work_item;
 #endif
 #endif
    Dmsg0(200, "Done run_job()\n");
@@ -285,6 +266,7 @@ static void *job_thread(void *arg)
         }
       }
 bail_out:
+#ifndef JOB_QUEUE
       release_resource_locks(jcr);
       if (jcr->job->RescheduleOnError && 
          jcr->JobStatus != JS_Terminated &&
@@ -321,15 +303,16 @@ bail_out:
         njcr->messages = jcr->messages;
         run_job(njcr);
       }
+#endif
       break;
    }
 
+#ifndef JOB_QUEUE
    if (jcr->db) {
       Dmsg0(200, "Close DB\n");
       db_close_database(jcr, jcr->db);
       jcr->db = NULL;
    }
-#ifndef JOB_QUEUE
    free_jcr(jcr);
 #endif
    Dmsg0(50, "======== End Job ==========\n");
@@ -482,12 +465,12 @@ static void backoff_resource_locks(JCR *jcr, int count)
  *   there are any other jobs waiting, we wake them
  *   up so that they can try again.
  */
+#ifdef USE_SEMAPHORE
 static void release_resource_locks(JCR *jcr)
 {
    if (!jcr->acquired_resource_locks) {
       return;                        /* Job canceled, no locks acquired */
    }
-#ifdef USE_SEMAPHORE
    P(mutex);
    sem_unlock(&jcr->store->sem);
    sem_unlock(&jcr->client->sem);
@@ -498,8 +481,8 @@ static void release_resource_locks(JCR *jcr)
    }
    jcr->acquired_resource_locks = false;
    V(mutex);
-#endif
 }
+#endif
 
 /*
  * Get or create a Client record for this Job
index a16f6b48fafddbeab40e9f7b9156d020b90b8659..4496446b00dac3c494675060e7c71e6cf6e2d729 100755 (executable)
@@ -375,7 +375,9 @@ static void *jobq_server(void *arg)
        */
       Dmsg0(100, "Checking ready queue.\n");
       while (!jq->ready_jobs->empty() && !jq->quit) {
+        JCR *jcr;
         je = (jobq_item_t *)jq->ready_jobs->first(); 
+        jcr = je->jcr;
         jq->ready_jobs->remove(je);
         if (!jq->ready_jobs->empty()) {
             Dmsg0(100, "ready queue not empty start server\n");
@@ -384,14 +386,14 @@ static void *jobq_server(void *arg)
            }
         }
         jq->running_jobs->append(je);
-         Dmsg1(100, "Took jobid=%d from ready and appended to run\n", je->jcr->JobId);
+         Dmsg1(100, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
         if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
            return NULL;
         }
          /* Call user's routine here */
-         Dmsg1(100, "Calling user engine for jobid=%d\n", je->jcr->JobId);
+         Dmsg1(100, "Calling user engine for jobid=%d\n", jcr->JobId);
         jq->engine(je->jcr);
-         Dmsg1(100, "Back from user engine jobid=%d.\n", je->jcr->JobId);
+         Dmsg1(100, "Back from user engine jobid=%d.\n", jcr->JobId);
         if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
            free(je);                 /* release job entry */
            return NULL;
@@ -403,12 +405,56 @@ static void *jobq_server(void *arg)
          *  been acquired for jobs canceled before they were
          *  put into the ready queue.
          */
-        if (je->jcr->acquired_resource_locks) {
-           je->jcr->store->NumConcurrentJobs--;
-           je->jcr->client->NumConcurrentJobs--;
-           je->jcr->job->NumConcurrentJobs--;
+        if (jcr->acquired_resource_locks) {
+           jcr->store->NumConcurrentJobs--;
+           jcr->client->NumConcurrentJobs--;
+           jcr->job->NumConcurrentJobs--;
         }
-        free_jcr(je->jcr);
+
+        if (jcr->job->RescheduleOnError && 
+            jcr->JobStatus != JS_Terminated &&
+            jcr->JobStatus != JS_Canceled && 
+            jcr->job->RescheduleTimes > 0 && 
+            jcr->reschedule_count < jcr->job->RescheduleTimes) {
+
+            /*
+             * Reschedule this job by cleaning it up, but
+             *  reuse the same JobId if possible.
+             */
+           jcr->reschedule_count++;
+           jcr->sched_time = time(NULL) + jcr->job->RescheduleInterval;
+            Dmsg2(100, "Rescheduled Job %s to re-run in %d seconds.\n", jcr->Job,
+              (int)jcr->job->RescheduleInterval);
+           jcr->JobStatus = JS_Created; /* force new status */
+           dird_free_jcr(jcr);          /* partial cleanup old stuff */
+           if (jcr->JobBytes == 0) {
+              jobq_add(jq, jcr);     /* queue the job to run again */
+              free(je);              /* free the job entry */
+              continue;
+           }
+           /* 
+            * Something was actually backed up, so we cannot reuse
+            *   the old JobId or there will be database record
+            *   conflicts.  We now create a new job, copying the
+            *   appropriate fields.
+            */
+           JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
+           set_jcr_defaults(njcr, jcr->job);
+           njcr->reschedule_count = jcr->reschedule_count;
+           njcr->JobLevel = jcr->JobLevel;
+           njcr->JobStatus = jcr->JobStatus;
+           njcr->pool = jcr->pool;
+           njcr->store = jcr->store;
+           njcr->messages = jcr->messages;
+           run_job(njcr);
+        }
+        /* Clean up and release old jcr */
+        if (jcr->db) {
+            Dmsg0(200, "Close DB\n");
+           db_close_database(jcr, jcr->db);
+           jcr->db = NULL;
+        }
+        free_jcr(jcr);
         free(je);                    /* release job entry */
       }
       /*
index 6614050f806d542aa9e8610aa38d0a84247a15eb..e0dc391bf95d16950e5335781b15e22059cef7a4 100644 (file)
@@ -85,6 +85,7 @@ extern void set_jcr_defaults(JCR *jcr, JOB *job);
 extern void create_unique_job_name(JCR *jcr, char *base_name);
 extern void update_job_end_record(JCR *jcr);
 extern int get_or_create_client_record(JCR *jcr);
+extern void run_job(JCR *jcr);
 
 /* mountreq.c */
 extern void mount_request(JCR *jcr, BSOCK *bs, char *buf);
index 01099a10606ed9f72b227d53c9047cf82c689176..e8dfaaf34faf438987851eff7f64d0b29a96a125 100644 (file)
@@ -37,12 +37,8 @@ extern int r_first;
 extern int r_last;
 extern struct s_res resources[];
 extern char my_name[];
-#ifndef USE_SEMAPHORE 
 #ifdef JOB_QUEUE
 extern jobq_t job_queue;             /* job queue */
-#else
-extern workq_t job_wq;               /* work queue */
-#endif
 #endif
 
 extern char *list_pool;
@@ -435,14 +431,10 @@ static int cancelcmd(UAContext *ua, char *cmd)
       set_jcr_job_status(jcr, JS_Canceled);
       bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"),
              jcr->JobId, jcr->Job);
-#ifndef USE_SEMAPHORE
 #ifdef JOB_QUEUE
       jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */
-#else
-      workq_remove(&job_wq, jcr->work_item); /* attempt to remove it from queue */
-#endif
 #endif
-      free_jcr(jcr);
+      free_jcr(jcr);                 /* this decrements the use count only */
       return 1;
         
    default:
index 0144c45bd79c22828d66e0824b68367434192285..9a4a559dce2ef671ab54774c901de0202409900d 100644 (file)
@@ -31,7 +31,6 @@
 #include "dird.h"
 
 /* Imported subroutines */
-extern void run_job(JCR *jcr);
 
 /* Imported variables */
 extern struct s_jl joblevels[];
@@ -71,9 +70,11 @@ int runcmd(UAContext *ua, char *cmd)
       N_("replace"),
       N_("when"),
       N_("priority"),
-      N_("yes"),          /* 12 -- see below */
+      N_("yes"),          /* 12 -- if you change this change YES_POS too */
       NULL};
 
+#define YES_POS 12
+
    if (!open_db(ua)) {
       return 1;
    }
@@ -96,116 +97,116 @@ int runcmd(UAContext *ua, char *cmd)
       for (j=0; !found && kw[j]; j++) {
         if (strcasecmp(ua->argk[i], _(kw[j])) == 0) {
            /* Note, yes has no value, so do not err */
-           if (!ua->argv[i] && j != 11 /*yes*/) {
+           if (!ua->argv[i] && j != YES_POS /*yes*/) {
                bsendmsg(ua, _("Value missing for keyword %s\n"), ua->argk[i]);
               return 1;
            }
             Dmsg1(200, "Got keyword=%s\n", kw[j]);
            switch (j) {
-              case 0: /* job */
-                 if (job_name) {
-                     bsendmsg(ua, _("Job name specified twice.\n"));
-                    return 1;
-                 }
-                 job_name = ua->argv[i];
-                 found = True;
-                 break;
-              case 1: /* JobId */
-                 if (jid) {
-                     bsendmsg(ua, _("JobId specified twice.\n"));
-                    return 1;
-                 }
-                 jid = ua->argv[i];
-                 found = True;
-                 break;
-              case 2: /* client */
-                 if (client_name) {
-                     bsendmsg(ua, _("Client specified twice.\n"));
-                    return 1;
-                 }
-                 client_name = ua->argv[i];
-                 found = True;
-                 break;
-              case 3: /* fileset */
-                 if (fileset_name) {
-                     bsendmsg(ua, _("FileSet specified twice.\n"));
-                    return 1;
-                 }
-                 fileset_name = ua->argv[i];
-                 found = True;
-                 break;
-              case 4: /* level */
-                 if (level_name) {
-                     bsendmsg(ua, _("Level specified twice.\n"));
-                    return 1;
-                 }
-                 level_name = ua->argv[i];
-                 found = True;
-                 break;
-              case 5: /* storage */
-                 if (store_name) {
-                     bsendmsg(ua, _("Storage specified twice.\n"));
-                    return 1;
-                 }
-                 store_name = ua->argv[i];
-                 found = True;
-                 break;
-              case 6: /* pool */
-                 if (pool_name) {
-                     bsendmsg(ua, _("Pool specified twice.\n"));
-                    return 1;
-                 }
-                 pool_name = ua->argv[i];
-                 found = True;
-                 break;
-              case 7: /* where */
-                 if (where) {
-                     bsendmsg(ua, _("Where specified twice.\n"));
-                    return 1;
-                 }
-                 where = ua->argv[i];
-                 found = True;
-                 break;
-              case 8: /* bootstrap */
-                 if (bootstrap) {
-                     bsendmsg(ua, _("Bootstrap specified twice.\n"));
-                    return 1;
-                 }
-                 bootstrap = ua->argv[i];
-                 found = True;
-                 break;
-              case 9: /* replace */
-                 if (replace) {
-                     bsendmsg(ua, _("Replace specified twice.\n"));
-                    return 1;
-                 }
-                 replace = ua->argv[i];
-                 found = True;
-                 break;
-              case 10: /* When */
-                 if (when) {
-                     bsendmsg(ua, _("When specified twice.\n"));
-                    return 1;
-                 }
-                 when = ua->argv[i];
-                 found = True;
-                 break;
-              case 11:  /* Priority */
-                 if (Priority) {
-                     bsendmsg(ua, _("Priority specified twice.\n"));
-                    return 1;
-                 }
-                 Priority = atoi(ua->argv[i]);
-                 if (Priority <= 0) {
-                     bsendmsg(ua, _("Priority must be positive nonzero setting it to 10.\n"));
-                    Priority = 10;
-                 }
-                 break;
-              case 12: /* yes */
-                 found = True;
-                 break;
-              default:
-                 break;
+           case 0: /* job */
+              if (job_name) {
+                  bsendmsg(ua, _("Job name specified twice.\n"));
+                 return 1;
+              }
+              job_name = ua->argv[i];
+              found = True;
+              break;
+           case 1: /* JobId */
+              if (jid) {
+                  bsendmsg(ua, _("JobId specified twice.\n"));
+                 return 1;
+              }
+              jid = ua->argv[i];
+              found = True;
+              break;
+           case 2: /* client */
+              if (client_name) {
+                  bsendmsg(ua, _("Client specified twice.\n"));
+                 return 1;
+              }
+              client_name = ua->argv[i];
+              found = True;
+              break;
+           case 3: /* fileset */
+              if (fileset_name) {
+                  bsendmsg(ua, _("FileSet specified twice.\n"));
+                 return 1;
+              }
+              fileset_name = ua->argv[i];
+              found = True;
+              break;
+           case 4: /* level */
+              if (level_name) {
+                  bsendmsg(ua, _("Level specified twice.\n"));
+                 return 1;
+              }
+              level_name = ua->argv[i];
+              found = True;
+              break;
+           case 5: /* storage */
+              if (store_name) {
+                  bsendmsg(ua, _("Storage specified twice.\n"));
+                 return 1;
+              }
+              store_name = ua->argv[i];
+              found = True;
+              break;
+           case 6: /* pool */
+              if (pool_name) {
+                  bsendmsg(ua, _("Pool specified twice.\n"));
+                 return 1;
+              }
+              pool_name = ua->argv[i];
+              found = True;
+              break;
+           case 7: /* where */
+              if (where) {
+                  bsendmsg(ua, _("Where specified twice.\n"));
+                 return 1;
+              }
+              where = ua->argv[i];
+              found = True;
+              break;
+           case 8: /* bootstrap */
+              if (bootstrap) {
+                  bsendmsg(ua, _("Bootstrap specified twice.\n"));
+                 return 1;
+              }
+              bootstrap = ua->argv[i];
+              found = True;
+              break;
+           case 9: /* replace */
+              if (replace) {
+                  bsendmsg(ua, _("Replace specified twice.\n"));
+                 return 1;
+              }
+              replace = ua->argv[i];
+              found = True;
+              break;
+           case 10: /* When */
+              if (when) {
+                  bsendmsg(ua, _("When specified twice.\n"));
+                 return 1;
+              }
+              when = ua->argv[i];
+              found = True;
+              break;
+           case 11:  /* Priority */
+              if (Priority) {
+                  bsendmsg(ua, _("Priority specified twice.\n"));
+                 return 1;
+              }
+              Priority = atoi(ua->argv[i]);
+              if (Priority <= 0) {
+                  bsendmsg(ua, _("Priority must be positive nonzero setting it to 10.\n"));
+                 Priority = 10;
+              }
+              break;
+           case 12: /* yes */
+              found = True;
+              break;
+           default:
+              break;
            }
         } /* end strcase compare */
       } /* end keyword loop */