2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula job queue routines.
31 * This code consists of three queues, the waiting_jobs
32 * queue, where jobs are initially queued, the ready_jobs
33 * queue, where jobs are placed when all the resources are
34 * allocated and they can immediately be run, and the
35 * running queue where jobs are placed when they are
38 * Kern Sibbald, July MMIII
42 * This code was adapted from the Bacula workq, which was
43 * adapted from "Programming with POSIX Threads", by
53 /* Forward referenced functions */
54 extern "C" void *jobq_server(void *arg);
55 extern "C" void *sched_wait(void *arg);
57 static int start_server(jobq_t *jq);
58 static bool acquire_resources(JCR *jcr);
59 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
60 static void dec_read_store(JCR *jcr);
61 static void dec_write_store(JCR *jcr);
64 * Initialize a job queue
66 * Returns: 0 on success
69 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
72 jobq_item_t *item = NULL;
74 if ((stat = pthread_attr_init(&jq->attr)) != 0) {
76 Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
79 if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
80 pthread_attr_destroy(&jq->attr);
83 if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
85 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
86 pthread_attr_destroy(&jq->attr);
89 if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
91 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
92 pthread_mutex_destroy(&jq->mutex);
93 pthread_attr_destroy(&jq->attr);
97 jq->max_workers = threads; /* max threads to create */
98 jq->num_workers = 0; /* no threads yet */
99 jq->idle_workers = 0; /* no idle threads */
100 jq->engine = engine; /* routine to run */
101 jq->valid = JOBQ_VALID;
102 /* Initialize the job queues */
103 jq->waiting_jobs = New(dlist(item, &item->link));
104 jq->running_jobs = New(dlist(item, &item->link));
105 jq->ready_jobs = New(dlist(item, &item->link));
110 * Destroy the job queue
112 * Returns: 0 on success
115 int jobq_destroy(jobq_t *jq)
117 int stat, stat1, stat2;
119 if (jq->valid != JOBQ_VALID) {
123 jq->valid = 0; /* prevent any more operations */
126 * If any threads are active, wake them
128 if (jq->num_workers > 0) {
130 if (jq->idle_workers) {
131 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
133 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
138 while (jq->num_workers > 0) {
139 if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
141 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
148 stat = pthread_mutex_destroy(&jq->mutex);
149 stat1 = pthread_cond_destroy(&jq->work);
150 stat2 = pthread_attr_destroy(&jq->attr);
151 delete jq->waiting_jobs;
152 delete jq->running_jobs;
153 delete jq->ready_jobs;
154 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
163 * Wait until schedule time arrives before starting. Normally
164 * this routine is only used for jobs started from the console
165 * for which the user explicitly specified a start time. Otherwise
166 * most jobs are put into the job queue only when their
167 * scheduled time arives.
170 void *sched_wait(void *arg)
172 JCR *jcr = ((wait_pkt *)arg)->jcr;
173 jobq_t *jq = ((wait_pkt *)arg)->jq;
176 Dmsg0(2300, "Enter sched_wait.\n");
178 time_t wtime = jcr->sched_time - time(NULL);
179 set_jcr_job_status(jcr, JS_WaitStartTime);
180 /* Wait until scheduled time arrives */
182 Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
185 /* Check every 30 seconds if canceled */
187 Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
188 jcr->JobId, wtime, jcr->use_count());
192 bmicrosleep(wtime, 0);
193 if (job_canceled(jcr)) {
196 wtime = jcr->sched_time - time(NULL);
198 Dmsg1(200, "resched use=%d\n", jcr->use_count());
200 free_jcr(jcr); /* we are done with jcr */
201 Dmsg0(2300, "Exit sched_wait\n");
206 * Add a job to the queue
207 * jq is a queue that was created with jobq_init
209 int jobq_add(jobq_t *jq, JCR *jcr)
212 jobq_item_t *item, *li;
213 bool inserted = false;
214 time_t wtime = jcr->sched_time - time(NULL);
218 if (!jcr->term_wait_inited) {
219 /* Initialize termination condition variable */
220 if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
222 Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
225 jcr->term_wait_inited = true;
228 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
229 if (jq->valid != JOBQ_VALID) {
230 Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
234 jcr->inc_use_count(); /* mark jcr in use by us */
235 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
236 if (!job_canceled(jcr) && wtime > 0) {
237 set_thread_concurrency(jq->max_workers + 2);
238 sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
239 sched_pkt->jcr = jcr;
241 stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
242 if (stat != 0) { /* thread not created */
244 Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
251 if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
252 free_jcr(jcr); /* release jcr */
257 /* While waiting in a queue this job is not attached to a thread */
258 set_jcr_in_tsd(INVALID_JCR);
259 if (job_canceled(jcr)) {
260 /* Add job to ready queue so that it is canceled quickly */
261 jq->ready_jobs->prepend(item);
262 Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
264 /* Add this job to the wait queue in priority sorted order */
265 foreach_dlist(li, jq->waiting_jobs) {
266 Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
267 li->jcr->JobId, li->jcr->JobPriority);
268 if (li->jcr->JobPriority > jcr->JobPriority) {
269 jq->waiting_jobs->insert_before(item, li);
270 Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
271 li->jcr->JobId, jcr->JobId);
276 /* If not jobs in wait queue, append it */
278 jq->waiting_jobs->append(item);
279 Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
283 /* Ensure that at least one server looks at the queue. */
284 stat = start_server(jq);
287 Dmsg0(2300, "Return jobq_add\n");
292 * Remove a job from the job queue. Used only by cancel_job().
293 * jq is a queue that was created with jobq_init
294 * work_item is an element of work
296 * Note, it is "removed" from the job queue.
297 * If you want to cancel it, you need to provide some external means
298 * of doing so (e.g. pthread_kill()).
300 int jobq_remove(jobq_t *jq, JCR *jcr)
306 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
307 if (jq->valid != JOBQ_VALID) {
312 foreach_dlist(item, jq->waiting_jobs) {
313 if (jcr == item->jcr) {
320 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
324 /* Move item to be the first on the list */
325 jq->waiting_jobs->remove(item);
326 jq->ready_jobs->prepend(item);
327 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
329 stat = start_server(jq);
332 Dmsg0(2300, "Return jobq_remove\n");
338 * Start the server thread if it isn't already running
340 static int start_server(jobq_t *jq)
346 * if any threads are idle, wake one.
347 * Actually we do a broadcast because on /lib/tls
348 * these signals seem to get lost from time to time.
350 if (jq->idle_workers > 0) {
351 Dmsg0(2300, "Signal worker to wake up\n");
352 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
354 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
357 } else if (jq->num_workers < jq->max_workers) {
358 Dmsg0(2300, "Create worker thread\n");
359 /* No idle threads so create a new one */
360 set_thread_concurrency(jq->max_workers + 1);
362 if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
365 Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
374 * This is the worker thread that serves the job queue.
375 * When all the resources are acquired for the job,
376 * it will call the user's engine.
379 void *jobq_server(void *arg)
381 struct timespec timeout;
382 jobq_t *jq = (jobq_t *)arg;
383 jobq_item_t *je; /* job entry in queue */
385 bool timedout = false;
388 set_jcr_in_tsd(INVALID_JCR);
389 Dmsg0(2300, "Start jobq_server\n");
396 Dmsg0(2300, "Top of for loop\n");
397 if (!work && !jq->quit) {
398 gettimeofday(&tv, &tz);
400 timeout.tv_sec = tv.tv_sec + 4;
404 * Wait 4 seconds, then if no more work, exit
406 Dmsg0(2300, "pthread_cond_timedwait()\n");
407 stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
408 if (stat == ETIMEDOUT) {
409 Dmsg0(2300, "timedwait timedout.\n");
412 } else if (stat != 0) {
413 /* This shouldn't happen */
414 Dmsg0(2300, "This shouldn't happen\n");
423 * If anything is in the ready queue, run it
425 Dmsg0(2300, "Checking ready queue.\n");
426 while (!jq->ready_jobs->empty() && !jq->quit) {
428 je = (jobq_item_t *)jq->ready_jobs->first();
430 jq->ready_jobs->remove(je);
431 if (!jq->ready_jobs->empty()) {
432 Dmsg0(2300, "ready queue not empty start server\n");
433 if (start_server(jq) != 0) {
439 jq->running_jobs->append(je);
441 /* Attach jcr to this thread while we run the job */
443 Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
445 /* Release job queue lock */
448 /* Call user's routine here */
449 Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
453 /* Job finished detach from thread */
454 set_jcr_in_tsd(INVALID_JCR);
456 Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
459 /* Reacquire job queue lock */
461 Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
462 jq->running_jobs->remove(je);
464 * Release locks if acquired. Note, they will not have
465 * been acquired for jobs canceled before they were
466 * put into the ready queue.
468 if (jcr->acquired_resource_locks) {
470 dec_write_store(jcr);
471 jcr->client->NumConcurrentJobs--;
472 jcr->job->NumConcurrentJobs--;
473 jcr->acquired_resource_locks = false;
476 if (reschedule_job(jcr, jq, je)) {
477 continue; /* go look for more work */
480 /* Clean up and release old jcr */
481 Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
482 jcr->SDJobStatus = 0;
483 V(jq->mutex); /* release internal lock */
485 free(je); /* release job entry */
486 P(jq->mutex); /* reacquire job queue lock */
489 * If any job in the wait queue can be run,
490 * move it to the ready queue
492 Dmsg0(2300, "Done check ready, now check wait queue.\n");
493 if (!jq->waiting_jobs->empty() && !jq->quit) {
495 bool running_allow_mix = false;
496 je = (jobq_item_t *)jq->waiting_jobs->first();
497 jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
499 Priority = re->jcr->JobPriority;
500 Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
501 re->jcr->JobId, Priority);
502 running_allow_mix = true;
504 Dmsg2(2300, "JobId %d is also running with %s\n",
506 re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
507 if (!re->jcr->job->allow_mixed_priority) {
508 running_allow_mix = false;
511 re = (jobq_item_t *)jq->running_jobs->next(re);
513 Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
514 running_allow_mix ? "allow" : "don't allow");
516 Priority = je->jcr->JobPriority;
517 Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
520 * Walk down the list of waiting jobs and attempt
521 * to acquire the resources it needs.
524 /* je is current job item on the queue, jn is the next one */
526 jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
528 Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
529 jcr->JobId, jcr->JobPriority, Priority,
530 jcr->job->allow_mixed_priority ? "mix" : "no mix");
532 /* Take only jobs of correct Priority */
533 if (!(jcr->JobPriority == Priority
534 || (jcr->JobPriority < Priority &&
535 jcr->job->allow_mixed_priority && running_allow_mix))) {
536 set_jcr_job_status(jcr, JS_WaitPriority);
540 if (!acquire_resources(jcr)) {
541 /* If resource conflict, job is canceled */
542 if (!job_canceled(jcr)) {
543 je = jn; /* point to next waiting job */
549 * Got all locks, now remove it from wait queue and append it
550 * to the ready queue. Note, we may also get here if the
551 * job was canceled. Once it is "run", it will quickly
554 jq->waiting_jobs->remove(je);
555 jq->ready_jobs->append(je);
556 Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
557 je = jn; /* Point to next waiting job */
562 Dmsg0(2300, "Done checking wait queue.\n");
564 * If no more ready work and we are asked to quit, then do it
566 if (jq->ready_jobs->empty() && jq->quit) {
568 if (jq->num_workers == 0) {
569 Dmsg0(2300, "Wake up destroy routine\n");
570 /* Wake up destroy routine if he is waiting */
571 pthread_cond_broadcast(&jq->work);
575 Dmsg0(2300, "Check for work request\n");
577 * If no more work requests, and we waited long enough, quit
579 Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
580 jq->ready_jobs->empty());
581 if (jq->ready_jobs->empty() && timedout) {
582 Dmsg0(2300, "break big loop\n");
587 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
590 * If a job is waiting on a Resource, don't consume all
591 * the CPU time looping looking for work, and even more
592 * important, release the lock so that a job that has
593 * terminated can give us the resource.
596 bmicrosleep(2, 0); /* pause for 2 seconds */
598 /* Recompute work as something may have changed in last 2 secs */
599 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
601 Dmsg1(2300, "Loop again. work=%d\n", work);
602 } /* end of big for loop */
604 Dmsg0(200, "unlock mutex\n");
606 Dmsg0(2300, "End jobq_server\n");
611 * Returns true if cleanup done and we should look for more work
613 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
616 * Reschedule the job if necessary and requested
618 if (jcr->job->RescheduleOnError &&
619 jcr->JobStatus != JS_Terminated &&
620 jcr->JobStatus != JS_Canceled &&
621 jcr->get_JobType() == JT_BACKUP &&
622 (jcr->job->RescheduleTimes == 0 ||
623 jcr->reschedule_count < jcr->job->RescheduleTimes)) {
624 char dt[50], dt2[50];
627 * Reschedule this job by cleaning it up, but
628 * reuse the same JobId if possible.
630 time_t now = time(NULL);
631 jcr->reschedule_count++;
632 jcr->sched_time = now + jcr->job->RescheduleInterval;
633 bstrftime(dt, sizeof(dt), now);
634 bstrftime(dt2, sizeof(dt2), jcr->sched_time);
635 Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
636 (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
637 Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
638 jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
639 dird_free_jcr_pointers(jcr); /* partial cleanup old stuff */
641 set_jcr_job_status(jcr, JS_WaitStartTime);
642 jcr->SDJobStatus = 0;
643 if (!allow_duplicate_job(jcr)) {
646 if (jcr->JobBytes == 0) {
647 Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
649 jobq_add(jq, jcr); /* queue the job to run again */
651 free_jcr(jcr); /* release jcr */
652 free(je); /* free the job entry */
653 return true; /* we already cleaned up */
656 * Something was actually backed up, so we cannot reuse
657 * the old JobId or there will be database record
658 * conflicts. We now create a new job, copying the
659 * appropriate fields.
661 JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
662 set_jcr_defaults(njcr, jcr->job);
663 njcr->reschedule_count = jcr->reschedule_count;
664 njcr->sched_time = jcr->sched_time;
665 njcr->set_JobLevel(jcr->get_JobLevel());
666 njcr->pool = jcr->pool;
667 njcr->run_pool_override = jcr->run_pool_override;
668 njcr->full_pool = jcr->full_pool;
669 njcr->run_full_pool_override = jcr->run_full_pool_override;
670 njcr->inc_pool = jcr->inc_pool;
671 njcr->run_inc_pool_override = jcr->run_inc_pool_override;
672 njcr->diff_pool = jcr->diff_pool;
673 njcr->JobStatus = -1;
674 set_jcr_job_status(njcr, jcr->JobStatus);
676 copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
681 copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
685 njcr->messages = jcr->messages;
686 njcr->spool_data = jcr->spool_data;
687 njcr->write_part_after_job = jcr->write_part_after_job;
688 Dmsg0(2300, "Call to run new job\n");
690 run_job(njcr); /* This creates a "new" job */
691 free_jcr(njcr); /* release "new" jcr */
693 Dmsg0(2300, "Back from running new job.\n");
699 * See if we can acquire all the necessary resources for the job (JCR)
701 * Returns: true if successful
702 * false if resource failure
704 static bool acquire_resources(JCR *jcr)
706 bool skip_this_jcr = false;
708 jcr->acquired_resource_locks = false;
710 * Turning this code off is likely to cause some deadlocks,
711 * but we do not really have enough information here to
712 * know if this is really a deadlock (it may be a dual drive
713 * autochanger), and in principle, the SD reservation system
714 * should detect these deadlocks, so push the work off on is.
717 if (jcr->rstore && jcr->rstore == jcr->wstore) { /* possible deadlock */
718 Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
719 " Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
720 jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
721 set_jcr_job_status(jcr, JS_Canceled);
726 Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
727 if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
728 jcr->rstore->NumConcurrentReadJobs++;
729 jcr->rstore->NumConcurrentJobs++;
730 Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
732 Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
733 set_jcr_job_status(jcr, JS_WaitStoreRes);
739 Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
740 if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
741 jcr->wstore->NumConcurrentJobs++;
742 Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
743 } else if (jcr->rstore) {
745 skip_this_jcr = true;
747 Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
748 skip_this_jcr = true;
752 set_jcr_job_status(jcr, JS_WaitStoreRes);
756 if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
757 jcr->client->NumConcurrentJobs++;
759 /* Back out previous locks */
760 dec_write_store(jcr);
762 set_jcr_job_status(jcr, JS_WaitClientRes);
765 if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
766 jcr->job->NumConcurrentJobs++;
768 /* Back out previous locks */
769 dec_write_store(jcr);
771 jcr->client->NumConcurrentJobs--;
772 set_jcr_job_status(jcr, JS_WaitJobRes);
776 jcr->acquired_resource_locks = true;
780 static void dec_read_store(JCR *jcr)
783 jcr->rstore->NumConcurrentReadJobs--; /* back out rstore */
784 jcr->rstore->NumConcurrentJobs--; /* back out rstore */
785 Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
786 ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
787 ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
791 static void dec_write_store(JCR *jcr)
794 jcr->wstore->NumConcurrentJobs--;
795 Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
796 ASSERT(jcr->wstore->NumConcurrentJobs >= 0);