2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2015 Kern Sibbald
5 Copyright (C) 2003-2014 Free Software Foundation Europe e.V.
7 The original author of Bacula is Kern Sibbald, with contributions
8 from many others, a complete list can be found in the file AUTHORS.
10 You may use this file and others of this release according to the
11 license defined in the LICENSE file, which includes the Affero General
12 Public License, v3.0 ("AGPLv3") and some additional permissions and
13 terms pursuant to its AGPLv3 Section 7.
15 This notice must be preserved when any source code is
16 conveyed and/or propagated.
18 Bacula(R) is a registered trademark of Kern Sibbald.
21 * Bacula job queue routines.
23 * This code consists of three queues, the waiting_jobs
24 * queue, where jobs are initially queued, the ready_jobs
25 * queue, where jobs are placed when all the resources are
26 * allocated and they can immediately be run, and the
27 * running queue where jobs are placed when they are
30 * Kern Sibbald, July MMIII
33 * This code was adapted from the Bacula workq, which was
34 * adapted from "Programming with POSIX Threads", by
44 /* Forward referenced functions */
45 extern "C" void *jobq_server(void *arg);
46 extern "C" void *sched_wait(void *arg);
48 static int start_server(jobq_t *jq);
49 static bool acquire_resources(JCR *jcr);
50 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
51 static void dec_write_store(JCR *jcr);
54 * Initialize a job queue
56 * Returns: 0 on success
59 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
62 jobq_item_t *item = NULL;
64 if ((stat = pthread_attr_init(&jq->attr)) != 0) {
66 Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
69 if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
70 pthread_attr_destroy(&jq->attr);
73 if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
75 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
76 pthread_attr_destroy(&jq->attr);
79 if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
81 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
82 pthread_mutex_destroy(&jq->mutex);
83 pthread_attr_destroy(&jq->attr);
87 jq->max_workers = threads; /* max threads to create */
88 jq->num_workers = 0; /* no threads yet */
89 jq->idle_workers = 0; /* no idle threads */
90 jq->engine = engine; /* routine to run */
91 jq->valid = JOBQ_VALID;
92 /* Initialize the job queues */
93 jq->waiting_jobs = New(dlist(item, &item->link));
94 jq->running_jobs = New(dlist(item, &item->link));
95 jq->ready_jobs = New(dlist(item, &item->link));
100 * Destroy the job queue
102 * Returns: 0 on success
105 int jobq_destroy(jobq_t *jq)
107 int stat, stat1, stat2;
109 if (jq->valid != JOBQ_VALID) {
113 jq->valid = 0; /* prevent any more operations */
116 * If any threads are active, wake them
118 if (jq->num_workers > 0) {
120 if (jq->idle_workers) {
121 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
123 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
128 while (jq->num_workers > 0) {
129 if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
131 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
138 stat = pthread_mutex_destroy(&jq->mutex);
139 stat1 = pthread_cond_destroy(&jq->work);
140 stat2 = pthread_attr_destroy(&jq->attr);
141 delete jq->waiting_jobs;
142 delete jq->running_jobs;
143 delete jq->ready_jobs;
144 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
153 * Wait until schedule time arrives before starting. Normally
154 * this routine is only used for jobs started from the console
155 * for which the user explicitly specified a start time. Otherwise
156 * most jobs are put into the job queue only when their
157 * scheduled time arives.
160 void *sched_wait(void *arg)
162 JCR *jcr = ((wait_pkt *)arg)->jcr;
163 jobq_t *jq = ((wait_pkt *)arg)->jq;
165 set_jcr_in_tsd(INVALID_JCR);
166 Dmsg0(2300, "Enter sched_wait.\n");
168 time_t wtime = jcr->sched_time - time(NULL);
169 jcr->setJobStatus(JS_WaitStartTime);
170 /* Wait until scheduled time arrives */
172 Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
175 /* Check every 30 seconds if canceled */
177 Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
178 jcr->JobId, wtime, jcr->use_count());
182 bmicrosleep(wtime, 0);
183 if (job_canceled(jcr)) {
186 wtime = jcr->sched_time - time(NULL);
188 Dmsg1(200, "resched use=%d\n", jcr->use_count());
190 free_jcr(jcr); /* we are done with jcr */
191 Dmsg0(2300, "Exit sched_wait\n");
195 /* Procedure to update the Client->NumConcurrentJobs */
196 static void update_client_numconcurrentjobs(JCR *jcr, int val)
202 switch (jcr->getJobType())
209 if (jcr->no_client_used()) {
212 /* Failback wanted */
214 jcr->client->NumConcurrentJobs += val;
220 * Add a job to the queue
221 * jq is a queue that was created with jobq_init
223 int jobq_add(jobq_t *jq, JCR *jcr)
226 jobq_item_t *item, *li;
227 bool inserted = false;
228 time_t wtime = jcr->sched_time - time(NULL);
232 if (!jcr->term_wait_inited) {
233 /* Initialize termination condition variable */
234 if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
236 Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
239 jcr->term_wait_inited = true;
242 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
243 if (jq->valid != JOBQ_VALID) {
244 Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
248 jcr->inc_use_count(); /* mark jcr in use by us */
249 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
250 if (!job_canceled(jcr) && wtime > 0) {
251 set_thread_concurrency(jq->max_workers + 2);
252 sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
253 sched_pkt->jcr = jcr;
255 stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
256 if (stat != 0) { /* thread not created */
258 Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
265 if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
266 free_jcr(jcr); /* release jcr */
271 /* While waiting in a queue this job is not attached to a thread */
272 set_jcr_in_tsd(INVALID_JCR);
273 if (job_canceled(jcr)) {
274 /* Add job to ready queue so that it is canceled quickly */
275 jq->ready_jobs->prepend(item);
276 Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
278 /* Add this job to the wait queue in priority sorted order */
279 foreach_dlist(li, jq->waiting_jobs) {
280 Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
281 li->jcr->JobId, li->jcr->JobPriority);
282 if (li->jcr->JobPriority > jcr->JobPriority) {
283 jq->waiting_jobs->insert_before(item, li);
284 Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
285 li->jcr->JobId, jcr->JobId);
290 /* If not jobs in wait queue, append it */
292 jq->waiting_jobs->append(item);
293 Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
297 /* Ensure that at least one server looks at the queue. */
298 stat = start_server(jq);
301 Dmsg0(2300, "Return jobq_add\n");
306 * Remove a job from the job queue. Used only by cancel_job().
307 * jq is a queue that was created with jobq_init
308 * work_item is an element of work
310 * Note, it is "removed" from the job queue.
311 * If you want to cancel it, you need to provide some external means
312 * of doing so (e.g. pthread_kill()).
314 int jobq_remove(jobq_t *jq, JCR *jcr)
320 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
321 if (jq->valid != JOBQ_VALID) {
326 foreach_dlist(item, jq->waiting_jobs) {
327 if (jcr == item->jcr) {
334 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
338 /* Move item to be the first on the list */
339 jq->waiting_jobs->remove(item);
340 jq->ready_jobs->prepend(item);
341 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
343 stat = start_server(jq);
346 Dmsg0(2300, "Return jobq_remove\n");
352 * Start the server thread if it isn't already running
354 static int start_server(jobq_t *jq)
360 * if any threads are idle, wake one.
361 * Actually we do a broadcast because on /lib/tls
362 * these signals seem to get lost from time to time.
364 if (jq->idle_workers > 0) {
365 Dmsg0(2300, "Signal worker to wake up\n");
366 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
368 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
371 } else if (jq->num_workers < jq->max_workers) {
372 Dmsg0(2300, "Create worker thread\n");
373 /* No idle threads so create a new one */
374 set_thread_concurrency(jq->max_workers + 1);
376 if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
379 Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
388 * This is the worker thread that serves the job queue.
389 * When all the resources are acquired for the job,
390 * it will call the user's engine.
393 void *jobq_server(void *arg)
395 struct timespec timeout;
396 jobq_t *jq = (jobq_t *)arg;
397 jobq_item_t *je; /* job entry in queue */
399 bool timedout = false;
402 set_jcr_in_tsd(INVALID_JCR);
403 Dmsg0(2300, "Start jobq_server\n");
410 Dmsg0(2300, "Top of for loop\n");
411 if (!work && !jq->quit) {
412 gettimeofday(&tv, &tz);
414 timeout.tv_sec = tv.tv_sec + 4;
418 * Wait 4 seconds, then if no more work, exit
420 Dmsg0(2300, "pthread_cond_timedwait()\n");
421 stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
422 if (stat == ETIMEDOUT) {
423 Dmsg0(2300, "timedwait timedout.\n");
426 } else if (stat != 0) {
427 /* This shouldn't happen */
428 Dmsg0(2300, "This shouldn't happen\n");
437 * If anything is in the ready queue, run it
439 Dmsg0(2300, "Checking ready queue.\n");
440 while (!jq->ready_jobs->empty() && !jq->quit) {
442 je = (jobq_item_t *)jq->ready_jobs->first();
444 jq->ready_jobs->remove(je);
445 if (!jq->ready_jobs->empty()) {
446 Dmsg0(2300, "ready queue not empty start server\n");
447 if (start_server(jq) != 0) {
453 jq->running_jobs->append(je);
455 /* Attach jcr to this thread while we run the job */
456 jcr->my_thread_id = pthread_self();
457 jcr->set_killable(true);
459 Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
461 /* Release job queue lock */
464 /* Call user's routine here */
465 Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
466 jcr->use_count(), jcr->JobStatus);
469 /* Job finished detach from thread */
470 remove_jcr_from_tsd(je->jcr);
471 je->jcr->set_killable(false);
473 Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
476 /* Reacquire job queue lock */
478 Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
479 jq->running_jobs->remove(je);
481 * Release locks if acquired. Note, they will not have
482 * been acquired for jobs canceled before they were
483 * put into the ready queue.
485 if (jcr->acquired_resource_locks) {
487 dec_write_store(jcr);
488 update_client_numconcurrentjobs(jcr, -1);
489 jcr->job->NumConcurrentJobs--;
490 jcr->acquired_resource_locks = false;
493 if (reschedule_job(jcr, jq, je)) {
494 continue; /* go look for more work */
497 /* Clean up and release old jcr */
498 Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
499 jcr->SDJobStatus = 0;
500 V(jq->mutex); /* release internal lock */
502 free(je); /* release job entry */
503 P(jq->mutex); /* reacquire job queue lock */
506 * If any job in the wait queue can be run,
507 * move it to the ready queue
509 Dmsg0(2300, "Done check ready, now check wait queue.\n");
510 if (!jq->waiting_jobs->empty() && !jq->quit) {
512 bool running_allow_mix = false;
513 je = (jobq_item_t *)jq->waiting_jobs->first();
514 jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
516 Priority = re->jcr->JobPriority;
517 Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
518 re->jcr->JobId, Priority);
519 running_allow_mix = true;
521 Dmsg2(2300, "JobId %d is also running with %s\n",
523 re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
524 if (!re->jcr->job->allow_mixed_priority) {
525 running_allow_mix = false;
528 re = (jobq_item_t *)jq->running_jobs->next(re);
530 Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
531 running_allow_mix ? "allow" : "don't allow");
533 Priority = je->jcr->JobPriority;
534 Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
537 * Walk down the list of waiting jobs and attempt
538 * to acquire the resources it needs.
541 /* je is current job item on the queue, jn is the next one */
543 jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
545 Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
546 jcr->JobId, jcr->JobPriority, Priority,
547 jcr->job->allow_mixed_priority ? "mix" : "no mix");
549 /* Take only jobs of correct Priority */
550 if (!(jcr->JobPriority == Priority
551 || (jcr->JobPriority < Priority &&
552 jcr->job->allow_mixed_priority && running_allow_mix))) {
553 jcr->setJobStatus(JS_WaitPriority);
557 if (!acquire_resources(jcr)) {
558 /* If resource conflict, job is canceled */
559 if (!job_canceled(jcr)) {
560 je = jn; /* point to next waiting job */
566 * Got all locks, now remove it from wait queue and append it
567 * to the ready queue. Note, we may also get here if the
568 * job was canceled. Once it is "run", it will quickly
571 jq->waiting_jobs->remove(je);
572 jq->ready_jobs->append(je);
573 Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
574 je = jn; /* Point to next waiting job */
579 Dmsg0(2300, "Done checking wait queue.\n");
581 * If no more ready work and we are asked to quit, then do it
583 if (jq->ready_jobs->empty() && jq->quit) {
585 if (jq->num_workers == 0) {
586 Dmsg0(2300, "Wake up destroy routine\n");
587 /* Wake up destroy routine if he is waiting */
588 pthread_cond_broadcast(&jq->work);
592 Dmsg0(2300, "Check for work request\n");
594 * If no more work requests, and we waited long enough, quit
596 Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
597 jq->ready_jobs->empty());
598 if (jq->ready_jobs->empty() && timedout) {
599 Dmsg0(2300, "break big loop\n");
604 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
607 * If a job is waiting on a Resource, don't consume all
608 * the CPU time looping looking for work, and even more
609 * important, release the lock so that a job that has
610 * terminated can give us the resource.
613 bmicrosleep(2, 0); /* pause for 2 seconds */
615 /* Recompute work as something may have changed in last 2 secs */
616 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
618 Dmsg1(2300, "Loop again. work=%d\n", work);
619 } /* end of big for loop */
621 Dmsg0(200, "unlock mutex\n");
623 Dmsg0(2300, "End jobq_server\n");
628 * Returns true if cleanup done and we should look for more work
630 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
632 bool resched = false;
634 * Reschedule the job if requested and possible
636 /* Basic condition is that more reschedule times remain */
637 if (jcr->job->RescheduleTimes == 0 ||
638 jcr->reschedule_count < jcr->job->RescheduleTimes) {
640 /* Check for incomplete jobs */
641 (jcr->RescheduleIncompleteJobs &&
642 jcr->is_incomplete() && jcr->is_JobType(JT_BACKUP) &&
643 !(jcr->HasBase||jcr->is_JobLevel(L_BASE))) ||
644 /* Check for failed jobs */
645 (jcr->job->RescheduleOnError &&
646 !jcr->is_JobStatus(JS_Terminated) &&
647 !jcr->is_JobStatus(JS_Canceled) &&
648 jcr->is_JobType(JT_BACKUP));
651 char dt[50], dt2[50];
654 * Reschedule this job by cleaning it up, but
655 * reuse the same JobId if possible.
657 jcr->rerunning = jcr->is_incomplete(); /* save incomplete status */
658 time_t now = time(NULL);
659 jcr->reschedule_count++;
660 jcr->sched_time = now + jcr->job->RescheduleInterval;
661 bstrftime(dt, sizeof(dt), now);
662 bstrftime(dt2, sizeof(dt2), jcr->sched_time);
663 Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
664 (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
665 Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
666 jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
667 dird_free_jcr_pointers(jcr); /* partial cleanup old stuff */
669 jcr->setJobStatus(JS_WaitStartTime);
670 jcr->SDJobStatus = 0;
672 if (!allow_duplicate_job(jcr)) {
675 /* Only jobs with no output or Incomplete jobs can run on same JCR */
676 if (jcr->JobBytes == 0 || jcr->rerunning) {
677 Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
680 * Special test here since a Virtual Full gets marked
681 * as a Full, so we look at the resource record
683 if (jcr->wasVirtualFull) {
684 jcr->setJobLevel(L_VIRTUAL_FULL);
687 * When we are using the same jcr then make sure to reset
688 * RealEndTime back to zero.
690 jcr->jr.RealEndTime = 0;
691 jobq_add(jq, jcr); /* queue the job to run again */
693 free_jcr(jcr); /* release jcr */
694 free(je); /* free the job entry */
695 return true; /* we already cleaned up */
698 * Something was actually backed up, so we cannot reuse
699 * the old JobId or there will be database record
700 * conflicts. We now create a new job, copying the
701 * appropriate fields.
703 JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
704 set_jcr_defaults(njcr, jcr->job);
706 * Eliminate the new job_end_push, then copy the one from
707 * the old job, and set the old one to be empty.
710 lock_jobs(); /* protect ourself from reload_config() */
712 foreach_alist(v, (&jcr->job_end_push)) {
713 njcr->job_end_push.append(v);
715 jcr->job_end_push.destroy();
716 jcr->job_end_push.init(1, false);
720 njcr->reschedule_count = jcr->reschedule_count;
721 njcr->sched_time = jcr->sched_time;
722 njcr->initial_sched_time = jcr->initial_sched_time;
724 * Special test here since a Virtual Full gets marked
725 * as a Full, so we look at the resource record
727 if (jcr->wasVirtualFull) {
728 njcr->setJobLevel(L_VIRTUAL_FULL);
730 njcr->setJobLevel(jcr->getJobLevel());
732 njcr->pool = jcr->pool;
733 njcr->run_pool_override = jcr->run_pool_override;
734 njcr->next_pool = jcr->next_pool;
735 njcr->run_next_pool_override = jcr->run_next_pool_override;
736 njcr->full_pool = jcr->full_pool;
737 njcr->run_full_pool_override = jcr->run_full_pool_override;
738 njcr->inc_pool = jcr->inc_pool;
739 njcr->run_inc_pool_override = jcr->run_inc_pool_override;
740 njcr->diff_pool = jcr->diff_pool;
741 njcr->JobStatus = -1;
742 njcr->setJobStatus(jcr->JobStatus);
744 copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
749 copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
753 njcr->messages = jcr->messages;
754 njcr->spool_data = jcr->spool_data;
755 njcr->write_part_after_job = jcr->write_part_after_job;
756 Dmsg0(2300, "Call to run new job\n");
758 run_job(njcr); /* This creates a "new" job */
759 free_jcr(njcr); /* release "new" jcr */
761 Dmsg0(2300, "Back from running new job.\n");
767 * See if we can acquire all the necessary resources for the job (JCR)
769 * Returns: true if successful
770 * false if resource failure
772 static bool acquire_resources(JCR *jcr)
774 bool skip_this_jcr = false;
776 jcr->acquired_resource_locks = false;
778 * Turning this code off is likely to cause some deadlocks,
779 * but we do not really have enough information here to
780 * know if this is really a deadlock (it may be a dual drive
781 * autochanger), and in principle, the SD reservation system
782 * should detect these deadlocks, so push the work off on it.
785 if (jcr->rstore && jcr->rstore == jcr->wstore) { /* possible deadlock */
786 Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
787 " Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
788 jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
789 jcr->setJobStatus(JS_Canceled);
794 Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
795 if (!inc_read_store(jcr)) {
796 Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
797 jcr->setJobStatus(JS_WaitStoreRes);
803 Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
804 if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
805 jcr->wstore->NumConcurrentJobs++;
806 Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
807 } else if (jcr->rstore) {
809 skip_this_jcr = true;
811 Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
812 skip_this_jcr = true;
816 jcr->setJobStatus(JS_WaitStoreRes);
821 if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
822 update_client_numconcurrentjobs(jcr, 1);
824 /* Back out previous locks */
825 dec_write_store(jcr);
827 jcr->setJobStatus(JS_WaitClientRes);
831 if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
832 jcr->job->NumConcurrentJobs++;
834 /* Back out previous locks */
835 dec_write_store(jcr);
837 update_client_numconcurrentjobs(jcr, -1);
838 jcr->setJobStatus(JS_WaitJobRes);
842 jcr->acquired_resource_locks = true;
846 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
849 * Note: inc_read_store() and dec_read_store() are
850 * called from select_rstore() in src/dird/restore.c
852 bool inc_read_store(JCR *jcr)
855 if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs &&
856 (jcr->getJobType() == JT_RESTORE ||
857 jcr->rstore->MaxConcurrentReadJobs == 0 ||
858 jcr->rstore->NumConcurrentReadJobs < jcr->rstore->MaxConcurrentReadJobs)) {
859 jcr->rstore->NumConcurrentReadJobs++;
860 jcr->rstore->NumConcurrentJobs++;
861 Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
869 void dec_read_store(JCR *jcr)
873 jcr->rstore->NumConcurrentReadJobs--; /* back out rstore */
874 jcr->rstore->NumConcurrentJobs--; /* back out rstore */
875 Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
877 ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
878 ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
882 static void dec_write_store(JCR *jcr)
885 jcr->wstore->NumConcurrentJobs--;
886 Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
887 ASSERT(jcr->wstore->NumConcurrentJobs >= 0);