]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
95168218b45ac94c26f22b8ea03e06e80b31a820
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula job queue routines.
21  *
22  *  This code consists of three queues, the waiting_jobs
23  *  queue, where jobs are initially queued, the ready_jobs
24  *  queue, where jobs are placed when all the resources are
25  *  allocated and they can immediately be run, and the
26  *  running queue where jobs are placed when they are
27  *  running.
28  *
29  *  Kern Sibbald, July MMIII
30  *
31  *
32  *  This code was adapted from the Bacula workq, which was
33  *    adapted from "Programming with POSIX Threads", by
34  *    David R. Butenhof
35  *
36  */
37
38 #include "bacula.h"
39 #include "dird.h"
40
41 extern JCR *jobs;
42
43 /* Forward referenced functions */
44 extern "C" void *jobq_server(void *arg);
45 extern "C" void *sched_wait(void *arg);
46
47 static int  start_server(jobq_t *jq);
48 static bool acquire_resources(JCR *jcr);
49 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
50 static void dec_write_store(JCR *jcr);
51
52 /*
53  * Initialize a job queue
54  *
55  *  Returns: 0 on success
56  *           errno on failure
57  */
58 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
59 {
60    int stat;
61    jobq_item_t *item = NULL;
62
63    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
64       berrno be;
65       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
66       return stat;
67    }
68    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69       pthread_attr_destroy(&jq->attr);
70       return stat;
71    }
72    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
73       berrno be;
74       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
75       pthread_attr_destroy(&jq->attr);
76       return stat;
77    }
78    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
79       berrno be;
80       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
81       pthread_mutex_destroy(&jq->mutex);
82       pthread_attr_destroy(&jq->attr);
83       return stat;
84    }
85    jq->quit = false;
86    jq->max_workers = threads;         /* max threads to create */
87    jq->num_workers = 0;               /* no threads yet */
88    jq->idle_workers = 0;              /* no idle threads */
89    jq->engine = engine;               /* routine to run */
90    jq->valid = JOBQ_VALID;
91    /* Initialize the job queues */
92    jq->waiting_jobs = New(dlist(item, &item->link));
93    jq->running_jobs = New(dlist(item, &item->link));
94    jq->ready_jobs = New(dlist(item, &item->link));
95    return 0;
96 }
97
98 /*
99  * Destroy the job queue
100  *
101  * Returns: 0 on success
102  *          errno on failure
103  */
104 int jobq_destroy(jobq_t *jq)
105 {
106    int stat, stat1, stat2;
107
108    if (jq->valid != JOBQ_VALID) {
109       return EINVAL;
110    }
111    P(jq->mutex);
112    jq->valid = 0;                      /* prevent any more operations */
113
114    /*
115     * If any threads are active, wake them
116     */
117    if (jq->num_workers > 0) {
118       jq->quit = true;
119       if (jq->idle_workers) {
120          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
121             berrno be;
122             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
123             V(jq->mutex);
124             return stat;
125          }
126       }
127       while (jq->num_workers > 0) {
128          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
129             berrno be;
130             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
131             V(jq->mutex);
132             return stat;
133          }
134       }
135    }
136    V(jq->mutex);
137    stat  = pthread_mutex_destroy(&jq->mutex);
138    stat1 = pthread_cond_destroy(&jq->work);
139    stat2 = pthread_attr_destroy(&jq->attr);
140    delete jq->waiting_jobs;
141    delete jq->running_jobs;
142    delete jq->ready_jobs;
143    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
144 }
145
146 struct wait_pkt {
147    JCR *jcr;
148    jobq_t *jq;
149 };
150
151 /*
152  * Wait until schedule time arrives before starting. Normally
153  *  this routine is only used for jobs started from the console
154  *  for which the user explicitly specified a start time. Otherwise
155  *  most jobs are put into the job queue only when their
156  *  scheduled time arives.
157  */
158 extern "C"
159 void *sched_wait(void *arg)
160 {
161    JCR *jcr = ((wait_pkt *)arg)->jcr;
162    jobq_t *jq = ((wait_pkt *)arg)->jq;
163
164    set_jcr_in_tsd(INVALID_JCR);
165    Dmsg0(2300, "Enter sched_wait.\n");
166    free(arg);
167    time_t wtime = jcr->sched_time - time(NULL);
168    jcr->setJobStatus(JS_WaitStartTime);
169    /* Wait until scheduled time arrives */
170    if (wtime > 0) {
171       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
172          jcr->Job, wtime);
173    }
174    /* Check every 30 seconds if canceled */
175    while (wtime > 0) {
176       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
177          jcr->JobId, wtime, jcr->use_count());
178       if (wtime > 30) {
179          wtime = 30;
180       }
181       bmicrosleep(wtime, 0);
182       if (job_canceled(jcr)) {
183          break;
184       }
185       wtime = jcr->sched_time - time(NULL);
186    }
187    Dmsg1(200, "resched use=%d\n", jcr->use_count());
188    jobq_add(jq, jcr);
189    free_jcr(jcr);                     /* we are done with jcr */
190    Dmsg0(2300, "Exit sched_wait\n");
191    return NULL;
192 }
193
194 /* Procedure to update the Client->NumConcurrentJobs */
195 static void update_client_numconcurrentjobs(JCR *jcr, int val)
196 {
197    if (!jcr->client) {
198       return;
199    }
200
201    switch (jcr->getJobType())
202    {
203    case JT_MIGRATE:
204    case JT_COPY:
205    case JT_ADMIN:
206       break;
207    case JT_BACKUP:
208       if (jcr->no_client_used()) {
209          break;
210       }
211    /* Failback wanted */
212    default:
213       jcr->client->NumConcurrentJobs += val;
214       break;
215    }
216 }
217
218 /*
219  *  Add a job to the queue
220  *    jq is a queue that was created with jobq_init
221  */
222 int jobq_add(jobq_t *jq, JCR *jcr)
223 {
224    int stat;
225    jobq_item_t *item, *li;
226    bool inserted = false;
227    time_t wtime = jcr->sched_time - time(NULL);
228    pthread_t id;
229    wait_pkt *sched_pkt;
230
231    if (!jcr->term_wait_inited) {
232       /* Initialize termination condition variable */
233       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
234          berrno be;
235          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
236          return stat;
237       }
238       jcr->term_wait_inited = true;
239    }
240
241    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
242    if (jq->valid != JOBQ_VALID) {
243       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
244       return EINVAL;
245    }
246
247    jcr->inc_use_count();                 /* mark jcr in use by us */
248    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
249    if (!job_canceled(jcr) && wtime > 0) {
250       set_thread_concurrency(jq->max_workers + 2);
251       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
252       sched_pkt->jcr = jcr;
253       sched_pkt->jq = jq;
254       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
255       if (stat != 0) {                /* thread not created */
256          berrno be;
257          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
258       }
259       return stat;
260    }
261
262    P(jq->mutex);
263
264    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
265       free_jcr(jcr);                    /* release jcr */
266       return ENOMEM;
267    }
268    item->jcr = jcr;
269
270    /* While waiting in a queue this job is not attached to a thread */
271    set_jcr_in_tsd(INVALID_JCR);
272    if (job_canceled(jcr)) {
273       /* Add job to ready queue so that it is canceled quickly */
274       jq->ready_jobs->prepend(item);
275       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
276    } else {
277       /* Add this job to the wait queue in priority sorted order */
278       foreach_dlist(li, jq->waiting_jobs) {
279          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
280             li->jcr->JobId, li->jcr->JobPriority);
281          if (li->jcr->JobPriority > jcr->JobPriority) {
282             jq->waiting_jobs->insert_before(item, li);
283             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
284                li->jcr->JobId, jcr->JobId);
285             inserted = true;
286             break;
287          }
288       }
289       /* If not jobs in wait queue, append it */
290       if (!inserted) {
291          jq->waiting_jobs->append(item);
292          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
293       }
294    }
295
296    /* Ensure that at least one server looks at the queue. */
297    stat = start_server(jq);
298
299    V(jq->mutex);
300    Dmsg0(2300, "Return jobq_add\n");
301    return stat;
302 }
303
304 /*
305  *  Remove a job from the job queue. Used only by cancel_job().
306  *    jq is a queue that was created with jobq_init
307  *    work_item is an element of work
308  *
309  *   Note, it is "removed" from the job queue.
310  *    If you want to cancel it, you need to provide some external means
311  *    of doing so (e.g. pthread_kill()).
312  */
313 int jobq_remove(jobq_t *jq, JCR *jcr)
314 {
315    int stat;
316    bool found = false;
317    jobq_item_t *item;
318
319    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
320    if (jq->valid != JOBQ_VALID) {
321       return EINVAL;
322    }
323
324    P(jq->mutex);
325    foreach_dlist(item, jq->waiting_jobs) {
326       if (jcr == item->jcr) {
327          found = true;
328          break;
329       }
330    }
331    if (!found) {
332       V(jq->mutex);
333       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
334       return EINVAL;
335    }
336
337    /* Move item to be the first on the list */
338    jq->waiting_jobs->remove(item);
339    jq->ready_jobs->prepend(item);
340    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
341
342    stat = start_server(jq);
343
344    V(jq->mutex);
345    Dmsg0(2300, "Return jobq_remove\n");
346    return stat;
347 }
348
349
350 /*
351  * Start the server thread if it isn't already running
352  */
353 static int start_server(jobq_t *jq)
354 {
355    int stat = 0;
356    pthread_t id;
357
358    /*
359     * if any threads are idle, wake one.
360     *   Actually we do a broadcast because on /lib/tls
361     *   these signals seem to get lost from time to time.
362     */
363    if (jq->idle_workers > 0) {
364       Dmsg0(2300, "Signal worker to wake up\n");
365       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
366          berrno be;
367          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
368          return stat;
369       }
370    } else if (jq->num_workers < jq->max_workers) {
371       Dmsg0(2300, "Create worker thread\n");
372       /* No idle threads so create a new one */
373       set_thread_concurrency(jq->max_workers + 1);
374       jq->num_workers++;
375       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
376          berrno be;
377          jq->num_workers--;
378          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
379          return stat;
380       }
381    }
382    return stat;
383 }
384
385
386 /*
387  * This is the worker thread that serves the job queue.
388  * When all the resources are acquired for the job,
389  *  it will call the user's engine.
390  */
391 extern "C"
392 void *jobq_server(void *arg)
393 {
394    struct timespec timeout;
395    jobq_t *jq = (jobq_t *)arg;
396    jobq_item_t *je;                   /* job entry in queue */
397    int stat;
398    bool timedout = false;
399    bool work = true;
400
401    set_jcr_in_tsd(INVALID_JCR);
402    Dmsg0(2300, "Start jobq_server\n");
403    P(jq->mutex);
404
405    for (;;) {
406       struct timeval tv;
407       struct timezone tz;
408
409       Dmsg0(2300, "Top of for loop\n");
410       if (!work && !jq->quit) {
411          gettimeofday(&tv, &tz);
412          timeout.tv_nsec = 0;
413          timeout.tv_sec = tv.tv_sec + 4;
414
415          while (!jq->quit) {
416             /*
417              * Wait 4 seconds, then if no more work, exit
418              */
419             Dmsg0(2300, "pthread_cond_timedwait()\n");
420             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
421             if (stat == ETIMEDOUT) {
422                Dmsg0(2300, "timedwait timedout.\n");
423                timedout = true;
424                break;
425             } else if (stat != 0) {
426                /* This shouldn't happen */
427                Dmsg0(2300, "This shouldn't happen\n");
428                jq->num_workers--;
429                V(jq->mutex);
430                return NULL;
431             }
432             break;
433          }
434       }
435       /*
436        * If anything is in the ready queue, run it
437        */
438       Dmsg0(2300, "Checking ready queue.\n");
439       while (!jq->ready_jobs->empty() && !jq->quit) {
440          JCR *jcr;
441          je = (jobq_item_t *)jq->ready_jobs->first();
442          jcr = je->jcr;
443          jq->ready_jobs->remove(je);
444          if (!jq->ready_jobs->empty()) {
445             Dmsg0(2300, "ready queue not empty start server\n");
446             if (start_server(jq) != 0) {
447                jq->num_workers--;
448                V(jq->mutex);
449                return NULL;
450             }
451          }
452          jq->running_jobs->append(je);
453
454          /* Attach jcr to this thread while we run the job */
455          jcr->my_thread_id = pthread_self();
456          jcr->set_killable(true);
457          set_jcr_in_tsd(jcr);
458          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
459
460          /* Release job queue lock */
461          V(jq->mutex);
462
463          /* Call user's routine here */
464          Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
465             jcr->use_count(), jcr->JobStatus);
466          jq->engine(je->jcr);
467
468          /* Job finished detach from thread */
469          remove_jcr_from_tsd(je->jcr);
470          je->jcr->set_killable(false);
471
472          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
473             jcr->use_count());
474
475          /* Reacquire job queue lock */
476          P(jq->mutex);
477          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
478          jq->running_jobs->remove(je);
479          /*
480           * Release locks if acquired. Note, they will not have
481           *  been acquired for jobs canceled before they were
482           *  put into the ready queue.
483           */
484          if (jcr->acquired_resource_locks) {
485             dec_read_store(jcr);
486             dec_write_store(jcr);
487             update_client_numconcurrentjobs(jcr, -1);
488             jcr->job->NumConcurrentJobs--;
489             jcr->acquired_resource_locks = false;
490          }
491
492          if (reschedule_job(jcr, jq, je)) {
493             continue;              /* go look for more work */
494          }
495
496          /* Clean up and release old jcr */
497          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
498          jcr->SDJobStatus = 0;
499          V(jq->mutex);                /* release internal lock */
500          free_jcr(jcr);
501          free(je);                    /* release job entry */
502          P(jq->mutex);                /* reacquire job queue lock */
503       }
504       /*
505        * If any job in the wait queue can be run,
506        *  move it to the ready queue
507        */
508       Dmsg0(2300, "Done check ready, now check wait queue.\n");
509       if (!jq->waiting_jobs->empty() && !jq->quit) {
510          int Priority;
511          bool running_allow_mix = false;
512          je = (jobq_item_t *)jq->waiting_jobs->first();
513          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
514          if (re) {
515             Priority = re->jcr->JobPriority;
516             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
517                   re->jcr->JobId, Priority);
518             running_allow_mix = true;
519             for ( ; re; ) {
520                Dmsg2(2300, "JobId %d is also running with %s\n",
521                      re->jcr->JobId,
522                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
523                if (!re->jcr->job->allow_mixed_priority) {
524                   running_allow_mix = false;
525                   break;
526                }
527                re = (jobq_item_t *)jq->running_jobs->next(re);
528             }
529             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
530                   running_allow_mix ? "allow" : "don't allow");
531          } else {
532             Priority = je->jcr->JobPriority;
533             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
534          }
535          /*
536           * Walk down the list of waiting jobs and attempt
537           *   to acquire the resources it needs.
538           */
539          for ( ; je;  ) {
540             /* je is current job item on the queue, jn is the next one */
541             JCR *jcr = je->jcr;
542             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
543
544             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
545                   jcr->JobId, jcr->JobPriority, Priority,
546                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
547
548             /* Take only jobs of correct Priority */
549             if (!(jcr->JobPriority == Priority
550                   || (jcr->JobPriority < Priority &&
551                       jcr->job->allow_mixed_priority && running_allow_mix))) {
552                jcr->setJobStatus(JS_WaitPriority);
553                break;
554             }
555
556             if (!acquire_resources(jcr)) {
557                /* If resource conflict, job is canceled */
558                if (!job_canceled(jcr)) {
559                   je = jn;            /* point to next waiting job */
560                   continue;
561                }
562             }
563
564             /*
565              * Got all locks, now remove it from wait queue and append it
566              *   to the ready queue.  Note, we may also get here if the
567              *    job was canceled.  Once it is "run", it will quickly
568              *    terminate.
569              */
570             jq->waiting_jobs->remove(je);
571             jq->ready_jobs->append(je);
572             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
573             je = jn;                  /* Point to next waiting job */
574          } /* end for loop */
575
576       } /* end if */
577
578       Dmsg0(2300, "Done checking wait queue.\n");
579       /*
580        * If no more ready work and we are asked to quit, then do it
581        */
582       if (jq->ready_jobs->empty() && jq->quit) {
583          jq->num_workers--;
584          if (jq->num_workers == 0) {
585             Dmsg0(2300, "Wake up destroy routine\n");
586             /* Wake up destroy routine if he is waiting */
587             pthread_cond_broadcast(&jq->work);
588          }
589          break;
590       }
591       Dmsg0(2300, "Check for work request\n");
592       /*
593        * If no more work requests, and we waited long enough, quit
594        */
595       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
596          jq->ready_jobs->empty());
597       if (jq->ready_jobs->empty() && timedout) {
598          Dmsg0(2300, "break big loop\n");
599          jq->num_workers--;
600          break;
601       }
602
603       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
604       if (work) {
605          /*
606           * If a job is waiting on a Resource, don't consume all
607           *   the CPU time looping looking for work, and even more
608           *   important, release the lock so that a job that has
609           *   terminated can give us the resource.
610           */
611          V(jq->mutex);
612          bmicrosleep(2, 0);              /* pause for 2 seconds */
613          P(jq->mutex);
614          /* Recompute work as something may have changed in last 2 secs */
615          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
616       }
617       Dmsg1(2300, "Loop again. work=%d\n", work);
618    } /* end of big for loop */
619
620    Dmsg0(200, "unlock mutex\n");
621    V(jq->mutex);
622    Dmsg0(2300, "End jobq_server\n");
623    return NULL;
624 }
625
626 /*
627  * Returns true if cleanup done and we should look for more work
628  */
629 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
630 {
631    bool resched = false;
632    /*
633     * Reschedule the job if requested and possible
634     */
635    /* Basic condition is that more reschedule times remain */
636    if (jcr->job->RescheduleTimes == 0 ||
637        jcr->reschedule_count < jcr->job->RescheduleTimes) {
638       resched =
639          /* Check for incomplete jobs */
640          (jcr->RescheduleIncompleteJobs &&
641           jcr->is_incomplete() && jcr->is_JobType(JT_BACKUP) &&
642           !(jcr->HasBase||jcr->is_JobLevel(L_BASE))) ||
643          /* Check for failed jobs */
644          (jcr->job->RescheduleOnError &&
645           !jcr->is_JobStatus(JS_Terminated) &&
646           !jcr->is_JobStatus(JS_Canceled) &&
647           jcr->is_JobType(JT_BACKUP));
648    }
649    if (resched) {
650        char dt[50], dt2[50];
651
652        /*
653         * Reschedule this job by cleaning it up, but
654         *  reuse the same JobId if possible.
655         */
656       jcr->rerunning = jcr->is_incomplete();   /* save incomplete status */
657       time_t now = time(NULL);
658       jcr->reschedule_count++;
659       jcr->sched_time = now + jcr->job->RescheduleInterval;
660       bstrftime(dt, sizeof(dt), now);
661       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
662       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
663             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
664       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
665            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
666       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
667       jcr->JobStatus = -1;
668       jcr->setJobStatus(JS_WaitStartTime);
669       jcr->SDJobStatus = 0;
670       jcr->JobErrors = 0;
671       if (!allow_duplicate_job(jcr)) {
672          return false;
673       }
674       /* Only jobs with no output or Incomplete jobs can run on same JCR */
675       if (jcr->JobBytes == 0 || jcr->rerunning) {
676          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
677          V(jq->mutex);
678          /*
679           * Special test here since a Virtual Full gets marked
680           *  as a Full, so we look at the resource record
681           */
682          if (jcr->wasVirtualFull) {
683             jcr->setJobLevel(L_VIRTUAL_FULL);
684          }
685          /*
686           * When we are using the same jcr then make sure to reset
687           *   RealEndTime back to zero.
688           */
689          jcr->jr.RealEndTime = 0;
690          jobq_add(jq, jcr);     /* queue the job to run again */
691          P(jq->mutex);
692          free_jcr(jcr);         /* release jcr */
693          free(je);              /* free the job entry */
694          return true;           /* we already cleaned up */
695       }
696       /*
697        * Something was actually backed up, so we cannot reuse
698        *   the old JobId or there will be database record
699        *   conflicts.  We now create a new job, copying the
700        *   appropriate fields.
701        */
702       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
703       set_jcr_defaults(njcr, jcr->job);
704       /*
705        * Eliminate the new job_end_push, then copy the one from
706        *  the old job, and set the old one to be empty.
707        */
708       void *v;
709       lock_jobs();              /* protect ourself from reload_config() */
710       LockRes();
711       foreach_alist(v, (&jcr->job_end_push)) {
712          njcr->job_end_push.append(v);
713       }
714       jcr->job_end_push.destroy();
715       jcr->job_end_push.init(1, false);
716       UnlockRes();
717       unlock_jobs();
718
719       njcr->reschedule_count = jcr->reschedule_count;
720       njcr->sched_time = jcr->sched_time;
721       njcr->initial_sched_time = jcr->initial_sched_time;
722       /*
723        * Special test here since a Virtual Full gets marked
724        *  as a Full, so we look at the resource record
725        */
726       if (jcr->wasVirtualFull) {
727          njcr->setJobLevel(L_VIRTUAL_FULL);
728       } else {
729          njcr->setJobLevel(jcr->getJobLevel());
730       }
731       njcr->pool = jcr->pool;
732       njcr->run_pool_override = jcr->run_pool_override;
733       njcr->next_pool = jcr->next_pool;
734       njcr->run_next_pool_override = jcr->run_next_pool_override;
735       njcr->full_pool = jcr->full_pool;
736       njcr->vfull_pool = jcr->vfull_pool;
737       njcr->run_full_pool_override = jcr->run_full_pool_override;
738       njcr->run_vfull_pool_override = jcr->run_vfull_pool_override;
739       njcr->inc_pool = jcr->inc_pool;
740       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
741       njcr->diff_pool = jcr->diff_pool;
742       njcr->JobStatus = -1;
743       njcr->setJobStatus(jcr->JobStatus);
744       if (jcr->rstore) {
745          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
746       } else {
747          free_rstorage(njcr);
748       }
749       if (jcr->wstore) {
750          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
751       } else {
752          free_wstorage(njcr);
753       }
754       njcr->messages = jcr->messages;
755       njcr->spool_data = jcr->spool_data;
756       njcr->write_part_after_job = jcr->write_part_after_job;
757       Dmsg0(2300, "Call to run new job\n");
758       V(jq->mutex);
759       run_job(njcr);            /* This creates a "new" job */
760       free_jcr(njcr);           /* release "new" jcr */
761       P(jq->mutex);
762       Dmsg0(2300, "Back from running new job.\n");
763    }
764    return false;
765 }
766
767 /*
768  * See if we can acquire all the necessary resources for the job (JCR)
769  *
770  *  Returns: true  if successful
771  *           false if resource failure
772  */
773 static bool acquire_resources(JCR *jcr)
774 {
775    bool skip_this_jcr = false;
776
777    jcr->acquired_resource_locks = false;
778 /*
779  * Turning this code off is likely to cause some deadlocks,
780  *   but we do not really have enough information here to
781  *   know if this is really a deadlock (it may be a dual drive
782  *   autochanger), and in principle, the SD reservation system
783  *   should detect these deadlocks, so push the work off on it.
784  */
785 #ifdef xxx
786    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
787       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
788          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
789          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
790       jcr->setJobStatus(JS_Canceled);
791       return false;
792    }
793 #endif
794    if (jcr->rstore) {
795       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
796       if (!inc_read_store(jcr)) {
797          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
798          jcr->setJobStatus(JS_WaitStoreRes);
799          return false;
800       }
801    }
802
803    if (jcr->wstore) {
804       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
805       if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
806          jcr->wstore->NumConcurrentJobs++;
807          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
808       } else if (jcr->rstore) {
809          dec_read_store(jcr);
810          skip_this_jcr = true;
811       } else {
812          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
813          skip_this_jcr = true;
814       }
815    }
816    if (skip_this_jcr) {
817       jcr->setJobStatus(JS_WaitStoreRes);
818       return false;
819    }
820
821    if (jcr->client) {
822       if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
823          update_client_numconcurrentjobs(jcr, 1);
824       } else {
825          /* Back out previous locks */
826          dec_write_store(jcr);
827          dec_read_store(jcr);
828          jcr->setJobStatus(JS_WaitClientRes);
829          return false;
830       }
831    }
832    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
833       jcr->job->NumConcurrentJobs++;
834    } else {
835       /* Back out previous locks */
836       dec_write_store(jcr);
837       dec_read_store(jcr);
838       update_client_numconcurrentjobs(jcr, -1);
839       jcr->setJobStatus(JS_WaitJobRes);
840       return false;
841    }
842
843    jcr->acquired_resource_locks = true;
844    return true;
845 }
846
847 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
848
849 /*
850  * Note: inc_read_store() and dec_read_store() are
851  *   called from select_rstore() in src/dird/restore.c
852  */
853 bool inc_read_store(JCR *jcr)
854 {
855    P(rstore_mutex);
856    if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs &&
857        (jcr->getJobType() == JT_RESTORE ||
858         jcr->rstore->MaxConcurrentReadJobs == 0 ||
859         jcr->rstore->NumConcurrentReadJobs < jcr->rstore->MaxConcurrentReadJobs)) {
860       jcr->rstore->NumConcurrentReadJobs++;
861       jcr->rstore->NumConcurrentJobs++;
862       Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
863       V(rstore_mutex);
864       return true;
865    }
866    V(rstore_mutex);
867    return false;
868 }
869
870 void dec_read_store(JCR *jcr)
871 {
872    if (jcr->rstore) {
873       P(rstore_mutex);
874       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
875       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
876       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
877       V(rstore_mutex);
878       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
879       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
880    }
881 }
882
883 static void dec_write_store(JCR *jcr)
884 {
885    if (jcr->wstore) {
886       jcr->wstore->NumConcurrentJobs--;
887       Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
888       ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
889    }
890 }