2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula job queue routines.
31 * This code consists of three queues, the waiting_jobs
32 * queue, where jobs are initially queued, the ready_jobs
33 * queue, where jobs are placed when all the resources are
34 * allocated and they can immediately be run, and the
35 * running queue where jobs are placed when they are
38 * Kern Sibbald, July MMIII
42 * This code was adapted from the Bacula workq, which was
43 * adapted from "Programming with POSIX Threads", by
53 /* Forward referenced functions */
54 extern "C" void *jobq_server(void *arg);
55 extern "C" void *sched_wait(void *arg);
57 static int start_server(jobq_t *jq);
58 static bool acquire_resources(JCR *jcr);
59 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
60 static void dec_read_store(JCR *jcr);
61 static void dec_write_store(JCR *jcr);
64 * Initialize a job queue
66 * Returns: 0 on success
69 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
72 jobq_item_t *item = NULL;
74 if ((stat = pthread_attr_init(&jq->attr)) != 0) {
76 Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
79 if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
80 pthread_attr_destroy(&jq->attr);
83 if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
85 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
86 pthread_attr_destroy(&jq->attr);
89 if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
91 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
92 pthread_mutex_destroy(&jq->mutex);
93 pthread_attr_destroy(&jq->attr);
97 jq->max_workers = threads; /* max threads to create */
98 jq->num_workers = 0; /* no threads yet */
99 jq->idle_workers = 0; /* no idle threads */
100 jq->engine = engine; /* routine to run */
101 jq->valid = JOBQ_VALID;
102 /* Initialize the job queues */
103 jq->waiting_jobs = New(dlist(item, &item->link));
104 jq->running_jobs = New(dlist(item, &item->link));
105 jq->ready_jobs = New(dlist(item, &item->link));
110 * Destroy the job queue
112 * Returns: 0 on success
115 int jobq_destroy(jobq_t *jq)
117 int stat, stat1, stat2;
119 if (jq->valid != JOBQ_VALID) {
122 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
124 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
127 jq->valid = 0; /* prevent any more operations */
130 * If any threads are active, wake them
132 if (jq->num_workers > 0) {
134 if (jq->idle_workers) {
135 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
137 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
138 pthread_mutex_unlock(&jq->mutex);
142 while (jq->num_workers > 0) {
143 if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
145 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
146 pthread_mutex_unlock(&jq->mutex);
151 if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
153 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.bstrerror(stat));
156 stat = pthread_mutex_destroy(&jq->mutex);
157 stat1 = pthread_cond_destroy(&jq->work);
158 stat2 = pthread_attr_destroy(&jq->attr);
159 delete jq->waiting_jobs;
160 delete jq->running_jobs;
161 delete jq->ready_jobs;
162 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
171 * Wait until schedule time arrives before starting. Normally
172 * this routine is only used for jobs started from the console
173 * for which the user explicitly specified a start time. Otherwise
174 * most jobs are put into the job queue only when their
175 * scheduled time arives.
178 void *sched_wait(void *arg)
180 JCR *jcr = ((wait_pkt *)arg)->jcr;
181 jobq_t *jq = ((wait_pkt *)arg)->jq;
184 Dmsg0(2300, "Enter sched_wait.\n");
186 time_t wtime = jcr->sched_time - time(NULL);
187 set_jcr_job_status(jcr, JS_WaitStartTime);
188 /* Wait until scheduled time arrives */
190 Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
193 /* Check every 30 seconds if canceled */
195 Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
196 jcr->JobId, wtime, jcr->use_count());
200 bmicrosleep(wtime, 0);
201 if (job_canceled(jcr)) {
204 wtime = jcr->sched_time - time(NULL);
206 Dmsg1(200, "resched use=%d\n", jcr->use_count());
208 free_jcr(jcr); /* we are done with jcr */
209 Dmsg0(2300, "Exit sched_wait\n");
214 * Add a job to the queue
215 * jq is a queue that was created with jobq_init
217 int jobq_add(jobq_t *jq, JCR *jcr)
220 jobq_item_t *item, *li;
221 bool inserted = false;
222 time_t wtime = jcr->sched_time - time(NULL);
226 if (!jcr->term_wait_inited) {
227 /* Initialize termination condition variable */
228 if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
230 Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
233 jcr->term_wait_inited = true;
236 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
237 if (jq->valid != JOBQ_VALID) {
238 Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
242 jcr->inc_use_count(); /* mark jcr in use by us */
243 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
244 if (!job_canceled(jcr) && wtime > 0) {
245 set_thread_concurrency(jq->max_workers + 2);
246 sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
247 sched_pkt->jcr = jcr;
249 stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
250 if (stat != 0) { /* thread not created */
252 Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
257 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
259 Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
260 free_jcr(jcr); /* release jcr */
264 if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
265 free_jcr(jcr); /* release jcr */
270 /* While waiting in a queue this job is not attached to a thread */
271 set_jcr_in_tsd(INVALID_JCR);
272 if (job_canceled(jcr)) {
273 /* Add job to ready queue so that it is canceled quickly */
274 jq->ready_jobs->prepend(item);
275 Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
277 /* Add this job to the wait queue in priority sorted order */
278 foreach_dlist(li, jq->waiting_jobs) {
279 Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
280 li->jcr->JobId, li->jcr->JobPriority);
281 if (li->jcr->JobPriority > jcr->JobPriority) {
282 jq->waiting_jobs->insert_before(item, li);
283 Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
284 li->jcr->JobId, jcr->JobId);
289 /* If not jobs in wait queue, append it */
291 jq->waiting_jobs->append(item);
292 Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
296 /* Ensure that at least one server looks at the queue. */
297 stat = start_server(jq);
299 pthread_mutex_unlock(&jq->mutex);
300 Dmsg0(2300, "Return jobq_add\n");
305 * Remove a job from the job queue. Used only by cancel_job().
306 * jq is a queue that was created with jobq_init
307 * work_item is an element of work
309 * Note, it is "removed" from the job queue.
310 * If you want to cancel it, you need to provide some external means
311 * of doing so (e.g. pthread_kill()).
313 int jobq_remove(jobq_t *jq, JCR *jcr)
319 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
320 if (jq->valid != JOBQ_VALID) {
324 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
326 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
330 foreach_dlist(item, jq->waiting_jobs) {
331 if (jcr == item->jcr) {
337 pthread_mutex_unlock(&jq->mutex);
338 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
342 /* Move item to be the first on the list */
343 jq->waiting_jobs->remove(item);
344 jq->ready_jobs->prepend(item);
345 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
347 stat = start_server(jq);
349 pthread_mutex_unlock(&jq->mutex);
350 Dmsg0(2300, "Return jobq_remove\n");
356 * Start the server thread if it isn't already running
358 static int start_server(jobq_t *jq)
364 * if any threads are idle, wake one.
365 * Actually we do a broadcast because on /lib/tls
366 * these signals seem to get lost from time to time.
368 if (jq->idle_workers > 0) {
369 Dmsg0(2300, "Signal worker to wake up\n");
370 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
372 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
375 } else if (jq->num_workers < jq->max_workers) {
376 Dmsg0(2300, "Create worker thread\n");
377 /* No idle threads so create a new one */
378 set_thread_concurrency(jq->max_workers + 1);
379 if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
381 Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
390 * This is the worker thread that serves the job queue.
391 * When all the resources are acquired for the job,
392 * it will call the user's engine.
395 void *jobq_server(void *arg)
397 struct timespec timeout;
398 jobq_t *jq = (jobq_t *)arg;
399 jobq_item_t *je; /* job entry in queue */
401 bool timedout = false;
404 set_jcr_in_tsd(INVALID_JCR);
405 Dmsg0(2300, "Start jobq_server\n");
406 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
408 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
417 Dmsg0(2300, "Top of for loop\n");
418 if (!work && !jq->quit) {
419 gettimeofday(&tv, &tz);
421 timeout.tv_sec = tv.tv_sec + 4;
425 * Wait 4 seconds, then if no more work, exit
427 Dmsg0(2300, "pthread_cond_timedwait()\n");
428 stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
429 if (stat == ETIMEDOUT) {
430 Dmsg0(2300, "timedwait timedout.\n");
433 } else if (stat != 0) {
434 /* This shouldn't happen */
435 Dmsg0(2300, "This shouldn't happen\n");
437 pthread_mutex_unlock(&jq->mutex);
444 * If anything is in the ready queue, run it
446 Dmsg0(2300, "Checking ready queue.\n");
447 while (!jq->ready_jobs->empty() && !jq->quit) {
449 je = (jobq_item_t *)jq->ready_jobs->first();
451 jq->ready_jobs->remove(je);
452 if (!jq->ready_jobs->empty()) {
453 Dmsg0(2300, "ready queue not empty start server\n");
454 if (start_server(jq) != 0) {
456 pthread_mutex_unlock(&jq->mutex);
460 jq->running_jobs->append(je);
462 /* Attach jcr to this thread while we run the job */
464 Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
466 /* Release job queue lock */
469 /* Call user's routine here */
470 Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
474 /* Job finished detach from thread */
475 set_jcr_in_tsd(INVALID_JCR);
477 Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
480 /* Reacquire job queue lock */
482 Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
483 jq->running_jobs->remove(je);
485 * Release locks if acquired. Note, they will not have
486 * been acquired for jobs canceled before they were
487 * put into the ready queue.
489 if (jcr->acquired_resource_locks) {
491 dec_write_store(jcr);
492 jcr->client->NumConcurrentJobs--;
493 jcr->job->NumConcurrentJobs--;
494 jcr->acquired_resource_locks = false;
497 if (reschedule_job(jcr, jq, je)) {
498 continue; /* go look for more work */
501 /* Clean up and release old jcr */
502 Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
503 jcr->SDJobStatus = 0;
504 V(jq->mutex); /* release internal lock */
506 free(je); /* release job entry */
507 P(jq->mutex); /* reacquire job queue lock */
510 * If any job in the wait queue can be run,
511 * move it to the ready queue
513 Dmsg0(2300, "Done check ready, now check wait queue.\n");
514 if (!jq->waiting_jobs->empty() && !jq->quit) {
516 bool running_allow_mix = false;
517 je = (jobq_item_t *)jq->waiting_jobs->first();
518 jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
520 Priority = re->jcr->JobPriority;
521 Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
522 re->jcr->JobId, Priority);
523 running_allow_mix = true;
525 Dmsg2(2300, "JobId %d is also running with %s\n",
527 re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
528 if (!re->jcr->job->allow_mixed_priority) {
529 running_allow_mix = false;
532 re = (jobq_item_t *)jq->running_jobs->next(re);
534 Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
535 running_allow_mix ? "allow" : "don't allow");
537 Priority = je->jcr->JobPriority;
538 Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
541 * Walk down the list of waiting jobs and attempt
542 * to acquire the resources it needs.
545 /* je is current job item on the queue, jn is the next one */
547 jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
549 Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
550 jcr->JobId, jcr->JobPriority, Priority,
551 jcr->job->allow_mixed_priority ? "mix" : "no mix");
553 /* Take only jobs of correct Priority */
554 if (!(jcr->JobPriority == Priority
555 || (jcr->JobPriority < Priority &&
556 jcr->job->allow_mixed_priority && running_allow_mix))) {
557 set_jcr_job_status(jcr, JS_WaitPriority);
561 if (!acquire_resources(jcr)) {
562 /* If resource conflict, job is canceled */
563 if (!job_canceled(jcr)) {
564 je = jn; /* point to next waiting job */
570 * Got all locks, now remove it from wait queue and append it
571 * to the ready queue. Note, we may also get here if the
572 * job was canceled. Once it is "run", it will quickly
575 jq->waiting_jobs->remove(je);
576 jq->ready_jobs->append(je);
577 Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
578 je = jn; /* Point to next waiting job */
583 Dmsg0(2300, "Done checking wait queue.\n");
585 * If no more ready work and we are asked to quit, then do it
587 if (jq->ready_jobs->empty() && jq->quit) {
589 if (jq->num_workers == 0) {
590 Dmsg0(2300, "Wake up destroy routine\n");
591 /* Wake up destroy routine if he is waiting */
592 pthread_cond_broadcast(&jq->work);
596 Dmsg0(2300, "Check for work request\n");
598 * If no more work requests, and we waited long enough, quit
600 Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
601 jq->ready_jobs->empty());
602 if (jq->ready_jobs->empty() && timedout) {
603 Dmsg0(2300, "break big loop\n");
608 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
611 * If a job is waiting on a Resource, don't consume all
612 * the CPU time looping looking for work, and even more
613 * important, release the lock so that a job that has
614 * terminated can give us the resource.
617 bmicrosleep(2, 0); /* pause for 2 seconds */
619 /* Recompute work as something may have changed in last 2 secs */
620 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
622 Dmsg1(2300, "Loop again. work=%d\n", work);
623 } /* end of big for loop */
625 Dmsg0(200, "unlock mutex\n");
627 Dmsg0(2300, "End jobq_server\n");
632 * Returns true if cleanup done and we should look for more work
634 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
637 * Reschedule the job if necessary and requested
639 if (jcr->job->RescheduleOnError &&
640 jcr->JobStatus != JS_Terminated &&
641 jcr->JobStatus != JS_Canceled &&
642 jcr->get_JobType() == JT_BACKUP &&
643 (jcr->job->RescheduleTimes == 0 ||
644 jcr->reschedule_count < jcr->job->RescheduleTimes)) {
645 char dt[50], dt2[50];
648 * Reschedule this job by cleaning it up, but
649 * reuse the same JobId if possible.
651 time_t now = time(NULL);
652 jcr->reschedule_count++;
653 jcr->sched_time = now + jcr->job->RescheduleInterval;
654 bstrftime(dt, sizeof(dt), now);
655 bstrftime(dt2, sizeof(dt2), jcr->sched_time);
656 Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
657 (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
658 Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
659 jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
660 dird_free_jcr_pointers(jcr); /* partial cleanup old stuff */
662 set_jcr_job_status(jcr, JS_WaitStartTime);
663 jcr->SDJobStatus = 0;
664 if (!allow_duplicate_job(jcr)) {
667 if (jcr->JobBytes == 0) {
668 Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
670 jobq_add(jq, jcr); /* queue the job to run again */
672 free_jcr(jcr); /* release jcr */
673 free(je); /* free the job entry */
674 return true; /* we already cleaned up */
677 * Something was actually backed up, so we cannot reuse
678 * the old JobId or there will be database record
679 * conflicts. We now create a new job, copying the
680 * appropriate fields.
682 JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
683 set_jcr_defaults(njcr, jcr->job);
684 njcr->reschedule_count = jcr->reschedule_count;
685 njcr->sched_time = jcr->sched_time;
686 njcr->set_JobLevel(jcr->get_JobLevel());
687 njcr->pool = jcr->pool;
688 njcr->run_pool_override = jcr->run_pool_override;
689 njcr->full_pool = jcr->full_pool;
690 njcr->run_full_pool_override = jcr->run_full_pool_override;
691 njcr->inc_pool = jcr->inc_pool;
692 njcr->run_inc_pool_override = jcr->run_inc_pool_override;
693 njcr->diff_pool = jcr->diff_pool;
694 njcr->JobStatus = -1;
695 set_jcr_job_status(njcr, jcr->JobStatus);
697 copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
702 copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
706 njcr->messages = jcr->messages;
707 njcr->spool_data = jcr->spool_data;
708 njcr->write_part_after_job = jcr->write_part_after_job;
709 Dmsg0(2300, "Call to run new job\n");
711 run_job(njcr); /* This creates a "new" job */
712 free_jcr(njcr); /* release "new" jcr */
714 Dmsg0(2300, "Back from running new job.\n");
720 * See if we can acquire all the necessary resources for the job (JCR)
722 * Returns: true if successful
723 * false if resource failure
725 static bool acquire_resources(JCR *jcr)
727 bool skip_this_jcr = false;
729 jcr->acquired_resource_locks = false;
731 * Turning this code off is likely to cause some deadlocks,
732 * but we do not really have enough information here to
733 * know if this is really a deadlock (it may be a dual drive
734 * autochanger), and in principle, the SD reservation system
735 * should detect these deadlocks, so push the work off on is.
738 if (jcr->rstore && jcr->rstore == jcr->wstore) { /* possible deadlock */
739 Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
740 " Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
741 jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
742 set_jcr_job_status(jcr, JS_Canceled);
747 Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
748 if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
749 jcr->rstore->NumConcurrentReadJobs++;
750 jcr->rstore->NumConcurrentJobs++;
751 Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
753 Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
754 set_jcr_job_status(jcr, JS_WaitStoreRes);
760 Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
761 if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
762 jcr->wstore->NumConcurrentJobs++;
763 Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
764 } else if (jcr->rstore) {
766 skip_this_jcr = true;
768 Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
769 skip_this_jcr = true;
773 set_jcr_job_status(jcr, JS_WaitStoreRes);
777 if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
778 jcr->client->NumConcurrentJobs++;
780 /* Back out previous locks */
781 dec_write_store(jcr);
783 set_jcr_job_status(jcr, JS_WaitClientRes);
786 if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
787 jcr->job->NumConcurrentJobs++;
789 /* Back out previous locks */
790 dec_write_store(jcr);
792 jcr->client->NumConcurrentJobs--;
793 set_jcr_job_status(jcr, JS_WaitJobRes);
797 jcr->acquired_resource_locks = true;
801 static void dec_read_store(JCR *jcr)
804 jcr->rstore->NumConcurrentReadJobs--; /* back out rstore */
805 jcr->rstore->NumConcurrentJobs--; /* back out rstore */
806 Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
807 ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
808 ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
812 static void dec_write_store(JCR *jcr)
815 jcr->wstore->NumConcurrentJobs--;
816 Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
817 ASSERT(jcr->wstore->NumConcurrentJobs >= 0);