]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
Restart incomplete jobs only if type Backup
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula job queue routines.
30  *
31  *  This code consists of three queues, the waiting_jobs
32  *  queue, where jobs are initially queued, the ready_jobs
33  *  queue, where jobs are placed when all the resources are
34  *  allocated and they can immediately be run, and the
35  *  running queue where jobs are placed when they are
36  *  running.
37  *
38  *  Kern Sibbald, July MMIII
39  *
40  *
41  *  This code was adapted from the Bacula workq, which was
42  *    adapted from "Programming with POSIX Threads", by
43  *    David R. Butenhof
44  *
45  */
46
47 #include "bacula.h"
48 #include "dird.h"
49
50 extern JCR *jobs;
51
52 /* Forward referenced functions */
53 extern "C" void *jobq_server(void *arg);
54 extern "C" void *sched_wait(void *arg);
55
56 static int  start_server(jobq_t *jq);
57 static bool acquire_resources(JCR *jcr);
58 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
59 static void dec_write_store(JCR *jcr);
60
61 /*
62  * Initialize a job queue
63  *
64  *  Returns: 0 on success
65  *           errno on failure
66  */
67 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
68 {
69    int stat;
70    jobq_item_t *item = NULL;
71
72    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
73       berrno be;
74       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
75       return stat;
76    }
77    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
78       pthread_attr_destroy(&jq->attr);
79       return stat;
80    }
81    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
82       berrno be;
83       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
84       pthread_attr_destroy(&jq->attr);
85       return stat;
86    }
87    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
88       berrno be;
89       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
90       pthread_mutex_destroy(&jq->mutex);
91       pthread_attr_destroy(&jq->attr);
92       return stat;
93    }
94    jq->quit = false;
95    jq->max_workers = threads;         /* max threads to create */
96    jq->num_workers = 0;               /* no threads yet */
97    jq->idle_workers = 0;              /* no idle threads */
98    jq->engine = engine;               /* routine to run */
99    jq->valid = JOBQ_VALID;
100    /* Initialize the job queues */
101    jq->waiting_jobs = New(dlist(item, &item->link));
102    jq->running_jobs = New(dlist(item, &item->link));
103    jq->ready_jobs = New(dlist(item, &item->link));
104    return 0;
105 }
106
107 /*
108  * Destroy the job queue
109  *
110  * Returns: 0 on success
111  *          errno on failure
112  */
113 int jobq_destroy(jobq_t *jq)
114 {
115    int stat, stat1, stat2;
116
117    if (jq->valid != JOBQ_VALID) {
118       return EINVAL;
119    }
120    P(jq->mutex);
121    jq->valid = 0;                      /* prevent any more operations */
122
123    /* 
124     * If any threads are active, wake them 
125     */
126    if (jq->num_workers > 0) {
127       jq->quit = true;
128       if (jq->idle_workers) {
129          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
130             berrno be;
131             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
132             V(jq->mutex);
133             return stat;
134          }
135       }
136       while (jq->num_workers > 0) {
137          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
138             berrno be;
139             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
140             V(jq->mutex);
141             return stat;
142          }
143       }
144    }
145    V(jq->mutex);
146    stat  = pthread_mutex_destroy(&jq->mutex);
147    stat1 = pthread_cond_destroy(&jq->work);
148    stat2 = pthread_attr_destroy(&jq->attr);
149    delete jq->waiting_jobs;
150    delete jq->running_jobs;
151    delete jq->ready_jobs;
152    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
153 }
154
155 struct wait_pkt {
156    JCR *jcr;
157    jobq_t *jq;
158 };
159
160 /*
161  * Wait until schedule time arrives before starting. Normally
162  *  this routine is only used for jobs started from the console
163  *  for which the user explicitly specified a start time. Otherwise
164  *  most jobs are put into the job queue only when their
165  *  scheduled time arives.
166  */
167 extern "C"
168 void *sched_wait(void *arg)
169 {
170    JCR *jcr = ((wait_pkt *)arg)->jcr;
171    jobq_t *jq = ((wait_pkt *)arg)->jq;
172
173    set_jcr_in_tsd(INVALID_JCR);
174    Dmsg0(2300, "Enter sched_wait.\n");
175    free(arg);
176    time_t wtime = jcr->sched_time - time(NULL);
177    set_jcr_job_status(jcr, JS_WaitStartTime);
178    /* Wait until scheduled time arrives */
179    if (wtime > 0) {
180       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
181          jcr->Job, wtime);
182    }
183    /* Check every 30 seconds if canceled */
184    while (wtime > 0) {
185       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", 
186          jcr->JobId, wtime, jcr->use_count());
187       if (wtime > 30) {
188          wtime = 30;
189       }
190       bmicrosleep(wtime, 0);
191       if (job_canceled(jcr)) {
192          break;
193       }
194       wtime = jcr->sched_time - time(NULL);
195    }
196    Dmsg1(200, "resched use=%d\n", jcr->use_count());
197    jobq_add(jq, jcr);
198    free_jcr(jcr);                     /* we are done with jcr */
199    Dmsg0(2300, "Exit sched_wait\n");
200    return NULL;
201 }
202
203 /*
204  *  Add a job to the queue
205  *    jq is a queue that was created with jobq_init
206  */
207 int jobq_add(jobq_t *jq, JCR *jcr)
208 {
209    int stat;
210    jobq_item_t *item, *li;
211    bool inserted = false;
212    time_t wtime = jcr->sched_time - time(NULL);
213    pthread_t id;
214    wait_pkt *sched_pkt;
215
216    if (!jcr->term_wait_inited) { 
217       /* Initialize termination condition variable */
218       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
219          berrno be;
220          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
221          return stat;
222       }
223       jcr->term_wait_inited = true;
224    }                           
225                              
226    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
227    if (jq->valid != JOBQ_VALID) {
228       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
229       return EINVAL;
230    }
231
232    jcr->inc_use_count();                 /* mark jcr in use by us */
233    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
234    if (!job_canceled(jcr) && wtime > 0) {
235       set_thread_concurrency(jq->max_workers + 2);
236       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
237       sched_pkt->jcr = jcr;
238       sched_pkt->jq = jq;
239       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
240       if (stat != 0) {                /* thread not created */
241          berrno be;
242          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
243       }
244       return stat;
245    }
246
247    P(jq->mutex);
248
249    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
250       free_jcr(jcr);                    /* release jcr */
251       return ENOMEM;
252    }
253    item->jcr = jcr;
254
255    /* While waiting in a queue this job is not attached to a thread */
256    set_jcr_in_tsd(INVALID_JCR);
257    if (job_canceled(jcr)) {
258       /* Add job to ready queue so that it is canceled quickly */
259       jq->ready_jobs->prepend(item);
260       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
261    } else {
262       /* Add this job to the wait queue in priority sorted order */
263       foreach_dlist(li, jq->waiting_jobs) {
264          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
265             li->jcr->JobId, li->jcr->JobPriority);
266          if (li->jcr->JobPriority > jcr->JobPriority) {
267             jq->waiting_jobs->insert_before(item, li);
268             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
269                li->jcr->JobId, jcr->JobId);
270             inserted = true;
271             break;
272          }
273       }
274       /* If not jobs in wait queue, append it */
275       if (!inserted) {
276          jq->waiting_jobs->append(item);
277          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
278       }
279    }
280
281    /* Ensure that at least one server looks at the queue. */
282    stat = start_server(jq);
283
284    V(jq->mutex);
285    Dmsg0(2300, "Return jobq_add\n");
286    return stat;
287 }
288
289 /*
290  *  Remove a job from the job queue. Used only by cancel_job().
291  *    jq is a queue that was created with jobq_init
292  *    work_item is an element of work
293  *
294  *   Note, it is "removed" from the job queue.
295  *    If you want to cancel it, you need to provide some external means
296  *    of doing so (e.g. pthread_kill()).
297  */
298 int jobq_remove(jobq_t *jq, JCR *jcr)
299 {
300    int stat;
301    bool found = false;
302    jobq_item_t *item;
303
304    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
305    if (jq->valid != JOBQ_VALID) {
306       return EINVAL;
307    }
308
309    P(jq->mutex);
310    foreach_dlist(item, jq->waiting_jobs) {
311       if (jcr == item->jcr) {
312          found = true;
313          break;
314       }
315    }
316    if (!found) {
317       V(jq->mutex);
318       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
319       return EINVAL;
320    }
321
322    /* Move item to be the first on the list */
323    jq->waiting_jobs->remove(item);
324    jq->ready_jobs->prepend(item);
325    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
326
327    stat = start_server(jq);
328
329    V(jq->mutex);
330    Dmsg0(2300, "Return jobq_remove\n");
331    return stat;
332 }
333
334
335 /*
336  * Start the server thread if it isn't already running
337  */
338 static int start_server(jobq_t *jq)
339 {
340    int stat = 0;
341    pthread_t id;
342
343    /*
344     * if any threads are idle, wake one.
345     *   Actually we do a broadcast because on /lib/tls 
346     *   these signals seem to get lost from time to time.
347     */
348    if (jq->idle_workers > 0) {
349       Dmsg0(2300, "Signal worker to wake up\n");
350       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
351          berrno be;
352          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
353          return stat;
354       }
355    } else if (jq->num_workers < jq->max_workers) {
356       Dmsg0(2300, "Create worker thread\n");
357       /* No idle threads so create a new one */
358       set_thread_concurrency(jq->max_workers + 1);
359       jq->num_workers++;
360       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
361          berrno be;
362          jq->num_workers--;
363          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
364          return stat;
365       }
366    }
367    return stat;
368 }
369
370
371 /*
372  * This is the worker thread that serves the job queue.
373  * When all the resources are acquired for the job,
374  *  it will call the user's engine.
375  */
376 extern "C"
377 void *jobq_server(void *arg)
378 {
379    struct timespec timeout;
380    jobq_t *jq = (jobq_t *)arg;
381    jobq_item_t *je;                   /* job entry in queue */
382    int stat;
383    bool timedout = false;
384    bool work = true;
385
386    set_jcr_in_tsd(INVALID_JCR);
387    Dmsg0(2300, "Start jobq_server\n");
388    P(jq->mutex);
389
390    for (;;) {
391       struct timeval tv;
392       struct timezone tz;
393
394       Dmsg0(2300, "Top of for loop\n");
395       if (!work && !jq->quit) {
396          gettimeofday(&tv, &tz);
397          timeout.tv_nsec = 0;
398          timeout.tv_sec = tv.tv_sec + 4;
399
400          while (!jq->quit) {
401             /*
402              * Wait 4 seconds, then if no more work, exit
403              */
404             Dmsg0(2300, "pthread_cond_timedwait()\n");
405             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
406             if (stat == ETIMEDOUT) {
407                Dmsg0(2300, "timedwait timedout.\n");
408                timedout = true;
409                break;
410             } else if (stat != 0) {
411                /* This shouldn't happen */
412                Dmsg0(2300, "This shouldn't happen\n");
413                jq->num_workers--;
414                V(jq->mutex);
415                return NULL;
416             }
417             break;
418          }
419       }
420       /*
421        * If anything is in the ready queue, run it
422        */
423       Dmsg0(2300, "Checking ready queue.\n");
424       while (!jq->ready_jobs->empty() && !jq->quit) {
425          JCR *jcr;
426          je = (jobq_item_t *)jq->ready_jobs->first();
427          jcr = je->jcr;
428          jq->ready_jobs->remove(je);
429          if (!jq->ready_jobs->empty()) {
430             Dmsg0(2300, "ready queue not empty start server\n");
431             if (start_server(jq) != 0) {
432                jq->num_workers--;
433                V(jq->mutex);
434                return NULL;
435             }
436          }
437          jq->running_jobs->append(je);
438
439          /* Attach jcr to this thread while we run the job */
440          jcr->set_killable(true);
441          set_jcr_in_tsd(jcr);
442          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
443
444          /* Release job queue lock */
445          V(jq->mutex);
446
447          /* Call user's routine here */
448          Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
449             jcr->use_count(), jcr->JobStatus);
450          jq->engine(je->jcr);
451
452          /* Job finished detach from thread */
453          remove_jcr_from_tsd(je->jcr);
454          je->jcr->set_killable(false);
455
456          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
457             jcr->use_count());
458
459          /* Reacquire job queue lock */
460          P(jq->mutex);
461          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
462          jq->running_jobs->remove(je);
463          /*
464           * Release locks if acquired. Note, they will not have
465           *  been acquired for jobs canceled before they were
466           *  put into the ready queue.
467           */
468          if (jcr->acquired_resource_locks) {
469             dec_read_store(jcr);
470             dec_write_store(jcr);
471             jcr->client->NumConcurrentJobs--;
472             jcr->job->NumConcurrentJobs--;
473             jcr->acquired_resource_locks = false;
474          }
475
476          if (reschedule_job(jcr, jq, je)) {
477             continue;              /* go look for more work */
478          }
479
480          /* Clean up and release old jcr */
481          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
482          jcr->SDJobStatus = 0;
483          V(jq->mutex);                /* release internal lock */
484          free_jcr(jcr);
485          free(je);                    /* release job entry */
486          P(jq->mutex);                /* reacquire job queue lock */
487       }
488       /*
489        * If any job in the wait queue can be run,
490        *  move it to the ready queue
491        */
492       Dmsg0(2300, "Done check ready, now check wait queue.\n");
493       if (!jq->waiting_jobs->empty() && !jq->quit) {
494          int Priority;
495          bool running_allow_mix = false;
496          je = (jobq_item_t *)jq->waiting_jobs->first();
497          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
498          if (re) {
499             Priority = re->jcr->JobPriority;
500             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
501                   re->jcr->JobId, Priority);
502             running_allow_mix = true;
503             for ( ; re; ) {
504                Dmsg2(2300, "JobId %d is also running with %s\n",
505                      re->jcr->JobId, 
506                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
507                if (!re->jcr->job->allow_mixed_priority) {
508                   running_allow_mix = false;
509                   break;
510                }
511                re = (jobq_item_t *)jq->running_jobs->next(re);
512             }
513             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
514                   running_allow_mix ? "allow" : "don't allow");
515          } else {
516             Priority = je->jcr->JobPriority;
517             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
518          }
519          /*
520           * Walk down the list of waiting jobs and attempt
521           *   to acquire the resources it needs.
522           */
523          for ( ; je;  ) {
524             /* je is current job item on the queue, jn is the next one */
525             JCR *jcr = je->jcr;
526             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
527
528             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
529                   jcr->JobId, jcr->JobPriority, Priority,
530                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
531
532             /* Take only jobs of correct Priority */
533             if (!(jcr->JobPriority == Priority
534                   || (jcr->JobPriority < Priority &&
535                       jcr->job->allow_mixed_priority && running_allow_mix))) {
536                set_jcr_job_status(jcr, JS_WaitPriority);
537                break;
538             }
539
540             if (!acquire_resources(jcr)) {
541                /* If resource conflict, job is canceled */
542                if (!job_canceled(jcr)) {
543                   je = jn;            /* point to next waiting job */
544                   continue;
545                }
546             }
547
548             /*
549              * Got all locks, now remove it from wait queue and append it
550              *   to the ready queue.  Note, we may also get here if the
551              *    job was canceled.  Once it is "run", it will quickly
552              *    terminate.
553              */
554             jq->waiting_jobs->remove(je);
555             jq->ready_jobs->append(je);
556             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
557             je = jn;                  /* Point to next waiting job */
558          } /* end for loop */
559
560       } /* end if */
561
562       Dmsg0(2300, "Done checking wait queue.\n");
563       /*
564        * If no more ready work and we are asked to quit, then do it
565        */
566       if (jq->ready_jobs->empty() && jq->quit) {
567          jq->num_workers--;
568          if (jq->num_workers == 0) {
569             Dmsg0(2300, "Wake up destroy routine\n");
570             /* Wake up destroy routine if he is waiting */
571             pthread_cond_broadcast(&jq->work);
572          }
573          break;
574       }
575       Dmsg0(2300, "Check for work request\n");
576       /*
577        * If no more work requests, and we waited long enough, quit
578        */
579       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
580          jq->ready_jobs->empty());
581       if (jq->ready_jobs->empty() && timedout) {
582          Dmsg0(2300, "break big loop\n");
583          jq->num_workers--;
584          break;
585       }
586
587       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
588       if (work) {
589          /*
590           * If a job is waiting on a Resource, don't consume all
591           *   the CPU time looping looking for work, and even more
592           *   important, release the lock so that a job that has
593           *   terminated can give us the resource.
594           */
595          V(jq->mutex);
596          bmicrosleep(2, 0);              /* pause for 2 seconds */
597          P(jq->mutex);
598          /* Recompute work as something may have changed in last 2 secs */
599          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
600       }
601       Dmsg1(2300, "Loop again. work=%d\n", work);
602    } /* end of big for loop */
603
604    Dmsg0(200, "unlock mutex\n");
605    V(jq->mutex);
606    Dmsg0(2300, "End jobq_server\n");
607    return NULL;
608 }
609
610 /*
611  * Returns true if cleanup done and we should look for more work
612  */
613 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
614 {
615    bool resched = false;
616    /*
617     * Reschedule the job if requested and possible
618     */
619    /* Basic condition is that more reschedule times remain */
620    if (jcr->job->RescheduleTimes == 0 ||
621        jcr->reschedule_count < jcr->job->RescheduleTimes) {
622       resched = 
623          /* Check for incomplete jobs */
624          (jcr->job->RescheduleIncompleteJobs && 
625           jcr->is_incomplete() && jcr->is_JobType(JT_BACKUP)) ||
626          /* Check for failed jobs */
627          (jcr->job->RescheduleOnError &&
628           !jcr->is_JobStatus(JS_Terminated) &&
629           !jcr->is_JobStatus(JS_Canceled) &&
630           jcr->is_JobType(JT_BACKUP));
631    }
632    if (resched) {
633        char dt[50], dt2[50];
634
635        /*
636         * Reschedule this job by cleaning it up, but
637         *  reuse the same JobId if possible.
638         */
639       jcr->incomplete = jcr->is_incomplete();   /* save incomplete status */
640       time_t now = time(NULL);
641       jcr->reschedule_count++;
642       jcr->sched_time = now + jcr->job->RescheduleInterval;
643       bstrftime(dt, sizeof(dt), now);
644       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
645       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
646             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
647       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
648            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
649       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
650       jcr->JobStatus = -1;
651       set_jcr_job_status(jcr, JS_WaitStartTime);
652       jcr->SDJobStatus = 0;
653       jcr->JobErrors = 0;
654       if (!allow_duplicate_job(jcr)) {
655          return false;
656       }
657       /* Only jobs with no output or Incomplete jobs can run on same JCR */
658       if (jcr->JobBytes == 0 || jcr->incomplete) {
659          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
660          V(jq->mutex);
661          jobq_add(jq, jcr);     /* queue the job to run again */
662          P(jq->mutex);
663          free_jcr(jcr);         /* release jcr */
664          free(je);              /* free the job entry */
665          return true;           /* we already cleaned up */
666       }
667       /*
668        * Something was actually backed up, so we cannot reuse
669        *   the old JobId or there will be database record
670        *   conflicts.  We now create a new job, copying the
671        *   appropriate fields.
672        */           
673       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
674       set_jcr_defaults(njcr, jcr->job);
675       njcr->reschedule_count = jcr->reschedule_count;
676       njcr->sched_time = jcr->sched_time;
677       njcr->set_JobLevel(jcr->getJobLevel());
678       njcr->pool = jcr->pool;
679       njcr->run_pool_override = jcr->run_pool_override;
680       njcr->full_pool = jcr->full_pool;
681       njcr->run_full_pool_override = jcr->run_full_pool_override;
682       njcr->inc_pool = jcr->inc_pool;
683       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
684       njcr->diff_pool = jcr->diff_pool;
685       njcr->JobStatus = -1;
686       set_jcr_job_status(njcr, jcr->JobStatus);
687       if (jcr->rstore) {
688          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
689       } else {
690          free_rstorage(njcr);
691       }
692       if (jcr->wstore) {
693          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
694       } else {
695          free_wstorage(njcr);
696       }
697       njcr->messages = jcr->messages;
698       njcr->spool_data = jcr->spool_data;
699       njcr->write_part_after_job = jcr->write_part_after_job;
700       Dmsg0(2300, "Call to run new job\n");
701       V(jq->mutex);
702       run_job(njcr);            /* This creates a "new" job */
703       free_jcr(njcr);           /* release "new" jcr */
704       P(jq->mutex);
705       Dmsg0(2300, "Back from running new job.\n");
706    }
707    return false;
708 }
709
710 /*
711  * See if we can acquire all the necessary resources for the job (JCR)
712  *
713  *  Returns: true  if successful
714  *           false if resource failure
715  */
716 static bool acquire_resources(JCR *jcr)
717 {
718    bool skip_this_jcr = false;
719
720    jcr->acquired_resource_locks = false;
721 /*
722  * Turning this code off is likely to cause some deadlocks,
723  *   but we do not really have enough information here to
724  *   know if this is really a deadlock (it may be a dual drive
725  *   autochanger), and in principle, the SD reservation system
726  *   should detect these deadlocks, so push the work off on it.
727  */
728 #ifdef xxx
729    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
730       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
731          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"), 
732          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
733       set_jcr_job_status(jcr, JS_Canceled);
734       return false;
735    }
736 #endif
737    if (jcr->rstore) {
738       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
739       if (!inc_read_store(jcr)) {
740          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
741          set_jcr_job_status(jcr, JS_WaitStoreRes);
742          return false;
743       }
744    }
745    
746    if (jcr->wstore) {
747       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
748       if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
749          jcr->wstore->NumConcurrentJobs++;
750          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
751       } else if (jcr->rstore) {
752          dec_read_store(jcr);
753          skip_this_jcr = true;
754       } else {
755          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
756          skip_this_jcr = true;
757       }
758    }
759    if (skip_this_jcr) {
760       set_jcr_job_status(jcr, JS_WaitStoreRes);
761       return false;
762    }
763
764    if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
765       jcr->client->NumConcurrentJobs++;
766    } else {
767       /* Back out previous locks */
768       dec_write_store(jcr);
769       dec_read_store(jcr);
770       set_jcr_job_status(jcr, JS_WaitClientRes);
771       return false;
772    }
773    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
774       jcr->job->NumConcurrentJobs++;
775    } else {
776       /* Back out previous locks */
777       dec_write_store(jcr);
778       dec_read_store(jcr);
779       jcr->client->NumConcurrentJobs--;
780       set_jcr_job_status(jcr, JS_WaitJobRes);
781       return false;
782    }
783
784    jcr->acquired_resource_locks = true;
785    return true;
786 }
787
788 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
789
790 /* 
791  * Note: inc_read_store() and dec_read_store() are
792  *   called from select_rstore() in src/dird/restore.c
793  */
794 bool inc_read_store(JCR *jcr)
795 {
796    P(rstore_mutex);
797    if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
798       jcr->rstore->NumConcurrentReadJobs++;
799       jcr->rstore->NumConcurrentJobs++;
800       Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
801       V(rstore_mutex);
802       return true;
803    }
804    V(rstore_mutex);
805    return false;
806 }
807
808 void dec_read_store(JCR *jcr)
809 {
810    if (jcr->rstore) {
811       P(rstore_mutex);
812       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
813       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
814       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
815       V(rstore_mutex);
816       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
817       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
818    }
819 }
820
821 static void dec_write_store(JCR *jcr)
822 {
823    if (jcr->wstore) {
824       jcr->wstore->NumConcurrentJobs--;
825       Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
826       ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
827    }
828 }