2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2014 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from many
7 others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 Bacula® is a registered trademark of Kern Sibbald.
17 * Bacula job queue routines.
19 * This code consists of three queues, the waiting_jobs
20 * queue, where jobs are initially queued, the ready_jobs
21 * queue, where jobs are placed when all the resources are
22 * allocated and they can immediately be run, and the
23 * running queue where jobs are placed when they are
26 * Kern Sibbald, July MMIII
29 * This code was adapted from the Bacula workq, which was
30 * adapted from "Programming with POSIX Threads", by
40 /* Forward referenced functions */
41 extern "C" void *jobq_server(void *arg);
42 extern "C" void *sched_wait(void *arg);
44 static int start_server(jobq_t *jq);
45 static bool acquire_resources(JCR *jcr);
46 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
47 static void dec_write_store(JCR *jcr);
50 * Initialize a job queue
52 * Returns: 0 on success
55 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
58 jobq_item_t *item = NULL;
60 if ((stat = pthread_attr_init(&jq->attr)) != 0) {
62 Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
65 if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
66 pthread_attr_destroy(&jq->attr);
69 if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
71 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
72 pthread_attr_destroy(&jq->attr);
75 if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
77 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
78 pthread_mutex_destroy(&jq->mutex);
79 pthread_attr_destroy(&jq->attr);
83 jq->max_workers = threads; /* max threads to create */
84 jq->num_workers = 0; /* no threads yet */
85 jq->idle_workers = 0; /* no idle threads */
86 jq->engine = engine; /* routine to run */
87 jq->valid = JOBQ_VALID;
88 /* Initialize the job queues */
89 jq->waiting_jobs = New(dlist(item, &item->link));
90 jq->running_jobs = New(dlist(item, &item->link));
91 jq->ready_jobs = New(dlist(item, &item->link));
96 * Destroy the job queue
98 * Returns: 0 on success
101 int jobq_destroy(jobq_t *jq)
103 int stat, stat1, stat2;
105 if (jq->valid != JOBQ_VALID) {
109 jq->valid = 0; /* prevent any more operations */
112 * If any threads are active, wake them
114 if (jq->num_workers > 0) {
116 if (jq->idle_workers) {
117 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
119 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
124 while (jq->num_workers > 0) {
125 if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
127 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
134 stat = pthread_mutex_destroy(&jq->mutex);
135 stat1 = pthread_cond_destroy(&jq->work);
136 stat2 = pthread_attr_destroy(&jq->attr);
137 delete jq->waiting_jobs;
138 delete jq->running_jobs;
139 delete jq->ready_jobs;
140 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
149 * Wait until schedule time arrives before starting. Normally
150 * this routine is only used for jobs started from the console
151 * for which the user explicitly specified a start time. Otherwise
152 * most jobs are put into the job queue only when their
153 * scheduled time arives.
156 void *sched_wait(void *arg)
158 JCR *jcr = ((wait_pkt *)arg)->jcr;
159 jobq_t *jq = ((wait_pkt *)arg)->jq;
161 set_jcr_in_tsd(INVALID_JCR);
162 Dmsg0(2300, "Enter sched_wait.\n");
164 time_t wtime = jcr->sched_time - time(NULL);
165 jcr->setJobStatus(JS_WaitStartTime);
166 /* Wait until scheduled time arrives */
168 Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
171 /* Check every 30 seconds if canceled */
173 Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
174 jcr->JobId, wtime, jcr->use_count());
178 bmicrosleep(wtime, 0);
179 if (job_canceled(jcr)) {
182 wtime = jcr->sched_time - time(NULL);
184 Dmsg1(200, "resched use=%d\n", jcr->use_count());
186 free_jcr(jcr); /* we are done with jcr */
187 Dmsg0(2300, "Exit sched_wait\n");
192 * Add a job to the queue
193 * jq is a queue that was created with jobq_init
195 int jobq_add(jobq_t *jq, JCR *jcr)
198 jobq_item_t *item, *li;
199 bool inserted = false;
200 time_t wtime = jcr->sched_time - time(NULL);
204 if (!jcr->term_wait_inited) {
205 /* Initialize termination condition variable */
206 if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
208 Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
211 jcr->term_wait_inited = true;
214 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
215 if (jq->valid != JOBQ_VALID) {
216 Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
220 jcr->inc_use_count(); /* mark jcr in use by us */
221 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
222 if (!job_canceled(jcr) && wtime > 0) {
223 set_thread_concurrency(jq->max_workers + 2);
224 sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
225 sched_pkt->jcr = jcr;
227 stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
228 if (stat != 0) { /* thread not created */
230 Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
237 if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
238 free_jcr(jcr); /* release jcr */
243 /* While waiting in a queue this job is not attached to a thread */
244 set_jcr_in_tsd(INVALID_JCR);
245 if (job_canceled(jcr)) {
246 /* Add job to ready queue so that it is canceled quickly */
247 jq->ready_jobs->prepend(item);
248 Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
250 /* Add this job to the wait queue in priority sorted order */
251 foreach_dlist(li, jq->waiting_jobs) {
252 Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
253 li->jcr->JobId, li->jcr->JobPriority);
254 if (li->jcr->JobPriority > jcr->JobPriority) {
255 jq->waiting_jobs->insert_before(item, li);
256 Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
257 li->jcr->JobId, jcr->JobId);
262 /* If not jobs in wait queue, append it */
264 jq->waiting_jobs->append(item);
265 Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
269 /* Ensure that at least one server looks at the queue. */
270 stat = start_server(jq);
273 Dmsg0(2300, "Return jobq_add\n");
278 * Remove a job from the job queue. Used only by cancel_job().
279 * jq is a queue that was created with jobq_init
280 * work_item is an element of work
282 * Note, it is "removed" from the job queue.
283 * If you want to cancel it, you need to provide some external means
284 * of doing so (e.g. pthread_kill()).
286 int jobq_remove(jobq_t *jq, JCR *jcr)
292 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
293 if (jq->valid != JOBQ_VALID) {
298 foreach_dlist(item, jq->waiting_jobs) {
299 if (jcr == item->jcr) {
306 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
310 /* Move item to be the first on the list */
311 jq->waiting_jobs->remove(item);
312 jq->ready_jobs->prepend(item);
313 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
315 stat = start_server(jq);
318 Dmsg0(2300, "Return jobq_remove\n");
324 * Start the server thread if it isn't already running
326 static int start_server(jobq_t *jq)
332 * if any threads are idle, wake one.
333 * Actually we do a broadcast because on /lib/tls
334 * these signals seem to get lost from time to time.
336 if (jq->idle_workers > 0) {
337 Dmsg0(2300, "Signal worker to wake up\n");
338 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
340 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
343 } else if (jq->num_workers < jq->max_workers) {
344 Dmsg0(2300, "Create worker thread\n");
345 /* No idle threads so create a new one */
346 set_thread_concurrency(jq->max_workers + 1);
348 if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
351 Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
360 * This is the worker thread that serves the job queue.
361 * When all the resources are acquired for the job,
362 * it will call the user's engine.
365 void *jobq_server(void *arg)
367 struct timespec timeout;
368 jobq_t *jq = (jobq_t *)arg;
369 jobq_item_t *je; /* job entry in queue */
371 bool timedout = false;
374 set_jcr_in_tsd(INVALID_JCR);
375 Dmsg0(2300, "Start jobq_server\n");
382 Dmsg0(2300, "Top of for loop\n");
383 if (!work && !jq->quit) {
384 gettimeofday(&tv, &tz);
386 timeout.tv_sec = tv.tv_sec + 4;
390 * Wait 4 seconds, then if no more work, exit
392 Dmsg0(2300, "pthread_cond_timedwait()\n");
393 stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
394 if (stat == ETIMEDOUT) {
395 Dmsg0(2300, "timedwait timedout.\n");
398 } else if (stat != 0) {
399 /* This shouldn't happen */
400 Dmsg0(2300, "This shouldn't happen\n");
409 * If anything is in the ready queue, run it
411 Dmsg0(2300, "Checking ready queue.\n");
412 while (!jq->ready_jobs->empty() && !jq->quit) {
414 je = (jobq_item_t *)jq->ready_jobs->first();
416 jq->ready_jobs->remove(je);
417 if (!jq->ready_jobs->empty()) {
418 Dmsg0(2300, "ready queue not empty start server\n");
419 if (start_server(jq) != 0) {
425 jq->running_jobs->append(je);
427 /* Attach jcr to this thread while we run the job */
428 jcr->my_thread_id = pthread_self();
429 jcr->set_killable(true);
431 Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
433 /* Release job queue lock */
436 /* Call user's routine here */
437 Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
438 jcr->use_count(), jcr->JobStatus);
441 /* Job finished detach from thread */
442 remove_jcr_from_tsd(je->jcr);
443 je->jcr->set_killable(false);
445 /* Clear the threadid, probably not necessary */
446 memset(&jcr->my_thread_id, 0, sizeof(jcr->my_thread_id));
448 Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
451 /* Reacquire job queue lock */
453 Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
454 jq->running_jobs->remove(je);
456 * Release locks if acquired. Note, they will not have
457 * been acquired for jobs canceled before they were
458 * put into the ready queue.
460 if (jcr->acquired_resource_locks) {
462 dec_write_store(jcr);
463 jcr->client->NumConcurrentJobs--;
464 jcr->job->NumConcurrentJobs--;
465 jcr->acquired_resource_locks = false;
468 if (reschedule_job(jcr, jq, je)) {
469 continue; /* go look for more work */
472 /* Clean up and release old jcr */
473 Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
474 jcr->SDJobStatus = 0;
475 V(jq->mutex); /* release internal lock */
477 free(je); /* release job entry */
478 P(jq->mutex); /* reacquire job queue lock */
481 * If any job in the wait queue can be run,
482 * move it to the ready queue
484 Dmsg0(2300, "Done check ready, now check wait queue.\n");
485 if (!jq->waiting_jobs->empty() && !jq->quit) {
487 bool running_allow_mix = false;
488 je = (jobq_item_t *)jq->waiting_jobs->first();
489 jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
491 Priority = re->jcr->JobPriority;
492 Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
493 re->jcr->JobId, Priority);
494 running_allow_mix = true;
496 Dmsg2(2300, "JobId %d is also running with %s\n",
498 re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
499 if (!re->jcr->job->allow_mixed_priority) {
500 running_allow_mix = false;
503 re = (jobq_item_t *)jq->running_jobs->next(re);
505 Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
506 running_allow_mix ? "allow" : "don't allow");
508 Priority = je->jcr->JobPriority;
509 Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
512 * Walk down the list of waiting jobs and attempt
513 * to acquire the resources it needs.
516 /* je is current job item on the queue, jn is the next one */
518 jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
520 Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
521 jcr->JobId, jcr->JobPriority, Priority,
522 jcr->job->allow_mixed_priority ? "mix" : "no mix");
524 /* Take only jobs of correct Priority */
525 if (!(jcr->JobPriority == Priority
526 || (jcr->JobPriority < Priority &&
527 jcr->job->allow_mixed_priority && running_allow_mix))) {
528 jcr->setJobStatus(JS_WaitPriority);
532 if (!acquire_resources(jcr)) {
533 /* If resource conflict, job is canceled */
534 if (!job_canceled(jcr)) {
535 je = jn; /* point to next waiting job */
541 * Got all locks, now remove it from wait queue and append it
542 * to the ready queue. Note, we may also get here if the
543 * job was canceled. Once it is "run", it will quickly
546 jq->waiting_jobs->remove(je);
547 jq->ready_jobs->append(je);
548 Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
549 je = jn; /* Point to next waiting job */
554 Dmsg0(2300, "Done checking wait queue.\n");
556 * If no more ready work and we are asked to quit, then do it
558 if (jq->ready_jobs->empty() && jq->quit) {
560 if (jq->num_workers == 0) {
561 Dmsg0(2300, "Wake up destroy routine\n");
562 /* Wake up destroy routine if he is waiting */
563 pthread_cond_broadcast(&jq->work);
567 Dmsg0(2300, "Check for work request\n");
569 * If no more work requests, and we waited long enough, quit
571 Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
572 jq->ready_jobs->empty());
573 if (jq->ready_jobs->empty() && timedout) {
574 Dmsg0(2300, "break big loop\n");
579 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
582 * If a job is waiting on a Resource, don't consume all
583 * the CPU time looping looking for work, and even more
584 * important, release the lock so that a job that has
585 * terminated can give us the resource.
588 bmicrosleep(2, 0); /* pause for 2 seconds */
590 /* Recompute work as something may have changed in last 2 secs */
591 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
593 Dmsg1(2300, "Loop again. work=%d\n", work);
594 } /* end of big for loop */
596 Dmsg0(200, "unlock mutex\n");
598 Dmsg0(2300, "End jobq_server\n");
603 * Returns true if cleanup done and we should look for more work
605 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
607 bool resched = false;
609 * Reschedule the job if requested and possible
611 /* Basic condition is that more reschedule times remain */
612 if (jcr->job->RescheduleTimes == 0 ||
613 jcr->reschedule_count < jcr->job->RescheduleTimes) {
615 /* Check for failed jobs */
616 (jcr->job->RescheduleOnError &&
617 !jcr->is_JobStatus(JS_Terminated) &&
618 !jcr->is_JobStatus(JS_Canceled) &&
619 jcr->is_JobType(JT_BACKUP));
622 char dt[50], dt2[50];
625 * Reschedule this job by cleaning it up, but
626 * reuse the same JobId if possible.
628 time_t now = time(NULL);
629 jcr->reschedule_count++;
630 jcr->sched_time = now + jcr->job->RescheduleInterval;
631 bstrftime(dt, sizeof(dt), now);
632 bstrftime(dt2, sizeof(dt2), jcr->sched_time);
633 Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
634 (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
635 Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
636 jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
637 dird_free_jcr_pointers(jcr); /* partial cleanup old stuff */
639 jcr->setJobStatus(JS_WaitStartTime);
640 jcr->SDJobStatus = 0;
642 if (!allow_duplicate_job(jcr)) {
645 /* Only jobs with no output jobs can run on same JCR */
646 if (jcr->JobBytes == 0) {
647 Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
650 * Special test here since a Virtual Full gets marked
651 * as a Full, so we look at the resource record
653 if (jcr->wasVirtualFull) {
654 jcr->setJobLevel(L_VIRTUAL_FULL);
657 * When we are using the same jcr then make sure to reset
658 * RealEndTime back to zero.
660 jcr->jr.RealEndTime = 0;
661 jobq_add(jq, jcr); /* queue the job to run again */
663 free_jcr(jcr); /* release jcr */
664 free(je); /* free the job entry */
665 return true; /* we already cleaned up */
668 * Something was actually backed up, so we cannot reuse
669 * the old JobId or there will be database record
670 * conflicts. We now create a new job, copying the
671 * appropriate fields.
673 JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
674 set_jcr_defaults(njcr, jcr->job);
675 njcr->reschedule_count = jcr->reschedule_count;
676 njcr->sched_time = jcr->sched_time;
677 njcr->initial_sched_time = jcr->initial_sched_time;
679 * Special test here since a Virtual Full gets marked
680 * as a Full, so we look at the resource record
682 if (jcr->wasVirtualFull) {
683 njcr->setJobLevel(L_VIRTUAL_FULL);
685 njcr->setJobLevel(jcr->getJobLevel());
687 njcr->pool = jcr->pool;
688 njcr->run_pool_override = jcr->run_pool_override;
689 njcr->next_pool = jcr->next_pool;
690 njcr->run_next_pool_override = jcr->run_next_pool_override;
691 njcr->full_pool = jcr->full_pool;
692 njcr->run_full_pool_override = jcr->run_full_pool_override;
693 njcr->inc_pool = jcr->inc_pool;
694 njcr->run_inc_pool_override = jcr->run_inc_pool_override;
695 njcr->diff_pool = jcr->diff_pool;
696 njcr->JobStatus = -1;
697 njcr->setJobStatus(jcr->JobStatus);
699 copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
704 copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
708 njcr->messages = jcr->messages;
709 njcr->spool_data = jcr->spool_data;
710 njcr->write_part_after_job = jcr->write_part_after_job;
711 Dmsg0(2300, "Call to run new job\n");
713 run_job(njcr); /* This creates a "new" job */
714 free_jcr(njcr); /* release "new" jcr */
716 Dmsg0(2300, "Back from running new job.\n");
722 * See if we can acquire all the necessary resources for the job (JCR)
724 * Returns: true if successful
725 * false if resource failure
727 static bool acquire_resources(JCR *jcr)
729 bool skip_this_jcr = false;
731 jcr->acquired_resource_locks = false;
733 * Turning this code off is likely to cause some deadlocks,
734 * but we do not really have enough information here to
735 * know if this is really a deadlock (it may be a dual drive
736 * autochanger), and in principle, the SD reservation system
737 * should detect these deadlocks, so push the work off on it.
740 if (jcr->rstore && jcr->rstore == jcr->wstore) { /* possible deadlock */
741 Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
742 " Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
743 jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
744 jcr->setJobStatus(JS_Canceled);
749 Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
750 if (!inc_read_store(jcr)) {
751 Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
752 jcr->setJobStatus(JS_WaitStoreRes);
758 Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
759 if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
760 jcr->wstore->NumConcurrentJobs++;
761 Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
762 } else if (jcr->rstore) {
764 skip_this_jcr = true;
766 Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
767 skip_this_jcr = true;
771 jcr->setJobStatus(JS_WaitStoreRes);
775 if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
776 jcr->client->NumConcurrentJobs++;
778 /* Back out previous locks */
779 dec_write_store(jcr);
781 jcr->setJobStatus(JS_WaitClientRes);
784 if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
785 jcr->job->NumConcurrentJobs++;
787 /* Back out previous locks */
788 dec_write_store(jcr);
790 jcr->client->NumConcurrentJobs--;
791 jcr->setJobStatus(JS_WaitJobRes);
795 jcr->acquired_resource_locks = true;
799 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
802 * Note: inc_read_store() and dec_read_store() are
803 * called from select_rstore() in src/dird/restore.c
805 bool inc_read_store(JCR *jcr)
808 if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs &&
809 (jcr->getJobType() == JT_RESTORE ||
810 jcr->rstore->MaxConcurrentReadJobs == 0 ||
811 jcr->rstore->NumConcurrentReadJobs < jcr->rstore->MaxConcurrentReadJobs)) {
812 jcr->rstore->NumConcurrentReadJobs++;
813 jcr->rstore->NumConcurrentJobs++;
814 Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
822 void dec_read_store(JCR *jcr)
826 jcr->rstore->NumConcurrentReadJobs--; /* back out rstore */
827 jcr->rstore->NumConcurrentJobs--; /* back out rstore */
828 Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
830 ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
831 ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
835 static void dec_write_store(JCR *jcr)
838 jcr->wstore->NumConcurrentJobs--;
839 Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
840 ASSERT(jcr->wstore->NumConcurrentJobs >= 0);