2 * Bacula job queue routines.
4 * This code consists of three queues, the waiting_jobs
5 * queue, where jobs are initially queued, the ready_jobs
6 * queue, where jobs are placed when all the resources are
7 * allocated and they can immediately be run, and the
8 * running queue where jobs are placed when they are
11 * Kern Sibbald, July MMIII
15 * This code was adapted from the Bacula workq, which was
16 * adapted from "Programming with POSIX Threads", by
21 Copyright (C) 2003-2006 Kern Sibbald
23 This program is free software; you can redistribute it and/or
24 modify it under the terms of the GNU General Public License
25 version 2 as amended with additional clauses defined in the
26 file LICENSE in the main source directory.
28 This program is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 the file LICENSE for additional details.
40 /* Forward referenced functions */
41 extern "C" void *jobq_server(void *arg);
42 extern "C" void *sched_wait(void *arg);
44 static int start_server(jobq_t *jq);
45 static bool acquire_resources(JCR *jcr);
50 * Initialize a job queue
52 * Returns: 0 on success
55 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
58 jobq_item_t *item = NULL;
60 if ((stat = pthread_attr_init(&jq->attr)) != 0) {
62 Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.strerror(stat));
65 if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
66 pthread_attr_destroy(&jq->attr);
69 if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
71 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.strerror(stat));
72 pthread_attr_destroy(&jq->attr);
75 if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
77 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.strerror(stat));
78 pthread_mutex_destroy(&jq->mutex);
79 pthread_attr_destroy(&jq->attr);
83 jq->max_workers = threads; /* max threads to create */
84 jq->num_workers = 0; /* no threads yet */
85 jq->idle_workers = 0; /* no idle threads */
86 jq->engine = engine; /* routine to run */
87 jq->valid = JOBQ_VALID;
88 /* Initialize the job queues */
89 jq->waiting_jobs = New(dlist(item, &item->link));
90 jq->running_jobs = New(dlist(item, &item->link));
91 jq->ready_jobs = New(dlist(item, &item->link));
96 * Destroy the job queue
98 * Returns: 0 on success
101 int jobq_destroy(jobq_t *jq)
103 int stat, stat1, stat2;
105 if (jq->valid != JOBQ_VALID) {
108 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
110 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
113 jq->valid = 0; /* prevent any more operations */
116 * If any threads are active, wake them
118 if (jq->num_workers > 0) {
120 if (jq->idle_workers) {
121 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
123 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.strerror(stat));
124 pthread_mutex_unlock(&jq->mutex);
128 while (jq->num_workers > 0) {
129 if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
131 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.strerror(stat));
132 pthread_mutex_unlock(&jq->mutex);
137 if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
139 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.strerror(stat));
142 stat = pthread_mutex_destroy(&jq->mutex);
143 stat1 = pthread_cond_destroy(&jq->work);
144 stat2 = pthread_attr_destroy(&jq->attr);
145 delete jq->waiting_jobs;
146 delete jq->running_jobs;
147 delete jq->ready_jobs;
148 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
157 * Wait until schedule time arrives before starting. Normally
158 * this routine is only used for jobs started from the console
159 * for which the user explicitly specified a start time. Otherwise
160 * most jobs are put into the job queue only when their
161 * scheduled time arives.
164 void *sched_wait(void *arg)
166 JCR *jcr = ((wait_pkt *)arg)->jcr;
167 jobq_t *jq = ((wait_pkt *)arg)->jq;
169 Dmsg0(2300, "Enter sched_wait.\n");
171 time_t wtime = jcr->sched_time - time(NULL);
172 set_jcr_job_status(jcr, JS_WaitStartTime);
173 /* Wait until scheduled time arrives */
175 Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
178 /* Check every 30 seconds if canceled */
180 Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
181 jcr->JobId, wtime, jcr->use_count());
185 bmicrosleep(wtime, 0);
186 if (job_canceled(jcr)) {
189 wtime = jcr->sched_time - time(NULL);
191 Dmsg1(200, "resched use=%d\n", jcr->use_count());
193 free_jcr(jcr); /* we are done with jcr */
194 Dmsg0(2300, "Exit sched_wait\n");
199 * Add a job to the queue
200 * jq is a queue that was created with jobq_init
202 int jobq_add(jobq_t *jq, JCR *jcr)
205 jobq_item_t *item, *li;
206 bool inserted = false;
207 time_t wtime = jcr->sched_time - time(NULL);
211 if (!jcr->term_wait_inited) {
212 /* Initialize termination condition variable */
213 if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
215 Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.strerror(stat));
218 jcr->term_wait_inited = true;
221 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
222 if (jq->valid != JOBQ_VALID) {
223 Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
227 jcr->inc_use_count(); /* mark jcr in use by us */
228 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
229 if (!job_canceled(jcr) && wtime > 0) {
230 set_thread_concurrency(jq->max_workers + 2);
231 sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
232 sched_pkt->jcr = jcr;
234 stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
235 if (stat != 0) { /* thread not created */
237 Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.strerror(stat));
242 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
244 Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
245 free_jcr(jcr); /* release jcr */
249 if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
250 free_jcr(jcr); /* release jcr */
255 if (job_canceled(jcr)) {
256 /* Add job to ready queue so that it is canceled quickly */
257 jq->ready_jobs->prepend(item);
258 Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
260 /* Add this job to the wait queue in priority sorted order */
261 foreach_dlist(li, jq->waiting_jobs) {
262 Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
263 li->jcr->JobId, li->jcr->JobPriority);
264 if (li->jcr->JobPriority > jcr->JobPriority) {
265 jq->waiting_jobs->insert_before(item, li);
266 Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
267 li->jcr->JobId, jcr->JobId);
272 /* If not jobs in wait queue, append it */
274 jq->waiting_jobs->append(item);
275 Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
279 /* Ensure that at least one server looks at the queue. */
280 stat = start_server(jq);
282 pthread_mutex_unlock(&jq->mutex);
283 Dmsg0(2300, "Return jobq_add\n");
288 * Remove a job from the job queue. Used only by cancel_job().
289 * jq is a queue that was created with jobq_init
290 * work_item is an element of work
292 * Note, it is "removed" from the job queue.
293 * If you want to cancel it, you need to provide some external means
294 * of doing so (e.g. pthread_kill()).
296 int jobq_remove(jobq_t *jq, JCR *jcr)
302 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
303 if (jq->valid != JOBQ_VALID) {
307 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
309 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
313 foreach_dlist(item, jq->waiting_jobs) {
314 if (jcr == item->jcr) {
320 pthread_mutex_unlock(&jq->mutex);
321 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
325 /* Move item to be the first on the list */
326 jq->waiting_jobs->remove(item);
327 jq->ready_jobs->prepend(item);
328 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
330 stat = start_server(jq);
332 pthread_mutex_unlock(&jq->mutex);
333 Dmsg0(2300, "Return jobq_remove\n");
339 * Start the server thread if it isn't already running
341 static int start_server(jobq_t *jq)
347 * if any threads are idle, wake one --
348 * actually we do a broadcast because on /lib/tls
349 * these signals seem to get lost from time to time.
351 if (jq->idle_workers > 0) {
352 Dmsg0(2300, "Signal worker to wake up\n");
353 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
355 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.strerror(stat));
358 } else if (jq->num_workers < jq->max_workers) {
359 Dmsg0(2300, "Create worker thread\n");
360 /* No idle threads so create a new one */
361 set_thread_concurrency(jq->max_workers + 1);
362 if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
364 Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.strerror(stat));
373 * This is the worker thread that serves the job queue.
374 * When all the resources are acquired for the job,
375 * it will call the user's engine.
378 void *jobq_server(void *arg)
380 struct timespec timeout;
381 jobq_t *jq = (jobq_t *)arg;
382 jobq_item_t *je; /* job entry in queue */
384 bool timedout = false;
387 Dmsg0(2300, "Start jobq_server\n");
388 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
390 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
399 Dmsg0(2300, "Top of for loop\n");
400 if (!work && !jq->quit) {
401 gettimeofday(&tv, &tz);
403 timeout.tv_sec = tv.tv_sec + 4;
407 * Wait 4 seconds, then if no more work, exit
409 Dmsg0(2300, "pthread_cond_timedwait()\n");
410 stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
411 if (stat == ETIMEDOUT) {
412 Dmsg0(2300, "timedwait timedout.\n");
415 } else if (stat != 0) {
416 /* This shouldn't happen */
417 Dmsg0(2300, "This shouldn't happen\n");
419 pthread_mutex_unlock(&jq->mutex);
426 * If anything is in the ready queue, run it
428 Dmsg0(2300, "Checking ready queue.\n");
429 while (!jq->ready_jobs->empty() && !jq->quit) {
431 je = (jobq_item_t *)jq->ready_jobs->first();
433 jq->ready_jobs->remove(je);
434 if (!jq->ready_jobs->empty()) {
435 Dmsg0(2300, "ready queue not empty start server\n");
436 if (start_server(jq) != 0) {
438 pthread_mutex_unlock(&jq->mutex);
442 jq->running_jobs->append(je);
443 Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
445 /* Release job queue lock */
448 /* Call user's routine here */
449 Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
453 Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
456 /* Reacquire job queue lock */
458 Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
459 jq->running_jobs->remove(je);
461 * Release locks if acquired. Note, they will not have
462 * been acquired for jobs canceled before they were
463 * put into the ready queue.
465 if (jcr->acquired_resource_locks) {
466 jcr->store->NumConcurrentJobs--;
467 jcr->client->NumConcurrentJobs--;
468 jcr->job->NumConcurrentJobs--;
472 * Reschedule the job if necessary and requested
474 if (jcr->job->RescheduleOnError &&
475 jcr->JobStatus != JS_Terminated &&
476 jcr->JobStatus != JS_Canceled &&
477 jcr->job->RescheduleTimes > 0 &&
478 jcr->JobType == JT_BACKUP &&
479 (jcr->job->RescheduleTimes == 0 ||
480 jcr->reschedule_count < jcr->job->RescheduleTimes)) {
481 char dt[50], dt2[50];
484 * Reschedule this job by cleaning it up, but
485 * reuse the same JobId if possible.
487 time_t now = time(NULL);
488 jcr->reschedule_count++;
489 jcr->sched_time = now + jcr->job->RescheduleInterval;
490 bstrftime(dt, sizeof(dt), now);
491 bstrftime(dt2, sizeof(dt2), jcr->sched_time);
492 Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
493 (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
494 Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
495 jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
496 dird_free_jcr_pointers(jcr); /* partial cleanup old stuff */
498 set_jcr_job_status(jcr, JS_WaitStartTime);
499 jcr->SDJobStatus = 0;
500 if (jcr->JobBytes == 0) {
501 Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
503 jobq_add(jq, jcr); /* queue the job to run again */
505 free_jcr(jcr); /* release jcr */
506 free(je); /* free the job entry */
507 continue; /* look for another job to run */
510 * Something was actually backed up, so we cannot reuse
511 * the old JobId or there will be database record
512 * conflicts. We now create a new job, copying the
513 * appropriate fields.
515 JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
516 set_jcr_defaults(njcr, jcr->job);
517 njcr->reschedule_count = jcr->reschedule_count;
518 njcr->sched_time = jcr->sched_time;
519 njcr->JobLevel = jcr->JobLevel;
520 njcr->JobStatus = -1;
521 set_jcr_job_status(njcr, jcr->JobStatus);
522 copy_storage(njcr, jcr->storage, _("previous Job"));
523 njcr->messages = jcr->messages;
524 Dmsg0(2300, "Call to run new job\n");
526 run_job(njcr); /* This creates a "new" job */
527 free_jcr(njcr); /* release "new" jcr */
529 Dmsg0(2300, "Back from running new job.\n");
531 /* Clean up and release old jcr */
533 db_close_database(jcr, jcr->db);
536 Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
537 jcr->SDJobStatus = 0;
538 V(jq->mutex); /* release internal lock */
540 free(je); /* release job entry */
541 P(jq->mutex); /* reacquire job queue lock */
544 * If any job in the wait queue can be run,
545 * move it to the ready queue
547 Dmsg0(2300, "Done check ready, now check wait queue.\n");
548 if (!jq->waiting_jobs->empty() && !jq->quit) {
550 je = (jobq_item_t *)jq->waiting_jobs->first();
551 jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
553 Priority = re->jcr->JobPriority;
554 Dmsg2(2300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId, Priority);
556 Priority = je->jcr->JobPriority;
557 Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
560 * Walk down the list of waiting jobs and attempt
561 * to acquire the resources it needs.
564 /* je is current job item on the queue, jn is the next one */
566 jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
568 Dmsg3(2300, "Examining Job=%d JobPri=%d want Pri=%d\n",
569 jcr->JobId, jcr->JobPriority, Priority);
571 /* Take only jobs of correct Priority */
572 if (jcr->JobPriority != Priority) {
573 set_jcr_job_status(jcr, JS_WaitPriority);
577 if (!acquire_resources(jcr)) {
578 je = jn; /* point to next waiting job */
582 /* Got all locks, now remove it from wait queue and append it
585 jcr->acquired_resource_locks = true;
586 jq->waiting_jobs->remove(je);
587 jq->ready_jobs->append(je);
588 Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
589 je = jn; /* Point to next waiting job */
594 Dmsg0(2300, "Done checking wait queue.\n");
596 * If no more ready work and we are asked to quit, then do it
598 if (jq->ready_jobs->empty() && jq->quit) {
600 if (jq->num_workers == 0) {
601 Dmsg0(2300, "Wake up destroy routine\n");
602 /* Wake up destroy routine if he is waiting */
603 pthread_cond_broadcast(&jq->work);
607 Dmsg0(2300, "Check for work request\n");
609 * If no more work requests, and we waited long enough, quit
611 Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
612 jq->ready_jobs->empty());
613 if (jq->ready_jobs->empty() && timedout) {
614 Dmsg0(2300, "break big loop\n");
619 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
622 * If a job is waiting on a Resource, don't consume all
623 * the CPU time looping looking for work, and even more
624 * important, release the lock so that a job that has
625 * terminated can give us the resource.
628 bmicrosleep(2, 0); /* pause for 2 seconds */
630 /* Recompute work as something may have changed in last 2 secs */
631 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
633 Dmsg1(2300, "Loop again. work=%d\n", work);
634 } /* end of big for loop */
636 Dmsg0(200, "unlock mutex\n");
638 Dmsg0(2300, "End jobq_server\n");
643 * See if we can acquire all the necessary resources for the job (JCR)
645 * Returns: true if successful
646 * false if resource failure
648 static bool acquire_resources(JCR *jcr)
650 bool skip_this_jcr = false;
652 if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
654 * Let only one Restore/verify job run at a time regardless
655 * of MaxConcurrentJobs.
657 if (jcr->store->NumConcurrentJobs == 0) {
658 jcr->store->NumConcurrentJobs = 1;
660 set_jcr_job_status(jcr, JS_WaitStoreRes);
663 /* We are not doing a Restore or Verify */
664 } else if (jcr->store->NumConcurrentJobs == 0 &&
665 jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
666 /* Simple case, first job */
667 jcr->store->NumConcurrentJobs = 1;
668 } else if (jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
669 jcr->store->NumConcurrentJobs++;
671 skip_this_jcr = true;
674 set_jcr_job_status(jcr, JS_WaitStoreRes);
678 if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
679 jcr->client->NumConcurrentJobs++;
681 /* Back out previous locks */
682 jcr->store->NumConcurrentJobs--;
683 set_jcr_job_status(jcr, JS_WaitClientRes);
686 if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
687 jcr->job->NumConcurrentJobs++;
689 /* Back out previous locks */
690 jcr->store->NumConcurrentJobs--;
691 jcr->client->NumConcurrentJobs--;
692 set_jcr_job_status(jcr, JS_WaitJobRes);
695 /* Check actual device availability */