]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
Fix and document new queries
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula job queue routines.
30  *
31  *  This code consists of three queues, the waiting_jobs
32  *  queue, where jobs are initially queued, the ready_jobs
33  *  queue, where jobs are placed when all the resources are
34  *  allocated and they can immediately be run, and the
35  *  running queue where jobs are placed when they are
36  *  running.
37  *
38  *  Kern Sibbald, July MMIII
39  *
40  *   Version $Id$
41  *
42  *  This code was adapted from the Bacula workq, which was
43  *    adapted from "Programming with POSIX Threads", by
44  *    David R. Butenhof
45  *
46  */
47
48 #include "bacula.h"
49 #include "dird.h"
50
51 extern JCR *jobs;
52
53 /* Forward referenced functions */
54 extern "C" void *jobq_server(void *arg);
55 extern "C" void *sched_wait(void *arg);
56
57 static int  start_server(jobq_t *jq);
58 static bool acquire_resources(JCR *jcr);
59 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
60 static void dec_read_store(JCR *jcr);
61 static void dec_write_store(JCR *jcr);
62
63 /*
64  * Initialize a job queue
65  *
66  *  Returns: 0 on success
67  *           errno on failure
68  */
69 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
70 {
71    int stat;
72    jobq_item_t *item = NULL;
73
74    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
75       berrno be;
76       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
77       return stat;
78    }
79    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
80       pthread_attr_destroy(&jq->attr);
81       return stat;
82    }
83    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
84       berrno be;
85       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
86       pthread_attr_destroy(&jq->attr);
87       return stat;
88    }
89    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
90       berrno be;
91       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
92       pthread_mutex_destroy(&jq->mutex);
93       pthread_attr_destroy(&jq->attr);
94       return stat;
95    }
96    jq->quit = false;
97    jq->max_workers = threads;         /* max threads to create */
98    jq->num_workers = 0;               /* no threads yet */
99    jq->idle_workers = 0;              /* no idle threads */
100    jq->engine = engine;               /* routine to run */
101    jq->valid = JOBQ_VALID;
102    /* Initialize the job queues */
103    jq->waiting_jobs = New(dlist(item, &item->link));
104    jq->running_jobs = New(dlist(item, &item->link));
105    jq->ready_jobs = New(dlist(item, &item->link));
106    return 0;
107 }
108
109 /*
110  * Destroy the job queue
111  *
112  * Returns: 0 on success
113  *          errno on failure
114  */
115 int jobq_destroy(jobq_t *jq)
116 {
117    int stat, stat1, stat2;
118
119    if (jq->valid != JOBQ_VALID) {
120       return EINVAL;
121    }
122    P(jq->mutex);
123    jq->valid = 0;                      /* prevent any more operations */
124
125    /* 
126     * If any threads are active, wake them 
127     */
128    if (jq->num_workers > 0) {
129       jq->quit = true;
130       if (jq->idle_workers) {
131          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
132             berrno be;
133             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
134             V(jq->mutex);
135             return stat;
136          }
137       }
138       while (jq->num_workers > 0) {
139          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
140             berrno be;
141             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
142             V(jq->mutex);
143             return stat;
144          }
145       }
146    }
147    V(jq->mutex);
148    stat  = pthread_mutex_destroy(&jq->mutex);
149    stat1 = pthread_cond_destroy(&jq->work);
150    stat2 = pthread_attr_destroy(&jq->attr);
151    delete jq->waiting_jobs;
152    delete jq->running_jobs;
153    delete jq->ready_jobs;
154    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
155 }
156
157 struct wait_pkt {
158    JCR *jcr;
159    jobq_t *jq;
160 };
161
162 /*
163  * Wait until schedule time arrives before starting. Normally
164  *  this routine is only used for jobs started from the console
165  *  for which the user explicitly specified a start time. Otherwise
166  *  most jobs are put into the job queue only when their
167  *  scheduled time arives.
168  */
169 extern "C"
170 void *sched_wait(void *arg)
171 {
172    JCR *jcr = ((wait_pkt *)arg)->jcr;
173    jobq_t *jq = ((wait_pkt *)arg)->jq;
174
175    set_jcr_in_tsd(jcr);
176    Dmsg0(2300, "Enter sched_wait.\n");
177    free(arg);
178    time_t wtime = jcr->sched_time - time(NULL);
179    set_jcr_job_status(jcr, JS_WaitStartTime);
180    /* Wait until scheduled time arrives */
181    if (wtime > 0) {
182       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
183          jcr->Job, wtime);
184    }
185    /* Check every 30 seconds if canceled */
186    while (wtime > 0) {
187       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", 
188          jcr->JobId, wtime, jcr->use_count());
189       if (wtime > 30) {
190          wtime = 30;
191       }
192       bmicrosleep(wtime, 0);
193       if (job_canceled(jcr)) {
194          break;
195       }
196       wtime = jcr->sched_time - time(NULL);
197    }
198    Dmsg1(200, "resched use=%d\n", jcr->use_count());
199    jobq_add(jq, jcr);
200    free_jcr(jcr);                     /* we are done with jcr */
201    Dmsg0(2300, "Exit sched_wait\n");
202    return NULL;
203 }
204
205 /*
206  *  Add a job to the queue
207  *    jq is a queue that was created with jobq_init
208  */
209 int jobq_add(jobq_t *jq, JCR *jcr)
210 {
211    int stat;
212    jobq_item_t *item, *li;
213    bool inserted = false;
214    time_t wtime = jcr->sched_time - time(NULL);
215    pthread_t id;
216    wait_pkt *sched_pkt;
217
218    if (!jcr->term_wait_inited) { 
219       /* Initialize termination condition variable */
220       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
221          berrno be;
222          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
223          return stat;
224       }
225       jcr->term_wait_inited = true;
226    }                           
227                              
228    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
229    if (jq->valid != JOBQ_VALID) {
230       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
231       return EINVAL;
232    }
233
234    jcr->inc_use_count();                 /* mark jcr in use by us */
235    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
236    if (!job_canceled(jcr) && wtime > 0) {
237       set_thread_concurrency(jq->max_workers + 2);
238       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
239       sched_pkt->jcr = jcr;
240       sched_pkt->jq = jq;
241       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
242       if (stat != 0) {                /* thread not created */
243          berrno be;
244          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
245       }
246       return stat;
247    }
248
249    P(jq->mutex);
250
251    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
252       free_jcr(jcr);                    /* release jcr */
253       return ENOMEM;
254    }
255    item->jcr = jcr;
256
257    /* While waiting in a queue this job is not attached to a thread */
258    set_jcr_in_tsd(INVALID_JCR);
259    if (job_canceled(jcr)) {
260       /* Add job to ready queue so that it is canceled quickly */
261       jq->ready_jobs->prepend(item);
262       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
263    } else {
264       /* Add this job to the wait queue in priority sorted order */
265       foreach_dlist(li, jq->waiting_jobs) {
266          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
267             li->jcr->JobId, li->jcr->JobPriority);
268          if (li->jcr->JobPriority > jcr->JobPriority) {
269             jq->waiting_jobs->insert_before(item, li);
270             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
271                li->jcr->JobId, jcr->JobId);
272             inserted = true;
273             break;
274          }
275       }
276       /* If not jobs in wait queue, append it */
277       if (!inserted) {
278          jq->waiting_jobs->append(item);
279          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
280       }
281    }
282
283    /* Ensure that at least one server looks at the queue. */
284    stat = start_server(jq);
285
286    V(jq->mutex);
287    Dmsg0(2300, "Return jobq_add\n");
288    return stat;
289 }
290
291 /*
292  *  Remove a job from the job queue. Used only by cancel_job().
293  *    jq is a queue that was created with jobq_init
294  *    work_item is an element of work
295  *
296  *   Note, it is "removed" from the job queue.
297  *    If you want to cancel it, you need to provide some external means
298  *    of doing so (e.g. pthread_kill()).
299  */
300 int jobq_remove(jobq_t *jq, JCR *jcr)
301 {
302    int stat;
303    bool found = false;
304    jobq_item_t *item;
305
306    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
307    if (jq->valid != JOBQ_VALID) {
308       return EINVAL;
309    }
310
311    P(jq->mutex);
312    foreach_dlist(item, jq->waiting_jobs) {
313       if (jcr == item->jcr) {
314          found = true;
315          break;
316       }
317    }
318    if (!found) {
319       V(jq->mutex);
320       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
321       return EINVAL;
322    }
323
324    /* Move item to be the first on the list */
325    jq->waiting_jobs->remove(item);
326    jq->ready_jobs->prepend(item);
327    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
328
329    stat = start_server(jq);
330
331    V(jq->mutex);
332    Dmsg0(2300, "Return jobq_remove\n");
333    return stat;
334 }
335
336
337 /*
338  * Start the server thread if it isn't already running
339  */
340 static int start_server(jobq_t *jq)
341 {
342    int stat = 0;
343    pthread_t id;
344
345    /*
346     * if any threads are idle, wake one.
347     *   Actually we do a broadcast because on /lib/tls 
348     *   these signals seem to get lost from time to time.
349     */
350    if (jq->idle_workers > 0) {
351       Dmsg0(2300, "Signal worker to wake up\n");
352       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
353          berrno be;
354          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
355          return stat;
356       }
357    } else if (jq->num_workers < jq->max_workers) {
358       Dmsg0(2300, "Create worker thread\n");
359       /* No idle threads so create a new one */
360       set_thread_concurrency(jq->max_workers + 1);
361       jq->num_workers++;
362       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
363          berrno be;
364          jq->num_workers--;
365          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
366          return stat;
367       }
368    }
369    return stat;
370 }
371
372
373 /*
374  * This is the worker thread that serves the job queue.
375  * When all the resources are acquired for the job,
376  *  it will call the user's engine.
377  */
378 extern "C"
379 void *jobq_server(void *arg)
380 {
381    struct timespec timeout;
382    jobq_t *jq = (jobq_t *)arg;
383    jobq_item_t *je;                   /* job entry in queue */
384    int stat;
385    bool timedout = false;
386    bool work = true;
387
388    set_jcr_in_tsd(INVALID_JCR);
389    Dmsg0(2300, "Start jobq_server\n");
390    P(jq->mutex);
391
392    for (;;) {
393       struct timeval tv;
394       struct timezone tz;
395
396       Dmsg0(2300, "Top of for loop\n");
397       if (!work && !jq->quit) {
398          gettimeofday(&tv, &tz);
399          timeout.tv_nsec = 0;
400          timeout.tv_sec = tv.tv_sec + 4;
401
402          while (!jq->quit) {
403             /*
404              * Wait 4 seconds, then if no more work, exit
405              */
406             Dmsg0(2300, "pthread_cond_timedwait()\n");
407             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
408             if (stat == ETIMEDOUT) {
409                Dmsg0(2300, "timedwait timedout.\n");
410                timedout = true;
411                break;
412             } else if (stat != 0) {
413                /* This shouldn't happen */
414                Dmsg0(2300, "This shouldn't happen\n");
415                jq->num_workers--;
416                V(jq->mutex);
417                return NULL;
418             }
419             break;
420          }
421       }
422       /*
423        * If anything is in the ready queue, run it
424        */
425       Dmsg0(2300, "Checking ready queue.\n");
426       while (!jq->ready_jobs->empty() && !jq->quit) {
427          JCR *jcr;
428          je = (jobq_item_t *)jq->ready_jobs->first();
429          jcr = je->jcr;
430          jq->ready_jobs->remove(je);
431          if (!jq->ready_jobs->empty()) {
432             Dmsg0(2300, "ready queue not empty start server\n");
433             if (start_server(jq) != 0) {
434                jq->num_workers--;
435                V(jq->mutex);
436                return NULL;
437             }
438          }
439          jq->running_jobs->append(je);
440
441          /* Attach jcr to this thread while we run the job */
442          set_jcr_in_tsd(jcr);
443          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
444
445          /* Release job queue lock */
446          V(jq->mutex);
447
448          /* Call user's routine here */
449          Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
450             jcr->use_count());
451          jq->engine(je->jcr);
452
453          /* Job finished detach from thread */
454          set_jcr_in_tsd(INVALID_JCR);
455
456          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
457             jcr->use_count());
458
459          /* Reacquire job queue lock */
460          P(jq->mutex);
461          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
462          jq->running_jobs->remove(je);
463          /*
464           * Release locks if acquired. Note, they will not have
465           *  been acquired for jobs canceled before they were
466           *  put into the ready queue.
467           */
468          if (jcr->acquired_resource_locks) {
469             dec_read_store(jcr);
470             dec_write_store(jcr);
471             jcr->client->NumConcurrentJobs--;
472             jcr->job->NumConcurrentJobs--;
473             jcr->acquired_resource_locks = false;
474          }
475
476          if (reschedule_job(jcr, jq, je)) {
477             continue;              /* go look for more work */
478          }
479
480          /* Clean up and release old jcr */
481          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
482          jcr->SDJobStatus = 0;
483          V(jq->mutex);                /* release internal lock */
484          free_jcr(jcr);
485          free(je);                    /* release job entry */
486          P(jq->mutex);                /* reacquire job queue lock */
487       }
488       /*
489        * If any job in the wait queue can be run,
490        *  move it to the ready queue
491        */
492       Dmsg0(2300, "Done check ready, now check wait queue.\n");
493       if (!jq->waiting_jobs->empty() && !jq->quit) {
494          int Priority;
495          bool running_allow_mix = false;
496          je = (jobq_item_t *)jq->waiting_jobs->first();
497          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
498          if (re) {
499             Priority = re->jcr->JobPriority;
500             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
501                   re->jcr->JobId, Priority);
502             running_allow_mix = true;
503             for ( ; re; ) {
504                Dmsg2(2300, "JobId %d is also running with %s\n",
505                      re->jcr->JobId, 
506                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
507                if (!re->jcr->job->allow_mixed_priority) {
508                   running_allow_mix = false;
509                   break;
510                }
511                re = (jobq_item_t *)jq->running_jobs->next(re);
512             }
513             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
514                   running_allow_mix ? "allow" : "don't allow");
515          } else {
516             Priority = je->jcr->JobPriority;
517             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
518          }
519          /*
520           * Walk down the list of waiting jobs and attempt
521           *   to acquire the resources it needs.
522           */
523          for ( ; je;  ) {
524             /* je is current job item on the queue, jn is the next one */
525             JCR *jcr = je->jcr;
526             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
527
528             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
529                   jcr->JobId, jcr->JobPriority, Priority,
530                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
531
532             /* Take only jobs of correct Priority */
533             if (!(jcr->JobPriority == Priority
534                   || (jcr->JobPriority < Priority &&
535                       jcr->job->allow_mixed_priority && running_allow_mix))) {
536                set_jcr_job_status(jcr, JS_WaitPriority);
537                break;
538             }
539
540             if (!acquire_resources(jcr)) {
541                /* If resource conflict, job is canceled */
542                if (!job_canceled(jcr)) {
543                   je = jn;            /* point to next waiting job */
544                   continue;
545                }
546             }
547
548             /*
549              * Got all locks, now remove it from wait queue and append it
550              *   to the ready queue.  Note, we may also get here if the
551              *    job was canceled.  Once it is "run", it will quickly
552              *    terminate.
553              */
554             jq->waiting_jobs->remove(je);
555             jq->ready_jobs->append(je);
556             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
557             je = jn;                  /* Point to next waiting job */
558          } /* end for loop */
559
560       } /* end if */
561
562       Dmsg0(2300, "Done checking wait queue.\n");
563       /*
564        * If no more ready work and we are asked to quit, then do it
565        */
566       if (jq->ready_jobs->empty() && jq->quit) {
567          jq->num_workers--;
568          if (jq->num_workers == 0) {
569             Dmsg0(2300, "Wake up destroy routine\n");
570             /* Wake up destroy routine if he is waiting */
571             pthread_cond_broadcast(&jq->work);
572          }
573          break;
574       }
575       Dmsg0(2300, "Check for work request\n");
576       /*
577        * If no more work requests, and we waited long enough, quit
578        */
579       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
580          jq->ready_jobs->empty());
581       if (jq->ready_jobs->empty() && timedout) {
582          Dmsg0(2300, "break big loop\n");
583          jq->num_workers--;
584          break;
585       }
586
587       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
588       if (work) {
589          /*
590           * If a job is waiting on a Resource, don't consume all
591           *   the CPU time looping looking for work, and even more
592           *   important, release the lock so that a job that has
593           *   terminated can give us the resource.
594           */
595          V(jq->mutex);
596          bmicrosleep(2, 0);              /* pause for 2 seconds */
597          P(jq->mutex);
598          /* Recompute work as something may have changed in last 2 secs */
599          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
600       }
601       Dmsg1(2300, "Loop again. work=%d\n", work);
602    } /* end of big for loop */
603
604    Dmsg0(200, "unlock mutex\n");
605    V(jq->mutex);
606    Dmsg0(2300, "End jobq_server\n");
607    return NULL;
608 }
609
610 /*
611  * Returns true if cleanup done and we should look for more work
612  */
613 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
614 {
615    /*
616     * Reschedule the job if necessary and requested
617     */
618    if (jcr->job->RescheduleOnError &&
619        jcr->JobStatus != JS_Terminated &&
620        jcr->JobStatus != JS_Canceled &&
621        jcr->get_JobType() == JT_BACKUP &&
622        (jcr->job->RescheduleTimes == 0 ||
623         jcr->reschedule_count < jcr->job->RescheduleTimes)) {
624        char dt[50], dt2[50];
625
626        /*
627         * Reschedule this job by cleaning it up, but
628         *  reuse the same JobId if possible.
629         */
630       time_t now = time(NULL);
631       jcr->reschedule_count++;
632       jcr->sched_time = now + jcr->job->RescheduleInterval;
633       bstrftime(dt, sizeof(dt), now);
634       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
635       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
636             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
637       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
638            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
639       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
640       jcr->JobStatus = -1;
641       set_jcr_job_status(jcr, JS_WaitStartTime);
642       jcr->SDJobStatus = 0;
643       if (!allow_duplicate_job(jcr)) {
644          return false;
645       }
646       if (jcr->JobBytes == 0) {
647          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
648          V(jq->mutex);
649          jobq_add(jq, jcr);     /* queue the job to run again */
650          P(jq->mutex);
651          free_jcr(jcr);         /* release jcr */
652          free(je);              /* free the job entry */
653          return true;           /* we already cleaned up */
654       }
655       /*
656        * Something was actually backed up, so we cannot reuse
657        *   the old JobId or there will be database record
658        *   conflicts.  We now create a new job, copying the
659        *   appropriate fields.
660        */           
661       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
662       set_jcr_defaults(njcr, jcr->job);
663       njcr->reschedule_count = jcr->reschedule_count;
664       njcr->sched_time = jcr->sched_time;
665       njcr->set_JobLevel(jcr->get_JobLevel());
666       njcr->pool = jcr->pool;
667       njcr->run_pool_override = jcr->run_pool_override;
668       njcr->full_pool = jcr->full_pool;
669       njcr->run_full_pool_override = jcr->run_full_pool_override;
670       njcr->inc_pool = jcr->inc_pool;
671       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
672       njcr->diff_pool = jcr->diff_pool;
673       njcr->JobStatus = -1;
674       set_jcr_job_status(njcr, jcr->JobStatus);
675       if (jcr->rstore) {
676          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
677       } else {
678          free_rstorage(njcr);
679       }
680       if (jcr->wstore) {
681          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
682       } else {
683          free_wstorage(njcr);
684       }
685       njcr->messages = jcr->messages;
686       njcr->spool_data = jcr->spool_data;
687       njcr->write_part_after_job = jcr->write_part_after_job;
688       Dmsg0(2300, "Call to run new job\n");
689       V(jq->mutex);
690       run_job(njcr);            /* This creates a "new" job */
691       free_jcr(njcr);           /* release "new" jcr */
692       P(jq->mutex);
693       Dmsg0(2300, "Back from running new job.\n");
694    }
695    return false;
696 }
697
698 /*
699  * See if we can acquire all the necessary resources for the job (JCR)
700  *
701  *  Returns: true  if successful
702  *           false if resource failure
703  */
704 static bool acquire_resources(JCR *jcr)
705 {
706    bool skip_this_jcr = false;
707
708    jcr->acquired_resource_locks = false;
709 /*
710  * Turning this code off is likely to cause some deadlocks,
711  *   but we do not really have enough information here to
712  *   know if this is really a deadlock (it may be a dual drive
713  *   autochanger), and in principle, the SD reservation system
714  *   should detect these deadlocks, so push the work off on is.
715  */
716 #ifdef xxx
717    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
718       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
719          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"), 
720          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
721       set_jcr_job_status(jcr, JS_Canceled);
722       return false;
723    }
724 #endif
725    if (jcr->rstore) {
726       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
727       if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
728          jcr->rstore->NumConcurrentReadJobs++;
729          jcr->rstore->NumConcurrentJobs++;
730          Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
731       } else {
732          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
733          set_jcr_job_status(jcr, JS_WaitStoreRes);
734          return false;
735       }
736    }
737    
738    if (jcr->wstore) {
739       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
740       if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
741          jcr->wstore->NumConcurrentJobs++;
742          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
743       } else if (jcr->rstore) {
744          dec_read_store(jcr);
745          skip_this_jcr = true;
746       } else {
747          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
748          skip_this_jcr = true;
749       }
750    }
751    if (skip_this_jcr) {
752       set_jcr_job_status(jcr, JS_WaitStoreRes);
753       return false;
754    }
755
756    if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
757       jcr->client->NumConcurrentJobs++;
758    } else {
759       /* Back out previous locks */
760       dec_write_store(jcr);
761       dec_read_store(jcr);
762       set_jcr_job_status(jcr, JS_WaitClientRes);
763       return false;
764    }
765    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
766       jcr->job->NumConcurrentJobs++;
767    } else {
768       /* Back out previous locks */
769       dec_write_store(jcr);
770       dec_read_store(jcr);
771       jcr->client->NumConcurrentJobs--;
772       set_jcr_job_status(jcr, JS_WaitJobRes);
773       return false;
774    }
775
776    jcr->acquired_resource_locks = true;
777    return true;
778 }
779
780 static void dec_read_store(JCR *jcr)
781 {
782    if (jcr->rstore) {
783       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
784       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
785       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
786       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
787       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
788    }
789 }
790
791 static void dec_write_store(JCR *jcr)
792 {
793    if (jcr->wstore) {
794       jcr->wstore->NumConcurrentJobs--;
795       Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
796       ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
797    }
798 }