2 * Bacula job queue routines.
4 * This code consists of three queues, the waiting_jobs
5 * queue, where jobs are initially queued, the ready_jobs
6 * queue, where jobs are placed when all the resources are
7 * allocated and they can immediately be run, and the
8 * running queue where jobs are placed when they are
11 * Kern Sibbald, July MMIII
15 * This code was adapted from the Bacula workq, which was
16 * adapted from "Programming with POSIX Threads", by
21 Copyright (C) 2003-2006 Kern Sibbald
23 This program is free software; you can redistribute it and/or
24 modify it under the terms of the GNU General Public License
25 version 2 as amended with additional clauses defined in the
26 file LICENSE in the main source directory.
28 This program is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 the file LICENSE for additional details.
40 /* Forward referenced functions */
41 extern "C" void *jobq_server(void *arg);
42 extern "C" void *sched_wait(void *arg);
44 static int start_server(jobq_t *jq);
45 static bool acquire_resources(JCR *jcr);
50 * Initialize a job queue
52 * Returns: 0 on success
55 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
58 jobq_item_t *item = NULL;
60 if ((stat = pthread_attr_init(&jq->attr)) != 0) {
62 Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.strerror(stat));
65 if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
66 pthread_attr_destroy(&jq->attr);
69 if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
71 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.strerror(stat));
72 pthread_attr_destroy(&jq->attr);
75 if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
77 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.strerror(stat));
78 pthread_mutex_destroy(&jq->mutex);
79 pthread_attr_destroy(&jq->attr);
83 jq->max_workers = threads; /* max threads to create */
84 jq->num_workers = 0; /* no threads yet */
85 jq->idle_workers = 0; /* no idle threads */
86 jq->engine = engine; /* routine to run */
87 jq->valid = JOBQ_VALID;
88 /* Initialize the job queues */
89 jq->waiting_jobs = New(dlist(item, &item->link));
90 jq->running_jobs = New(dlist(item, &item->link));
91 jq->ready_jobs = New(dlist(item, &item->link));
96 * Destroy the job queue
98 * Returns: 0 on success
101 int jobq_destroy(jobq_t *jq)
103 int stat, stat1, stat2;
105 if (jq->valid != JOBQ_VALID) {
108 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
110 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
113 jq->valid = 0; /* prevent any more operations */
116 * If any threads are active, wake them
118 if (jq->num_workers > 0) {
120 if (jq->idle_workers) {
121 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
123 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.strerror(stat));
124 pthread_mutex_unlock(&jq->mutex);
128 while (jq->num_workers > 0) {
129 if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
131 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.strerror(stat));
132 pthread_mutex_unlock(&jq->mutex);
137 if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
139 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.strerror(stat));
142 stat = pthread_mutex_destroy(&jq->mutex);
143 stat1 = pthread_cond_destroy(&jq->work);
144 stat2 = pthread_attr_destroy(&jq->attr);
145 delete jq->waiting_jobs;
146 delete jq->running_jobs;
147 delete jq->ready_jobs;
148 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
157 * Wait until schedule time arrives before starting. Normally
158 * this routine is only used for jobs started from the console
159 * for which the user explicitly specified a start time. Otherwise
160 * most jobs are put into the job queue only when their
161 * scheduled time arives.
164 void *sched_wait(void *arg)
166 JCR *jcr = ((wait_pkt *)arg)->jcr;
167 jobq_t *jq = ((wait_pkt *)arg)->jq;
169 Dmsg0(2300, "Enter sched_wait.\n");
171 time_t wtime = jcr->sched_time - time(NULL);
172 set_jcr_job_status(jcr, JS_WaitStartTime);
173 /* Wait until scheduled time arrives */
175 Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
178 /* Check every 30 seconds if canceled */
180 Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
181 jcr->JobId, wtime, jcr->use_count());
185 bmicrosleep(wtime, 0);
186 if (job_canceled(jcr)) {
189 wtime = jcr->sched_time - time(NULL);
191 Dmsg1(200, "resched use=%d\n", jcr->use_count());
193 free_jcr(jcr); /* we are done with jcr */
194 Dmsg0(2300, "Exit sched_wait\n");
199 * Add a job to the queue
200 * jq is a queue that was created with jobq_init
202 int jobq_add(jobq_t *jq, JCR *jcr)
205 jobq_item_t *item, *li;
206 bool inserted = false;
207 time_t wtime = jcr->sched_time - time(NULL);
211 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
212 if (jq->valid != JOBQ_VALID) {
213 Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
217 jcr->inc_use_count(); /* mark jcr in use by us */
218 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
219 if (!job_canceled(jcr) && wtime > 0) {
220 set_thread_concurrency(jq->max_workers + 2);
221 sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
222 sched_pkt->jcr = jcr;
224 stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
225 if (stat != 0) { /* thread not created */
227 Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.strerror(stat));
232 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
234 Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
235 free_jcr(jcr); /* release jcr */
239 if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
240 free_jcr(jcr); /* release jcr */
245 if (job_canceled(jcr)) {
246 /* Add job to ready queue so that it is canceled quickly */
247 jq->ready_jobs->prepend(item);
248 Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
250 /* Add this job to the wait queue in priority sorted order */
251 foreach_dlist(li, jq->waiting_jobs) {
252 Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
253 li->jcr->JobId, li->jcr->JobPriority);
254 if (li->jcr->JobPriority > jcr->JobPriority) {
255 jq->waiting_jobs->insert_before(item, li);
256 Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
257 li->jcr->JobId, jcr->JobId);
262 /* If not jobs in wait queue, append it */
264 jq->waiting_jobs->append(item);
265 Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
269 /* Ensure that at least one server looks at the queue. */
270 stat = start_server(jq);
272 pthread_mutex_unlock(&jq->mutex);
273 Dmsg0(2300, "Return jobq_add\n");
278 * Remove a job from the job queue. Used only by cancel_job().
279 * jq is a queue that was created with jobq_init
280 * work_item is an element of work
282 * Note, it is "removed" from the job queue.
283 * If you want to cancel it, you need to provide some external means
284 * of doing so (e.g. pthread_kill()).
286 int jobq_remove(jobq_t *jq, JCR *jcr)
292 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
293 if (jq->valid != JOBQ_VALID) {
297 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
299 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
303 foreach_dlist(item, jq->waiting_jobs) {
304 if (jcr == item->jcr) {
310 pthread_mutex_unlock(&jq->mutex);
311 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
315 /* Move item to be the first on the list */
316 jq->waiting_jobs->remove(item);
317 jq->ready_jobs->prepend(item);
318 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
320 stat = start_server(jq);
322 pthread_mutex_unlock(&jq->mutex);
323 Dmsg0(2300, "Return jobq_remove\n");
329 * Start the server thread if it isn't already running
331 static int start_server(jobq_t *jq)
337 * if any threads are idle, wake one --
338 * actually we do a broadcast because on /lib/tls
339 * these signals seem to get lost from time to time.
341 if (jq->idle_workers > 0) {
342 Dmsg0(2300, "Signal worker to wake up\n");
343 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
345 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.strerror(stat));
348 } else if (jq->num_workers < jq->max_workers) {
349 Dmsg0(2300, "Create worker thread\n");
350 /* No idle threads so create a new one */
351 set_thread_concurrency(jq->max_workers + 1);
352 if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
354 Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.strerror(stat));
363 * This is the worker thread that serves the job queue.
364 * When all the resources are acquired for the job,
365 * it will call the user's engine.
368 void *jobq_server(void *arg)
370 struct timespec timeout;
371 jobq_t *jq = (jobq_t *)arg;
372 jobq_item_t *je; /* job entry in queue */
374 bool timedout = false;
377 Dmsg0(2300, "Start jobq_server\n");
378 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
380 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
389 Dmsg0(2300, "Top of for loop\n");
390 if (!work && !jq->quit) {
391 gettimeofday(&tv, &tz);
393 timeout.tv_sec = tv.tv_sec + 4;
397 * Wait 4 seconds, then if no more work, exit
399 Dmsg0(2300, "pthread_cond_timedwait()\n");
400 stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
401 if (stat == ETIMEDOUT) {
402 Dmsg0(2300, "timedwait timedout.\n");
405 } else if (stat != 0) {
406 /* This shouldn't happen */
407 Dmsg0(2300, "This shouldn't happen\n");
409 pthread_mutex_unlock(&jq->mutex);
416 * If anything is in the ready queue, run it
418 Dmsg0(2300, "Checking ready queue.\n");
419 while (!jq->ready_jobs->empty() && !jq->quit) {
421 je = (jobq_item_t *)jq->ready_jobs->first();
423 jq->ready_jobs->remove(je);
424 if (!jq->ready_jobs->empty()) {
425 Dmsg0(2300, "ready queue not empty start server\n");
426 if (start_server(jq) != 0) {
428 pthread_mutex_unlock(&jq->mutex);
432 jq->running_jobs->append(je);
433 Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
435 /* Release job queue lock */
438 /* Call user's routine here */
439 Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
443 Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
446 /* Reacquire job queue lock */
448 Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
449 jq->running_jobs->remove(je);
451 * Release locks if acquired. Note, they will not have
452 * been acquired for jobs canceled before they were
453 * put into the ready queue.
455 if (jcr->acquired_resource_locks) {
456 jcr->store->NumConcurrentJobs--;
457 jcr->client->NumConcurrentJobs--;
458 jcr->job->NumConcurrentJobs--;
462 * Reschedule the job if necessary and requested
464 if (jcr->job->RescheduleOnError &&
465 jcr->JobStatus != JS_Terminated &&
466 jcr->JobStatus != JS_Canceled &&
467 jcr->job->RescheduleTimes > 0 &&
468 jcr->JobType == JT_BACKUP &&
469 jcr->reschedule_count < jcr->job->RescheduleTimes) {
473 * Reschedule this job by cleaning it up, but
474 * reuse the same JobId if possible.
476 jcr->reschedule_count++;
477 jcr->sched_time = time(NULL) + jcr->job->RescheduleInterval;
478 Dmsg2(2300, "Rescheduled Job %s to re-run in %d seconds.\n", jcr->Job,
479 (int)jcr->job->RescheduleInterval);
480 bstrftime(dt, sizeof(dt), time(NULL));
481 Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds.\n"),
482 jcr->Job, dt, (int)jcr->job->RescheduleInterval);
483 dird_free_jcr_pointers(jcr); /* partial cleanup old stuff */
484 jcr->JobStatus = JS_WaitStartTime;
485 jcr->SDJobStatus = 0;
486 if (jcr->JobBytes == 0) {
487 Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
488 jcr->JobStatus = JS_WaitStartTime;
490 jobq_add(jq, jcr); /* queue the job to run again */
492 free_jcr(jcr); /* release jcr */
493 free(je); /* free the job entry */
494 continue; /* look for another job to run */
497 * Something was actually backed up, so we cannot reuse
498 * the old JobId or there will be database record
499 * conflicts. We now create a new job, copying the
500 * appropriate fields.
502 JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
503 set_jcr_defaults(njcr, jcr->job);
504 njcr->reschedule_count = jcr->reschedule_count;
505 njcr->JobLevel = jcr->JobLevel;
506 njcr->JobStatus = jcr->JobStatus;
507 copy_storage(njcr, jcr->storage);
508 njcr->messages = jcr->messages;
509 Dmsg0(2300, "Call to run new job\n");
511 run_job(njcr); /* This creates a "new" job */
512 free_jcr(njcr); /* release "new" jcr */
514 Dmsg0(2300, "Back from running new job.\n");
516 /* Clean up and release old jcr */
518 db_close_database(jcr, jcr->db);
521 Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
522 jcr->SDJobStatus = 0;
523 V(jq->mutex); /* release internal lock */
525 free(je); /* release job entry */
526 P(jq->mutex); /* reacquire job queue lock */
529 * If any job in the wait queue can be run,
530 * move it to the ready queue
532 Dmsg0(2300, "Done check ready, now check wait queue.\n");
533 if (!jq->waiting_jobs->empty() && !jq->quit) {
535 je = (jobq_item_t *)jq->waiting_jobs->first();
536 jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
538 Priority = re->jcr->JobPriority;
539 Dmsg2(2300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId, Priority);
541 Priority = je->jcr->JobPriority;
542 Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
545 * Walk down the list of waiting jobs and attempt
546 * to acquire the resources it needs.
549 /* je is current job item on the queue, jn is the next one */
551 jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
553 Dmsg3(2300, "Examining Job=%d JobPri=%d want Pri=%d\n",
554 jcr->JobId, jcr->JobPriority, Priority);
556 /* Take only jobs of correct Priority */
557 if (jcr->JobPriority != Priority) {
558 set_jcr_job_status(jcr, JS_WaitPriority);
562 if (!acquire_resources(jcr)) {
563 je = jn; /* point to next waiting job */
567 /* Got all locks, now remove it from wait queue and append it
570 jcr->acquired_resource_locks = true;
571 jq->waiting_jobs->remove(je);
572 jq->ready_jobs->append(je);
573 Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
574 je = jn; /* Point to next waiting job */
579 Dmsg0(2300, "Done checking wait queue.\n");
581 * If no more ready work and we are asked to quit, then do it
583 if (jq->ready_jobs->empty() && jq->quit) {
585 if (jq->num_workers == 0) {
586 Dmsg0(2300, "Wake up destroy routine\n");
587 /* Wake up destroy routine if he is waiting */
588 pthread_cond_broadcast(&jq->work);
592 Dmsg0(2300, "Check for work request\n");
594 * If no more work requests, and we waited long enough, quit
596 Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
597 jq->ready_jobs->empty());
598 if (jq->ready_jobs->empty() && timedout) {
599 Dmsg0(2300, "break big loop\n");
604 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
607 * If a job is waiting on a Resource, don't consume all
608 * the CPU time looping looking for work, and even more
609 * important, release the lock so that a job that has
610 * terminated can give us the resource.
613 bmicrosleep(2, 0); /* pause for 2 seconds */
615 /* Recompute work as something may have changed in last 2 secs */
616 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
618 Dmsg1(2300, "Loop again. work=%d\n", work);
619 } /* end of big for loop */
621 Dmsg0(200, "unlock mutex\n");
623 Dmsg0(2300, "End jobq_server\n");
628 * See if we can acquire all the necessary resources for the job (JCR)
630 * Returns: true if successful
631 * false if resource failure
633 static bool acquire_resources(JCR *jcr)
635 bool skip_this_jcr = false;
637 if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
639 * Let only one Restore/verify job run at a time regardless
640 * of MaxConcurrentJobs.
642 if (jcr->store->NumConcurrentJobs == 0) {
643 jcr->store->NumConcurrentJobs = 1;
645 set_jcr_job_status(jcr, JS_WaitStoreRes);
648 /* We are not doing a Restore or Verify */
649 } else if (jcr->store->NumConcurrentJobs == 0 &&
650 jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
651 /* Simple case, first job */
652 jcr->store->NumConcurrentJobs = 1;
653 } else if (jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
654 jcr->store->NumConcurrentJobs++;
656 skip_this_jcr = true;
659 set_jcr_job_status(jcr, JS_WaitStoreRes);
663 if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
664 jcr->client->NumConcurrentJobs++;
666 /* Back out previous locks */
667 jcr->store->NumConcurrentJobs--;
668 set_jcr_job_status(jcr, JS_WaitClientRes);
671 if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
672 jcr->job->NumConcurrentJobs++;
674 /* Back out previous locks */
675 jcr->store->NumConcurrentJobs--;
676 jcr->client->NumConcurrentJobs--;
677 set_jcr_job_status(jcr, JS_WaitJobRes);
680 /* Check actual device availability */