]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
kes Implement bsr block level checking for disk files. However,
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula job queue routines.
30  *
31  *  This code consists of three queues, the waiting_jobs
32  *  queue, where jobs are initially queued, the ready_jobs
33  *  queue, where jobs are placed when all the resources are
34  *  allocated and they can immediately be run, and the
35  *  running queue where jobs are placed when they are
36  *  running.
37  *
38  *  Kern Sibbald, July MMIII
39  *
40  *   Version $Id$
41  *
42  *  This code was adapted from the Bacula workq, which was
43  *    adapted from "Programming with POSIX Threads", by
44  *    David R. Butenhof
45  *
46  */
47
48 #include "bacula.h"
49 #include "dird.h"
50
51 extern JCR *jobs;
52
53 /* Forward referenced functions */
54 extern "C" void *jobq_server(void *arg);
55 extern "C" void *sched_wait(void *arg);
56
57 static int  start_server(jobq_t *jq);
58 static bool acquire_resources(JCR *jcr);
59 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
60 static void dec_read_store(JCR *jcr);
61 static void dec_write_store(JCR *jcr);
62
63 /*
64  * Initialize a job queue
65  *
66  *  Returns: 0 on success
67  *           errno on failure
68  */
69 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
70 {
71    int stat;
72    jobq_item_t *item = NULL;
73
74    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
75       berrno be;
76       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
77       return stat;
78    }
79    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
80       pthread_attr_destroy(&jq->attr);
81       return stat;
82    }
83    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
84       berrno be;
85       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
86       pthread_attr_destroy(&jq->attr);
87       return stat;
88    }
89    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
90       berrno be;
91       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
92       pthread_mutex_destroy(&jq->mutex);
93       pthread_attr_destroy(&jq->attr);
94       return stat;
95    }
96    jq->quit = false;
97    jq->max_workers = threads;         /* max threads to create */
98    jq->num_workers = 0;               /* no threads yet */
99    jq->idle_workers = 0;              /* no idle threads */
100    jq->engine = engine;               /* routine to run */
101    jq->valid = JOBQ_VALID;
102    /* Initialize the job queues */
103    jq->waiting_jobs = New(dlist(item, &item->link));
104    jq->running_jobs = New(dlist(item, &item->link));
105    jq->ready_jobs = New(dlist(item, &item->link));
106    return 0;
107 }
108
109 /*
110  * Destroy the job queue
111  *
112  * Returns: 0 on success
113  *          errno on failure
114  */
115 int jobq_destroy(jobq_t *jq)
116 {
117    int stat, stat1, stat2;
118
119    if (jq->valid != JOBQ_VALID) {
120       return EINVAL;
121    }
122    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
123       berrno be;
124       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
125       return stat;
126    }
127    jq->valid = 0;                      /* prevent any more operations */
128
129    /* 
130     * If any threads are active, wake them 
131     */
132    if (jq->num_workers > 0) {
133       jq->quit = true;
134       if (jq->idle_workers) {
135          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
136             berrno be;
137             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
138             pthread_mutex_unlock(&jq->mutex);
139             return stat;
140          }
141       }
142       while (jq->num_workers > 0) {
143          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
144             berrno be;
145             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
146             pthread_mutex_unlock(&jq->mutex);
147             return stat;
148          }
149       }
150    }
151    if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
152       berrno be;
153       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.bstrerror(stat));
154       return stat;
155    }
156    stat  = pthread_mutex_destroy(&jq->mutex);
157    stat1 = pthread_cond_destroy(&jq->work);
158    stat2 = pthread_attr_destroy(&jq->attr);
159    delete jq->waiting_jobs;
160    delete jq->running_jobs;
161    delete jq->ready_jobs;
162    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
163 }
164
165 struct wait_pkt {
166    JCR *jcr;
167    jobq_t *jq;
168 };
169
170 /*
171  * Wait until schedule time arrives before starting. Normally
172  *  this routine is only used for jobs started from the console
173  *  for which the user explicitly specified a start time. Otherwise
174  *  most jobs are put into the job queue only when their
175  *  scheduled time arives.
176  */
177 extern "C"
178 void *sched_wait(void *arg)
179 {
180    JCR *jcr = ((wait_pkt *)arg)->jcr;
181    jobq_t *jq = ((wait_pkt *)arg)->jq;
182
183    Dmsg0(2300, "Enter sched_wait.\n");
184    free(arg);
185    time_t wtime = jcr->sched_time - time(NULL);
186    set_jcr_job_status(jcr, JS_WaitStartTime);
187    /* Wait until scheduled time arrives */
188    if (wtime > 0) {
189       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
190          jcr->Job, wtime);
191    }
192    /* Check every 30 seconds if canceled */
193    while (wtime > 0) {
194       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", 
195          jcr->JobId, wtime, jcr->use_count());
196       if (wtime > 30) {
197          wtime = 30;
198       }
199       bmicrosleep(wtime, 0);
200       if (job_canceled(jcr)) {
201          break;
202       }
203       wtime = jcr->sched_time - time(NULL);
204    }
205    Dmsg1(200, "resched use=%d\n", jcr->use_count());
206    jobq_add(jq, jcr);
207    free_jcr(jcr);                     /* we are done with jcr */
208    Dmsg0(2300, "Exit sched_wait\n");
209    return NULL;
210 }
211
212 /*
213  *  Add a job to the queue
214  *    jq is a queue that was created with jobq_init
215  */
216 int jobq_add(jobq_t *jq, JCR *jcr)
217 {
218    int stat;
219    jobq_item_t *item, *li;
220    bool inserted = false;
221    time_t wtime = jcr->sched_time - time(NULL);
222    pthread_t id;
223    wait_pkt *sched_pkt;
224
225    if (!jcr->term_wait_inited) { 
226       /* Initialize termination condition variable */
227       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
228          berrno be;
229          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
230          return stat;
231       }
232       jcr->term_wait_inited = true;
233    }                           
234                              
235    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
236    if (jq->valid != JOBQ_VALID) {
237       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
238       return EINVAL;
239    }
240
241    jcr->inc_use_count();                 /* mark jcr in use by us */
242    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
243    if (!job_canceled(jcr) && wtime > 0) {
244       set_thread_concurrency(jq->max_workers + 2);
245       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
246       sched_pkt->jcr = jcr;
247       sched_pkt->jq = jq;
248       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
249       if (stat != 0) {                /* thread not created */
250          berrno be;
251          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
252       }
253       return stat;
254    }
255
256    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
257       berrno be;
258       Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
259       free_jcr(jcr);                    /* release jcr */
260       return stat;
261    }
262
263    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
264       free_jcr(jcr);                    /* release jcr */
265       return ENOMEM;
266    }
267    item->jcr = jcr;
268
269    if (job_canceled(jcr)) {
270       /* Add job to ready queue so that it is canceled quickly */
271       jq->ready_jobs->prepend(item);
272       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
273    } else {
274       /* Add this job to the wait queue in priority sorted order */
275       foreach_dlist(li, jq->waiting_jobs) {
276          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
277             li->jcr->JobId, li->jcr->JobPriority);
278          if (li->jcr->JobPriority > jcr->JobPriority) {
279             jq->waiting_jobs->insert_before(item, li);
280             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
281                li->jcr->JobId, jcr->JobId);
282             inserted = true;
283             break;
284          }
285       }
286       /* If not jobs in wait queue, append it */
287       if (!inserted) {
288          jq->waiting_jobs->append(item);
289          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
290       }
291    }
292
293    /* Ensure that at least one server looks at the queue. */
294    stat = start_server(jq);
295
296    pthread_mutex_unlock(&jq->mutex);
297    Dmsg0(2300, "Return jobq_add\n");
298    return stat;
299 }
300
301 /*
302  *  Remove a job from the job queue. Used only by cancel_job().
303  *    jq is a queue that was created with jobq_init
304  *    work_item is an element of work
305  *
306  *   Note, it is "removed" from the job queue.
307  *    If you want to cancel it, you need to provide some external means
308  *    of doing so (e.g. pthread_kill()).
309  */
310 int jobq_remove(jobq_t *jq, JCR *jcr)
311 {
312    int stat;
313    bool found = false;
314    jobq_item_t *item;
315
316    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
317    if (jq->valid != JOBQ_VALID) {
318       return EINVAL;
319    }
320
321    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
322       berrno be;
323       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
324       return stat;
325    }
326
327    foreach_dlist(item, jq->waiting_jobs) {
328       if (jcr == item->jcr) {
329          found = true;
330          break;
331       }
332    }
333    if (!found) {
334       pthread_mutex_unlock(&jq->mutex);
335       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
336       return EINVAL;
337    }
338
339    /* Move item to be the first on the list */
340    jq->waiting_jobs->remove(item);
341    jq->ready_jobs->prepend(item);
342    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
343
344    stat = start_server(jq);
345
346    pthread_mutex_unlock(&jq->mutex);
347    Dmsg0(2300, "Return jobq_remove\n");
348    return stat;
349 }
350
351
352 /*
353  * Start the server thread if it isn't already running
354  */
355 static int start_server(jobq_t *jq)
356 {
357    int stat = 0;
358    pthread_t id;
359
360    /*
361     * if any threads are idle, wake one.
362     *   Actually we do a broadcast because on /lib/tls 
363     *   these signals seem to get lost from time to time.
364     */
365    if (jq->idle_workers > 0) {
366       Dmsg0(2300, "Signal worker to wake up\n");
367       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
368          berrno be;
369          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
370          return stat;
371       }
372    } else if (jq->num_workers < jq->max_workers) {
373       Dmsg0(2300, "Create worker thread\n");
374       /* No idle threads so create a new one */
375       set_thread_concurrency(jq->max_workers + 1);
376       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
377          berrno be;
378          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
379          return stat;
380       }
381    }
382    return stat;
383 }
384
385
386 /*
387  * This is the worker thread that serves the job queue.
388  * When all the resources are acquired for the job,
389  *  it will call the user's engine.
390  */
391 extern "C"
392 void *jobq_server(void *arg)
393 {
394    struct timespec timeout;
395    jobq_t *jq = (jobq_t *)arg;
396    jobq_item_t *je;                   /* job entry in queue */
397    int stat;
398    bool timedout = false;
399    bool work = true;
400
401    Dmsg0(2300, "Start jobq_server\n");
402    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
403       berrno be;
404       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
405       return NULL;
406    }
407    jq->num_workers++;
408
409    for (;;) {
410       struct timeval tv;
411       struct timezone tz;
412
413       Dmsg0(2300, "Top of for loop\n");
414       if (!work && !jq->quit) {
415          gettimeofday(&tv, &tz);
416          timeout.tv_nsec = 0;
417          timeout.tv_sec = tv.tv_sec + 4;
418
419          while (!jq->quit) {
420             /*
421              * Wait 4 seconds, then if no more work, exit
422              */
423             Dmsg0(2300, "pthread_cond_timedwait()\n");
424             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
425             if (stat == ETIMEDOUT) {
426                Dmsg0(2300, "timedwait timedout.\n");
427                timedout = true;
428                break;
429             } else if (stat != 0) {
430                /* This shouldn't happen */
431                Dmsg0(2300, "This shouldn't happen\n");
432                jq->num_workers--;
433                pthread_mutex_unlock(&jq->mutex);
434                return NULL;
435             }
436             break;
437          }
438       }
439       /*
440        * If anything is in the ready queue, run it
441        */
442       Dmsg0(2300, "Checking ready queue.\n");
443       while (!jq->ready_jobs->empty() && !jq->quit) {
444          JCR *jcr;
445          je = (jobq_item_t *)jq->ready_jobs->first();
446          jcr = je->jcr;
447          jq->ready_jobs->remove(je);
448          if (!jq->ready_jobs->empty()) {
449             Dmsg0(2300, "ready queue not empty start server\n");
450             if (start_server(jq) != 0) {
451                jq->num_workers--;
452                pthread_mutex_unlock(&jq->mutex);
453                return NULL;
454             }
455          }
456          jq->running_jobs->append(je);
457          set_jcr_in_tsd(jcr);
458          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
459
460          /* Release job queue lock */
461          V(jq->mutex);
462
463          /* Call user's routine here */
464          Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
465             jcr->use_count());
466          jq->engine(je->jcr);
467
468          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
469             jcr->use_count());
470
471          /* Reacquire job queue lock */
472          P(jq->mutex);
473          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
474          jq->running_jobs->remove(je);
475          /*
476           * Release locks if acquired. Note, they will not have
477           *  been acquired for jobs canceled before they were
478           *  put into the ready queue.
479           */
480          if (jcr->acquired_resource_locks) {
481             dec_read_store(jcr);
482             dec_write_store(jcr);
483             jcr->client->NumConcurrentJobs--;
484             jcr->job->NumConcurrentJobs--;
485             jcr->acquired_resource_locks = false;
486          }
487
488          if (reschedule_job(jcr, jq, je)) {
489             continue;              /* go look for more work */
490          }
491
492          /* Clean up and release old jcr */
493          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
494          jcr->SDJobStatus = 0;
495          V(jq->mutex);                /* release internal lock */
496          free_jcr(jcr);
497          free(je);                    /* release job entry */
498          P(jq->mutex);                /* reacquire job queue lock */
499       }
500       /*
501        * If any job in the wait queue can be run,
502        *  move it to the ready queue
503        */
504       Dmsg0(2300, "Done check ready, now check wait queue.\n");
505       if (!jq->waiting_jobs->empty() && !jq->quit) {
506          int Priority;
507          bool running_allow_mix = false;
508          je = (jobq_item_t *)jq->waiting_jobs->first();
509          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
510          if (re) {
511             Priority = re->jcr->JobPriority;
512             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
513                   re->jcr->JobId, Priority);
514             running_allow_mix = true;
515             for ( ; re; ) {
516                Dmsg2(2300, "JobId %d is also running with %s\n",
517                      re->jcr->JobId, 
518                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
519                if (!re->jcr->job->allow_mixed_priority) {
520                   running_allow_mix = false;
521                   break;
522                }
523                re = (jobq_item_t *)jq->running_jobs->next(re);
524             }
525             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
526                   running_allow_mix ? "allow" : "don't allow");
527          } else {
528             Priority = je->jcr->JobPriority;
529             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
530          }
531          /*
532           * Walk down the list of waiting jobs and attempt
533           *   to acquire the resources it needs.
534           */
535          for ( ; je;  ) {
536             /* je is current job item on the queue, jn is the next one */
537             JCR *jcr = je->jcr;
538             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
539
540             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
541                   jcr->JobId, jcr->JobPriority, Priority,
542                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
543
544             /* Take only jobs of correct Priority */
545             if (!(jcr->JobPriority == Priority
546                   || (jcr->JobPriority < Priority &&
547                       jcr->job->allow_mixed_priority && running_allow_mix))) {
548                set_jcr_job_status(jcr, JS_WaitPriority);
549                break;
550             }
551
552             if (!acquire_resources(jcr)) {
553                /* If resource conflict, job is canceled */
554                if (!job_canceled(jcr)) {
555                   je = jn;            /* point to next waiting job */
556                   continue;
557                }
558             }
559
560             /*
561              * Got all locks, now remove it from wait queue and append it
562              *   to the ready queue.  Note, we may also get here if the
563              *    job was canceled.  Once it is "run", it will quickly
564              *    terminate.
565              */
566             jq->waiting_jobs->remove(je);
567             jq->ready_jobs->append(je);
568             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
569             je = jn;                  /* Point to next waiting job */
570          } /* end for loop */
571
572       } /* end if */
573
574       Dmsg0(2300, "Done checking wait queue.\n");
575       /*
576        * If no more ready work and we are asked to quit, then do it
577        */
578       if (jq->ready_jobs->empty() && jq->quit) {
579          jq->num_workers--;
580          if (jq->num_workers == 0) {
581             Dmsg0(2300, "Wake up destroy routine\n");
582             /* Wake up destroy routine if he is waiting */
583             pthread_cond_broadcast(&jq->work);
584          }
585          break;
586       }
587       Dmsg0(2300, "Check for work request\n");
588       /*
589        * If no more work requests, and we waited long enough, quit
590        */
591       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
592          jq->ready_jobs->empty());
593       if (jq->ready_jobs->empty() && timedout) {
594          Dmsg0(2300, "break big loop\n");
595          jq->num_workers--;
596          break;
597       }
598
599       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
600       if (work) {
601          /*
602           * If a job is waiting on a Resource, don't consume all
603           *   the CPU time looping looking for work, and even more
604           *   important, release the lock so that a job that has
605           *   terminated can give us the resource.
606           */
607          V(jq->mutex);
608          bmicrosleep(2, 0);              /* pause for 2 seconds */
609          P(jq->mutex);
610          /* Recompute work as something may have changed in last 2 secs */
611          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
612       }
613       Dmsg1(2300, "Loop again. work=%d\n", work);
614    } /* end of big for loop */
615
616    Dmsg0(200, "unlock mutex\n");
617    V(jq->mutex);
618    Dmsg0(2300, "End jobq_server\n");
619    return NULL;
620 }
621
622 /*
623  * Returns true if cleanup done and we should look for more work
624  */
625 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
626 {
627    /*
628     * Reschedule the job if necessary and requested
629     */
630    if (jcr->job->RescheduleOnError &&
631        jcr->JobStatus != JS_Terminated &&
632        jcr->JobStatus != JS_Canceled &&
633        jcr->get_JobType() == JT_BACKUP &&
634        (jcr->job->RescheduleTimes == 0 ||
635         jcr->reschedule_count < jcr->job->RescheduleTimes)) {
636        char dt[50], dt2[50];
637
638        /*
639         * Reschedule this job by cleaning it up, but
640         *  reuse the same JobId if possible.
641         */
642       time_t now = time(NULL);
643       jcr->reschedule_count++;
644       jcr->sched_time = now + jcr->job->RescheduleInterval;
645       bstrftime(dt, sizeof(dt), now);
646       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
647       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
648             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
649       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
650            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
651       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
652       jcr->JobStatus = -1;
653       set_jcr_job_status(jcr, JS_WaitStartTime);
654       jcr->SDJobStatus = 0;
655       if (!allow_duplicate_job(jcr)) {
656          return false;
657       }
658       if (jcr->JobBytes == 0) {
659          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
660          V(jq->mutex);
661          jobq_add(jq, jcr);     /* queue the job to run again */
662          P(jq->mutex);
663          free_jcr(jcr);         /* release jcr */
664          free(je);              /* free the job entry */
665          return true;           /* we already cleaned up */
666       }
667       /*
668        * Something was actually backed up, so we cannot reuse
669        *   the old JobId or there will be database record
670        *   conflicts.  We now create a new job, copying the
671        *   appropriate fields.
672        */           
673       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
674       set_jcr_defaults(njcr, jcr->job);
675       njcr->reschedule_count = jcr->reschedule_count;
676       njcr->sched_time = jcr->sched_time;
677       njcr->set_JobLevel(jcr->get_JobLevel());
678       njcr->pool = jcr->pool;
679       njcr->run_pool_override = jcr->run_pool_override;
680       njcr->full_pool = jcr->full_pool;
681       njcr->run_full_pool_override = jcr->run_full_pool_override;
682       njcr->inc_pool = jcr->inc_pool;
683       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
684       njcr->diff_pool = jcr->diff_pool;
685       njcr->JobStatus = -1;
686       set_jcr_job_status(njcr, jcr->JobStatus);
687       if (jcr->rstore) {
688          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
689       } else {
690          free_rstorage(njcr);
691       }
692       if (jcr->wstore) {
693          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
694       } else {
695          free_wstorage(njcr);
696       }
697       njcr->messages = jcr->messages;
698       njcr->spool_data = jcr->spool_data;
699       njcr->write_part_after_job = jcr->write_part_after_job;
700       Dmsg0(2300, "Call to run new job\n");
701       V(jq->mutex);
702       run_job(njcr);            /* This creates a "new" job */
703       free_jcr(njcr);           /* release "new" jcr */
704       P(jq->mutex);
705       Dmsg0(2300, "Back from running new job.\n");
706    }
707    return false;
708 }
709
710 /*
711  * See if we can acquire all the necessary resources for the job (JCR)
712  *
713  *  Returns: true  if successful
714  *           false if resource failure
715  */
716 static bool acquire_resources(JCR *jcr)
717 {
718    bool skip_this_jcr = false;
719
720    jcr->acquired_resource_locks = false;
721 /*
722  * Turning this code off is likely to cause some deadlocks,
723  *   but we do not really have enough information here to
724  *   know if this is really a deadlock (it may be a dual drive
725  *   autochanger), and in principle, the SD reservation system
726  *   should detect these deadlocks, so push the work off on is.
727  */
728 #ifdef xxx
729    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
730       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
731          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"), 
732          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
733       set_jcr_job_status(jcr, JS_Canceled);
734       return false;
735    }
736 #endif
737    if (jcr->rstore) {
738       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
739       if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
740          jcr->rstore->NumConcurrentReadJobs++;
741          jcr->rstore->NumConcurrentJobs++;
742          Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
743       } else {
744          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
745          set_jcr_job_status(jcr, JS_WaitStoreRes);
746          return false;
747       }
748    }
749    
750    if (jcr->wstore) {
751       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
752       if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
753          jcr->wstore->NumConcurrentJobs++;
754          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
755       } else if (jcr->rstore) {
756          dec_read_store(jcr);
757          skip_this_jcr = true;
758       } else {
759          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
760          skip_this_jcr = true;
761       }
762    }
763    if (skip_this_jcr) {
764       set_jcr_job_status(jcr, JS_WaitStoreRes);
765       return false;
766    }
767
768    if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
769       jcr->client->NumConcurrentJobs++;
770    } else {
771       /* Back out previous locks */
772       dec_write_store(jcr);
773       dec_read_store(jcr);
774       set_jcr_job_status(jcr, JS_WaitClientRes);
775       return false;
776    }
777    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
778       jcr->job->NumConcurrentJobs++;
779    } else {
780       /* Back out previous locks */
781       dec_write_store(jcr);
782       dec_read_store(jcr);
783       jcr->client->NumConcurrentJobs--;
784       set_jcr_job_status(jcr, JS_WaitJobRes);
785       return false;
786    }
787
788    jcr->acquired_resource_locks = true;
789    return true;
790 }
791
792 static void dec_read_store(JCR *jcr)
793 {
794    if (jcr->rstore) {
795       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
796       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
797       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
798       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
799       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
800    }
801 }
802
803 static void dec_write_store(JCR *jcr)
804 {
805    if (jcr->wstore) {
806       jcr->wstore->NumConcurrentJobs--;
807       Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
808       ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
809    }
810 }