]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
Backport from Bacula Enterprise
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5    Copyright (C) 2003-2014 Free Software Foundation Europe e.V.
6
7    The original author of Bacula is Kern Sibbald, with contributions
8    from many others, a complete list can be found in the file AUTHORS.
9
10    You may use this file and others of this release according to the
11    license defined in the LICENSE file, which includes the Affero General
12    Public License, v3.0 ("AGPLv3") and some additional permissions and
13    terms pursuant to its AGPLv3 Section 7.
14
15    This notice must be preserved when any source code is 
16    conveyed and/or propagated.
17
18    Bacula(R) is a registered trademark of Kern Sibbald.
19 */
20 /*
21  * Bacula job queue routines.
22  *
23  *  This code consists of three queues, the waiting_jobs
24  *  queue, where jobs are initially queued, the ready_jobs
25  *  queue, where jobs are placed when all the resources are
26  *  allocated and they can immediately be run, and the
27  *  running queue where jobs are placed when they are
28  *  running.
29  *
30  *  Kern Sibbald, July MMIII
31  *
32  *
33  *  This code was adapted from the Bacula workq, which was
34  *    adapted from "Programming with POSIX Threads", by
35  *    David R. Butenhof
36  *
37  */
38
39 #include "bacula.h"
40 #include "dird.h"
41
42 extern JCR *jobs;
43
44 /* Forward referenced functions */
45 extern "C" void *jobq_server(void *arg);
46 extern "C" void *sched_wait(void *arg);
47
48 static int  start_server(jobq_t *jq);
49 static bool acquire_resources(JCR *jcr);
50 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
51 static void dec_write_store(JCR *jcr);
52
53 /*
54  * Initialize a job queue
55  *
56  *  Returns: 0 on success
57  *           errno on failure
58  */
59 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
60 {
61    int stat;
62    jobq_item_t *item = NULL;
63
64    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
65       berrno be;
66       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
67       return stat;
68    }
69    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
70       pthread_attr_destroy(&jq->attr);
71       return stat;
72    }
73    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
74       berrno be;
75       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
76       pthread_attr_destroy(&jq->attr);
77       return stat;
78    }
79    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
80       berrno be;
81       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
82       pthread_mutex_destroy(&jq->mutex);
83       pthread_attr_destroy(&jq->attr);
84       return stat;
85    }
86    jq->quit = false;
87    jq->max_workers = threads;         /* max threads to create */
88    jq->num_workers = 0;               /* no threads yet */
89    jq->idle_workers = 0;              /* no idle threads */
90    jq->engine = engine;               /* routine to run */
91    jq->valid = JOBQ_VALID;
92    /* Initialize the job queues */
93    jq->waiting_jobs = New(dlist(item, &item->link));
94    jq->running_jobs = New(dlist(item, &item->link));
95    jq->ready_jobs = New(dlist(item, &item->link));
96    return 0;
97 }
98
99 /*
100  * Destroy the job queue
101  *
102  * Returns: 0 on success
103  *          errno on failure
104  */
105 int jobq_destroy(jobq_t *jq)
106 {
107    int stat, stat1, stat2;
108
109    if (jq->valid != JOBQ_VALID) {
110       return EINVAL;
111    }
112    P(jq->mutex);
113    jq->valid = 0;                      /* prevent any more operations */
114
115    /*
116     * If any threads are active, wake them
117     */
118    if (jq->num_workers > 0) {
119       jq->quit = true;
120       if (jq->idle_workers) {
121          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
122             berrno be;
123             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
124             V(jq->mutex);
125             return stat;
126          }
127       }
128       while (jq->num_workers > 0) {
129          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
130             berrno be;
131             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
132             V(jq->mutex);
133             return stat;
134          }
135       }
136    }
137    V(jq->mutex);
138    stat  = pthread_mutex_destroy(&jq->mutex);
139    stat1 = pthread_cond_destroy(&jq->work);
140    stat2 = pthread_attr_destroy(&jq->attr);
141    delete jq->waiting_jobs;
142    delete jq->running_jobs;
143    delete jq->ready_jobs;
144    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
145 }
146
147 struct wait_pkt {
148    JCR *jcr;
149    jobq_t *jq;
150 };
151
152 /*
153  * Wait until schedule time arrives before starting. Normally
154  *  this routine is only used for jobs started from the console
155  *  for which the user explicitly specified a start time. Otherwise
156  *  most jobs are put into the job queue only when their
157  *  scheduled time arives.
158  */
159 extern "C"
160 void *sched_wait(void *arg)
161 {
162    JCR *jcr = ((wait_pkt *)arg)->jcr;
163    jobq_t *jq = ((wait_pkt *)arg)->jq;
164
165    set_jcr_in_tsd(INVALID_JCR);
166    Dmsg0(2300, "Enter sched_wait.\n");
167    free(arg);
168    time_t wtime = jcr->sched_time - time(NULL);
169    jcr->setJobStatus(JS_WaitStartTime);
170    /* Wait until scheduled time arrives */
171    if (wtime > 0) {
172       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
173          jcr->Job, wtime);
174    }
175    /* Check every 30 seconds if canceled */
176    while (wtime > 0) {
177       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
178          jcr->JobId, wtime, jcr->use_count());
179       if (wtime > 30) {
180          wtime = 30;
181       }
182       bmicrosleep(wtime, 0);
183       if (job_canceled(jcr)) {
184          break;
185       }
186       wtime = jcr->sched_time - time(NULL);
187    }
188    Dmsg1(200, "resched use=%d\n", jcr->use_count());
189    jobq_add(jq, jcr);
190    free_jcr(jcr);                     /* we are done with jcr */
191    Dmsg0(2300, "Exit sched_wait\n");
192    return NULL;
193 }
194
195 /* Procedure to update the Client->NumConcurrentJobs */
196 static void update_client_numconcurrentjobs(JCR *jcr, int val)
197 {
198    if (!jcr->client) {
199       return;
200    }
201
202    switch (jcr->getJobType())
203    {
204    case JT_MIGRATE:
205    case JT_COPY:
206    case JT_ADMIN:
207       break;
208    case JT_BACKUP:
209       if (jcr->no_client_used()) {
210          break;
211       }
212    /* Failback wanted */
213    default:
214       jcr->client->NumConcurrentJobs += val;
215       break;
216    }
217 }
218
219 /*
220  *  Add a job to the queue
221  *    jq is a queue that was created with jobq_init
222  */
223 int jobq_add(jobq_t *jq, JCR *jcr)
224 {
225    int stat;
226    jobq_item_t *item, *li;
227    bool inserted = false;
228    time_t wtime = jcr->sched_time - time(NULL);
229    pthread_t id;
230    wait_pkt *sched_pkt;
231
232    if (!jcr->term_wait_inited) {
233       /* Initialize termination condition variable */
234       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
235          berrno be;
236          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
237          return stat;
238       }
239       jcr->term_wait_inited = true;
240    }
241
242    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
243    if (jq->valid != JOBQ_VALID) {
244       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
245       return EINVAL;
246    }
247
248    jcr->inc_use_count();                 /* mark jcr in use by us */
249    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
250    if (!job_canceled(jcr) && wtime > 0) {
251       set_thread_concurrency(jq->max_workers + 2);
252       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
253       sched_pkt->jcr = jcr;
254       sched_pkt->jq = jq;
255       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
256       if (stat != 0) {                /* thread not created */
257          berrno be;
258          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
259       }
260       return stat;
261    }
262
263    P(jq->mutex);
264
265    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
266       free_jcr(jcr);                    /* release jcr */
267       return ENOMEM;
268    }
269    item->jcr = jcr;
270
271    /* While waiting in a queue this job is not attached to a thread */
272    set_jcr_in_tsd(INVALID_JCR);
273    if (job_canceled(jcr)) {
274       /* Add job to ready queue so that it is canceled quickly */
275       jq->ready_jobs->prepend(item);
276       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
277    } else {
278       /* Add this job to the wait queue in priority sorted order */
279       foreach_dlist(li, jq->waiting_jobs) {
280          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
281             li->jcr->JobId, li->jcr->JobPriority);
282          if (li->jcr->JobPriority > jcr->JobPriority) {
283             jq->waiting_jobs->insert_before(item, li);
284             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
285                li->jcr->JobId, jcr->JobId);
286             inserted = true;
287             break;
288          }
289       }
290       /* If not jobs in wait queue, append it */
291       if (!inserted) {
292          jq->waiting_jobs->append(item);
293          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
294       }
295    }
296
297    /* Ensure that at least one server looks at the queue. */
298    stat = start_server(jq);
299
300    V(jq->mutex);
301    Dmsg0(2300, "Return jobq_add\n");
302    return stat;
303 }
304
305 /*
306  *  Remove a job from the job queue. Used only by cancel_job().
307  *    jq is a queue that was created with jobq_init
308  *    work_item is an element of work
309  *
310  *   Note, it is "removed" from the job queue.
311  *    If you want to cancel it, you need to provide some external means
312  *    of doing so (e.g. pthread_kill()).
313  */
314 int jobq_remove(jobq_t *jq, JCR *jcr)
315 {
316    int stat;
317    bool found = false;
318    jobq_item_t *item;
319
320    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
321    if (jq->valid != JOBQ_VALID) {
322       return EINVAL;
323    }
324
325    P(jq->mutex);
326    foreach_dlist(item, jq->waiting_jobs) {
327       if (jcr == item->jcr) {
328          found = true;
329          break;
330       }
331    }
332    if (!found) {
333       V(jq->mutex);
334       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
335       return EINVAL;
336    }
337
338    /* Move item to be the first on the list */
339    jq->waiting_jobs->remove(item);
340    jq->ready_jobs->prepend(item);
341    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
342
343    stat = start_server(jq);
344
345    V(jq->mutex);
346    Dmsg0(2300, "Return jobq_remove\n");
347    return stat;
348 }
349
350
351 /*
352  * Start the server thread if it isn't already running
353  */
354 static int start_server(jobq_t *jq)
355 {
356    int stat = 0;
357    pthread_t id;
358
359    /*
360     * if any threads are idle, wake one.
361     *   Actually we do a broadcast because on /lib/tls
362     *   these signals seem to get lost from time to time.
363     */
364    if (jq->idle_workers > 0) {
365       Dmsg0(2300, "Signal worker to wake up\n");
366       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
367          berrno be;
368          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
369          return stat;
370       }
371    } else if (jq->num_workers < jq->max_workers) {
372       Dmsg0(2300, "Create worker thread\n");
373       /* No idle threads so create a new one */
374       set_thread_concurrency(jq->max_workers + 1);
375       jq->num_workers++;
376       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
377          berrno be;
378          jq->num_workers--;
379          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
380          return stat;
381       }
382    }
383    return stat;
384 }
385
386
387 /*
388  * This is the worker thread that serves the job queue.
389  * When all the resources are acquired for the job,
390  *  it will call the user's engine.
391  */
392 extern "C"
393 void *jobq_server(void *arg)
394 {
395    struct timespec timeout;
396    jobq_t *jq = (jobq_t *)arg;
397    jobq_item_t *je;                   /* job entry in queue */
398    int stat;
399    bool timedout = false;
400    bool work = true;
401
402    set_jcr_in_tsd(INVALID_JCR);
403    Dmsg0(2300, "Start jobq_server\n");
404    P(jq->mutex);
405
406    for (;;) {
407       struct timeval tv;
408       struct timezone tz;
409
410       Dmsg0(2300, "Top of for loop\n");
411       if (!work && !jq->quit) {
412          gettimeofday(&tv, &tz);
413          timeout.tv_nsec = 0;
414          timeout.tv_sec = tv.tv_sec + 4;
415
416          while (!jq->quit) {
417             /*
418              * Wait 4 seconds, then if no more work, exit
419              */
420             Dmsg0(2300, "pthread_cond_timedwait()\n");
421             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
422             if (stat == ETIMEDOUT) {
423                Dmsg0(2300, "timedwait timedout.\n");
424                timedout = true;
425                break;
426             } else if (stat != 0) {
427                /* This shouldn't happen */
428                Dmsg0(2300, "This shouldn't happen\n");
429                jq->num_workers--;
430                V(jq->mutex);
431                return NULL;
432             }
433             break;
434          }
435       }
436       /*
437        * If anything is in the ready queue, run it
438        */
439       Dmsg0(2300, "Checking ready queue.\n");
440       while (!jq->ready_jobs->empty() && !jq->quit) {
441          JCR *jcr;
442          je = (jobq_item_t *)jq->ready_jobs->first();
443          jcr = je->jcr;
444          jq->ready_jobs->remove(je);
445          if (!jq->ready_jobs->empty()) {
446             Dmsg0(2300, "ready queue not empty start server\n");
447             if (start_server(jq) != 0) {
448                jq->num_workers--;
449                V(jq->mutex);
450                return NULL;
451             }
452          }
453          jq->running_jobs->append(je);
454
455          /* Attach jcr to this thread while we run the job */
456          jcr->my_thread_id = pthread_self();
457          jcr->set_killable(true);
458          set_jcr_in_tsd(jcr);
459          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
460
461          /* Release job queue lock */
462          V(jq->mutex);
463
464          /* Call user's routine here */
465          Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
466             jcr->use_count(), jcr->JobStatus);
467          jq->engine(je->jcr);
468
469          /* Job finished detach from thread */
470          remove_jcr_from_tsd(je->jcr);
471          je->jcr->set_killable(false);
472
473          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
474             jcr->use_count());
475
476          /* Reacquire job queue lock */
477          P(jq->mutex);
478          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
479          jq->running_jobs->remove(je);
480          /*
481           * Release locks if acquired. Note, they will not have
482           *  been acquired for jobs canceled before they were
483           *  put into the ready queue.
484           */
485          if (jcr->acquired_resource_locks) {
486             dec_read_store(jcr);
487             dec_write_store(jcr);
488             update_client_numconcurrentjobs(jcr, -1);
489             jcr->job->NumConcurrentJobs--;
490             jcr->acquired_resource_locks = false;
491          }
492
493          if (reschedule_job(jcr, jq, je)) {
494             continue;              /* go look for more work */
495          }
496
497          /* Clean up and release old jcr */
498          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
499          jcr->SDJobStatus = 0;
500          V(jq->mutex);                /* release internal lock */
501          free_jcr(jcr);
502          free(je);                    /* release job entry */
503          P(jq->mutex);                /* reacquire job queue lock */
504       }
505       /*
506        * If any job in the wait queue can be run,
507        *  move it to the ready queue
508        */
509       Dmsg0(2300, "Done check ready, now check wait queue.\n");
510       if (!jq->waiting_jobs->empty() && !jq->quit) {
511          int Priority;
512          bool running_allow_mix = false;
513          je = (jobq_item_t *)jq->waiting_jobs->first();
514          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
515          if (re) {
516             Priority = re->jcr->JobPriority;
517             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
518                   re->jcr->JobId, Priority);
519             running_allow_mix = true;
520             for ( ; re; ) {
521                Dmsg2(2300, "JobId %d is also running with %s\n",
522                      re->jcr->JobId,
523                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
524                if (!re->jcr->job->allow_mixed_priority) {
525                   running_allow_mix = false;
526                   break;
527                }
528                re = (jobq_item_t *)jq->running_jobs->next(re);
529             }
530             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
531                   running_allow_mix ? "allow" : "don't allow");
532          } else {
533             Priority = je->jcr->JobPriority;
534             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
535          }
536          /*
537           * Walk down the list of waiting jobs and attempt
538           *   to acquire the resources it needs.
539           */
540          for ( ; je;  ) {
541             /* je is current job item on the queue, jn is the next one */
542             JCR *jcr = je->jcr;
543             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
544
545             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
546                   jcr->JobId, jcr->JobPriority, Priority,
547                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
548
549             /* Take only jobs of correct Priority */
550             if (!(jcr->JobPriority == Priority
551                   || (jcr->JobPriority < Priority &&
552                       jcr->job->allow_mixed_priority && running_allow_mix))) {
553                jcr->setJobStatus(JS_WaitPriority);
554                break;
555             }
556
557             if (!acquire_resources(jcr)) {
558                /* If resource conflict, job is canceled */
559                if (!job_canceled(jcr)) {
560                   je = jn;            /* point to next waiting job */
561                   continue;
562                }
563             }
564
565             /*
566              * Got all locks, now remove it from wait queue and append it
567              *   to the ready queue.  Note, we may also get here if the
568              *    job was canceled.  Once it is "run", it will quickly
569              *    terminate.
570              */
571             jq->waiting_jobs->remove(je);
572             jq->ready_jobs->append(je);
573             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
574             je = jn;                  /* Point to next waiting job */
575          } /* end for loop */
576
577       } /* end if */
578
579       Dmsg0(2300, "Done checking wait queue.\n");
580       /*
581        * If no more ready work and we are asked to quit, then do it
582        */
583       if (jq->ready_jobs->empty() && jq->quit) {
584          jq->num_workers--;
585          if (jq->num_workers == 0) {
586             Dmsg0(2300, "Wake up destroy routine\n");
587             /* Wake up destroy routine if he is waiting */
588             pthread_cond_broadcast(&jq->work);
589          }
590          break;
591       }
592       Dmsg0(2300, "Check for work request\n");
593       /*
594        * If no more work requests, and we waited long enough, quit
595        */
596       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
597          jq->ready_jobs->empty());
598       if (jq->ready_jobs->empty() && timedout) {
599          Dmsg0(2300, "break big loop\n");
600          jq->num_workers--;
601          break;
602       }
603
604       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
605       if (work) {
606          /*
607           * If a job is waiting on a Resource, don't consume all
608           *   the CPU time looping looking for work, and even more
609           *   important, release the lock so that a job that has
610           *   terminated can give us the resource.
611           */
612          V(jq->mutex);
613          bmicrosleep(2, 0);              /* pause for 2 seconds */
614          P(jq->mutex);
615          /* Recompute work as something may have changed in last 2 secs */
616          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
617       }
618       Dmsg1(2300, "Loop again. work=%d\n", work);
619    } /* end of big for loop */
620
621    Dmsg0(200, "unlock mutex\n");
622    V(jq->mutex);
623    Dmsg0(2300, "End jobq_server\n");
624    return NULL;
625 }
626
627 /*
628  * Returns true if cleanup done and we should look for more work
629  */
630 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
631 {
632    bool resched = false;
633    /*
634     * Reschedule the job if requested and possible
635     */
636    /* Basic condition is that more reschedule times remain */
637    if (jcr->job->RescheduleTimes == 0 ||
638        jcr->reschedule_count < jcr->job->RescheduleTimes) {
639       resched =
640          /* Check for incomplete jobs */
641          (jcr->RescheduleIncompleteJobs &&
642           jcr->is_incomplete() && jcr->is_JobType(JT_BACKUP) &&
643           !(jcr->HasBase||jcr->is_JobLevel(L_BASE))) ||
644          /* Check for failed jobs */
645          (jcr->job->RescheduleOnError &&
646           !jcr->is_JobStatus(JS_Terminated) &&
647           !jcr->is_JobStatus(JS_Canceled) &&
648           jcr->is_JobType(JT_BACKUP));
649    }
650    if (resched) {
651        char dt[50], dt2[50];
652
653        /*
654         * Reschedule this job by cleaning it up, but
655         *  reuse the same JobId if possible.
656         */
657       jcr->rerunning = jcr->is_incomplete();   /* save incomplete status */
658       time_t now = time(NULL);
659       jcr->reschedule_count++;
660       jcr->sched_time = now + jcr->job->RescheduleInterval;
661       bstrftime(dt, sizeof(dt), now);
662       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
663       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
664             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
665       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
666            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
667       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
668       jcr->JobStatus = -1;
669       jcr->setJobStatus(JS_WaitStartTime);
670       jcr->SDJobStatus = 0;
671       jcr->JobErrors = 0;
672       if (!allow_duplicate_job(jcr)) {
673          return false;
674       }
675       /* Only jobs with no output or Incomplete jobs can run on same JCR */
676       if (jcr->JobBytes == 0 || jcr->rerunning) {
677          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
678          V(jq->mutex);
679          /*
680           * Special test here since a Virtual Full gets marked
681           *  as a Full, so we look at the resource record
682           */
683          if (jcr->wasVirtualFull) {
684             jcr->setJobLevel(L_VIRTUAL_FULL);
685          }
686          /*
687           * When we are using the same jcr then make sure to reset
688           *   RealEndTime back to zero.
689           */
690          jcr->jr.RealEndTime = 0;
691          jobq_add(jq, jcr);     /* queue the job to run again */
692          P(jq->mutex);
693          free_jcr(jcr);         /* release jcr */
694          free(je);              /* free the job entry */
695          return true;           /* we already cleaned up */
696       }
697       /*
698        * Something was actually backed up, so we cannot reuse
699        *   the old JobId or there will be database record
700        *   conflicts.  We now create a new job, copying the
701        *   appropriate fields.
702        */
703       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
704       set_jcr_defaults(njcr, jcr->job);
705       /*
706        * Eliminate the new job_end_push, then copy the one from
707        *  the old job, and set the old one to be empty.
708        */
709       void *v;
710       lock_jobs();              /* protect ourself from reload_config() */
711       LockRes();
712       foreach_alist(v, (&jcr->job_end_push)) {
713          njcr->job_end_push.append(v);
714       }
715       jcr->job_end_push.destroy();
716       jcr->job_end_push.init(1, false);
717       UnlockRes();
718       unlock_jobs();
719
720       njcr->reschedule_count = jcr->reschedule_count;
721       njcr->sched_time = jcr->sched_time;
722       njcr->initial_sched_time = jcr->initial_sched_time;
723       /*
724        * Special test here since a Virtual Full gets marked
725        *  as a Full, so we look at the resource record
726        */
727       if (jcr->wasVirtualFull) {
728          njcr->setJobLevel(L_VIRTUAL_FULL);
729       } else {
730          njcr->setJobLevel(jcr->getJobLevel());
731       }
732       njcr->pool = jcr->pool;
733       njcr->run_pool_override = jcr->run_pool_override;
734       njcr->next_pool = jcr->next_pool;
735       njcr->run_next_pool_override = jcr->run_next_pool_override;
736       njcr->full_pool = jcr->full_pool;
737       njcr->run_full_pool_override = jcr->run_full_pool_override;
738       njcr->inc_pool = jcr->inc_pool;
739       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
740       njcr->diff_pool = jcr->diff_pool;
741       njcr->JobStatus = -1;
742       njcr->setJobStatus(jcr->JobStatus);
743       if (jcr->rstore) {
744          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
745       } else {
746          free_rstorage(njcr);
747       }
748       if (jcr->wstore) {
749          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
750       } else {
751          free_wstorage(njcr);
752       }
753       njcr->messages = jcr->messages;
754       njcr->spool_data = jcr->spool_data;
755       njcr->write_part_after_job = jcr->write_part_after_job;
756       Dmsg0(2300, "Call to run new job\n");
757       V(jq->mutex);
758       run_job(njcr);            /* This creates a "new" job */
759       free_jcr(njcr);           /* release "new" jcr */
760       P(jq->mutex);
761       Dmsg0(2300, "Back from running new job.\n");
762    }
763    return false;
764 }
765
766 /*
767  * See if we can acquire all the necessary resources for the job (JCR)
768  *
769  *  Returns: true  if successful
770  *           false if resource failure
771  */
772 static bool acquire_resources(JCR *jcr)
773 {
774    bool skip_this_jcr = false;
775
776    jcr->acquired_resource_locks = false;
777 /*
778  * Turning this code off is likely to cause some deadlocks,
779  *   but we do not really have enough information here to
780  *   know if this is really a deadlock (it may be a dual drive
781  *   autochanger), and in principle, the SD reservation system
782  *   should detect these deadlocks, so push the work off on it.
783  */
784 #ifdef xxx
785    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
786       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
787          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
788          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
789       jcr->setJobStatus(JS_Canceled);
790       return false;
791    }
792 #endif
793    if (jcr->rstore) {
794       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
795       if (!inc_read_store(jcr)) {
796          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
797          jcr->setJobStatus(JS_WaitStoreRes);
798          return false;
799       }
800    }
801
802    if (jcr->wstore) {
803       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
804       if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
805          jcr->wstore->NumConcurrentJobs++;
806          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
807       } else if (jcr->rstore) {
808          dec_read_store(jcr);
809          skip_this_jcr = true;
810       } else {
811          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
812          skip_this_jcr = true;
813       }
814    }
815    if (skip_this_jcr) {
816       jcr->setJobStatus(JS_WaitStoreRes);
817       return false;
818    }
819
820    if (jcr->client) {
821       if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
822          update_client_numconcurrentjobs(jcr, 1);
823       } else {
824          /* Back out previous locks */
825          dec_write_store(jcr);
826          dec_read_store(jcr);
827          jcr->setJobStatus(JS_WaitClientRes);
828          return false;
829       }
830    }
831    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
832       jcr->job->NumConcurrentJobs++;
833    } else {
834       /* Back out previous locks */
835       dec_write_store(jcr);
836       dec_read_store(jcr);
837       update_client_numconcurrentjobs(jcr, -1);
838       jcr->setJobStatus(JS_WaitJobRes);
839       return false;
840    }
841
842    jcr->acquired_resource_locks = true;
843    return true;
844 }
845
846 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
847
848 /*
849  * Note: inc_read_store() and dec_read_store() are
850  *   called from select_rstore() in src/dird/restore.c
851  */
852 bool inc_read_store(JCR *jcr)
853 {
854    P(rstore_mutex);
855    if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs &&
856        (jcr->getJobType() == JT_RESTORE ||
857         jcr->rstore->MaxConcurrentReadJobs == 0 ||
858         jcr->rstore->NumConcurrentReadJobs < jcr->rstore->MaxConcurrentReadJobs)) {
859       jcr->rstore->NumConcurrentReadJobs++;
860       jcr->rstore->NumConcurrentJobs++;
861       Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
862       V(rstore_mutex);
863       return true;
864    }
865    V(rstore_mutex);
866    return false;
867 }
868
869 void dec_read_store(JCR *jcr)
870 {
871    if (jcr->rstore) {
872       P(rstore_mutex);
873       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
874       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
875       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
876       V(rstore_mutex);
877       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
878       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
879    }
880 }
881
882 static void dec_write_store(JCR *jcr)
883 {
884    if (jcr->wstore) {
885       jcr->wstore->NumConcurrentJobs--;
886       Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
887       ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
888    }
889 }