]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
Restore win32 dir from Branch-5.2 and update it
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula job queue routines.
21  *
22  *  This code consists of three queues, the waiting_jobs
23  *  queue, where jobs are initially queued, the ready_jobs
24  *  queue, where jobs are placed when all the resources are
25  *  allocated and they can immediately be run, and the
26  *  running queue where jobs are placed when they are
27  *  running.
28  *
29  *  Kern Sibbald, July MMIII
30  *
31  *
32  *  This code was adapted from the Bacula workq, which was
33  *    adapted from "Programming with POSIX Threads", by
34  *    David R. Butenhof
35  *
36  */
37
38 #include "bacula.h"
39 #include "dird.h"
40
41 extern JCR *jobs;
42
43 /* Forward referenced functions */
44 extern "C" void *jobq_server(void *arg);
45 extern "C" void *sched_wait(void *arg);
46
47 static int  start_server(jobq_t *jq);
48 static bool acquire_resources(JCR *jcr);
49 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
50 static void dec_write_store(JCR *jcr);
51
52 /*
53  * Initialize a job queue
54  *
55  *  Returns: 0 on success
56  *           errno on failure
57  */
58 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
59 {
60    int stat;
61    jobq_item_t *item = NULL;
62
63    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
64       berrno be;
65       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
66       return stat;
67    }
68    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69       pthread_attr_destroy(&jq->attr);
70       return stat;
71    }
72    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
73       berrno be;
74       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
75       pthread_attr_destroy(&jq->attr);
76       return stat;
77    }
78    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
79       berrno be;
80       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
81       pthread_mutex_destroy(&jq->mutex);
82       pthread_attr_destroy(&jq->attr);
83       return stat;
84    }
85    jq->quit = false;
86    jq->max_workers = threads;         /* max threads to create */
87    jq->num_workers = 0;               /* no threads yet */
88    jq->idle_workers = 0;              /* no idle threads */
89    jq->engine = engine;               /* routine to run */
90    jq->valid = JOBQ_VALID;
91    /* Initialize the job queues */
92    jq->waiting_jobs = New(dlist(item, &item->link));
93    jq->running_jobs = New(dlist(item, &item->link));
94    jq->ready_jobs = New(dlist(item, &item->link));
95    return 0;
96 }
97
98 /*
99  * Destroy the job queue
100  *
101  * Returns: 0 on success
102  *          errno on failure
103  */
104 int jobq_destroy(jobq_t *jq)
105 {
106    int stat, stat1, stat2;
107
108    if (jq->valid != JOBQ_VALID) {
109       return EINVAL;
110    }
111    P(jq->mutex);
112    jq->valid = 0;                      /* prevent any more operations */
113
114    /*
115     * If any threads are active, wake them
116     */
117    if (jq->num_workers > 0) {
118       jq->quit = true;
119       if (jq->idle_workers) {
120          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
121             berrno be;
122             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
123             V(jq->mutex);
124             return stat;
125          }
126       }
127       while (jq->num_workers > 0) {
128          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
129             berrno be;
130             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
131             V(jq->mutex);
132             return stat;
133          }
134       }
135    }
136    V(jq->mutex);
137    stat  = pthread_mutex_destroy(&jq->mutex);
138    stat1 = pthread_cond_destroy(&jq->work);
139    stat2 = pthread_attr_destroy(&jq->attr);
140    delete jq->waiting_jobs;
141    delete jq->running_jobs;
142    delete jq->ready_jobs;
143    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
144 }
145
146 struct wait_pkt {
147    JCR *jcr;
148    jobq_t *jq;
149 };
150
151 /*
152  * Wait until schedule time arrives before starting. Normally
153  *  this routine is only used for jobs started from the console
154  *  for which the user explicitly specified a start time. Otherwise
155  *  most jobs are put into the job queue only when their
156  *  scheduled time arrives.
157  */
158 extern "C"
159 void *sched_wait(void *arg)
160 {
161    JCR *jcr = ((wait_pkt *)arg)->jcr;
162    jobq_t *jq = ((wait_pkt *)arg)->jq;
163
164    set_jcr_in_tsd(INVALID_JCR);
165    Dmsg0(2300, "Enter sched_wait.\n");
166    free(arg);
167    time_t wtime = jcr->sched_time - time(NULL);
168    jcr->setJobStatus(JS_WaitStartTime);
169    /* Wait until scheduled time arrives */
170    if (wtime > 0) {
171       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
172          jcr->Job, wtime);
173    }
174    /* Check every 30 seconds if canceled */
175    while (wtime > 0) {
176       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
177          jcr->JobId, wtime, jcr->use_count());
178       if (wtime > 30) {
179          wtime = 30;
180       }
181       bmicrosleep(wtime, 0);
182       if (job_canceled(jcr)) {
183          break;
184       }
185       wtime = jcr->sched_time - time(NULL);
186    }
187    Dmsg1(200, "resched use=%d\n", jcr->use_count());
188    jobq_add(jq, jcr);
189    free_jcr(jcr);                     /* we are done with jcr */
190    Dmsg0(2300, "Exit sched_wait\n");
191    return NULL;
192 }
193
194 /* Procedure to update the client->NumConcurrentJobs */
195 static void update_client_numconcurrentjobs(JCR *jcr, int val)
196 {
197    int num;
198    if (!jcr->client) {
199       return;
200    }
201
202    switch (jcr->getJobType())
203    {
204    case JT_MIGRATE:
205    case JT_COPY:
206    case JT_ADMIN:
207       break;
208    case JT_BACKUP:
209    /* Fall through wanted */
210    default:
211       if (jcr->no_client_used() || jcr->wasVirtualFull) {
212          break;
213       }
214       num = jcr->client->getNumConcurrentJobs();
215       jcr->client->setNumConcurrentJobs(num + val);
216       break;
217    }
218 }
219
220 /*
221  *  Add a job to the queue
222  *    jq is a queue that was created with jobq_init
223  */
224 int jobq_add(jobq_t *jq, JCR *jcr)
225 {
226    int stat;
227    jobq_item_t *item, *li;
228    bool inserted = false;
229    time_t wtime = jcr->sched_time - time(NULL);
230    pthread_t id;
231    wait_pkt *sched_pkt;
232
233    if (!jcr->term_wait_inited) {
234       /* Initialize termination condition variable */
235       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
236          berrno be;
237          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
238          return stat;
239       }
240       jcr->term_wait_inited = true;
241    }
242
243    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
244    if (jq->valid != JOBQ_VALID) {
245       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
246       return EINVAL;
247    }
248
249    jcr->inc_use_count();                 /* mark jcr in use by us */
250    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
251    if (!job_canceled(jcr) && wtime > 0) {
252       set_thread_concurrency(jq->max_workers + 2);
253       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
254       sched_pkt->jcr = jcr;
255       sched_pkt->jq = jq;
256       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
257       if (stat != 0) {                /* thread not created */
258          berrno be;
259          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
260       }
261       return stat;
262    }
263
264    P(jq->mutex);
265
266    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
267       free_jcr(jcr);                    /* release jcr */
268       return ENOMEM;
269    }
270    item->jcr = jcr;
271
272    /* While waiting in a queue this job is not attached to a thread */
273    set_jcr_in_tsd(INVALID_JCR);
274    if (job_canceled(jcr)) {
275       /* Add job to ready queue so that it is canceled quickly */
276       jq->ready_jobs->prepend(item);
277       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
278    } else {
279       /* Add this job to the wait queue in priority sorted order */
280       foreach_dlist(li, jq->waiting_jobs) {
281          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
282             li->jcr->JobId, li->jcr->JobPriority);
283          if (li->jcr->JobPriority > jcr->JobPriority) {
284             jq->waiting_jobs->insert_before(item, li);
285             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
286                li->jcr->JobId, jcr->JobId);
287             inserted = true;
288             break;
289          }
290       }
291       /* If not jobs in wait queue, append it */
292       if (!inserted) {
293          jq->waiting_jobs->append(item);
294          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
295       }
296    }
297
298    /* Ensure that at least one server looks at the queue. */
299    stat = start_server(jq);
300
301    V(jq->mutex);
302    Dmsg0(2300, "Return jobq_add\n");
303    return stat;
304 }
305
306 /*
307  *  Remove a job from the job queue. Used only by cancel_job().
308  *    jq is a queue that was created with jobq_init
309  *    work_item is an element of work
310  *
311  *   Note, it is "removed" from the job queue.
312  *    If you want to cancel it, you need to provide some external means
313  *    of doing so (e.g. pthread_kill()).
314  */
315 int jobq_remove(jobq_t *jq, JCR *jcr)
316 {
317    int stat;
318    bool found = false;
319    jobq_item_t *item;
320
321    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
322    if (jq->valid != JOBQ_VALID) {
323       return EINVAL;
324    }
325
326    P(jq->mutex);
327    foreach_dlist(item, jq->waiting_jobs) {
328       if (jcr == item->jcr) {
329          found = true;
330          break;
331       }
332    }
333    if (!found) {
334       V(jq->mutex);
335       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
336       return EINVAL;
337    }
338
339    /* Move item to be the first on the list */
340    jq->waiting_jobs->remove(item);
341    jq->ready_jobs->prepend(item);
342    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
343
344    stat = start_server(jq);
345
346    V(jq->mutex);
347    Dmsg0(2300, "Return jobq_remove\n");
348    return stat;
349 }
350
351
352 /*
353  * Start the server thread if it isn't already running
354  */
355 static int start_server(jobq_t *jq)
356 {
357    int stat = 0;
358    pthread_t id;
359
360    /*
361     * if any threads are idle, wake one.
362     *   Actually we do a broadcast because on /lib/tls
363     *   these signals seem to get lost from time to time.
364     */
365    if (jq->idle_workers > 0) {
366       Dmsg0(2300, "Signal worker to wake up\n");
367       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
368          berrno be;
369          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
370          return stat;
371       }
372    } else if (jq->num_workers < jq->max_workers) {
373       Dmsg0(2300, "Create worker thread\n");
374       /* No idle threads so create a new one */
375       set_thread_concurrency(jq->max_workers + 1);
376       jq->num_workers++;
377       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
378          berrno be;
379          jq->num_workers--;
380          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
381          return stat;
382       }
383    }
384    return stat;
385 }
386
387
388 /*
389  * This is the worker thread that serves the job queue.
390  * When all the resources are acquired for the job,
391  *  it will call the user's engine.
392  */
393 extern "C"
394 void *jobq_server(void *arg)
395 {
396    struct timespec timeout;
397    jobq_t *jq = (jobq_t *)arg;
398    jobq_item_t *je;                   /* job entry in queue */
399    int stat;
400    bool timedout = false;
401    bool work = true;
402
403    set_jcr_in_tsd(INVALID_JCR);
404    Dmsg0(2300, "Start jobq_server\n");
405    P(jq->mutex);
406
407    for (;;) {
408       struct timeval tv;
409       struct timezone tz;
410
411       Dmsg0(2300, "Top of for loop\n");
412       if (!work && !jq->quit) {
413          gettimeofday(&tv, &tz);
414          timeout.tv_nsec = 0;
415          timeout.tv_sec = tv.tv_sec + 4;
416
417          while (!jq->quit) {
418             /*
419              * Wait 4 seconds, then if no more work, exit
420              */
421             Dmsg0(2300, "pthread_cond_timedwait()\n");
422             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
423             if (stat == ETIMEDOUT) {
424                Dmsg0(2300, "timedwait timedout.\n");
425                timedout = true;
426                break;
427             } else if (stat != 0) {
428                /* This shouldn't happen */
429                Dmsg0(2300, "This shouldn't happen\n");
430                jq->num_workers--;
431                V(jq->mutex);
432                return NULL;
433             }
434             break;
435          }
436       }
437       /*
438        * If anything is in the ready queue, run it
439        */
440       Dmsg0(2300, "Checking ready queue.\n");
441       while (!jq->ready_jobs->empty() && !jq->quit) {
442          JCR *jcr;
443          je = (jobq_item_t *)jq->ready_jobs->first();
444          jcr = je->jcr;
445          jq->ready_jobs->remove(je);
446          if (!jq->ready_jobs->empty()) {
447             Dmsg0(2300, "ready queue not empty start server\n");
448             if (start_server(jq) != 0) {
449                jq->num_workers--;
450                V(jq->mutex);
451                return NULL;
452             }
453          }
454          jq->running_jobs->append(je);
455
456          /* Attach jcr to this thread while we run the job */
457          jcr->my_thread_id = pthread_self();
458          jcr->set_killable(true);
459          set_jcr_in_tsd(jcr);
460          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
461
462          /* Release job queue lock */
463          V(jq->mutex);
464
465          /* Call user's routine here */
466          Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
467             jcr->use_count(), jcr->JobStatus);
468          jq->engine(je->jcr);
469
470          /* Job finished detach from thread */
471          remove_jcr_from_tsd(je->jcr);
472          je->jcr->set_killable(false);
473
474          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
475             jcr->use_count());
476
477          /* Reacquire job queue lock */
478          P(jq->mutex);
479          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
480          jq->running_jobs->remove(je);
481          /*
482           * Release locks if acquired. Note, they will not have
483           *  been acquired for jobs canceled before they were
484           *  put into the ready queue.
485           */
486          if (jcr->acquired_resource_locks) {
487             int num;
488             dec_read_store(jcr);
489             dec_write_store(jcr);
490             update_client_numconcurrentjobs(jcr, -1);
491             num = jcr->job->getNumConcurrentJobs() - 1;
492             jcr->job->setNumConcurrentJobs(num);
493             jcr->acquired_resource_locks = false;
494          }
495
496          if (reschedule_job(jcr, jq, je)) {
497             continue;              /* go look for more work */
498          }
499
500          /* Clean up and release old jcr */
501          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
502          jcr->SDJobStatus = 0;
503          V(jq->mutex);                /* release internal lock */
504          free_jcr(jcr);
505          free(je);                    /* release job entry */
506          P(jq->mutex);                /* reacquire job queue lock */
507       }
508       /*
509        * If any job in the wait queue can be run,
510        *  move it to the ready queue
511        */
512       Dmsg0(2300, "Done check ready, now check wait queue.\n");
513       if (!jq->waiting_jobs->empty() && !jq->quit) {
514          int Priority;
515          bool running_allow_mix = false;
516          je = (jobq_item_t *)jq->waiting_jobs->first();
517          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
518          if (re) {
519             Priority = re->jcr->JobPriority;
520             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
521                   re->jcr->JobId, Priority);
522             running_allow_mix = true;
523             for ( ; re; ) {
524                Dmsg2(2300, "JobId %d is also running with %s\n",
525                      re->jcr->JobId,
526                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
527                if (!re->jcr->job->allow_mixed_priority) {
528                   running_allow_mix = false;
529                   break;
530                }
531                re = (jobq_item_t *)jq->running_jobs->next(re);
532             }
533             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
534                   running_allow_mix ? "allow" : "don't allow");
535          } else {
536             Priority = je->jcr->JobPriority;
537             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
538          }
539          /*
540           * Walk down the list of waiting jobs and attempt
541           *   to acquire the resources it needs.
542           */
543          for ( ; je;  ) {
544             /* je is current job item on the queue, jn is the next one */
545             JCR *jcr = je->jcr;
546             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
547
548             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
549                   jcr->JobId, jcr->JobPriority, Priority,
550                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
551
552             /* Take only jobs of correct Priority */
553             if (!(jcr->JobPriority == Priority
554                   || (jcr->JobPriority < Priority &&
555                       jcr->job->allow_mixed_priority && running_allow_mix))) {
556                jcr->setJobStatus(JS_WaitPriority);
557                break;
558             }
559
560             if (!acquire_resources(jcr)) {
561                /* If resource conflict, job is canceled */
562                if (!job_canceled(jcr)) {
563                   je = jn;            /* point to next waiting job */
564                   continue;
565                }
566             }
567
568             /*
569              * Got all locks, now remove it from wait queue and append it
570              *   to the ready queue.  Note, we may also get here if the
571              *    job was canceled.  Once it is "run", it will quickly
572              *    terminate.
573              */
574             jq->waiting_jobs->remove(je);
575             jq->ready_jobs->append(je);
576             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
577             je = jn;                  /* Point to next waiting job */
578          } /* end for loop */
579
580       } /* end if */
581
582       Dmsg0(2300, "Done checking wait queue.\n");
583       /*
584        * If no more ready work and we are asked to quit, then do it
585        */
586       if (jq->ready_jobs->empty() && jq->quit) {
587          jq->num_workers--;
588          if (jq->num_workers == 0) {
589             Dmsg0(2300, "Wake up destroy routine\n");
590             /* Wake up destroy routine if he is waiting */
591             pthread_cond_broadcast(&jq->work);
592          }
593          break;
594       }
595       Dmsg0(2300, "Check for work request\n");
596       /*
597        * If no more work requests, and we waited long enough, quit
598        */
599       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
600          jq->ready_jobs->empty());
601       if (jq->ready_jobs->empty() && timedout) {
602          Dmsg0(2300, "break big loop\n");
603          jq->num_workers--;
604          break;
605       }
606
607       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
608       if (work) {
609          /*
610           * If a job is waiting on a Resource, don't consume all
611           *   the CPU time looping looking for work, and even more
612           *   important, release the lock so that a job that has
613           *   terminated can give us the resource.
614           */
615          V(jq->mutex);
616          bmicrosleep(2, 0);              /* pause for 2 seconds */
617          P(jq->mutex);
618          /* Recompute work as something may have changed in last 2 secs */
619          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
620       }
621       Dmsg1(2300, "Loop again. work=%d\n", work);
622    } /* end of big for loop */
623
624    Dmsg0(200, "unlock mutex\n");
625    V(jq->mutex);
626    Dmsg0(2300, "End jobq_server\n");
627    return NULL;
628 }
629
630 /*
631  * Returns true if cleanup done and we should look for more work
632  */
633 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
634 {
635    bool resched = false;
636    /*
637     * Reschedule the job if requested and possible
638     */
639    /* Basic condition is that more reschedule times remain */
640    if (jcr->job->RescheduleTimes == 0 ||
641        jcr->reschedule_count < jcr->job->RescheduleTimes) {
642
643       /* Check for incomplete jobs */
644       if (jcr->is_incomplete()) {
645          resched = (jcr->RescheduleIncompleteJobs && jcr->is_JobType(JT_BACKUP) &&
646                     !(jcr->HasBase||jcr->is_JobLevel(L_BASE)));
647       } else {
648          /* Check for failed jobs */
649          resched = (jcr->job->RescheduleOnError &&
650                     !jcr->is_JobStatus(JS_Terminated) &&
651                     !jcr->is_JobStatus(JS_Canceled) &&
652                     jcr->is_JobType(JT_BACKUP));
653       }
654    }
655    if (resched) {
656        char dt[50], dt2[50];
657
658        /*
659         * Reschedule this job by cleaning it up, but
660         *  reuse the same JobId if possible.
661         */
662       jcr->rerunning = jcr->is_incomplete();   /* save incomplete status */
663       time_t now = time(NULL);
664       jcr->reschedule_count++;
665       jcr->sched_time = now + jcr->job->RescheduleInterval;
666       bstrftime(dt, sizeof(dt), now);
667       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
668       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
669             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
670       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
671            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
672       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
673       jcr->JobStatus = -1;
674       jcr->setJobStatus(JS_WaitStartTime);
675       jcr->SDJobStatus = 0;
676       jcr->JobErrors = 0;
677       if (!allow_duplicate_job(jcr)) {
678          return false;
679       }
680       /* Only jobs with no output or Incomplete jobs can run on same JCR */
681       if (jcr->JobBytes == 0 || jcr->rerunning) {
682          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
683          V(jq->mutex);
684          /*
685           * Special test here since a Virtual Full gets marked
686           *  as a Full, so we look at the resource record
687           */
688          if (jcr->wasVirtualFull) {
689             jcr->setJobLevel(L_VIRTUAL_FULL);
690          }
691          /*
692           * When we are using the same jcr then make sure to reset
693           *   RealEndTime back to zero.
694           */
695          jcr->jr.RealEndTime = 0;
696          jobq_add(jq, jcr);     /* queue the job to run again */
697          P(jq->mutex);
698          free_jcr(jcr);         /* release jcr */
699          free(je);              /* free the job entry */
700          return true;           /* we already cleaned up */
701       }
702       /*
703        * Something was actually backed up, so we cannot reuse
704        *   the old JobId or there will be database record
705        *   conflicts.  We now create a new job, copying the
706        *   appropriate fields.
707        */
708       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
709       set_jcr_defaults(njcr, jcr->job);
710       /*
711        * Eliminate the new job_end_push, then copy the one from
712        *  the old job, and set the old one to be empty.
713        */
714       void *v;
715       lock_jobs();              /* protect ourself from reload_config() */
716       LockRes();
717       foreach_alist(v, (&jcr->job_end_push)) {
718          njcr->job_end_push.append(v);
719       }
720       jcr->job_end_push.destroy();
721       jcr->job_end_push.init(1, false);
722       UnlockRes();
723       unlock_jobs();
724
725       njcr->reschedule_count = jcr->reschedule_count;
726       njcr->sched_time = jcr->sched_time;
727       njcr->initial_sched_time = jcr->initial_sched_time;
728       /*
729        * Special test here since a Virtual Full gets marked
730        *  as a Full, so we look at the resource record
731        */
732       if (jcr->wasVirtualFull) {
733          njcr->setJobLevel(L_VIRTUAL_FULL);
734       } else {
735          njcr->setJobLevel(jcr->getJobLevel());
736       }
737       njcr->pool = jcr->pool;
738       njcr->run_pool_override = jcr->run_pool_override;
739       njcr->next_pool = jcr->next_pool;
740       njcr->run_next_pool_override = jcr->run_next_pool_override;
741       njcr->full_pool = jcr->full_pool;
742       njcr->vfull_pool = jcr->vfull_pool;
743       njcr->run_full_pool_override = jcr->run_full_pool_override;
744       njcr->run_vfull_pool_override = jcr->run_vfull_pool_override;
745       njcr->inc_pool = jcr->inc_pool;
746       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
747       njcr->diff_pool = jcr->diff_pool;
748       njcr->JobStatus = -1;
749       njcr->setJobStatus(jcr->JobStatus);
750       if (jcr->rstore) {
751          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
752       } else {
753          free_rstorage(njcr);
754       }
755       if (jcr->wstore) {
756          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
757       } else {
758          free_wstorage(njcr);
759       }
760       njcr->messages = jcr->messages;
761       njcr->spool_data = jcr->spool_data;
762       njcr->write_part_after_job = jcr->write_part_after_job;
763       Dmsg0(2300, "Call to run new job\n");
764       V(jq->mutex);
765       run_job(njcr);            /* This creates a "new" job */
766       free_jcr(njcr);           /* release "new" jcr */
767       P(jq->mutex);
768       Dmsg0(2300, "Back from running new job.\n");
769    }
770    return false;
771 }
772
773 /*
774  * See if we can acquire all the necessary resources for the job (JCR)
775  *
776  *  Returns: true  if successful
777  *           false if resource failure
778  */
779 static bool acquire_resources(JCR *jcr)
780 {
781    bool skip_this_jcr = false;
782
783    jcr->acquired_resource_locks = false;
784 /*
785  * Turning this code off is likely to cause some deadlocks,
786  *   but we do not really have enough information here to
787  *   know if this is really a deadlock (it may be a dual drive
788  *   autochanger), and in principle, the SD reservation system
789  *   should detect these deadlocks, so push the work off on it.
790  */
791 #ifdef xxx
792    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
793       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
794          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
795          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
796       jcr->setJobStatus(JS_Canceled);
797       return false;
798    }
799 #endif
800    if (jcr->rstore) {
801       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
802       if (!inc_read_store(jcr)) {
803          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->getNumConcurrentJobs());
804          jcr->setJobStatus(JS_WaitStoreRes);
805          return false;
806       }
807    }
808
809    if (jcr->wstore) {
810       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
811       int num = jcr->wstore->getNumConcurrentJobs();
812       if (num < jcr->wstore->MaxConcurrentJobs) {
813          Dmsg1(200, "Inc wncj=%d\n", num + 1);
814          jcr->wstore->setNumConcurrentJobs(num + 1);
815       } else if (jcr->rstore) {
816          dec_read_store(jcr);
817          skip_this_jcr = true;
818       } else {
819          Dmsg1(200, "Fail wncj=%d\n", num);
820          skip_this_jcr = true;
821       }
822    }
823    if (skip_this_jcr) {
824       jcr->setJobStatus(JS_WaitStoreRes);
825       return false;
826    }
827
828    if (jcr->client) {
829       if (jcr->client->getNumConcurrentJobs() < jcr->client->MaxConcurrentJobs) {
830          update_client_numconcurrentjobs(jcr, 1);
831       } else {
832          /* Back out previous locks */
833          dec_write_store(jcr);
834          dec_read_store(jcr);
835          jcr->setJobStatus(JS_WaitClientRes);
836          return false;
837       }
838    }
839    if (jcr->job->getNumConcurrentJobs() < jcr->job->MaxConcurrentJobs) {
840       int num;
841       num = jcr->job->getNumConcurrentJobs() + 1;
842       jcr->job->setNumConcurrentJobs(num);
843    } else {
844       /* Back out previous locks */
845       dec_write_store(jcr);
846       dec_read_store(jcr);
847       update_client_numconcurrentjobs(jcr, -1);
848       jcr->setJobStatus(JS_WaitJobRes);
849       return false;
850    }
851
852    jcr->acquired_resource_locks = true;
853    return true;
854 }
855
856 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
857
858 /*
859  * Note: inc_read_store() and dec_read_store() are
860  *   called from select_rstore() in src/dird/restore.c
861  */
862 bool inc_read_store(JCR *jcr)
863 {
864    P(rstore_mutex);
865    int num = jcr->rstore->getNumConcurrentJobs();
866    int numread = jcr->rstore->getNumConcurrentReadJobs();
867    int maxread = jcr->rstore->MaxConcurrentReadJobs;
868    if (num < jcr->rstore->MaxConcurrentJobs &&
869        (jcr->getJobType() == JT_RESTORE ||
870         numread == 0     ||
871         maxread == 0     ||     /* No limit set */
872         numread < maxread))     /* Below the limit */
873    {
874       num++;
875       numread++;
876       jcr->rstore->setNumConcurrentReadJobs(numread);
877       jcr->rstore->setNumConcurrentJobs(num);
878       Dmsg1(200, "Inc rncj=%d\n", num);
879       V(rstore_mutex);
880       return true;
881    }
882    V(rstore_mutex);
883    return false;
884 }
885
886 void dec_read_store(JCR *jcr)
887 {
888    if (jcr->rstore) {
889       P(rstore_mutex);
890       int numread = jcr->rstore->getNumConcurrentReadJobs() - 1;
891       int num = jcr->rstore->getNumConcurrentJobs() - 1;
892       jcr->rstore->setNumConcurrentReadJobs(numread);
893       jcr->rstore->setNumConcurrentJobs(num);
894       Dmsg1(200, "Dec rncj=%d\n", num);
895       V(rstore_mutex);
896    }
897 }
898
899 static void dec_write_store(JCR *jcr)
900 {
901    if (jcr->wstore) {
902       int num = jcr->wstore->getNumConcurrentJobs() - 1;
903       Dmsg1(200, "Dec wncj=%d\n", num);
904       jcr->wstore->setNumConcurrentJobs(num);
905    }
906 }