2 * Bacula job queue routines.
4 * This code consists of three queues, the waiting_jobs
5 * queue, where jobs are initially queued, the ready_jobs
6 * queue, where jobs are placed when all the resources are
7 * allocated and they can immediately be run, and the
8 * running queue where jobs are placed when they are
11 * Kern Sibbald, July MMIII
15 * This code was adapted from the Bacula workq, which was
16 * adapted from "Programming with POSIX Threads", by
21 Copyright (C) 2003-2006 Kern Sibbald
23 This program is free software; you can redistribute it and/or
24 modify it under the terms of the GNU General Public License
25 version 2 as amended with additional clauses defined in the
26 file LICENSE in the main source directory.
28 This program is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 the file LICENSE for additional details.
40 /* Forward referenced functions */
41 extern "C" void *jobq_server(void *arg);
42 extern "C" void *sched_wait(void *arg);
44 static int start_server(jobq_t *jq);
45 static bool acquire_resources(JCR *jcr);
50 * Initialize a job queue
52 * Returns: 0 on success
55 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
58 jobq_item_t *item = NULL;
60 if ((stat = pthread_attr_init(&jq->attr)) != 0) {
62 Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.strerror(stat));
65 if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
66 pthread_attr_destroy(&jq->attr);
69 if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
71 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.strerror(stat));
72 pthread_attr_destroy(&jq->attr);
75 if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
77 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.strerror(stat));
78 pthread_mutex_destroy(&jq->mutex);
79 pthread_attr_destroy(&jq->attr);
83 jq->max_workers = threads; /* max threads to create */
84 jq->num_workers = 0; /* no threads yet */
85 jq->idle_workers = 0; /* no idle threads */
86 jq->engine = engine; /* routine to run */
87 jq->valid = JOBQ_VALID;
88 /* Initialize the job queues */
89 jq->waiting_jobs = New(dlist(item, &item->link));
90 jq->running_jobs = New(dlist(item, &item->link));
91 jq->ready_jobs = New(dlist(item, &item->link));
96 * Destroy the job queue
98 * Returns: 0 on success
101 int jobq_destroy(jobq_t *jq)
103 int stat, stat1, stat2;
105 if (jq->valid != JOBQ_VALID) {
108 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
110 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
113 jq->valid = 0; /* prevent any more operations */
116 * If any threads are active, wake them
118 if (jq->num_workers > 0) {
120 if (jq->idle_workers) {
121 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
123 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.strerror(stat));
124 pthread_mutex_unlock(&jq->mutex);
128 while (jq->num_workers > 0) {
129 if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
131 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.strerror(stat));
132 pthread_mutex_unlock(&jq->mutex);
137 if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
139 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.strerror(stat));
142 stat = pthread_mutex_destroy(&jq->mutex);
143 stat1 = pthread_cond_destroy(&jq->work);
144 stat2 = pthread_attr_destroy(&jq->attr);
145 delete jq->waiting_jobs;
146 delete jq->running_jobs;
147 delete jq->ready_jobs;
148 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
157 * Wait until schedule time arrives before starting. Normally
158 * this routine is only used for jobs started from the console
159 * for which the user explicitly specified a start time. Otherwise
160 * most jobs are put into the job queue only when their
161 * scheduled time arives.
164 void *sched_wait(void *arg)
166 JCR *jcr = ((wait_pkt *)arg)->jcr;
167 jobq_t *jq = ((wait_pkt *)arg)->jq;
169 Dmsg0(2300, "Enter sched_wait.\n");
171 time_t wtime = jcr->sched_time - time(NULL);
172 set_jcr_job_status(jcr, JS_WaitStartTime);
173 /* Wait until scheduled time arrives */
175 Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
178 /* Check every 30 seconds if canceled */
180 Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
181 jcr->JobId, wtime, jcr->use_count());
185 bmicrosleep(wtime, 0);
186 if (job_canceled(jcr)) {
189 wtime = jcr->sched_time - time(NULL);
191 Dmsg1(200, "resched use=%d\n", jcr->use_count());
193 free_jcr(jcr); /* we are done with jcr */
194 Dmsg0(2300, "Exit sched_wait\n");
199 * Add a job to the queue
200 * jq is a queue that was created with jobq_init
202 int jobq_add(jobq_t *jq, JCR *jcr)
205 jobq_item_t *item, *li;
206 bool inserted = false;
207 time_t wtime = jcr->sched_time - time(NULL);
211 if (!jcr->term_wait_inited) {
212 /* Initialize termination condition variable */
213 if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
215 Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.strerror(stat));
218 jcr->term_wait_inited = true;
221 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
222 if (jq->valid != JOBQ_VALID) {
223 Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
227 jcr->inc_use_count(); /* mark jcr in use by us */
228 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
229 if (!job_canceled(jcr) && wtime > 0) {
230 set_thread_concurrency(jq->max_workers + 2);
231 sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
232 sched_pkt->jcr = jcr;
234 stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
235 if (stat != 0) { /* thread not created */
237 Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.strerror(stat));
242 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
244 Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
245 free_jcr(jcr); /* release jcr */
249 if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
250 free_jcr(jcr); /* release jcr */
255 if (job_canceled(jcr)) {
256 /* Add job to ready queue so that it is canceled quickly */
257 jq->ready_jobs->prepend(item);
258 Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
260 /* Add this job to the wait queue in priority sorted order */
261 foreach_dlist(li, jq->waiting_jobs) {
262 Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
263 li->jcr->JobId, li->jcr->JobPriority);
264 if (li->jcr->JobPriority > jcr->JobPriority) {
265 jq->waiting_jobs->insert_before(item, li);
266 Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
267 li->jcr->JobId, jcr->JobId);
272 /* If not jobs in wait queue, append it */
274 jq->waiting_jobs->append(item);
275 Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
279 /* Ensure that at least one server looks at the queue. */
280 stat = start_server(jq);
282 pthread_mutex_unlock(&jq->mutex);
283 Dmsg0(2300, "Return jobq_add\n");
288 * Remove a job from the job queue. Used only by cancel_job().
289 * jq is a queue that was created with jobq_init
290 * work_item is an element of work
292 * Note, it is "removed" from the job queue.
293 * If you want to cancel it, you need to provide some external means
294 * of doing so (e.g. pthread_kill()).
296 int jobq_remove(jobq_t *jq, JCR *jcr)
302 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
303 if (jq->valid != JOBQ_VALID) {
307 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
309 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
313 foreach_dlist(item, jq->waiting_jobs) {
314 if (jcr == item->jcr) {
320 pthread_mutex_unlock(&jq->mutex);
321 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
325 /* Move item to be the first on the list */
326 jq->waiting_jobs->remove(item);
327 jq->ready_jobs->prepend(item);
328 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
330 stat = start_server(jq);
332 pthread_mutex_unlock(&jq->mutex);
333 Dmsg0(2300, "Return jobq_remove\n");
339 * Start the server thread if it isn't already running
341 static int start_server(jobq_t *jq)
347 * if any threads are idle, wake one --
348 * actually we do a broadcast because on /lib/tls
349 * these signals seem to get lost from time to time.
351 if (jq->idle_workers > 0) {
352 Dmsg0(2300, "Signal worker to wake up\n");
353 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
355 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.strerror(stat));
358 } else if (jq->num_workers < jq->max_workers) {
359 Dmsg0(2300, "Create worker thread\n");
360 /* No idle threads so create a new one */
361 set_thread_concurrency(jq->max_workers + 1);
362 if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
364 Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.strerror(stat));
373 * This is the worker thread that serves the job queue.
374 * When all the resources are acquired for the job,
375 * it will call the user's engine.
378 void *jobq_server(void *arg)
380 struct timespec timeout;
381 jobq_t *jq = (jobq_t *)arg;
382 jobq_item_t *je; /* job entry in queue */
384 bool timedout = false;
387 Dmsg0(2300, "Start jobq_server\n");
388 if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
390 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
399 Dmsg0(2300, "Top of for loop\n");
400 if (!work && !jq->quit) {
401 gettimeofday(&tv, &tz);
403 timeout.tv_sec = tv.tv_sec + 4;
407 * Wait 4 seconds, then if no more work, exit
409 Dmsg0(2300, "pthread_cond_timedwait()\n");
410 stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
411 if (stat == ETIMEDOUT) {
412 Dmsg0(2300, "timedwait timedout.\n");
415 } else if (stat != 0) {
416 /* This shouldn't happen */
417 Dmsg0(2300, "This shouldn't happen\n");
419 pthread_mutex_unlock(&jq->mutex);
426 * If anything is in the ready queue, run it
428 Dmsg0(2300, "Checking ready queue.\n");
429 while (!jq->ready_jobs->empty() && !jq->quit) {
431 je = (jobq_item_t *)jq->ready_jobs->first();
433 jq->ready_jobs->remove(je);
434 if (!jq->ready_jobs->empty()) {
435 Dmsg0(2300, "ready queue not empty start server\n");
436 if (start_server(jq) != 0) {
438 pthread_mutex_unlock(&jq->mutex);
442 jq->running_jobs->append(je);
443 Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
445 /* Release job queue lock */
448 /* Call user's routine here */
449 Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
453 Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
456 /* Reacquire job queue lock */
458 Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
459 jq->running_jobs->remove(je);
461 * Release locks if acquired. Note, they will not have
462 * been acquired for jobs canceled before they were
463 * put into the ready queue.
465 if (jcr->acquired_resource_locks) {
467 jcr->rstore->NumConcurrentJobs = 0;
468 Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
471 jcr->wstore->NumConcurrentJobs--;
472 Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
474 jcr->client->NumConcurrentJobs--;
475 jcr->job->NumConcurrentJobs--;
476 jcr->acquired_resource_locks = false;
480 * Reschedule the job if necessary and requested
482 if (jcr->job->RescheduleOnError &&
483 jcr->JobStatus != JS_Terminated &&
484 jcr->JobStatus != JS_Canceled &&
485 jcr->job->RescheduleTimes > 0 &&
486 jcr->JobType == JT_BACKUP &&
487 (jcr->job->RescheduleTimes == 0 ||
488 jcr->reschedule_count < jcr->job->RescheduleTimes)) {
489 char dt[50], dt2[50];
492 * Reschedule this job by cleaning it up, but
493 * reuse the same JobId if possible.
495 time_t now = time(NULL);
496 jcr->reschedule_count++;
497 jcr->sched_time = now + jcr->job->RescheduleInterval;
498 bstrftime(dt, sizeof(dt), now);
499 bstrftime(dt2, sizeof(dt2), jcr->sched_time);
500 Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
501 (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
502 Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
503 jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
504 dird_free_jcr_pointers(jcr); /* partial cleanup old stuff */
506 set_jcr_job_status(jcr, JS_WaitStartTime);
507 jcr->SDJobStatus = 0;
508 if (jcr->JobBytes == 0) {
509 Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
511 jobq_add(jq, jcr); /* queue the job to run again */
513 free_jcr(jcr); /* release jcr */
514 free(je); /* free the job entry */
515 continue; /* look for another job to run */
518 * Something was actually backed up, so we cannot reuse
519 * the old JobId or there will be database record
520 * conflicts. We now create a new job, copying the
521 * appropriate fields.
523 JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
524 set_jcr_defaults(njcr, jcr->job);
525 njcr->reschedule_count = jcr->reschedule_count;
526 njcr->sched_time = jcr->sched_time;
527 njcr->JobLevel = jcr->JobLevel;
528 njcr->JobStatus = -1;
529 set_jcr_job_status(njcr, jcr->JobStatus);
531 copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
536 copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
540 njcr->messages = jcr->messages;
541 Dmsg0(2300, "Call to run new job\n");
543 run_job(njcr); /* This creates a "new" job */
544 free_jcr(njcr); /* release "new" jcr */
546 Dmsg0(2300, "Back from running new job.\n");
548 /* Clean up and release old jcr */
550 db_close_database(jcr, jcr->db);
553 Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
554 jcr->SDJobStatus = 0;
555 V(jq->mutex); /* release internal lock */
557 free(je); /* release job entry */
558 P(jq->mutex); /* reacquire job queue lock */
561 * If any job in the wait queue can be run,
562 * move it to the ready queue
564 Dmsg0(2300, "Done check ready, now check wait queue.\n");
565 if (!jq->waiting_jobs->empty() && !jq->quit) {
567 je = (jobq_item_t *)jq->waiting_jobs->first();
568 jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
570 Priority = re->jcr->JobPriority;
571 Dmsg2(2300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId, Priority);
573 Priority = je->jcr->JobPriority;
574 Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
577 * Walk down the list of waiting jobs and attempt
578 * to acquire the resources it needs.
581 /* je is current job item on the queue, jn is the next one */
583 jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
585 Dmsg3(2300, "Examining Job=%d JobPri=%d want Pri=%d\n",
586 jcr->JobId, jcr->JobPriority, Priority);
588 /* Take only jobs of correct Priority */
589 if (jcr->JobPriority != Priority) {
590 set_jcr_job_status(jcr, JS_WaitPriority);
594 if (!acquire_resources(jcr)) {
595 je = jn; /* point to next waiting job */
599 /* Got all locks, now remove it from wait queue and append it
602 jq->waiting_jobs->remove(je);
603 jq->ready_jobs->append(je);
604 Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
605 je = jn; /* Point to next waiting job */
610 Dmsg0(2300, "Done checking wait queue.\n");
612 * If no more ready work and we are asked to quit, then do it
614 if (jq->ready_jobs->empty() && jq->quit) {
616 if (jq->num_workers == 0) {
617 Dmsg0(2300, "Wake up destroy routine\n");
618 /* Wake up destroy routine if he is waiting */
619 pthread_cond_broadcast(&jq->work);
623 Dmsg0(2300, "Check for work request\n");
625 * If no more work requests, and we waited long enough, quit
627 Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
628 jq->ready_jobs->empty());
629 if (jq->ready_jobs->empty() && timedout) {
630 Dmsg0(2300, "break big loop\n");
635 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
638 * If a job is waiting on a Resource, don't consume all
639 * the CPU time looping looking for work, and even more
640 * important, release the lock so that a job that has
641 * terminated can give us the resource.
644 bmicrosleep(2, 0); /* pause for 2 seconds */
646 /* Recompute work as something may have changed in last 2 secs */
647 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
649 Dmsg1(2300, "Loop again. work=%d\n", work);
650 } /* end of big for loop */
652 Dmsg0(200, "unlock mutex\n");
654 Dmsg0(2300, "End jobq_server\n");
659 * See if we can acquire all the necessary resources for the job (JCR)
661 * Returns: true if successful
662 * false if resource failure
664 static bool acquire_resources(JCR *jcr)
666 bool skip_this_jcr = false;
668 jcr->acquired_resource_locks = false;
670 Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
672 * Let only one Restore/verify job run at a time regardless
673 * of MaxConcurrentJobs.
675 if (jcr->rstore->NumConcurrentJobs == 0) {
676 jcr->rstore->NumConcurrentJobs = 1;
677 Dmsg0(200, "Set rncj=1\n");
679 Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
680 set_jcr_job_status(jcr, JS_WaitStoreRes);
686 if (jcr->wstore->NumConcurrentJobs == 0 &&
687 jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
688 /* Simple case, first job */
689 jcr->wstore->NumConcurrentJobs = 1;
690 Dmsg0(200, "Set wncj=1\n");
691 } else if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
692 jcr->wstore->NumConcurrentJobs++;
693 Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
694 } else if (jcr->rstore) {
695 jcr->rstore->NumConcurrentJobs = 0; /* back out rstore */
696 Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
697 skip_this_jcr = true;
699 Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
700 skip_this_jcr = true;
704 set_jcr_job_status(jcr, JS_WaitStoreRes);
708 if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
709 jcr->client->NumConcurrentJobs++;
711 /* Back out previous locks */
713 jcr->wstore->NumConcurrentJobs--;
714 Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
717 jcr->rstore->NumConcurrentJobs = 0;
718 Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
720 set_jcr_job_status(jcr, JS_WaitClientRes);
723 if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
724 jcr->job->NumConcurrentJobs++;
726 /* Back out previous locks */
728 jcr->wstore->NumConcurrentJobs--;
729 Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
732 jcr->rstore->NumConcurrentJobs = 0;
733 Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
735 jcr->client->NumConcurrentJobs--;
736 set_jcr_job_status(jcr, JS_WaitJobRes);
739 /* Check actual device availability */
743 jcr->acquired_resource_locks = true;