]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
More changes to ensure that during thread switches the jcr
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula job queue routines.
30  *
31  *  This code consists of three queues, the waiting_jobs
32  *  queue, where jobs are initially queued, the ready_jobs
33  *  queue, where jobs are placed when all the resources are
34  *  allocated and they can immediately be run, and the
35  *  running queue where jobs are placed when they are
36  *  running.
37  *
38  *  Kern Sibbald, July MMIII
39  *
40  *   Version $Id$
41  *
42  *  This code was adapted from the Bacula workq, which was
43  *    adapted from "Programming with POSIX Threads", by
44  *    David R. Butenhof
45  *
46  */
47
48 #include "bacula.h"
49 #include "dird.h"
50
51 extern JCR *jobs;
52
53 /* Forward referenced functions */
54 extern "C" void *jobq_server(void *arg);
55 extern "C" void *sched_wait(void *arg);
56
57 static int  start_server(jobq_t *jq);
58 static bool acquire_resources(JCR *jcr);
59 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
60 static void dec_read_store(JCR *jcr);
61 static void dec_write_store(JCR *jcr);
62
63 /*
64  * Initialize a job queue
65  *
66  *  Returns: 0 on success
67  *           errno on failure
68  */
69 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
70 {
71    int stat;
72    jobq_item_t *item = NULL;
73
74    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
75       berrno be;
76       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
77       return stat;
78    }
79    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
80       pthread_attr_destroy(&jq->attr);
81       return stat;
82    }
83    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
84       berrno be;
85       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
86       pthread_attr_destroy(&jq->attr);
87       return stat;
88    }
89    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
90       berrno be;
91       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
92       pthread_mutex_destroy(&jq->mutex);
93       pthread_attr_destroy(&jq->attr);
94       return stat;
95    }
96    jq->quit = false;
97    jq->max_workers = threads;         /* max threads to create */
98    jq->num_workers = 0;               /* no threads yet */
99    jq->idle_workers = 0;              /* no idle threads */
100    jq->engine = engine;               /* routine to run */
101    jq->valid = JOBQ_VALID;
102    /* Initialize the job queues */
103    jq->waiting_jobs = New(dlist(item, &item->link));
104    jq->running_jobs = New(dlist(item, &item->link));
105    jq->ready_jobs = New(dlist(item, &item->link));
106    return 0;
107 }
108
109 /*
110  * Destroy the job queue
111  *
112  * Returns: 0 on success
113  *          errno on failure
114  */
115 int jobq_destroy(jobq_t *jq)
116 {
117    int stat, stat1, stat2;
118
119    if (jq->valid != JOBQ_VALID) {
120       return EINVAL;
121    }
122    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
123       berrno be;
124       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
125       return stat;
126    }
127    jq->valid = 0;                      /* prevent any more operations */
128
129    /* 
130     * If any threads are active, wake them 
131     */
132    if (jq->num_workers > 0) {
133       jq->quit = true;
134       if (jq->idle_workers) {
135          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
136             berrno be;
137             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
138             pthread_mutex_unlock(&jq->mutex);
139             return stat;
140          }
141       }
142       while (jq->num_workers > 0) {
143          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
144             berrno be;
145             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
146             pthread_mutex_unlock(&jq->mutex);
147             return stat;
148          }
149       }
150    }
151    if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
152       berrno be;
153       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.bstrerror(stat));
154       return stat;
155    }
156    stat  = pthread_mutex_destroy(&jq->mutex);
157    stat1 = pthread_cond_destroy(&jq->work);
158    stat2 = pthread_attr_destroy(&jq->attr);
159    delete jq->waiting_jobs;
160    delete jq->running_jobs;
161    delete jq->ready_jobs;
162    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
163 }
164
165 struct wait_pkt {
166    JCR *jcr;
167    jobq_t *jq;
168 };
169
170 /*
171  * Wait until schedule time arrives before starting. Normally
172  *  this routine is only used for jobs started from the console
173  *  for which the user explicitly specified a start time. Otherwise
174  *  most jobs are put into the job queue only when their
175  *  scheduled time arives.
176  */
177 extern "C"
178 void *sched_wait(void *arg)
179 {
180    JCR *jcr = ((wait_pkt *)arg)->jcr;
181    jobq_t *jq = ((wait_pkt *)arg)->jq;
182
183    set_jcr_in_tsd(jcr);
184    Dmsg0(2300, "Enter sched_wait.\n");
185    free(arg);
186    time_t wtime = jcr->sched_time - time(NULL);
187    set_jcr_job_status(jcr, JS_WaitStartTime);
188    /* Wait until scheduled time arrives */
189    if (wtime > 0) {
190       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
191          jcr->Job, wtime);
192    }
193    /* Check every 30 seconds if canceled */
194    while (wtime > 0) {
195       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", 
196          jcr->JobId, wtime, jcr->use_count());
197       if (wtime > 30) {
198          wtime = 30;
199       }
200       bmicrosleep(wtime, 0);
201       if (job_canceled(jcr)) {
202          break;
203       }
204       wtime = jcr->sched_time - time(NULL);
205    }
206    Dmsg1(200, "resched use=%d\n", jcr->use_count());
207    jobq_add(jq, jcr);
208    free_jcr(jcr);                     /* we are done with jcr */
209    Dmsg0(2300, "Exit sched_wait\n");
210    return NULL;
211 }
212
213 /*
214  *  Add a job to the queue
215  *    jq is a queue that was created with jobq_init
216  */
217 int jobq_add(jobq_t *jq, JCR *jcr)
218 {
219    int stat;
220    jobq_item_t *item, *li;
221    bool inserted = false;
222    time_t wtime = jcr->sched_time - time(NULL);
223    pthread_t id;
224    wait_pkt *sched_pkt;
225
226    if (!jcr->term_wait_inited) { 
227       /* Initialize termination condition variable */
228       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
229          berrno be;
230          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
231          return stat;
232       }
233       jcr->term_wait_inited = true;
234    }                           
235                              
236    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
237    if (jq->valid != JOBQ_VALID) {
238       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
239       return EINVAL;
240    }
241
242    jcr->inc_use_count();                 /* mark jcr in use by us */
243    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
244    if (!job_canceled(jcr) && wtime > 0) {
245       set_thread_concurrency(jq->max_workers + 2);
246       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
247       sched_pkt->jcr = jcr;
248       sched_pkt->jq = jq;
249       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
250       if (stat != 0) {                /* thread not created */
251          berrno be;
252          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
253       }
254       return stat;
255    }
256
257    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
258       berrno be;
259       Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
260       free_jcr(jcr);                    /* release jcr */
261       return stat;
262    }
263
264    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
265       free_jcr(jcr);                    /* release jcr */
266       return ENOMEM;
267    }
268    item->jcr = jcr;
269
270    /* While waiting in a queue this job is not attached to a thread */
271    set_jcr_in_tsd(INVALID_JCR);
272    if (job_canceled(jcr)) {
273       /* Add job to ready queue so that it is canceled quickly */
274       jq->ready_jobs->prepend(item);
275       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
276    } else {
277       /* Add this job to the wait queue in priority sorted order */
278       foreach_dlist(li, jq->waiting_jobs) {
279          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
280             li->jcr->JobId, li->jcr->JobPriority);
281          if (li->jcr->JobPriority > jcr->JobPriority) {
282             jq->waiting_jobs->insert_before(item, li);
283             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
284                li->jcr->JobId, jcr->JobId);
285             inserted = true;
286             break;
287          }
288       }
289       /* If not jobs in wait queue, append it */
290       if (!inserted) {
291          jq->waiting_jobs->append(item);
292          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
293       }
294    }
295
296    /* Ensure that at least one server looks at the queue. */
297    stat = start_server(jq);
298
299    pthread_mutex_unlock(&jq->mutex);
300    Dmsg0(2300, "Return jobq_add\n");
301    return stat;
302 }
303
304 /*
305  *  Remove a job from the job queue. Used only by cancel_job().
306  *    jq is a queue that was created with jobq_init
307  *    work_item is an element of work
308  *
309  *   Note, it is "removed" from the job queue.
310  *    If you want to cancel it, you need to provide some external means
311  *    of doing so (e.g. pthread_kill()).
312  */
313 int jobq_remove(jobq_t *jq, JCR *jcr)
314 {
315    int stat;
316    bool found = false;
317    jobq_item_t *item;
318
319    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
320    if (jq->valid != JOBQ_VALID) {
321       return EINVAL;
322    }
323
324    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
325       berrno be;
326       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
327       return stat;
328    }
329
330    foreach_dlist(item, jq->waiting_jobs) {
331       if (jcr == item->jcr) {
332          found = true;
333          break;
334       }
335    }
336    if (!found) {
337       pthread_mutex_unlock(&jq->mutex);
338       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
339       return EINVAL;
340    }
341
342    /* Move item to be the first on the list */
343    jq->waiting_jobs->remove(item);
344    jq->ready_jobs->prepend(item);
345    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
346
347    stat = start_server(jq);
348
349    pthread_mutex_unlock(&jq->mutex);
350    Dmsg0(2300, "Return jobq_remove\n");
351    return stat;
352 }
353
354
355 /*
356  * Start the server thread if it isn't already running
357  */
358 static int start_server(jobq_t *jq)
359 {
360    int stat = 0;
361    pthread_t id;
362
363    /*
364     * if any threads are idle, wake one.
365     *   Actually we do a broadcast because on /lib/tls 
366     *   these signals seem to get lost from time to time.
367     */
368    if (jq->idle_workers > 0) {
369       Dmsg0(2300, "Signal worker to wake up\n");
370       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
371          berrno be;
372          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
373          return stat;
374       }
375    } else if (jq->num_workers < jq->max_workers) {
376       Dmsg0(2300, "Create worker thread\n");
377       /* No idle threads so create a new one */
378       set_thread_concurrency(jq->max_workers + 1);
379       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
380          berrno be;
381          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
382          return stat;
383       }
384    }
385    return stat;
386 }
387
388
389 /*
390  * This is the worker thread that serves the job queue.
391  * When all the resources are acquired for the job,
392  *  it will call the user's engine.
393  */
394 extern "C"
395 void *jobq_server(void *arg)
396 {
397    struct timespec timeout;
398    jobq_t *jq = (jobq_t *)arg;
399    jobq_item_t *je;                   /* job entry in queue */
400    int stat;
401    bool timedout = false;
402    bool work = true;
403
404    set_jcr_in_tsd(INVALID_JCR);
405    Dmsg0(2300, "Start jobq_server\n");
406    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
407       berrno be;
408       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
409       return NULL;
410    }
411    jq->num_workers++;
412
413    for (;;) {
414       struct timeval tv;
415       struct timezone tz;
416
417       Dmsg0(2300, "Top of for loop\n");
418       if (!work && !jq->quit) {
419          gettimeofday(&tv, &tz);
420          timeout.tv_nsec = 0;
421          timeout.tv_sec = tv.tv_sec + 4;
422
423          while (!jq->quit) {
424             /*
425              * Wait 4 seconds, then if no more work, exit
426              */
427             Dmsg0(2300, "pthread_cond_timedwait()\n");
428             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
429             if (stat == ETIMEDOUT) {
430                Dmsg0(2300, "timedwait timedout.\n");
431                timedout = true;
432                break;
433             } else if (stat != 0) {
434                /* This shouldn't happen */
435                Dmsg0(2300, "This shouldn't happen\n");
436                jq->num_workers--;
437                pthread_mutex_unlock(&jq->mutex);
438                return NULL;
439             }
440             break;
441          }
442       }
443       /*
444        * If anything is in the ready queue, run it
445        */
446       Dmsg0(2300, "Checking ready queue.\n");
447       while (!jq->ready_jobs->empty() && !jq->quit) {
448          JCR *jcr;
449          je = (jobq_item_t *)jq->ready_jobs->first();
450          jcr = je->jcr;
451          jq->ready_jobs->remove(je);
452          if (!jq->ready_jobs->empty()) {
453             Dmsg0(2300, "ready queue not empty start server\n");
454             if (start_server(jq) != 0) {
455                jq->num_workers--;
456                pthread_mutex_unlock(&jq->mutex);
457                return NULL;
458             }
459          }
460          jq->running_jobs->append(je);
461
462          /* Attach jcr to this thread while we run the job */
463          set_jcr_in_tsd(jcr);
464          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
465
466          /* Release job queue lock */
467          V(jq->mutex);
468
469          /* Call user's routine here */
470          Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
471             jcr->use_count());
472          jq->engine(je->jcr);
473
474          /* Job finished detach from thread */
475          set_jcr_in_tsd(INVALID_JCR);
476
477          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
478             jcr->use_count());
479
480          /* Reacquire job queue lock */
481          P(jq->mutex);
482          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
483          jq->running_jobs->remove(je);
484          /*
485           * Release locks if acquired. Note, they will not have
486           *  been acquired for jobs canceled before they were
487           *  put into the ready queue.
488           */
489          if (jcr->acquired_resource_locks) {
490             dec_read_store(jcr);
491             dec_write_store(jcr);
492             jcr->client->NumConcurrentJobs--;
493             jcr->job->NumConcurrentJobs--;
494             jcr->acquired_resource_locks = false;
495          }
496
497          if (reschedule_job(jcr, jq, je)) {
498             continue;              /* go look for more work */
499          }
500
501          /* Clean up and release old jcr */
502          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
503          jcr->SDJobStatus = 0;
504          V(jq->mutex);                /* release internal lock */
505          free_jcr(jcr);
506          free(je);                    /* release job entry */
507          P(jq->mutex);                /* reacquire job queue lock */
508       }
509       /*
510        * If any job in the wait queue can be run,
511        *  move it to the ready queue
512        */
513       Dmsg0(2300, "Done check ready, now check wait queue.\n");
514       if (!jq->waiting_jobs->empty() && !jq->quit) {
515          int Priority;
516          bool running_allow_mix = false;
517          je = (jobq_item_t *)jq->waiting_jobs->first();
518          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
519          if (re) {
520             Priority = re->jcr->JobPriority;
521             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
522                   re->jcr->JobId, Priority);
523             running_allow_mix = true;
524             for ( ; re; ) {
525                Dmsg2(2300, "JobId %d is also running with %s\n",
526                      re->jcr->JobId, 
527                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
528                if (!re->jcr->job->allow_mixed_priority) {
529                   running_allow_mix = false;
530                   break;
531                }
532                re = (jobq_item_t *)jq->running_jobs->next(re);
533             }
534             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
535                   running_allow_mix ? "allow" : "don't allow");
536          } else {
537             Priority = je->jcr->JobPriority;
538             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
539          }
540          /*
541           * Walk down the list of waiting jobs and attempt
542           *   to acquire the resources it needs.
543           */
544          for ( ; je;  ) {
545             /* je is current job item on the queue, jn is the next one */
546             JCR *jcr = je->jcr;
547             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
548
549             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
550                   jcr->JobId, jcr->JobPriority, Priority,
551                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
552
553             /* Take only jobs of correct Priority */
554             if (!(jcr->JobPriority == Priority
555                   || (jcr->JobPriority < Priority &&
556                       jcr->job->allow_mixed_priority && running_allow_mix))) {
557                set_jcr_job_status(jcr, JS_WaitPriority);
558                break;
559             }
560
561             if (!acquire_resources(jcr)) {
562                /* If resource conflict, job is canceled */
563                if (!job_canceled(jcr)) {
564                   je = jn;            /* point to next waiting job */
565                   continue;
566                }
567             }
568
569             /*
570              * Got all locks, now remove it from wait queue and append it
571              *   to the ready queue.  Note, we may also get here if the
572              *    job was canceled.  Once it is "run", it will quickly
573              *    terminate.
574              */
575             jq->waiting_jobs->remove(je);
576             jq->ready_jobs->append(je);
577             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
578             je = jn;                  /* Point to next waiting job */
579          } /* end for loop */
580
581       } /* end if */
582
583       Dmsg0(2300, "Done checking wait queue.\n");
584       /*
585        * If no more ready work and we are asked to quit, then do it
586        */
587       if (jq->ready_jobs->empty() && jq->quit) {
588          jq->num_workers--;
589          if (jq->num_workers == 0) {
590             Dmsg0(2300, "Wake up destroy routine\n");
591             /* Wake up destroy routine if he is waiting */
592             pthread_cond_broadcast(&jq->work);
593          }
594          break;
595       }
596       Dmsg0(2300, "Check for work request\n");
597       /*
598        * If no more work requests, and we waited long enough, quit
599        */
600       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
601          jq->ready_jobs->empty());
602       if (jq->ready_jobs->empty() && timedout) {
603          Dmsg0(2300, "break big loop\n");
604          jq->num_workers--;
605          break;
606       }
607
608       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
609       if (work) {
610          /*
611           * If a job is waiting on a Resource, don't consume all
612           *   the CPU time looping looking for work, and even more
613           *   important, release the lock so that a job that has
614           *   terminated can give us the resource.
615           */
616          V(jq->mutex);
617          bmicrosleep(2, 0);              /* pause for 2 seconds */
618          P(jq->mutex);
619          /* Recompute work as something may have changed in last 2 secs */
620          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
621       }
622       Dmsg1(2300, "Loop again. work=%d\n", work);
623    } /* end of big for loop */
624
625    Dmsg0(200, "unlock mutex\n");
626    V(jq->mutex);
627    Dmsg0(2300, "End jobq_server\n");
628    return NULL;
629 }
630
631 /*
632  * Returns true if cleanup done and we should look for more work
633  */
634 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
635 {
636    /*
637     * Reschedule the job if necessary and requested
638     */
639    if (jcr->job->RescheduleOnError &&
640        jcr->JobStatus != JS_Terminated &&
641        jcr->JobStatus != JS_Canceled &&
642        jcr->get_JobType() == JT_BACKUP &&
643        (jcr->job->RescheduleTimes == 0 ||
644         jcr->reschedule_count < jcr->job->RescheduleTimes)) {
645        char dt[50], dt2[50];
646
647        /*
648         * Reschedule this job by cleaning it up, but
649         *  reuse the same JobId if possible.
650         */
651       time_t now = time(NULL);
652       jcr->reschedule_count++;
653       jcr->sched_time = now + jcr->job->RescheduleInterval;
654       bstrftime(dt, sizeof(dt), now);
655       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
656       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
657             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
658       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
659            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
660       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
661       jcr->JobStatus = -1;
662       set_jcr_job_status(jcr, JS_WaitStartTime);
663       jcr->SDJobStatus = 0;
664       if (!allow_duplicate_job(jcr)) {
665          return false;
666       }
667       if (jcr->JobBytes == 0) {
668          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
669          V(jq->mutex);
670          jobq_add(jq, jcr);     /* queue the job to run again */
671          P(jq->mutex);
672          free_jcr(jcr);         /* release jcr */
673          free(je);              /* free the job entry */
674          return true;           /* we already cleaned up */
675       }
676       /*
677        * Something was actually backed up, so we cannot reuse
678        *   the old JobId or there will be database record
679        *   conflicts.  We now create a new job, copying the
680        *   appropriate fields.
681        */           
682       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
683       set_jcr_defaults(njcr, jcr->job);
684       njcr->reschedule_count = jcr->reschedule_count;
685       njcr->sched_time = jcr->sched_time;
686       njcr->set_JobLevel(jcr->get_JobLevel());
687       njcr->pool = jcr->pool;
688       njcr->run_pool_override = jcr->run_pool_override;
689       njcr->full_pool = jcr->full_pool;
690       njcr->run_full_pool_override = jcr->run_full_pool_override;
691       njcr->inc_pool = jcr->inc_pool;
692       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
693       njcr->diff_pool = jcr->diff_pool;
694       njcr->JobStatus = -1;
695       set_jcr_job_status(njcr, jcr->JobStatus);
696       if (jcr->rstore) {
697          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
698       } else {
699          free_rstorage(njcr);
700       }
701       if (jcr->wstore) {
702          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
703       } else {
704          free_wstorage(njcr);
705       }
706       njcr->messages = jcr->messages;
707       njcr->spool_data = jcr->spool_data;
708       njcr->write_part_after_job = jcr->write_part_after_job;
709       Dmsg0(2300, "Call to run new job\n");
710       V(jq->mutex);
711       run_job(njcr);            /* This creates a "new" job */
712       free_jcr(njcr);           /* release "new" jcr */
713       P(jq->mutex);
714       Dmsg0(2300, "Back from running new job.\n");
715    }
716    return false;
717 }
718
719 /*
720  * See if we can acquire all the necessary resources for the job (JCR)
721  *
722  *  Returns: true  if successful
723  *           false if resource failure
724  */
725 static bool acquire_resources(JCR *jcr)
726 {
727    bool skip_this_jcr = false;
728
729    jcr->acquired_resource_locks = false;
730 /*
731  * Turning this code off is likely to cause some deadlocks,
732  *   but we do not really have enough information here to
733  *   know if this is really a deadlock (it may be a dual drive
734  *   autochanger), and in principle, the SD reservation system
735  *   should detect these deadlocks, so push the work off on is.
736  */
737 #ifdef xxx
738    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
739       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
740          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"), 
741          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
742       set_jcr_job_status(jcr, JS_Canceled);
743       return false;
744    }
745 #endif
746    if (jcr->rstore) {
747       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
748       if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
749          jcr->rstore->NumConcurrentReadJobs++;
750          jcr->rstore->NumConcurrentJobs++;
751          Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
752       } else {
753          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
754          set_jcr_job_status(jcr, JS_WaitStoreRes);
755          return false;
756       }
757    }
758    
759    if (jcr->wstore) {
760       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
761       if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
762          jcr->wstore->NumConcurrentJobs++;
763          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
764       } else if (jcr->rstore) {
765          dec_read_store(jcr);
766          skip_this_jcr = true;
767       } else {
768          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
769          skip_this_jcr = true;
770       }
771    }
772    if (skip_this_jcr) {
773       set_jcr_job_status(jcr, JS_WaitStoreRes);
774       return false;
775    }
776
777    if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
778       jcr->client->NumConcurrentJobs++;
779    } else {
780       /* Back out previous locks */
781       dec_write_store(jcr);
782       dec_read_store(jcr);
783       set_jcr_job_status(jcr, JS_WaitClientRes);
784       return false;
785    }
786    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
787       jcr->job->NumConcurrentJobs++;
788    } else {
789       /* Back out previous locks */
790       dec_write_store(jcr);
791       dec_read_store(jcr);
792       jcr->client->NumConcurrentJobs--;
793       set_jcr_job_status(jcr, JS_WaitJobRes);
794       return false;
795    }
796
797    jcr->acquired_resource_locks = true;
798    return true;
799 }
800
801 static void dec_read_store(JCR *jcr)
802 {
803    if (jcr->rstore) {
804       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
805       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
806       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
807       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
808       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
809    }
810 }
811
812 static void dec_write_store(JCR *jcr)
813 {
814    if (jcr->wstore) {
815       jcr->wstore->NumConcurrentJobs--;
816       Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
817       ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
818    }
819 }