]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
Fix seg fault in jobq.c
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2014 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from many
7    others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    Bacula® is a registered trademark of Kern Sibbald.
15 */
16 /*
17  * Bacula job queue routines.
18  *
19  *  This code consists of three queues, the waiting_jobs
20  *  queue, where jobs are initially queued, the ready_jobs
21  *  queue, where jobs are placed when all the resources are
22  *  allocated and they can immediately be run, and the
23  *  running queue where jobs are placed when they are
24  *  running.
25  *
26  *  Kern Sibbald, July MMIII
27  *
28  *
29  *  This code was adapted from the Bacula workq, which was
30  *    adapted from "Programming with POSIX Threads", by
31  *    David R. Butenhof
32  *
33  */
34
35 #include "bacula.h"
36 #include "dird.h"
37
38 extern JCR *jobs;
39
40 /* Forward referenced functions */
41 extern "C" void *jobq_server(void *arg);
42 extern "C" void *sched_wait(void *arg);
43
44 static int  start_server(jobq_t *jq);
45 static bool acquire_resources(JCR *jcr);
46 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
47 static void dec_write_store(JCR *jcr);
48
49 /*
50  * Initialize a job queue
51  *
52  *  Returns: 0 on success
53  *           errno on failure
54  */
55 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
56 {
57    int stat;
58    jobq_item_t *item = NULL;
59
60    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
61       berrno be;
62       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
63       return stat;
64    }
65    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
66       pthread_attr_destroy(&jq->attr);
67       return stat;
68    }
69    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
70       berrno be;
71       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
72       pthread_attr_destroy(&jq->attr);
73       return stat;
74    }
75    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
76       berrno be;
77       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
78       pthread_mutex_destroy(&jq->mutex);
79       pthread_attr_destroy(&jq->attr);
80       return stat;
81    }
82    jq->quit = false;
83    jq->max_workers = threads;         /* max threads to create */
84    jq->num_workers = 0;               /* no threads yet */
85    jq->idle_workers = 0;              /* no idle threads */
86    jq->engine = engine;               /* routine to run */
87    jq->valid = JOBQ_VALID;
88    /* Initialize the job queues */
89    jq->waiting_jobs = New(dlist(item, &item->link));
90    jq->running_jobs = New(dlist(item, &item->link));
91    jq->ready_jobs = New(dlist(item, &item->link));
92    return 0;
93 }
94
95 /*
96  * Destroy the job queue
97  *
98  * Returns: 0 on success
99  *          errno on failure
100  */
101 int jobq_destroy(jobq_t *jq)
102 {
103    int stat, stat1, stat2;
104
105    if (jq->valid != JOBQ_VALID) {
106       return EINVAL;
107    }
108    P(jq->mutex);
109    jq->valid = 0;                      /* prevent any more operations */
110
111    /*
112     * If any threads are active, wake them
113     */
114    if (jq->num_workers > 0) {
115       jq->quit = true;
116       if (jq->idle_workers) {
117          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
118             berrno be;
119             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
120             V(jq->mutex);
121             return stat;
122          }
123       }
124       while (jq->num_workers > 0) {
125          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
126             berrno be;
127             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
128             V(jq->mutex);
129             return stat;
130          }
131       }
132    }
133    V(jq->mutex);
134    stat  = pthread_mutex_destroy(&jq->mutex);
135    stat1 = pthread_cond_destroy(&jq->work);
136    stat2 = pthread_attr_destroy(&jq->attr);
137    delete jq->waiting_jobs;
138    delete jq->running_jobs;
139    delete jq->ready_jobs;
140    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
141 }
142
143 struct wait_pkt {
144    JCR *jcr;
145    jobq_t *jq;
146 };
147
148 /*
149  * Wait until schedule time arrives before starting. Normally
150  *  this routine is only used for jobs started from the console
151  *  for which the user explicitly specified a start time. Otherwise
152  *  most jobs are put into the job queue only when their
153  *  scheduled time arives.
154  */
155 extern "C"
156 void *sched_wait(void *arg)
157 {
158    JCR *jcr = ((wait_pkt *)arg)->jcr;
159    jobq_t *jq = ((wait_pkt *)arg)->jq;
160
161    set_jcr_in_tsd(INVALID_JCR);
162    Dmsg0(2300, "Enter sched_wait.\n");
163    free(arg);
164    time_t wtime = jcr->sched_time - time(NULL);
165    jcr->setJobStatus(JS_WaitStartTime);
166    /* Wait until scheduled time arrives */
167    if (wtime > 0) {
168       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
169          jcr->Job, wtime);
170    }
171    /* Check every 30 seconds if canceled */
172    while (wtime > 0) {
173       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
174          jcr->JobId, wtime, jcr->use_count());
175       if (wtime > 30) {
176          wtime = 30;
177       }
178       bmicrosleep(wtime, 0);
179       if (job_canceled(jcr)) {
180          break;
181       }
182       wtime = jcr->sched_time - time(NULL);
183    }
184    Dmsg1(200, "resched use=%d\n", jcr->use_count());
185    jobq_add(jq, jcr);
186    free_jcr(jcr);                     /* we are done with jcr */
187    Dmsg0(2300, "Exit sched_wait\n");
188    return NULL;
189 }
190
191 /*
192  *  Add a job to the queue
193  *    jq is a queue that was created with jobq_init
194  */
195 int jobq_add(jobq_t *jq, JCR *jcr)
196 {
197    int stat;
198    jobq_item_t *item, *li;
199    bool inserted = false;
200    time_t wtime = jcr->sched_time - time(NULL);
201    pthread_t id;
202    wait_pkt *sched_pkt;
203
204    if (!jcr->term_wait_inited) {
205       /* Initialize termination condition variable */
206       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
207          berrno be;
208          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
209          return stat;
210       }
211       jcr->term_wait_inited = true;
212    }
213
214    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
215    if (jq->valid != JOBQ_VALID) {
216       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
217       return EINVAL;
218    }
219
220    jcr->inc_use_count();                 /* mark jcr in use by us */
221    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
222    if (!job_canceled(jcr) && wtime > 0) {
223       set_thread_concurrency(jq->max_workers + 2);
224       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
225       sched_pkt->jcr = jcr;
226       sched_pkt->jq = jq;
227       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
228       if (stat != 0) {                /* thread not created */
229          berrno be;
230          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
231       }
232       return stat;
233    }
234
235    P(jq->mutex);
236
237    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
238       free_jcr(jcr);                    /* release jcr */
239       return ENOMEM;
240    }
241    item->jcr = jcr;
242
243    /* While waiting in a queue this job is not attached to a thread */
244    set_jcr_in_tsd(INVALID_JCR);
245    if (job_canceled(jcr)) {
246       /* Add job to ready queue so that it is canceled quickly */
247       jq->ready_jobs->prepend(item);
248       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
249    } else {
250       /* Add this job to the wait queue in priority sorted order */
251       foreach_dlist(li, jq->waiting_jobs) {
252          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
253             li->jcr->JobId, li->jcr->JobPriority);
254          if (li->jcr->JobPriority > jcr->JobPriority) {
255             jq->waiting_jobs->insert_before(item, li);
256             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
257                li->jcr->JobId, jcr->JobId);
258             inserted = true;
259             break;
260          }
261       }
262       /* If not jobs in wait queue, append it */
263       if (!inserted) {
264          jq->waiting_jobs->append(item);
265          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
266       }
267    }
268
269    /* Ensure that at least one server looks at the queue. */
270    stat = start_server(jq);
271
272    V(jq->mutex);
273    Dmsg0(2300, "Return jobq_add\n");
274    return stat;
275 }
276
277 /*
278  *  Remove a job from the job queue. Used only by cancel_job().
279  *    jq is a queue that was created with jobq_init
280  *    work_item is an element of work
281  *
282  *   Note, it is "removed" from the job queue.
283  *    If you want to cancel it, you need to provide some external means
284  *    of doing so (e.g. pthread_kill()).
285  */
286 int jobq_remove(jobq_t *jq, JCR *jcr)
287 {
288    int stat;
289    bool found = false;
290    jobq_item_t *item;
291
292    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
293    if (jq->valid != JOBQ_VALID) {
294       return EINVAL;
295    }
296
297    P(jq->mutex);
298    foreach_dlist(item, jq->waiting_jobs) {
299       if (jcr == item->jcr) {
300          found = true;
301          break;
302       }
303    }
304    if (!found) {
305       V(jq->mutex);
306       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
307       return EINVAL;
308    }
309
310    /* Move item to be the first on the list */
311    jq->waiting_jobs->remove(item);
312    jq->ready_jobs->prepend(item);
313    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
314
315    stat = start_server(jq);
316
317    V(jq->mutex);
318    Dmsg0(2300, "Return jobq_remove\n");
319    return stat;
320 }
321
322
323 /*
324  * Start the server thread if it isn't already running
325  */
326 static int start_server(jobq_t *jq)
327 {
328    int stat = 0;
329    pthread_t id;
330
331    /*
332     * if any threads are idle, wake one.
333     *   Actually we do a broadcast because on /lib/tls
334     *   these signals seem to get lost from time to time.
335     */
336    if (jq->idle_workers > 0) {
337       Dmsg0(2300, "Signal worker to wake up\n");
338       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
339          berrno be;
340          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
341          return stat;
342       }
343    } else if (jq->num_workers < jq->max_workers) {
344       Dmsg0(2300, "Create worker thread\n");
345       /* No idle threads so create a new one */
346       set_thread_concurrency(jq->max_workers + 1);
347       jq->num_workers++;
348       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
349          berrno be;
350          jq->num_workers--;
351          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
352          return stat;
353       }
354    }
355    return stat;
356 }
357
358
359 /*
360  * This is the worker thread that serves the job queue.
361  * When all the resources are acquired for the job,
362  *  it will call the user's engine.
363  */
364 extern "C"
365 void *jobq_server(void *arg)
366 {
367    struct timespec timeout;
368    jobq_t *jq = (jobq_t *)arg;
369    jobq_item_t *je;                   /* job entry in queue */
370    int stat;
371    bool timedout = false;
372    bool work = true;
373
374    set_jcr_in_tsd(INVALID_JCR);
375    Dmsg0(2300, "Start jobq_server\n");
376    P(jq->mutex);
377
378    for (;;) {
379       struct timeval tv;
380       struct timezone tz;
381
382       Dmsg0(2300, "Top of for loop\n");
383       if (!work && !jq->quit) {
384          gettimeofday(&tv, &tz);
385          timeout.tv_nsec = 0;
386          timeout.tv_sec = tv.tv_sec + 4;
387
388          while (!jq->quit) {
389             /*
390              * Wait 4 seconds, then if no more work, exit
391              */
392             Dmsg0(2300, "pthread_cond_timedwait()\n");
393             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
394             if (stat == ETIMEDOUT) {
395                Dmsg0(2300, "timedwait timedout.\n");
396                timedout = true;
397                break;
398             } else if (stat != 0) {
399                /* This shouldn't happen */
400                Dmsg0(2300, "This shouldn't happen\n");
401                jq->num_workers--;
402                V(jq->mutex);
403                return NULL;
404             }
405             break;
406          }
407       }
408       /*
409        * If anything is in the ready queue, run it
410        */
411       Dmsg0(2300, "Checking ready queue.\n");
412       while (!jq->ready_jobs->empty() && !jq->quit) {
413          JCR *jcr;
414          je = (jobq_item_t *)jq->ready_jobs->first();
415          jcr = je->jcr;
416          jq->ready_jobs->remove(je);
417          if (!jq->ready_jobs->empty()) {
418             Dmsg0(2300, "ready queue not empty start server\n");
419             if (start_server(jq) != 0) {
420                jq->num_workers--;
421                V(jq->mutex);
422                return NULL;
423             }
424          }
425          jq->running_jobs->append(je);
426
427          /* Attach jcr to this thread while we run the job */
428          jcr->my_thread_id = pthread_self();
429          jcr->set_killable(true);
430          set_jcr_in_tsd(jcr);
431          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
432
433          /* Release job queue lock */
434          V(jq->mutex);
435
436          /* Call user's routine here */
437          Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
438             jcr->use_count(), jcr->JobStatus);
439          jq->engine(je->jcr);
440
441          /* Job finished detach from thread */
442          remove_jcr_from_tsd(je->jcr);
443          je->jcr->set_killable(false);
444
445          /* Clear the threadid, probably not necessary */
446          memset(&jcr->my_thread_id, 0, sizeof(jcr->my_thread_id));
447
448          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
449             jcr->use_count());
450
451          /* Reacquire job queue lock */
452          P(jq->mutex);
453          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
454          jq->running_jobs->remove(je);
455          /*
456           * Release locks if acquired. Note, they will not have
457           *  been acquired for jobs canceled before they were
458           *  put into the ready queue.
459           */
460          if (jcr->acquired_resource_locks) {
461             dec_read_store(jcr);
462             dec_write_store(jcr);
463             if (jcr->client) {
464                jcr->client->NumConcurrentJobs--;
465             }
466             jcr->job->NumConcurrentJobs--;
467             jcr->acquired_resource_locks = false;
468          }
469
470          if (reschedule_job(jcr, jq, je)) {
471             continue;              /* go look for more work */
472          }
473
474          /* Clean up and release old jcr */
475          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
476          jcr->SDJobStatus = 0;
477          V(jq->mutex);                /* release internal lock */
478          free_jcr(jcr);
479          free(je);                    /* release job entry */
480          P(jq->mutex);                /* reacquire job queue lock */
481       }
482       /*
483        * If any job in the wait queue can be run,
484        *  move it to the ready queue
485        */
486       Dmsg0(2300, "Done check ready, now check wait queue.\n");
487       if (!jq->waiting_jobs->empty() && !jq->quit) {
488          int Priority;
489          bool running_allow_mix = false;
490          je = (jobq_item_t *)jq->waiting_jobs->first();
491          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
492          if (re) {
493             Priority = re->jcr->JobPriority;
494             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
495                   re->jcr->JobId, Priority);
496             running_allow_mix = true;
497             for ( ; re; ) {
498                Dmsg2(2300, "JobId %d is also running with %s\n",
499                      re->jcr->JobId,
500                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
501                if (!re->jcr->job->allow_mixed_priority) {
502                   running_allow_mix = false;
503                   break;
504                }
505                re = (jobq_item_t *)jq->running_jobs->next(re);
506             }
507             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
508                   running_allow_mix ? "allow" : "don't allow");
509          } else {
510             Priority = je->jcr->JobPriority;
511             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
512          }
513          /*
514           * Walk down the list of waiting jobs and attempt
515           *   to acquire the resources it needs.
516           */
517          for ( ; je;  ) {
518             /* je is current job item on the queue, jn is the next one */
519             JCR *jcr = je->jcr;
520             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
521
522             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
523                   jcr->JobId, jcr->JobPriority, Priority,
524                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
525
526             /* Take only jobs of correct Priority */
527             if (!(jcr->JobPriority == Priority
528                   || (jcr->JobPriority < Priority &&
529                       jcr->job->allow_mixed_priority && running_allow_mix))) {
530                jcr->setJobStatus(JS_WaitPriority);
531                break;
532             }
533
534             if (!acquire_resources(jcr)) {
535                /* If resource conflict, job is canceled */
536                if (!job_canceled(jcr)) {
537                   je = jn;            /* point to next waiting job */
538                   continue;
539                }
540             }
541
542             /*
543              * Got all locks, now remove it from wait queue and append it
544              *   to the ready queue.  Note, we may also get here if the
545              *    job was canceled.  Once it is "run", it will quickly
546              *    terminate.
547              */
548             jq->waiting_jobs->remove(je);
549             jq->ready_jobs->append(je);
550             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
551             je = jn;                  /* Point to next waiting job */
552          } /* end for loop */
553
554       } /* end if */
555
556       Dmsg0(2300, "Done checking wait queue.\n");
557       /*
558        * If no more ready work and we are asked to quit, then do it
559        */
560       if (jq->ready_jobs->empty() && jq->quit) {
561          jq->num_workers--;
562          if (jq->num_workers == 0) {
563             Dmsg0(2300, "Wake up destroy routine\n");
564             /* Wake up destroy routine if he is waiting */
565             pthread_cond_broadcast(&jq->work);
566          }
567          break;
568       }
569       Dmsg0(2300, "Check for work request\n");
570       /*
571        * If no more work requests, and we waited long enough, quit
572        */
573       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
574          jq->ready_jobs->empty());
575       if (jq->ready_jobs->empty() && timedout) {
576          Dmsg0(2300, "break big loop\n");
577          jq->num_workers--;
578          break;
579       }
580
581       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
582       if (work) {
583          /*
584           * If a job is waiting on a Resource, don't consume all
585           *   the CPU time looping looking for work, and even more
586           *   important, release the lock so that a job that has
587           *   terminated can give us the resource.
588           */
589          V(jq->mutex);
590          bmicrosleep(2, 0);              /* pause for 2 seconds */
591          P(jq->mutex);
592          /* Recompute work as something may have changed in last 2 secs */
593          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
594       }
595       Dmsg1(2300, "Loop again. work=%d\n", work);
596    } /* end of big for loop */
597
598    Dmsg0(200, "unlock mutex\n");
599    V(jq->mutex);
600    Dmsg0(2300, "End jobq_server\n");
601    return NULL;
602 }
603
604 /*
605  * Returns true if cleanup done and we should look for more work
606  */
607 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
608 {
609    bool resched = false;
610    /*
611     * Reschedule the job if requested and possible
612     */
613    /* Basic condition is that more reschedule times remain */
614    if (jcr->job->RescheduleTimes == 0 ||
615        jcr->reschedule_count < jcr->job->RescheduleTimes) {
616       resched =
617          /* Check for failed jobs */
618          (jcr->job->RescheduleOnError &&
619           !jcr->is_JobStatus(JS_Terminated) &&
620           !jcr->is_JobStatus(JS_Canceled) &&
621           jcr->is_JobType(JT_BACKUP));
622    }
623    if (resched) {
624        char dt[50], dt2[50];
625
626        /*
627         * Reschedule this job by cleaning it up, but
628         *  reuse the same JobId if possible.
629         */
630       time_t now = time(NULL);
631       jcr->reschedule_count++;
632       jcr->sched_time = now + jcr->job->RescheduleInterval;
633       bstrftime(dt, sizeof(dt), now);
634       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
635       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
636             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
637       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
638            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
639       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
640       jcr->JobStatus = -1;
641       jcr->setJobStatus(JS_WaitStartTime);
642       jcr->SDJobStatus = 0;
643       jcr->JobErrors = 0;
644       if (!allow_duplicate_job(jcr)) {
645          return false;
646       }
647       /* Only jobs with no output jobs can run on same JCR */
648       if (jcr->JobBytes == 0) {
649          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
650          V(jq->mutex);
651          /*
652           * Special test here since a Virtual Full gets marked
653           *  as a Full, so we look at the resource record
654           */
655          if (jcr->wasVirtualFull) {
656             jcr->setJobLevel(L_VIRTUAL_FULL);
657          }
658          /* 
659           * When we are using the same jcr then make sure to reset
660           *   RealEndTime back to zero.  
661           */
662          jcr->jr.RealEndTime = 0;
663          jobq_add(jq, jcr);     /* queue the job to run again */
664          P(jq->mutex);
665          free_jcr(jcr);         /* release jcr */
666          free(je);              /* free the job entry */
667          return true;           /* we already cleaned up */
668       }
669       /*
670        * Something was actually backed up, so we cannot reuse
671        *   the old JobId or there will be database record
672        *   conflicts.  We now create a new job, copying the
673        *   appropriate fields.
674        */
675       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
676       set_jcr_defaults(njcr, jcr->job);
677       njcr->reschedule_count = jcr->reschedule_count;
678       njcr->sched_time = jcr->sched_time;
679       njcr->initial_sched_time = jcr->initial_sched_time;
680       /*
681        * Special test here since a Virtual Full gets marked
682        *  as a Full, so we look at the resource record
683        */
684       if (jcr->wasVirtualFull) {
685          njcr->setJobLevel(L_VIRTUAL_FULL);
686       } else {
687          njcr->setJobLevel(jcr->getJobLevel());
688       }
689       njcr->pool = jcr->pool;
690       njcr->run_pool_override = jcr->run_pool_override;
691       njcr->next_pool = jcr->next_pool;
692       njcr->run_next_pool_override = jcr->run_next_pool_override;
693       njcr->full_pool = jcr->full_pool;
694       njcr->run_full_pool_override = jcr->run_full_pool_override;
695       njcr->inc_pool = jcr->inc_pool;
696       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
697       njcr->diff_pool = jcr->diff_pool;
698       njcr->JobStatus = -1;
699       njcr->setJobStatus(jcr->JobStatus);
700       if (jcr->rstore) {
701          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
702       } else {
703          free_rstorage(njcr);
704       }
705       if (jcr->wstore) {
706          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
707       } else {
708          free_wstorage(njcr);
709       }
710       njcr->messages = jcr->messages;
711       njcr->spool_data = jcr->spool_data;
712       njcr->write_part_after_job = jcr->write_part_after_job;
713       Dmsg0(2300, "Call to run new job\n");
714       V(jq->mutex);
715       run_job(njcr);            /* This creates a "new" job */
716       free_jcr(njcr);           /* release "new" jcr */
717       P(jq->mutex);
718       Dmsg0(2300, "Back from running new job.\n");
719    }
720    return false;
721 }
722
723 /*
724  * See if we can acquire all the necessary resources for the job (JCR)
725  *
726  *  Returns: true  if successful
727  *           false if resource failure
728  */
729 static bool acquire_resources(JCR *jcr)
730 {
731    bool skip_this_jcr = false;
732
733    jcr->acquired_resource_locks = false;
734 /*
735  * Turning this code off is likely to cause some deadlocks,
736  *   but we do not really have enough information here to
737  *   know if this is really a deadlock (it may be a dual drive
738  *   autochanger), and in principle, the SD reservation system
739  *   should detect these deadlocks, so push the work off on it.
740  */
741 #ifdef xxx
742    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
743       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
744          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
745          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
746       jcr->setJobStatus(JS_Canceled);
747       return false;
748    }
749 #endif
750    if (jcr->rstore) {
751       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
752       if (!inc_read_store(jcr)) {
753          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
754          jcr->setJobStatus(JS_WaitStoreRes);
755          return false;
756       }
757    }
758
759    if (jcr->wstore) {
760       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
761       if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
762          jcr->wstore->NumConcurrentJobs++;
763          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
764       } else if (jcr->rstore) {
765          dec_read_store(jcr);
766          skip_this_jcr = true;
767       } else {
768          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
769          skip_this_jcr = true;
770       }
771    }
772    if (skip_this_jcr) {
773       jcr->setJobStatus(JS_WaitStoreRes);
774       return false;
775    }
776
777    if (jcr->client) {
778       if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
779          jcr->client->NumConcurrentJobs++;
780       } else {
781          /* Back out previous locks */
782          dec_write_store(jcr);
783          dec_read_store(jcr);
784          jcr->setJobStatus(JS_WaitClientRes);
785          return false;
786       }
787    }
788    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
789       jcr->job->NumConcurrentJobs++;
790    } else {
791       /* Back out previous locks */
792       dec_write_store(jcr);
793       dec_read_store(jcr);
794       if (jcr->client) {
795          jcr->client->NumConcurrentJobs--;
796       }
797       jcr->setJobStatus(JS_WaitJobRes);
798       return false;
799    }
800
801    jcr->acquired_resource_locks = true;
802    return true;
803 }
804
805 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
806
807 /*
808  * Note: inc_read_store() and dec_read_store() are
809  *   called from select_rstore() in src/dird/restore.c
810  */
811 bool inc_read_store(JCR *jcr)
812 {
813    P(rstore_mutex);
814    if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs &&
815        (jcr->getJobType() == JT_RESTORE ||
816         jcr->rstore->MaxConcurrentReadJobs == 0 ||
817         jcr->rstore->NumConcurrentReadJobs < jcr->rstore->MaxConcurrentReadJobs)) {
818       jcr->rstore->NumConcurrentReadJobs++;
819       jcr->rstore->NumConcurrentJobs++;
820       Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
821       V(rstore_mutex);
822       return true;
823    }
824    V(rstore_mutex);
825    return false;
826 }
827
828 void dec_read_store(JCR *jcr)
829 {
830    if (jcr->rstore) {
831       P(rstore_mutex);
832       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
833       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
834       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
835       V(rstore_mutex);
836       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
837       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
838    }
839 }
840
841 static void dec_write_store(JCR *jcr)
842 {
843    if (jcr->wstore) {
844       jcr->wstore->NumConcurrentJobs--;
845       Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
846       ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
847    }
848 }