]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/dird/jobq.c
Add more ingres code
[bacula/bacula] / bacula / src / dird / jobq.c
index c4a4c8cb8b53b922c6a3f87ea20759d5422a373c..aec1bd5724da97fd2a4682f623acc7cecbaf9b80 100644 (file)
@@ -1,12 +1,12 @@
 /*
    Bacula® - The Network Backup Solution
 
-   Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
+   Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
 
    The main author of Bacula is Kern Sibbald, with contributions from
    many others, a complete list can be found in the file AUTHORS.
    This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version two of the GNU General Public
+   modify it under the terms of version three of the GNU Affero General Public
    License as published by the Free Software Foundation and included
    in the file LICENSE.
 
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    General Public License for more details.
 
-   You should have received a copy of the GNU General Public License
+   You should have received a copy of the GNU Affero General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301, USA.
 
-   Bacula® is a registered trademark of John Walker.
+   Bacula® is a registered trademark of Kern Sibbald.
    The licensor of Bacula is the Free Software Foundation Europe
    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
    Switzerland, email:ftf@fsfeurope.org.
@@ -37,7 +37,6 @@
  *
  *  Kern Sibbald, July MMIII
  *
- *   Version $Id$
  *
  *  This code was adapted from the Bacula workq, which was
  *    adapted from "Programming with POSIX Threads", by
@@ -56,8 +55,8 @@ extern "C" void *sched_wait(void *arg);
 
 static int  start_server(jobq_t *jq);
 static bool acquire_resources(JCR *jcr);
-
-
+static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
+static void dec_write_store(JCR *jcr);
 
 /*
  * Initialize a job queue
@@ -118,11 +117,7 @@ int jobq_destroy(jobq_t *jq)
    if (jq->valid != JOBQ_VALID) {
       return EINVAL;
    }
-   if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
-      berrno be;
-      Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
-      return stat;
-   }
+   P(jq->mutex);
    jq->valid = 0;                      /* prevent any more operations */
 
    /* 
@@ -134,7 +129,7 @@ int jobq_destroy(jobq_t *jq)
          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
             berrno be;
             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
-            pthread_mutex_unlock(&jq->mutex);
+            V(jq->mutex);
             return stat;
          }
       }
@@ -142,16 +137,12 @@ int jobq_destroy(jobq_t *jq)
          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
             berrno be;
             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
-            pthread_mutex_unlock(&jq->mutex);
+            V(jq->mutex);
             return stat;
          }
       }
    }
-   if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
-      berrno be;
-      Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.bstrerror(stat));
-      return stat;
-   }
+   V(jq->mutex);
    stat  = pthread_mutex_destroy(&jq->mutex);
    stat1 = pthread_cond_destroy(&jq->work);
    stat2 = pthread_attr_destroy(&jq->attr);
@@ -179,6 +170,7 @@ void *sched_wait(void *arg)
    JCR *jcr = ((wait_pkt *)arg)->jcr;
    jobq_t *jq = ((wait_pkt *)arg)->jq;
 
+   set_jcr_in_tsd(INVALID_JCR);
    Dmsg0(2300, "Enter sched_wait.\n");
    free(arg);
    time_t wtime = jcr->sched_time - time(NULL);
@@ -252,12 +244,7 @@ int jobq_add(jobq_t *jq, JCR *jcr)
       return stat;
    }
 
-   if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
-      berrno be;
-      Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
-      free_jcr(jcr);                    /* release jcr */
-      return stat;
-   }
+   P(jq->mutex);
 
    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
       free_jcr(jcr);                    /* release jcr */
@@ -265,6 +252,8 @@ int jobq_add(jobq_t *jq, JCR *jcr)
    }
    item->jcr = jcr;
 
+   /* While waiting in a queue this job is not attached to a thread */
+   set_jcr_in_tsd(INVALID_JCR);
    if (job_canceled(jcr)) {
       /* Add job to ready queue so that it is canceled quickly */
       jq->ready_jobs->prepend(item);
@@ -292,7 +281,7 @@ int jobq_add(jobq_t *jq, JCR *jcr)
    /* Ensure that at least one server looks at the queue. */
    stat = start_server(jq);
 
-   pthread_mutex_unlock(&jq->mutex);
+   V(jq->mutex);
    Dmsg0(2300, "Return jobq_add\n");
    return stat;
 }
@@ -317,12 +306,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr)
       return EINVAL;
    }
 
-   if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
-      berrno be;
-      Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
-      return stat;
-   }
-
+   P(jq->mutex);
    foreach_dlist(item, jq->waiting_jobs) {
       if (jcr == item->jcr) {
          found = true;
@@ -330,7 +314,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr)
       }
    }
    if (!found) {
-      pthread_mutex_unlock(&jq->mutex);
+      V(jq->mutex);
       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
       return EINVAL;
    }
@@ -342,7 +326,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr)
 
    stat = start_server(jq);
 
-   pthread_mutex_unlock(&jq->mutex);
+   V(jq->mutex);
    Dmsg0(2300, "Return jobq_remove\n");
    return stat;
 }
@@ -357,8 +341,8 @@ static int start_server(jobq_t *jq)
    pthread_t id;
 
    /*
-    * if any threads are idle, wake one --                
-    *   actually we do a broadcast because on /lib/tls 
+    * if any threads are idle, wake one.
+    *   Actually we do a broadcast because on /lib/tls 
     *   these signals seem to get lost from time to time.
     */
    if (jq->idle_workers > 0) {
@@ -372,8 +356,10 @@ static int start_server(jobq_t *jq)
       Dmsg0(2300, "Create worker thread\n");
       /* No idle threads so create a new one */
       set_thread_concurrency(jq->max_workers + 1);
+      jq->num_workers++;
       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
          berrno be;
+         jq->num_workers--;
          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
          return stat;
       }
@@ -397,13 +383,9 @@ void *jobq_server(void *arg)
    bool timedout = false;
    bool work = true;
 
+   set_jcr_in_tsd(INVALID_JCR);
    Dmsg0(2300, "Start jobq_server\n");
-   if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
-      berrno be;
-      Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
-      return NULL;
-   }
-   jq->num_workers++;
+   P(jq->mutex);
 
    for (;;) {
       struct timeval tv;
@@ -429,7 +411,7 @@ void *jobq_server(void *arg)
                /* This shouldn't happen */
                Dmsg0(2300, "This shouldn't happen\n");
                jq->num_workers--;
-               pthread_mutex_unlock(&jq->mutex);
+               V(jq->mutex);
                return NULL;
             }
             break;
@@ -448,11 +430,14 @@ void *jobq_server(void *arg)
             Dmsg0(2300, "ready queue not empty start server\n");
             if (start_server(jq) != 0) {
                jq->num_workers--;
-               pthread_mutex_unlock(&jq->mutex);
+               V(jq->mutex);
                return NULL;
             }
          }
          jq->running_jobs->append(je);
+
+         /* Attach jcr to this thread while we run the job */
+         jcr->set_killable(true);
          set_jcr_in_tsd(jcr);
          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
 
@@ -460,10 +445,14 @@ void *jobq_server(void *arg)
          V(jq->mutex);
 
          /* Call user's routine here */
-         Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
-            jcr->use_count());
+         Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
+            jcr->use_count(), jcr->JobStatus);
          jq->engine(je->jcr);
 
+         /* Job finished detach from thread */
+         remove_jcr_from_tsd(je->jcr);
+         je->jcr->set_killable(false);
+
          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
             jcr->use_count());
 
@@ -477,87 +466,17 @@ void *jobq_server(void *arg)
           *  put into the ready queue.
           */
          if (jcr->acquired_resource_locks) {
-            if (jcr->rstore) {
-               jcr->rstore->NumConcurrentJobs = 0;
-               Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
-            }
-            if (jcr->wstore) {
-               jcr->wstore->NumConcurrentJobs--;
-               Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
-            }
+            dec_read_store(jcr);
+            dec_write_store(jcr);
             jcr->client->NumConcurrentJobs--;
             jcr->job->NumConcurrentJobs--;
             jcr->acquired_resource_locks = false;
          }
 
-         /*
-          * Reschedule the job if necessary and requested
-          */
-         if (jcr->job->RescheduleOnError &&
-             jcr->JobStatus != JS_Terminated &&
-             jcr->JobStatus != JS_Canceled &&
-             jcr->JobType == JT_BACKUP &&
-             (jcr->job->RescheduleTimes == 0 ||
-              jcr->reschedule_count < jcr->job->RescheduleTimes)) {
-             char dt[50], dt2[50];
-
-             /*
-              * Reschedule this job by cleaning it up, but
-              *  reuse the same JobId if possible.
-              */
-            time_t now = time(NULL);
-            jcr->reschedule_count++;
-            jcr->sched_time = now + jcr->job->RescheduleInterval;
-            bstrftime(dt, sizeof(dt), now);
-            bstrftime(dt2, sizeof(dt2), jcr->sched_time);
-            Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
-                  (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
-            Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
-                 jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
-            dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
-            jcr->JobStatus = -1;
-            set_jcr_job_status(jcr, JS_WaitStartTime);
-            jcr->SDJobStatus = 0;
-            if (jcr->JobBytes == 0) {
-               Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
-               V(jq->mutex);
-               jobq_add(jq, jcr);     /* queue the job to run again */
-               P(jq->mutex);
-               free_jcr(jcr);         /* release jcr */
-               free(je);              /* free the job entry */
-               continue;              /* look for another job to run */
-            }
-            /*
-             * Something was actually backed up, so we cannot reuse
-             *   the old JobId or there will be database record
-             *   conflicts.  We now create a new job, copying the
-             *   appropriate fields.
-             */           
-            JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
-            set_jcr_defaults(njcr, jcr->job);
-            njcr->reschedule_count = jcr->reschedule_count;
-            njcr->sched_time = jcr->sched_time;
-            njcr->JobLevel = jcr->JobLevel;
-            njcr->JobStatus = -1;
-            set_jcr_job_status(njcr, jcr->JobStatus);
-            if (jcr->rstore) {
-               copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
-            } else {
-               free_rstorage(njcr);
-            }
-            if (jcr->wstore) {
-               copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
-            } else {
-               free_wstorage(njcr);
-            }
-            njcr->messages = jcr->messages;
-            Dmsg0(2300, "Call to run new job\n");
-            V(jq->mutex);
-            run_job(njcr);            /* This creates a "new" job */
-            free_jcr(njcr);           /* release "new" jcr */
-            P(jq->mutex);
-            Dmsg0(2300, "Back from running new job.\n");
+         if (reschedule_job(jcr, jq, je)) {
+            continue;              /* go look for more work */
          }
+
          /* Clean up and release old jcr */
          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
          jcr->SDJobStatus = 0;
@@ -573,11 +492,26 @@ void *jobq_server(void *arg)
       Dmsg0(2300, "Done check ready, now check wait queue.\n");
       if (!jq->waiting_jobs->empty() && !jq->quit) {
          int Priority;
+         bool running_allow_mix = false;
          je = (jobq_item_t *)jq->waiting_jobs->first();
          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
          if (re) {
             Priority = re->jcr->JobPriority;
-            Dmsg2(2300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId, Priority);
+            Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
+                  re->jcr->JobId, Priority);
+            running_allow_mix = true;
+            for ( ; re; ) {
+               Dmsg2(2300, "JobId %d is also running with %s\n",
+                     re->jcr->JobId, 
+                     re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
+               if (!re->jcr->job->allow_mixed_priority) {
+                  running_allow_mix = false;
+                  break;
+               }
+               re = (jobq_item_t *)jq->running_jobs->next(re);
+            }
+            Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
+                  running_allow_mix ? "allow" : "don't allow");
          } else {
             Priority = je->jcr->JobPriority;
             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
@@ -591,11 +525,14 @@ void *jobq_server(void *arg)
             JCR *jcr = je->jcr;
             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
 
-            Dmsg3(2300, "Examining Job=%d JobPri=%d want Pri=%d\n",
-               jcr->JobId, jcr->JobPriority, Priority);
+            Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
+                  jcr->JobId, jcr->JobPriority, Priority,
+                  jcr->job->allow_mixed_priority ? "mix" : "no mix");
 
             /* Take only jobs of correct Priority */
-            if (jcr->JobPriority != Priority) {
+            if (!(jcr->JobPriority == Priority
+                  || (jcr->JobPriority < Priority &&
+                      jcr->job->allow_mixed_priority && running_allow_mix))) {
                set_jcr_job_status(jcr, JS_WaitPriority);
                break;
             }
@@ -670,6 +607,94 @@ void *jobq_server(void *arg)
    return NULL;
 }
 
+/*
+ * Returns true if cleanup done and we should look for more work
+ */
+static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
+{
+   /*
+    * Reschedule the job if necessary and requested
+    */
+   if (jcr->job->RescheduleOnError &&
+       jcr->JobStatus != JS_Terminated &&
+       jcr->JobStatus != JS_Canceled &&
+       jcr->getJobType() == JT_BACKUP &&
+       (jcr->job->RescheduleTimes == 0 ||
+        jcr->reschedule_count < jcr->job->RescheduleTimes)) {
+       char dt[50], dt2[50];
+
+       /*
+        * Reschedule this job by cleaning it up, but
+        *  reuse the same JobId if possible.
+        */
+      time_t now = time(NULL);
+      jcr->reschedule_count++;
+      jcr->sched_time = now + jcr->job->RescheduleInterval;
+      bstrftime(dt, sizeof(dt), now);
+      bstrftime(dt2, sizeof(dt2), jcr->sched_time);
+      Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
+            (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
+      Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
+           jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
+      dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
+      jcr->JobStatus = -1;
+      set_jcr_job_status(jcr, JS_WaitStartTime);
+      jcr->SDJobStatus = 0;
+      if (!allow_duplicate_job(jcr)) {
+         return false;
+      }
+      if (jcr->JobBytes == 0) {
+         Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
+         V(jq->mutex);
+         jobq_add(jq, jcr);     /* queue the job to run again */
+         P(jq->mutex);
+         free_jcr(jcr);         /* release jcr */
+         free(je);              /* free the job entry */
+         return true;           /* we already cleaned up */
+      }
+      /*
+       * Something was actually backed up, so we cannot reuse
+       *   the old JobId or there will be database record
+       *   conflicts.  We now create a new job, copying the
+       *   appropriate fields.
+       */           
+      JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
+      set_jcr_defaults(njcr, jcr->job);
+      njcr->reschedule_count = jcr->reschedule_count;
+      njcr->sched_time = jcr->sched_time;
+      njcr->set_JobLevel(jcr->getJobLevel());
+      njcr->pool = jcr->pool;
+      njcr->run_pool_override = jcr->run_pool_override;
+      njcr->full_pool = jcr->full_pool;
+      njcr->run_full_pool_override = jcr->run_full_pool_override;
+      njcr->inc_pool = jcr->inc_pool;
+      njcr->run_inc_pool_override = jcr->run_inc_pool_override;
+      njcr->diff_pool = jcr->diff_pool;
+      njcr->JobStatus = -1;
+      set_jcr_job_status(njcr, jcr->JobStatus);
+      if (jcr->rstore) {
+         copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
+      } else {
+         free_rstorage(njcr);
+      }
+      if (jcr->wstore) {
+         copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
+      } else {
+         free_wstorage(njcr);
+      }
+      njcr->messages = jcr->messages;
+      njcr->spool_data = jcr->spool_data;
+      njcr->write_part_after_job = jcr->write_part_after_job;
+      Dmsg0(2300, "Call to run new job\n");
+      V(jq->mutex);
+      run_job(njcr);            /* This creates a "new" job */
+      free_jcr(njcr);           /* release "new" jcr */
+      P(jq->mutex);
+      Dmsg0(2300, "Back from running new job.\n");
+   }
+   return false;
+}
+
 /*
  * See if we can acquire all the necessary resources for the job (JCR)
  *
@@ -681,16 +706,25 @@ static bool acquire_resources(JCR *jcr)
    bool skip_this_jcr = false;
 
    jcr->acquired_resource_locks = false;
+/*
+ * Turning this code off is likely to cause some deadlocks,
+ *   but we do not really have enough information here to
+ *   know if this is really a deadlock (it may be a dual drive
+ *   autochanger), and in principle, the SD reservation system
+ *   should detect these deadlocks, so push the work off on it.
+ */
+#ifdef xxx
+   if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
+      Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
+         "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"), 
+         jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
+      set_jcr_job_status(jcr, JS_Canceled);
+      return false;
+   }
+#endif
    if (jcr->rstore) {
       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
-      /*
-       * Let only one Restore/Verify job run at a time regardless
-       *  of MaxConcurrentjobs.
-       */
-      if (jcr->rstore->NumConcurrentJobs == 0) {
-         jcr->rstore->NumConcurrentJobs = 1;
-         Dmsg0(200, "Set rncj=1\n");
-      } else {
+      if (!inc_read_store(jcr)) {
          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
          set_jcr_job_status(jcr, JS_WaitStoreRes);
          return false;
@@ -699,25 +733,11 @@ static bool acquire_resources(JCR *jcr)
    
    if (jcr->wstore) {
       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
-      if (jcr->rstore == jcr->wstore) {           /* deadlock */
-         jcr->rstore->NumConcurrentJobs = 0;      /* back out rstore */
-         Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
-            "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"), 
-            jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
-         set_jcr_job_status(jcr, JS_Canceled);
-         return false;
-      }
-      if (jcr->wstore->NumConcurrentJobs == 0 &&
-          jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
-         /* Simple case, first job */
-         jcr->wstore->NumConcurrentJobs = 1;
-         Dmsg0(200, "Set wncj=1\n");
-      } else if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
+      if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
          jcr->wstore->NumConcurrentJobs++;
          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
       } else if (jcr->rstore) {
-         jcr->rstore->NumConcurrentJobs = 0;      /* back out rstore */
-         Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
+         dec_read_store(jcr);
          skip_this_jcr = true;
       } else {
          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
@@ -733,14 +753,8 @@ static bool acquire_resources(JCR *jcr)
       jcr->client->NumConcurrentJobs++;
    } else {
       /* Back out previous locks */
-      if (jcr->wstore) {
-         jcr->wstore->NumConcurrentJobs--;
-         Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
-      }
-      if (jcr->rstore) {
-         jcr->rstore->NumConcurrentJobs = 0;
-         Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
-      }
+      dec_write_store(jcr);
+      dec_read_store(jcr);
       set_jcr_job_status(jcr, JS_WaitClientRes);
       return false;
    }
@@ -748,14 +762,8 @@ static bool acquire_resources(JCR *jcr)
       jcr->job->NumConcurrentJobs++;
    } else {
       /* Back out previous locks */
-      if (jcr->wstore) {
-         jcr->wstore->NumConcurrentJobs--;
-         Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
-      }
-      if (jcr->rstore) {
-         jcr->rstore->NumConcurrentJobs = 0;
-         Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
-      }
+      dec_write_store(jcr);
+      dec_read_store(jcr);
       jcr->client->NumConcurrentJobs--;
       set_jcr_job_status(jcr, JS_WaitJobRes);
       return false;
@@ -764,3 +772,45 @@ static bool acquire_resources(JCR *jcr)
    jcr->acquired_resource_locks = true;
    return true;
 }
+
+static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/* 
+ * Note: inc_read_store() and dec_read_store() are
+ *   called from select_rstore() in src/dird/restore.c
+ */
+bool inc_read_store(JCR *jcr)
+{
+   P(rstore_mutex);
+   if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
+      jcr->rstore->NumConcurrentReadJobs++;
+      jcr->rstore->NumConcurrentJobs++;
+      Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
+      V(rstore_mutex);
+      return true;
+   }
+   V(rstore_mutex);
+   return false;
+}
+
+void dec_read_store(JCR *jcr)
+{
+   if (jcr->rstore) {
+      P(rstore_mutex);
+      jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
+      jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
+      Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
+      V(rstore_mutex);
+      ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
+      ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
+   }
+}
+
+static void dec_write_store(JCR *jcr)
+{
+   if (jcr->wstore) {
+      jcr->wstore->NumConcurrentJobs--;
+      Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
+      ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
+   }
+}