]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/dird/jobq.c
- Add index file to JobId field of File records for PostgreSQL.
[bacula/bacula] / bacula / src / dird / jobq.c
index 112584fcd48be856f4802547e4242842f9b13762..35438cfcf7b3d940954ba5f400c9ba5dee20d47a 100755 (executable)
@@ -42,7 +42,9 @@
 
 
 /* Forward referenced functions */
-static void *jobq_server(void *arg);
+extern "C" void *jobq_server(void *arg);
+extern "C" void *sched_wait(void *arg);
+
 static int   start_server(jobq_t *jq);
 
 /*   
@@ -82,9 +84,9 @@ int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
    jq->engine = engine;              /* routine to run */
    jq->valid = JOBQ_VALID; 
    /* Initialize the job queues */
-   jq->waiting_jobs = new dlist(item, &item->link);
-   jq->running_jobs = new dlist(item, &item->link);
-   jq->ready_jobs = new dlist(item, &item->link);
+   jq->waiting_jobs = New(dlist(item, &item->link));
+   jq->running_jobs = New(dlist(item, &item->link));
+   jq->ready_jobs = New(dlist(item, &item->link));
    return 0;
 }
 
@@ -152,7 +154,8 @@ struct wait_pkt {
  *  most jobs are put into the job queue only when their
  *  scheduled time arives.
  */
-static void *sched_wait(void *arg)
+extern "C"  
+void *sched_wait(void *arg)
 {
    JCR *jcr = ((wait_pkt *)arg)->jcr;
    jobq_t *jq = ((wait_pkt *)arg)->jq;
@@ -160,11 +163,11 @@ static void *sched_wait(void *arg)
    Dmsg0(300, "Enter sched_wait.\n");
    free(arg);
    time_t wtime = jcr->sched_time - time(NULL);
+   set_jcr_job_status(jcr, JS_WaitStartTime);
    /* Wait until scheduled time arrives */
-   if (wtime > 0 && verbose) {
+   if (wtime > 0) {
       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"), 
         jcr->Job, wtime);
-      set_jcr_job_status(jcr, JS_WaitStartTime);
    }
    /* Check every 30 seconds if canceled */ 
    while (wtime > 0) {
@@ -186,7 +189,6 @@ static void *sched_wait(void *arg)
    return NULL;
 }
 
-
 /*
  *  Add a job to the queue
  *    jq is a queue that was created with jobq_init
@@ -210,17 +212,16 @@ int jobq_add(jobq_t *jq, JCR *jcr)
    }
 
    jcr->use_count++;                 /* mark jcr in use by us */
-
    Dmsg3(300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count);
    if (!job_canceled(jcr) && wtime > 0) {
       set_thread_concurrency(jq->max_workers + 2);
       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
       sched_pkt->jcr = jcr;
       sched_pkt->jq = jq;
+      jcr->use_count--;           /* release our use of jcr */
       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);       
       if (stat != 0) {               /* thread not created */
          Jmsg1(jcr, M_ERROR, 0, "pthread_thread_create: ERR=%s\n", strerror(stat));
-        jcr->use_count--;            /* release jcr */
       }
       return stat;
    }
@@ -352,7 +353,8 @@ static int start_server(jobq_t *jq)
  * When all the resources are acquired for the job, 
  *  it will call the user's engine.
  */
-static void *jobq_server(void *arg)
+extern "C"  
+void *jobq_server(void *arg)
 {
    struct timespec timeout;
    jobq_t *jq = (jobq_t *)arg;
@@ -417,6 +419,8 @@ static void *jobq_server(void *arg)
         }
         jq->running_jobs->append(je);
          Dmsg1(300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
+
+        /* Release job queue lock */
         if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
             Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", strerror(stat));
            jq->num_workers--;
@@ -428,6 +432,8 @@ static void *jobq_server(void *arg)
         jq->engine(je->jcr);
 
          Dmsg1(300, "Back from user engine jobid=%d.\n", jcr->JobId);
+
+        /* Reacquire job queue lock */
         if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
             Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat));
            jq->num_workers--;
@@ -443,18 +449,22 @@ static void *jobq_server(void *arg)
          */
         if (jcr->acquired_resource_locks) {
            jcr->store->NumConcurrentJobs--;
-           if (jcr->JobType == JT_RESTORE) {
+           if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
               jcr->store->MaxConcurrentJobs = jcr->saveMaxConcurrentJobs;  
            }
            jcr->client->NumConcurrentJobs--;
            jcr->job->NumConcurrentJobs--;
         }
 
+        /*
+         * Reschedule the job if necessary and requested
+         */
         if (jcr->job->RescheduleOnError && 
             jcr->JobStatus != JS_Terminated &&
             jcr->JobStatus != JS_Canceled && 
             jcr->job->RescheduleTimes > 0 && 
             jcr->reschedule_count < jcr->job->RescheduleTimes) {
+            char dt[50];
 
             /*
              * Reschedule this job by cleaning it up, but
@@ -464,10 +474,15 @@ static void *jobq_server(void *arg)
            jcr->sched_time = time(NULL) + jcr->job->RescheduleInterval;
             Dmsg2(300, "Rescheduled Job %s to re-run in %d seconds.\n", jcr->Job,
               (int)jcr->job->RescheduleInterval);
-           jcr->JobStatus = JS_Created; /* force new status */
+           bstrftime(dt, sizeof(dt), time(NULL));
+            Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds.\n"),
+              jcr->Job, dt, (int)jcr->job->RescheduleInterval);
            dird_free_jcr(jcr);          /* partial cleanup old stuff */
+           jcr->JobStatus = JS_WaitStartTime;
+           jcr->SDJobStatus = 0;
            if (jcr->JobBytes == 0) {
                Dmsg1(300, "Requeue job=%d\n", jcr->JobId);
+              jcr->JobStatus = JS_WaitStartTime;
               V(jq->mutex);
               jobq_add(jq, jcr);     /* queue the job to run again */
               P(jq->mutex);
@@ -500,9 +515,12 @@ static void *jobq_server(void *arg)
            db_close_database(jcr, jcr->db);
            jcr->db = NULL;
         }
-         Dmsg1(300, "====== Termination job=%d\n", jcr->JobId);
+         Dmsg2(300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count);
+        jcr->SDJobStatus = 0;
+        V(jq->mutex);                /* release internal lock */
         free_jcr(jcr);
         free(je);                    /* release job entry */
+        P(jq->mutex);                /* reacquire job queue lock */
       }
       /*
        * If any job in the wait queue can be run,
@@ -535,8 +553,8 @@ static void *jobq_server(void *arg)
               set_jcr_job_status(jcr, JS_WaitPriority);
               break;
            }
-           if (jcr->JobType == JT_RESTORE) {
-              /* Let only one Restore job run at a time regardless of MaxConcurrentJobs */
+           if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
+              /* Let only one Restore/verify job run at a time regardless of MaxConcurrentJobs */
               if (jcr->store->NumConcurrentJobs == 0) {
                  jcr->store->NumConcurrentJobs++;
                  jcr->saveMaxConcurrentJobs = jcr->store->MaxConcurrentJobs;
@@ -559,7 +577,7 @@ static void *jobq_server(void *arg)
            } else {
               /* Back out previous locks */
               jcr->store->NumConcurrentJobs--;
-              if (jcr->JobType == JT_RESTORE) {
+              if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
                  jcr->store->MaxConcurrentJobs = jcr->saveMaxConcurrentJobs;  
               }
               set_jcr_job_status(jcr, JS_WaitClientRes);
@@ -571,7 +589,7 @@ static void *jobq_server(void *arg)
            } else {
               /* Back out previous locks */
               jcr->store->NumConcurrentJobs--;
-              if (jcr->JobType == JT_RESTORE) {
+              if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
                  jcr->store->MaxConcurrentJobs = jcr->saveMaxConcurrentJobs;  
               }
               jcr->client->NumConcurrentJobs--;