]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
Fix bug #1582 Restore from multiple storage daemons breaks subsequent backups
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula job queue routines.
30  *
31  *  This code consists of three queues, the waiting_jobs
32  *  queue, where jobs are initially queued, the ready_jobs
33  *  queue, where jobs are placed when all the resources are
34  *  allocated and they can immediately be run, and the
35  *  running queue where jobs are placed when they are
36  *  running.
37  *
38  *  Kern Sibbald, July MMIII
39  *
40  *
41  *  This code was adapted from the Bacula workq, which was
42  *    adapted from "Programming with POSIX Threads", by
43  *    David R. Butenhof
44  *
45  */
46
47 #include "bacula.h"
48 #include "dird.h"
49
50 extern JCR *jobs;
51
52 /* Forward referenced functions */
53 extern "C" void *jobq_server(void *arg);
54 extern "C" void *sched_wait(void *arg);
55
56 static int  start_server(jobq_t *jq);
57 static bool acquire_resources(JCR *jcr);
58 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
59 static void dec_write_store(JCR *jcr);
60
61 /*
62  * Initialize a job queue
63  *
64  *  Returns: 0 on success
65  *           errno on failure
66  */
67 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
68 {
69    int stat;
70    jobq_item_t *item = NULL;
71
72    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
73       berrno be;
74       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
75       return stat;
76    }
77    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
78       pthread_attr_destroy(&jq->attr);
79       return stat;
80    }
81    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
82       berrno be;
83       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
84       pthread_attr_destroy(&jq->attr);
85       return stat;
86    }
87    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
88       berrno be;
89       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
90       pthread_mutex_destroy(&jq->mutex);
91       pthread_attr_destroy(&jq->attr);
92       return stat;
93    }
94    jq->quit = false;
95    jq->max_workers = threads;         /* max threads to create */
96    jq->num_workers = 0;               /* no threads yet */
97    jq->idle_workers = 0;              /* no idle threads */
98    jq->engine = engine;               /* routine to run */
99    jq->valid = JOBQ_VALID;
100    /* Initialize the job queues */
101    jq->waiting_jobs = New(dlist(item, &item->link));
102    jq->running_jobs = New(dlist(item, &item->link));
103    jq->ready_jobs = New(dlist(item, &item->link));
104    return 0;
105 }
106
107 /*
108  * Destroy the job queue
109  *
110  * Returns: 0 on success
111  *          errno on failure
112  */
113 int jobq_destroy(jobq_t *jq)
114 {
115    int stat, stat1, stat2;
116
117    if (jq->valid != JOBQ_VALID) {
118       return EINVAL;
119    }
120    P(jq->mutex);
121    jq->valid = 0;                      /* prevent any more operations */
122
123    /* 
124     * If any threads are active, wake them 
125     */
126    if (jq->num_workers > 0) {
127       jq->quit = true;
128       if (jq->idle_workers) {
129          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
130             berrno be;
131             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
132             V(jq->mutex);
133             return stat;
134          }
135       }
136       while (jq->num_workers > 0) {
137          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
138             berrno be;
139             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
140             V(jq->mutex);
141             return stat;
142          }
143       }
144    }
145    V(jq->mutex);
146    stat  = pthread_mutex_destroy(&jq->mutex);
147    stat1 = pthread_cond_destroy(&jq->work);
148    stat2 = pthread_attr_destroy(&jq->attr);
149    delete jq->waiting_jobs;
150    delete jq->running_jobs;
151    delete jq->ready_jobs;
152    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
153 }
154
155 struct wait_pkt {
156    JCR *jcr;
157    jobq_t *jq;
158 };
159
160 /*
161  * Wait until schedule time arrives before starting. Normally
162  *  this routine is only used for jobs started from the console
163  *  for which the user explicitly specified a start time. Otherwise
164  *  most jobs are put into the job queue only when their
165  *  scheduled time arives.
166  */
167 extern "C"
168 void *sched_wait(void *arg)
169 {
170    JCR *jcr = ((wait_pkt *)arg)->jcr;
171    jobq_t *jq = ((wait_pkt *)arg)->jq;
172
173    set_jcr_in_tsd(jcr);
174    Dmsg0(2300, "Enter sched_wait.\n");
175    free(arg);
176    time_t wtime = jcr->sched_time - time(NULL);
177    set_jcr_job_status(jcr, JS_WaitStartTime);
178    /* Wait until scheduled time arrives */
179    if (wtime > 0) {
180       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
181          jcr->Job, wtime);
182    }
183    /* Check every 30 seconds if canceled */
184    while (wtime > 0) {
185       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", 
186          jcr->JobId, wtime, jcr->use_count());
187       if (wtime > 30) {
188          wtime = 30;
189       }
190       bmicrosleep(wtime, 0);
191       if (job_canceled(jcr)) {
192          break;
193       }
194       wtime = jcr->sched_time - time(NULL);
195    }
196    Dmsg1(200, "resched use=%d\n", jcr->use_count());
197    jobq_add(jq, jcr);
198    free_jcr(jcr);                     /* we are done with jcr */
199    Dmsg0(2300, "Exit sched_wait\n");
200    return NULL;
201 }
202
203 /*
204  *  Add a job to the queue
205  *    jq is a queue that was created with jobq_init
206  */
207 int jobq_add(jobq_t *jq, JCR *jcr)
208 {
209    int stat;
210    jobq_item_t *item, *li;
211    bool inserted = false;
212    time_t wtime = jcr->sched_time - time(NULL);
213    pthread_t id;
214    wait_pkt *sched_pkt;
215
216    if (!jcr->term_wait_inited) { 
217       /* Initialize termination condition variable */
218       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
219          berrno be;
220          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
221          return stat;
222       }
223       jcr->term_wait_inited = true;
224    }                           
225                              
226    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
227    if (jq->valid != JOBQ_VALID) {
228       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
229       return EINVAL;
230    }
231
232    jcr->inc_use_count();                 /* mark jcr in use by us */
233    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
234    if (!job_canceled(jcr) && wtime > 0) {
235       set_thread_concurrency(jq->max_workers + 2);
236       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
237       sched_pkt->jcr = jcr;
238       sched_pkt->jq = jq;
239       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
240       if (stat != 0) {                /* thread not created */
241          berrno be;
242          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
243       }
244       return stat;
245    }
246
247    P(jq->mutex);
248
249    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
250       free_jcr(jcr);                    /* release jcr */
251       return ENOMEM;
252    }
253    item->jcr = jcr;
254
255    /* While waiting in a queue this job is not attached to a thread */
256    set_jcr_in_tsd(INVALID_JCR);
257    if (job_canceled(jcr)) {
258       /* Add job to ready queue so that it is canceled quickly */
259       jq->ready_jobs->prepend(item);
260       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
261    } else {
262       /* Add this job to the wait queue in priority sorted order */
263       foreach_dlist(li, jq->waiting_jobs) {
264          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
265             li->jcr->JobId, li->jcr->JobPriority);
266          if (li->jcr->JobPriority > jcr->JobPriority) {
267             jq->waiting_jobs->insert_before(item, li);
268             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
269                li->jcr->JobId, jcr->JobId);
270             inserted = true;
271             break;
272          }
273       }
274       /* If not jobs in wait queue, append it */
275       if (!inserted) {
276          jq->waiting_jobs->append(item);
277          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
278       }
279    }
280
281    /* Ensure that at least one server looks at the queue. */
282    stat = start_server(jq);
283
284    V(jq->mutex);
285    Dmsg0(2300, "Return jobq_add\n");
286    return stat;
287 }
288
289 /*
290  *  Remove a job from the job queue. Used only by cancel_job().
291  *    jq is a queue that was created with jobq_init
292  *    work_item is an element of work
293  *
294  *   Note, it is "removed" from the job queue.
295  *    If you want to cancel it, you need to provide some external means
296  *    of doing so (e.g. pthread_kill()).
297  */
298 int jobq_remove(jobq_t *jq, JCR *jcr)
299 {
300    int stat;
301    bool found = false;
302    jobq_item_t *item;
303
304    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
305    if (jq->valid != JOBQ_VALID) {
306       return EINVAL;
307    }
308
309    P(jq->mutex);
310    foreach_dlist(item, jq->waiting_jobs) {
311       if (jcr == item->jcr) {
312          found = true;
313          break;
314       }
315    }
316    if (!found) {
317       V(jq->mutex);
318       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
319       return EINVAL;
320    }
321
322    /* Move item to be the first on the list */
323    jq->waiting_jobs->remove(item);
324    jq->ready_jobs->prepend(item);
325    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
326
327    stat = start_server(jq);
328
329    V(jq->mutex);
330    Dmsg0(2300, "Return jobq_remove\n");
331    return stat;
332 }
333
334
335 /*
336  * Start the server thread if it isn't already running
337  */
338 static int start_server(jobq_t *jq)
339 {
340    int stat = 0;
341    pthread_t id;
342
343    /*
344     * if any threads are idle, wake one.
345     *   Actually we do a broadcast because on /lib/tls 
346     *   these signals seem to get lost from time to time.
347     */
348    if (jq->idle_workers > 0) {
349       Dmsg0(2300, "Signal worker to wake up\n");
350       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
351          berrno be;
352          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
353          return stat;
354       }
355    } else if (jq->num_workers < jq->max_workers) {
356       Dmsg0(2300, "Create worker thread\n");
357       /* No idle threads so create a new one */
358       set_thread_concurrency(jq->max_workers + 1);
359       jq->num_workers++;
360       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
361          berrno be;
362          jq->num_workers--;
363          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
364          return stat;
365       }
366    }
367    return stat;
368 }
369
370
371 /*
372  * This is the worker thread that serves the job queue.
373  * When all the resources are acquired for the job,
374  *  it will call the user's engine.
375  */
376 extern "C"
377 void *jobq_server(void *arg)
378 {
379    struct timespec timeout;
380    jobq_t *jq = (jobq_t *)arg;
381    jobq_item_t *je;                   /* job entry in queue */
382    int stat;
383    bool timedout = false;
384    bool work = true;
385
386    set_jcr_in_tsd(INVALID_JCR);
387    Dmsg0(2300, "Start jobq_server\n");
388    P(jq->mutex);
389
390    for (;;) {
391       struct timeval tv;
392       struct timezone tz;
393
394       Dmsg0(2300, "Top of for loop\n");
395       if (!work && !jq->quit) {
396          gettimeofday(&tv, &tz);
397          timeout.tv_nsec = 0;
398          timeout.tv_sec = tv.tv_sec + 4;
399
400          while (!jq->quit) {
401             /*
402              * Wait 4 seconds, then if no more work, exit
403              */
404             Dmsg0(2300, "pthread_cond_timedwait()\n");
405             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
406             if (stat == ETIMEDOUT) {
407                Dmsg0(2300, "timedwait timedout.\n");
408                timedout = true;
409                break;
410             } else if (stat != 0) {
411                /* This shouldn't happen */
412                Dmsg0(2300, "This shouldn't happen\n");
413                jq->num_workers--;
414                V(jq->mutex);
415                return NULL;
416             }
417             break;
418          }
419       }
420       /*
421        * If anything is in the ready queue, run it
422        */
423       Dmsg0(2300, "Checking ready queue.\n");
424       while (!jq->ready_jobs->empty() && !jq->quit) {
425          JCR *jcr;
426          je = (jobq_item_t *)jq->ready_jobs->first();
427          jcr = je->jcr;
428          jq->ready_jobs->remove(je);
429          if (!jq->ready_jobs->empty()) {
430             Dmsg0(2300, "ready queue not empty start server\n");
431             if (start_server(jq) != 0) {
432                jq->num_workers--;
433                V(jq->mutex);
434                return NULL;
435             }
436          }
437          jq->running_jobs->append(je);
438
439          /* Attach jcr to this thread while we run the job */
440          set_jcr_in_tsd(jcr);
441          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
442
443          /* Release job queue lock */
444          V(jq->mutex);
445
446          /* Call user's routine here */
447          Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
448             jcr->use_count(), jcr->JobStatus);
449          jq->engine(je->jcr);
450
451          /* Job finished detach from thread */
452          remove_jcr_from_tsd(je->jcr);
453
454          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
455             jcr->use_count());
456
457          /* Reacquire job queue lock */
458          P(jq->mutex);
459          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
460          jq->running_jobs->remove(je);
461          /*
462           * Release locks if acquired. Note, they will not have
463           *  been acquired for jobs canceled before they were
464           *  put into the ready queue.
465           */
466          if (jcr->acquired_resource_locks) {
467             dec_read_store(jcr);
468             dec_write_store(jcr);
469             jcr->client->NumConcurrentJobs--;
470             jcr->job->NumConcurrentJobs--;
471             jcr->acquired_resource_locks = false;
472          }
473
474          if (reschedule_job(jcr, jq, je)) {
475             continue;              /* go look for more work */
476          }
477
478          /* Clean up and release old jcr */
479          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
480          jcr->SDJobStatus = 0;
481          V(jq->mutex);                /* release internal lock */
482          free_jcr(jcr);
483          free(je);                    /* release job entry */
484          P(jq->mutex);                /* reacquire job queue lock */
485       }
486       /*
487        * If any job in the wait queue can be run,
488        *  move it to the ready queue
489        */
490       Dmsg0(2300, "Done check ready, now check wait queue.\n");
491       if (!jq->waiting_jobs->empty() && !jq->quit) {
492          int Priority;
493          bool running_allow_mix = false;
494          je = (jobq_item_t *)jq->waiting_jobs->first();
495          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
496          if (re) {
497             Priority = re->jcr->JobPriority;
498             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
499                   re->jcr->JobId, Priority);
500             running_allow_mix = true;
501             for ( ; re; ) {
502                Dmsg2(2300, "JobId %d is also running with %s\n",
503                      re->jcr->JobId, 
504                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
505                if (!re->jcr->job->allow_mixed_priority) {
506                   running_allow_mix = false;
507                   break;
508                }
509                re = (jobq_item_t *)jq->running_jobs->next(re);
510             }
511             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
512                   running_allow_mix ? "allow" : "don't allow");
513          } else {
514             Priority = je->jcr->JobPriority;
515             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
516          }
517          /*
518           * Walk down the list of waiting jobs and attempt
519           *   to acquire the resources it needs.
520           */
521          for ( ; je;  ) {
522             /* je is current job item on the queue, jn is the next one */
523             JCR *jcr = je->jcr;
524             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
525
526             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
527                   jcr->JobId, jcr->JobPriority, Priority,
528                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
529
530             /* Take only jobs of correct Priority */
531             if (!(jcr->JobPriority == Priority
532                   || (jcr->JobPriority < Priority &&
533                       jcr->job->allow_mixed_priority && running_allow_mix))) {
534                set_jcr_job_status(jcr, JS_WaitPriority);
535                break;
536             }
537
538             if (!acquire_resources(jcr)) {
539                /* If resource conflict, job is canceled */
540                if (!job_canceled(jcr)) {
541                   je = jn;            /* point to next waiting job */
542                   continue;
543                }
544             }
545
546             /*
547              * Got all locks, now remove it from wait queue and append it
548              *   to the ready queue.  Note, we may also get here if the
549              *    job was canceled.  Once it is "run", it will quickly
550              *    terminate.
551              */
552             jq->waiting_jobs->remove(je);
553             jq->ready_jobs->append(je);
554             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
555             je = jn;                  /* Point to next waiting job */
556          } /* end for loop */
557
558       } /* end if */
559
560       Dmsg0(2300, "Done checking wait queue.\n");
561       /*
562        * If no more ready work and we are asked to quit, then do it
563        */
564       if (jq->ready_jobs->empty() && jq->quit) {
565          jq->num_workers--;
566          if (jq->num_workers == 0) {
567             Dmsg0(2300, "Wake up destroy routine\n");
568             /* Wake up destroy routine if he is waiting */
569             pthread_cond_broadcast(&jq->work);
570          }
571          break;
572       }
573       Dmsg0(2300, "Check for work request\n");
574       /*
575        * If no more work requests, and we waited long enough, quit
576        */
577       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
578          jq->ready_jobs->empty());
579       if (jq->ready_jobs->empty() && timedout) {
580          Dmsg0(2300, "break big loop\n");
581          jq->num_workers--;
582          break;
583       }
584
585       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
586       if (work) {
587          /*
588           * If a job is waiting on a Resource, don't consume all
589           *   the CPU time looping looking for work, and even more
590           *   important, release the lock so that a job that has
591           *   terminated can give us the resource.
592           */
593          V(jq->mutex);
594          bmicrosleep(2, 0);              /* pause for 2 seconds */
595          P(jq->mutex);
596          /* Recompute work as something may have changed in last 2 secs */
597          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
598       }
599       Dmsg1(2300, "Loop again. work=%d\n", work);
600    } /* end of big for loop */
601
602    Dmsg0(200, "unlock mutex\n");
603    V(jq->mutex);
604    Dmsg0(2300, "End jobq_server\n");
605    return NULL;
606 }
607
608 /*
609  * Returns true if cleanup done and we should look for more work
610  */
611 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
612 {
613    /*
614     * Reschedule the job if necessary and requested
615     */
616    if (jcr->job->RescheduleOnError &&
617        jcr->JobStatus != JS_Terminated &&
618        jcr->JobStatus != JS_Canceled &&
619        jcr->getJobType() == JT_BACKUP &&
620        (jcr->job->RescheduleTimes == 0 ||
621         jcr->reschedule_count < jcr->job->RescheduleTimes)) {
622        char dt[50], dt2[50];
623
624        /*
625         * Reschedule this job by cleaning it up, but
626         *  reuse the same JobId if possible.
627         */
628       time_t now = time(NULL);
629       jcr->reschedule_count++;
630       jcr->sched_time = now + jcr->job->RescheduleInterval;
631       bstrftime(dt, sizeof(dt), now);
632       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
633       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
634             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
635       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
636            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
637       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
638       jcr->JobStatus = -1;
639       set_jcr_job_status(jcr, JS_WaitStartTime);
640       jcr->SDJobStatus = 0;
641       if (!allow_duplicate_job(jcr)) {
642          return false;
643       }
644       if (jcr->JobBytes == 0) {
645          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
646          V(jq->mutex);
647          jobq_add(jq, jcr);     /* queue the job to run again */
648          P(jq->mutex);
649          free_jcr(jcr);         /* release jcr */
650          free(je);              /* free the job entry */
651          return true;           /* we already cleaned up */
652       }
653       /*
654        * Something was actually backed up, so we cannot reuse
655        *   the old JobId or there will be database record
656        *   conflicts.  We now create a new job, copying the
657        *   appropriate fields.
658        */           
659       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
660       set_jcr_defaults(njcr, jcr->job);
661       njcr->reschedule_count = jcr->reschedule_count;
662       njcr->sched_time = jcr->sched_time;
663       njcr->set_JobLevel(jcr->getJobLevel());
664       njcr->pool = jcr->pool;
665       njcr->run_pool_override = jcr->run_pool_override;
666       njcr->full_pool = jcr->full_pool;
667       njcr->run_full_pool_override = jcr->run_full_pool_override;
668       njcr->inc_pool = jcr->inc_pool;
669       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
670       njcr->diff_pool = jcr->diff_pool;
671       njcr->JobStatus = -1;
672       set_jcr_job_status(njcr, jcr->JobStatus);
673       if (jcr->rstore) {
674          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
675       } else {
676          free_rstorage(njcr);
677       }
678       if (jcr->wstore) {
679          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
680       } else {
681          free_wstorage(njcr);
682       }
683       njcr->messages = jcr->messages;
684       njcr->spool_data = jcr->spool_data;
685       njcr->write_part_after_job = jcr->write_part_after_job;
686       Dmsg0(2300, "Call to run new job\n");
687       V(jq->mutex);
688       run_job(njcr);            /* This creates a "new" job */
689       free_jcr(njcr);           /* release "new" jcr */
690       P(jq->mutex);
691       Dmsg0(2300, "Back from running new job.\n");
692    }
693    return false;
694 }
695
696 /*
697  * See if we can acquire all the necessary resources for the job (JCR)
698  *
699  *  Returns: true  if successful
700  *           false if resource failure
701  */
702 static bool acquire_resources(JCR *jcr)
703 {
704    bool skip_this_jcr = false;
705
706    jcr->acquired_resource_locks = false;
707 /*
708  * Turning this code off is likely to cause some deadlocks,
709  *   but we do not really have enough information here to
710  *   know if this is really a deadlock (it may be a dual drive
711  *   autochanger), and in principle, the SD reservation system
712  *   should detect these deadlocks, so push the work off on it.
713  */
714 #ifdef xxx
715    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
716       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
717          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"), 
718          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
719       set_jcr_job_status(jcr, JS_Canceled);
720       return false;
721    }
722 #endif
723    if (jcr->rstore) {
724       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
725       if (!inc_read_store(jcr)) {
726          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
727          set_jcr_job_status(jcr, JS_WaitStoreRes);
728          return false;
729       }
730    }
731    
732    if (jcr->wstore) {
733       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
734       if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
735          jcr->wstore->NumConcurrentJobs++;
736          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
737       } else if (jcr->rstore) {
738          dec_read_store(jcr);
739          skip_this_jcr = true;
740       } else {
741          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
742          skip_this_jcr = true;
743       }
744    }
745    if (skip_this_jcr) {
746       set_jcr_job_status(jcr, JS_WaitStoreRes);
747       return false;
748    }
749
750    if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
751       jcr->client->NumConcurrentJobs++;
752    } else {
753       /* Back out previous locks */
754       dec_write_store(jcr);
755       dec_read_store(jcr);
756       set_jcr_job_status(jcr, JS_WaitClientRes);
757       return false;
758    }
759    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
760       jcr->job->NumConcurrentJobs++;
761    } else {
762       /* Back out previous locks */
763       dec_write_store(jcr);
764       dec_read_store(jcr);
765       jcr->client->NumConcurrentJobs--;
766       set_jcr_job_status(jcr, JS_WaitJobRes);
767       return false;
768    }
769
770    jcr->acquired_resource_locks = true;
771    return true;
772 }
773
774 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
775
776 /* 
777  * Note: inc_read_store() and dec_read_store() are
778  *   called from select_rstore() in src/dird/restore.c
779  */
780 bool inc_read_store(JCR *jcr)
781 {
782    P(rstore_mutex);
783    if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
784       jcr->rstore->NumConcurrentReadJobs++;
785       jcr->rstore->NumConcurrentJobs++;
786       Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
787       V(rstore_mutex);
788       return true;
789    }
790    V(rstore_mutex);
791    return false;
792 }
793
794 void dec_read_store(JCR *jcr)
795 {
796    if (jcr->rstore) {
797       P(rstore_mutex);
798       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
799       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
800       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
801       V(rstore_mutex);
802       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
803       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
804    }
805 }
806
807 static void dec_write_store(JCR *jcr)
808 {
809    if (jcr->wstore) {
810       jcr->wstore->NumConcurrentJobs--;
811       Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
812       ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
813    }
814 }