]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
baa81469fc64debfcf602564748bec134c5df62d
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula job queue routines.
30  *
31  *  This code consists of three queues, the waiting_jobs
32  *  queue, where jobs are initially queued, the ready_jobs
33  *  queue, where jobs are placed when all the resources are
34  *  allocated and they can immediately be run, and the
35  *  running queue where jobs are placed when they are
36  *  running.
37  *
38  *  Kern Sibbald, July MMIII
39  *
40  *
41  *  This code was adapted from the Bacula workq, which was
42  *    adapted from "Programming with POSIX Threads", by
43  *    David R. Butenhof
44  *
45  */
46
47 #include "bacula.h"
48 #include "dird.h"
49
50 extern JCR *jobs;
51
52 /* Forward referenced functions */
53 extern "C" void *jobq_server(void *arg);
54 extern "C" void *sched_wait(void *arg);
55
56 static int  start_server(jobq_t *jq);
57 static bool acquire_resources(JCR *jcr);
58 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
59 static void dec_read_store(JCR *jcr);
60 static void dec_write_store(JCR *jcr);
61
62 /*
63  * Initialize a job queue
64  *
65  *  Returns: 0 on success
66  *           errno on failure
67  */
68 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
69 {
70    int stat;
71    jobq_item_t *item = NULL;
72
73    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
74       berrno be;
75       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
76       return stat;
77    }
78    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
79       pthread_attr_destroy(&jq->attr);
80       return stat;
81    }
82    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
83       berrno be;
84       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
85       pthread_attr_destroy(&jq->attr);
86       return stat;
87    }
88    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
89       berrno be;
90       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
91       pthread_mutex_destroy(&jq->mutex);
92       pthread_attr_destroy(&jq->attr);
93       return stat;
94    }
95    jq->quit = false;
96    jq->max_workers = threads;         /* max threads to create */
97    jq->num_workers = 0;               /* no threads yet */
98    jq->idle_workers = 0;              /* no idle threads */
99    jq->engine = engine;               /* routine to run */
100    jq->valid = JOBQ_VALID;
101    /* Initialize the job queues */
102    jq->waiting_jobs = New(dlist(item, &item->link));
103    jq->running_jobs = New(dlist(item, &item->link));
104    jq->ready_jobs = New(dlist(item, &item->link));
105    return 0;
106 }
107
108 /*
109  * Destroy the job queue
110  *
111  * Returns: 0 on success
112  *          errno on failure
113  */
114 int jobq_destroy(jobq_t *jq)
115 {
116    int stat, stat1, stat2;
117
118    if (jq->valid != JOBQ_VALID) {
119       return EINVAL;
120    }
121    P(jq->mutex);
122    jq->valid = 0;                      /* prevent any more operations */
123
124    /* 
125     * If any threads are active, wake them 
126     */
127    if (jq->num_workers > 0) {
128       jq->quit = true;
129       if (jq->idle_workers) {
130          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
131             berrno be;
132             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
133             V(jq->mutex);
134             return stat;
135          }
136       }
137       while (jq->num_workers > 0) {
138          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
139             berrno be;
140             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
141             V(jq->mutex);
142             return stat;
143          }
144       }
145    }
146    V(jq->mutex);
147    stat  = pthread_mutex_destroy(&jq->mutex);
148    stat1 = pthread_cond_destroy(&jq->work);
149    stat2 = pthread_attr_destroy(&jq->attr);
150    delete jq->waiting_jobs;
151    delete jq->running_jobs;
152    delete jq->ready_jobs;
153    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
154 }
155
156 struct wait_pkt {
157    JCR *jcr;
158    jobq_t *jq;
159 };
160
161 /*
162  * Wait until schedule time arrives before starting. Normally
163  *  this routine is only used for jobs started from the console
164  *  for which the user explicitly specified a start time. Otherwise
165  *  most jobs are put into the job queue only when their
166  *  scheduled time arives.
167  */
168 extern "C"
169 void *sched_wait(void *arg)
170 {
171    JCR *jcr = ((wait_pkt *)arg)->jcr;
172    jobq_t *jq = ((wait_pkt *)arg)->jq;
173
174    set_jcr_in_tsd(jcr);
175    Dmsg0(2300, "Enter sched_wait.\n");
176    free(arg);
177    time_t wtime = jcr->sched_time - time(NULL);
178    set_jcr_job_status(jcr, JS_WaitStartTime);
179    /* Wait until scheduled time arrives */
180    if (wtime > 0) {
181       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
182          jcr->Job, wtime);
183    }
184    /* Check every 30 seconds if canceled */
185    while (wtime > 0) {
186       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", 
187          jcr->JobId, wtime, jcr->use_count());
188       if (wtime > 30) {
189          wtime = 30;
190       }
191       bmicrosleep(wtime, 0);
192       if (job_canceled(jcr)) {
193          break;
194       }
195       wtime = jcr->sched_time - time(NULL);
196    }
197    Dmsg1(200, "resched use=%d\n", jcr->use_count());
198    jobq_add(jq, jcr);
199    free_jcr(jcr);                     /* we are done with jcr */
200    Dmsg0(2300, "Exit sched_wait\n");
201    return NULL;
202 }
203
204 /*
205  *  Add a job to the queue
206  *    jq is a queue that was created with jobq_init
207  */
208 int jobq_add(jobq_t *jq, JCR *jcr)
209 {
210    int stat;
211    jobq_item_t *item, *li;
212    bool inserted = false;
213    time_t wtime = jcr->sched_time - time(NULL);
214    pthread_t id;
215    wait_pkt *sched_pkt;
216
217    if (!jcr->term_wait_inited) { 
218       /* Initialize termination condition variable */
219       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
220          berrno be;
221          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
222          return stat;
223       }
224       jcr->term_wait_inited = true;
225    }                           
226                              
227    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
228    if (jq->valid != JOBQ_VALID) {
229       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
230       return EINVAL;
231    }
232
233    jcr->inc_use_count();                 /* mark jcr in use by us */
234    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
235    if (!job_canceled(jcr) && wtime > 0) {
236       set_thread_concurrency(jq->max_workers + 2);
237       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
238       sched_pkt->jcr = jcr;
239       sched_pkt->jq = jq;
240       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
241       if (stat != 0) {                /* thread not created */
242          berrno be;
243          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
244       }
245       return stat;
246    }
247
248    P(jq->mutex);
249
250    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
251       free_jcr(jcr);                    /* release jcr */
252       return ENOMEM;
253    }
254    item->jcr = jcr;
255
256    /* While waiting in a queue this job is not attached to a thread */
257    set_jcr_in_tsd(INVALID_JCR);
258    if (job_canceled(jcr)) {
259       /* Add job to ready queue so that it is canceled quickly */
260       jq->ready_jobs->prepend(item);
261       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
262    } else {
263       /* Add this job to the wait queue in priority sorted order */
264       foreach_dlist(li, jq->waiting_jobs) {
265          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
266             li->jcr->JobId, li->jcr->JobPriority);
267          if (li->jcr->JobPriority > jcr->JobPriority) {
268             jq->waiting_jobs->insert_before(item, li);
269             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
270                li->jcr->JobId, jcr->JobId);
271             inserted = true;
272             break;
273          }
274       }
275       /* If not jobs in wait queue, append it */
276       if (!inserted) {
277          jq->waiting_jobs->append(item);
278          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
279       }
280    }
281
282    /* Ensure that at least one server looks at the queue. */
283    stat = start_server(jq);
284
285    V(jq->mutex);
286    Dmsg0(2300, "Return jobq_add\n");
287    return stat;
288 }
289
290 /*
291  *  Remove a job from the job queue. Used only by cancel_job().
292  *    jq is a queue that was created with jobq_init
293  *    work_item is an element of work
294  *
295  *   Note, it is "removed" from the job queue.
296  *    If you want to cancel it, you need to provide some external means
297  *    of doing so (e.g. pthread_kill()).
298  */
299 int jobq_remove(jobq_t *jq, JCR *jcr)
300 {
301    int stat;
302    bool found = false;
303    jobq_item_t *item;
304
305    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
306    if (jq->valid != JOBQ_VALID) {
307       return EINVAL;
308    }
309
310    P(jq->mutex);
311    foreach_dlist(item, jq->waiting_jobs) {
312       if (jcr == item->jcr) {
313          found = true;
314          break;
315       }
316    }
317    if (!found) {
318       V(jq->mutex);
319       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
320       return EINVAL;
321    }
322
323    /* Move item to be the first on the list */
324    jq->waiting_jobs->remove(item);
325    jq->ready_jobs->prepend(item);
326    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
327
328    stat = start_server(jq);
329
330    V(jq->mutex);
331    Dmsg0(2300, "Return jobq_remove\n");
332    return stat;
333 }
334
335
336 /*
337  * Start the server thread if it isn't already running
338  */
339 static int start_server(jobq_t *jq)
340 {
341    int stat = 0;
342    pthread_t id;
343
344    /*
345     * if any threads are idle, wake one.
346     *   Actually we do a broadcast because on /lib/tls 
347     *   these signals seem to get lost from time to time.
348     */
349    if (jq->idle_workers > 0) {
350       Dmsg0(2300, "Signal worker to wake up\n");
351       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
352          berrno be;
353          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
354          return stat;
355       }
356    } else if (jq->num_workers < jq->max_workers) {
357       Dmsg0(2300, "Create worker thread\n");
358       /* No idle threads so create a new one */
359       set_thread_concurrency(jq->max_workers + 1);
360       jq->num_workers++;
361       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
362          berrno be;
363          jq->num_workers--;
364          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
365          return stat;
366       }
367    }
368    return stat;
369 }
370
371
372 /*
373  * This is the worker thread that serves the job queue.
374  * When all the resources are acquired for the job,
375  *  it will call the user's engine.
376  */
377 extern "C"
378 void *jobq_server(void *arg)
379 {
380    struct timespec timeout;
381    jobq_t *jq = (jobq_t *)arg;
382    jobq_item_t *je;                   /* job entry in queue */
383    int stat;
384    bool timedout = false;
385    bool work = true;
386
387    set_jcr_in_tsd(INVALID_JCR);
388    Dmsg0(2300, "Start jobq_server\n");
389    P(jq->mutex);
390
391    for (;;) {
392       struct timeval tv;
393       struct timezone tz;
394
395       Dmsg0(2300, "Top of for loop\n");
396       if (!work && !jq->quit) {
397          gettimeofday(&tv, &tz);
398          timeout.tv_nsec = 0;
399          timeout.tv_sec = tv.tv_sec + 4;
400
401          while (!jq->quit) {
402             /*
403              * Wait 4 seconds, then if no more work, exit
404              */
405             Dmsg0(2300, "pthread_cond_timedwait()\n");
406             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
407             if (stat == ETIMEDOUT) {
408                Dmsg0(2300, "timedwait timedout.\n");
409                timedout = true;
410                break;
411             } else if (stat != 0) {
412                /* This shouldn't happen */
413                Dmsg0(2300, "This shouldn't happen\n");
414                jq->num_workers--;
415                V(jq->mutex);
416                return NULL;
417             }
418             break;
419          }
420       }
421       /*
422        * If anything is in the ready queue, run it
423        */
424       Dmsg0(2300, "Checking ready queue.\n");
425       while (!jq->ready_jobs->empty() && !jq->quit) {
426          JCR *jcr;
427          je = (jobq_item_t *)jq->ready_jobs->first();
428          jcr = je->jcr;
429          jq->ready_jobs->remove(je);
430          if (!jq->ready_jobs->empty()) {
431             Dmsg0(2300, "ready queue not empty start server\n");
432             if (start_server(jq) != 0) {
433                jq->num_workers--;
434                V(jq->mutex);
435                return NULL;
436             }
437          }
438          jq->running_jobs->append(je);
439
440          /* Attach jcr to this thread while we run the job */
441          set_jcr_in_tsd(jcr);
442          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
443
444          /* Release job queue lock */
445          V(jq->mutex);
446
447          /* Call user's routine here */
448          Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
449             jcr->use_count(), jcr->JobStatus);
450          jq->engine(je->jcr);
451
452          /* Job finished detach from thread */
453          set_jcr_in_tsd(INVALID_JCR);
454
455          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
456             jcr->use_count());
457
458          /* Reacquire job queue lock */
459          P(jq->mutex);
460          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
461          jq->running_jobs->remove(je);
462          /*
463           * Release locks if acquired. Note, they will not have
464           *  been acquired for jobs canceled before they were
465           *  put into the ready queue.
466           */
467          if (jcr->acquired_resource_locks) {
468             dec_read_store(jcr);
469             dec_write_store(jcr);
470             jcr->client->NumConcurrentJobs--;
471             jcr->job->NumConcurrentJobs--;
472             jcr->acquired_resource_locks = false;
473          }
474
475          if (reschedule_job(jcr, jq, je)) {
476             continue;              /* go look for more work */
477          }
478
479          /* Clean up and release old jcr */
480          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
481          jcr->SDJobStatus = 0;
482          V(jq->mutex);                /* release internal lock */
483          free_jcr(jcr);
484          free(je);                    /* release job entry */
485          P(jq->mutex);                /* reacquire job queue lock */
486       }
487       /*
488        * If any job in the wait queue can be run,
489        *  move it to the ready queue
490        */
491       Dmsg0(2300, "Done check ready, now check wait queue.\n");
492       if (!jq->waiting_jobs->empty() && !jq->quit) {
493          int Priority;
494          bool running_allow_mix = false;
495          je = (jobq_item_t *)jq->waiting_jobs->first();
496          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
497          if (re) {
498             Priority = re->jcr->JobPriority;
499             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
500                   re->jcr->JobId, Priority);
501             running_allow_mix = true;
502             for ( ; re; ) {
503                Dmsg2(2300, "JobId %d is also running with %s\n",
504                      re->jcr->JobId, 
505                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
506                if (!re->jcr->job->allow_mixed_priority) {
507                   running_allow_mix = false;
508                   break;
509                }
510                re = (jobq_item_t *)jq->running_jobs->next(re);
511             }
512             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
513                   running_allow_mix ? "allow" : "don't allow");
514          } else {
515             Priority = je->jcr->JobPriority;
516             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
517          }
518          /*
519           * Walk down the list of waiting jobs and attempt
520           *   to acquire the resources it needs.
521           */
522          for ( ; je;  ) {
523             /* je is current job item on the queue, jn is the next one */
524             JCR *jcr = je->jcr;
525             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
526
527             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
528                   jcr->JobId, jcr->JobPriority, Priority,
529                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
530
531             /* Take only jobs of correct Priority */
532             if (!(jcr->JobPriority == Priority
533                   || (jcr->JobPriority < Priority &&
534                       jcr->job->allow_mixed_priority && running_allow_mix))) {
535                set_jcr_job_status(jcr, JS_WaitPriority);
536                break;
537             }
538
539             if (!acquire_resources(jcr)) {
540                /* If resource conflict, job is canceled */
541                if (!job_canceled(jcr)) {
542                   je = jn;            /* point to next waiting job */
543                   continue;
544                }
545             }
546
547             /*
548              * Got all locks, now remove it from wait queue and append it
549              *   to the ready queue.  Note, we may also get here if the
550              *    job was canceled.  Once it is "run", it will quickly
551              *    terminate.
552              */
553             jq->waiting_jobs->remove(je);
554             jq->ready_jobs->append(je);
555             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
556             je = jn;                  /* Point to next waiting job */
557          } /* end for loop */
558
559       } /* end if */
560
561       Dmsg0(2300, "Done checking wait queue.\n");
562       /*
563        * If no more ready work and we are asked to quit, then do it
564        */
565       if (jq->ready_jobs->empty() && jq->quit) {
566          jq->num_workers--;
567          if (jq->num_workers == 0) {
568             Dmsg0(2300, "Wake up destroy routine\n");
569             /* Wake up destroy routine if he is waiting */
570             pthread_cond_broadcast(&jq->work);
571          }
572          break;
573       }
574       Dmsg0(2300, "Check for work request\n");
575       /*
576        * If no more work requests, and we waited long enough, quit
577        */
578       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
579          jq->ready_jobs->empty());
580       if (jq->ready_jobs->empty() && timedout) {
581          Dmsg0(2300, "break big loop\n");
582          jq->num_workers--;
583          break;
584       }
585
586       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
587       if (work) {
588          /*
589           * If a job is waiting on a Resource, don't consume all
590           *   the CPU time looping looking for work, and even more
591           *   important, release the lock so that a job that has
592           *   terminated can give us the resource.
593           */
594          V(jq->mutex);
595          bmicrosleep(2, 0);              /* pause for 2 seconds */
596          P(jq->mutex);
597          /* Recompute work as something may have changed in last 2 secs */
598          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
599       }
600       Dmsg1(2300, "Loop again. work=%d\n", work);
601    } /* end of big for loop */
602
603    Dmsg0(200, "unlock mutex\n");
604    V(jq->mutex);
605    Dmsg0(2300, "End jobq_server\n");
606    return NULL;
607 }
608
609 /*
610  * Returns true if cleanup done and we should look for more work
611  */
612 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
613 {
614    /*
615     * Reschedule the job if necessary and requested
616     */
617    if (jcr->job->RescheduleOnError &&
618        jcr->JobStatus != JS_Terminated &&
619        jcr->JobStatus != JS_Canceled &&
620        jcr->get_JobType() == JT_BACKUP &&
621        (jcr->job->RescheduleTimes == 0 ||
622         jcr->reschedule_count < jcr->job->RescheduleTimes)) {
623        char dt[50], dt2[50];
624
625        /*
626         * Reschedule this job by cleaning it up, but
627         *  reuse the same JobId if possible.
628         */
629       time_t now = time(NULL);
630       jcr->reschedule_count++;
631       jcr->sched_time = now + jcr->job->RescheduleInterval;
632       bstrftime(dt, sizeof(dt), now);
633       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
634       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
635             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
636       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
637            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
638       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
639       jcr->JobStatus = -1;
640       set_jcr_job_status(jcr, JS_WaitStartTime);
641       jcr->SDJobStatus = 0;
642       if (!allow_duplicate_job(jcr)) {
643          return false;
644       }
645       if (jcr->JobBytes == 0) {
646          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
647          V(jq->mutex);
648          jobq_add(jq, jcr);     /* queue the job to run again */
649          P(jq->mutex);
650          free_jcr(jcr);         /* release jcr */
651          free(je);              /* free the job entry */
652          return true;           /* we already cleaned up */
653       }
654       /*
655        * Something was actually backed up, so we cannot reuse
656        *   the old JobId or there will be database record
657        *   conflicts.  We now create a new job, copying the
658        *   appropriate fields.
659        */           
660       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
661       set_jcr_defaults(njcr, jcr->job);
662       njcr->reschedule_count = jcr->reschedule_count;
663       njcr->sched_time = jcr->sched_time;
664       njcr->set_JobLevel(jcr->get_JobLevel());
665       njcr->pool = jcr->pool;
666       njcr->run_pool_override = jcr->run_pool_override;
667       njcr->full_pool = jcr->full_pool;
668       njcr->run_full_pool_override = jcr->run_full_pool_override;
669       njcr->inc_pool = jcr->inc_pool;
670       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
671       njcr->diff_pool = jcr->diff_pool;
672       njcr->JobStatus = -1;
673       set_jcr_job_status(njcr, jcr->JobStatus);
674       if (jcr->rstore) {
675          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
676       } else {
677          free_rstorage(njcr);
678       }
679       if (jcr->wstore) {
680          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
681       } else {
682          free_wstorage(njcr);
683       }
684       njcr->messages = jcr->messages;
685       njcr->spool_data = jcr->spool_data;
686       njcr->write_part_after_job = jcr->write_part_after_job;
687       Dmsg0(2300, "Call to run new job\n");
688       V(jq->mutex);
689       run_job(njcr);            /* This creates a "new" job */
690       free_jcr(njcr);           /* release "new" jcr */
691       P(jq->mutex);
692       Dmsg0(2300, "Back from running new job.\n");
693    }
694    return false;
695 }
696
697 /*
698  * See if we can acquire all the necessary resources for the job (JCR)
699  *
700  *  Returns: true  if successful
701  *           false if resource failure
702  */
703 static bool acquire_resources(JCR *jcr)
704 {
705    bool skip_this_jcr = false;
706
707    jcr->acquired_resource_locks = false;
708 /*
709  * Turning this code off is likely to cause some deadlocks,
710  *   but we do not really have enough information here to
711  *   know if this is really a deadlock (it may be a dual drive
712  *   autochanger), and in principle, the SD reservation system
713  *   should detect these deadlocks, so push the work off on is.
714  */
715 #ifdef xxx
716    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
717       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
718          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"), 
719          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
720       set_jcr_job_status(jcr, JS_Canceled);
721       return false;
722    }
723 #endif
724    if (jcr->rstore) {
725       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
726       if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
727          jcr->rstore->NumConcurrentReadJobs++;
728          jcr->rstore->NumConcurrentJobs++;
729          Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
730       } else {
731          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
732          set_jcr_job_status(jcr, JS_WaitStoreRes);
733          return false;
734       }
735    }
736    
737    if (jcr->wstore) {
738       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
739       if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
740          jcr->wstore->NumConcurrentJobs++;
741          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
742       } else if (jcr->rstore) {
743          dec_read_store(jcr);
744          skip_this_jcr = true;
745       } else {
746          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
747          skip_this_jcr = true;
748       }
749    }
750    if (skip_this_jcr) {
751       set_jcr_job_status(jcr, JS_WaitStoreRes);
752       return false;
753    }
754
755    if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
756       jcr->client->NumConcurrentJobs++;
757    } else {
758       /* Back out previous locks */
759       dec_write_store(jcr);
760       dec_read_store(jcr);
761       set_jcr_job_status(jcr, JS_WaitClientRes);
762       return false;
763    }
764    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
765       jcr->job->NumConcurrentJobs++;
766    } else {
767       /* Back out previous locks */
768       dec_write_store(jcr);
769       dec_read_store(jcr);
770       jcr->client->NumConcurrentJobs--;
771       set_jcr_job_status(jcr, JS_WaitJobRes);
772       return false;
773    }
774
775    jcr->acquired_resource_locks = true;
776    return true;
777 }
778
779 static void dec_read_store(JCR *jcr)
780 {
781    if (jcr->rstore) {
782       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
783       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
784       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
785       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
786       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
787    }
788 }
789
790 static void dec_write_store(JCR *jcr)
791 {
792    if (jcr->wstore) {
793       jcr->wstore->NumConcurrentJobs--;
794       Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
795       ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
796    }
797 }