]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
This commit was manufactured by cvs2svn to create tag
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2  * Bacula job queue routines.
3  *
4  *  This code consists of three queues, the waiting_jobs
5  *  queue, where jobs are initially queued, the ready_jobs
6  *  queue, where jobs are placed when all the resources are
7  *  allocated and they can immediately be run, and the
8  *  running queue where jobs are placed when they are
9  *  running.
10  *
11  *  Kern Sibbald, July MMIII
12  *
13  *   Version $Id$
14  *
15  *  This code was adapted from the Bacula workq, which was
16  *    adapted from "Programming with POSIX Threads", by
17  *    David R. Butenhof
18  *
19  */
20 /*
21    Copyright (C) 2003-2006 Kern Sibbald
22
23    This program is free software; you can redistribute it and/or
24    modify it under the terms of the GNU General Public License
25    version 2 as amended with additional clauses defined in the
26    file LICENSE in the main source directory.
27
28    This program is distributed in the hope that it will be useful,
29    but WITHOUT ANY WARRANTY; without even the implied warranty of
30    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
31    the file LICENSE for additional details.
32
33  */
34
35 #include "bacula.h"
36 #include "dird.h"
37
38 extern JCR *jobs;
39
40 /* Forward referenced functions */
41 extern "C" void *jobq_server(void *arg);
42 extern "C" void *sched_wait(void *arg);
43
44 static int  start_server(jobq_t *jq);
45 static bool acquire_resources(JCR *jcr);
46
47
48
49 /*
50  * Initialize a job queue
51  *
52  *  Returns: 0 on success
53  *           errno on failure
54  */
55 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
56 {
57    int stat;
58    jobq_item_t *item = NULL;
59
60    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
61       berrno be;
62       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.strerror(stat));
63       return stat;
64    }
65    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
66       pthread_attr_destroy(&jq->attr);
67       return stat;
68    }
69    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
70       berrno be;
71       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.strerror(stat));
72       pthread_attr_destroy(&jq->attr);
73       return stat;
74    }
75    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
76       berrno be;
77       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.strerror(stat));
78       pthread_mutex_destroy(&jq->mutex);
79       pthread_attr_destroy(&jq->attr);
80       return stat;
81    }
82    jq->quit = false;
83    jq->max_workers = threads;         /* max threads to create */
84    jq->num_workers = 0;               /* no threads yet */
85    jq->idle_workers = 0;              /* no idle threads */
86    jq->engine = engine;               /* routine to run */
87    jq->valid = JOBQ_VALID;
88    /* Initialize the job queues */
89    jq->waiting_jobs = New(dlist(item, &item->link));
90    jq->running_jobs = New(dlist(item, &item->link));
91    jq->ready_jobs = New(dlist(item, &item->link));
92    return 0;
93 }
94
95 /*
96  * Destroy the job queue
97  *
98  * Returns: 0 on success
99  *          errno on failure
100  */
101 int jobq_destroy(jobq_t *jq)
102 {
103    int stat, stat1, stat2;
104
105    if (jq->valid != JOBQ_VALID) {
106       return EINVAL;
107    }
108    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
109       berrno be;
110       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
111       return stat;
112    }
113    jq->valid = 0;                      /* prevent any more operations */
114
115    /* 
116     * If any threads are active, wake them 
117     */
118    if (jq->num_workers > 0) {
119       jq->quit = true;
120       if (jq->idle_workers) {
121          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
122             berrno be;
123             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.strerror(stat));
124             pthread_mutex_unlock(&jq->mutex);
125             return stat;
126          }
127       }
128       while (jq->num_workers > 0) {
129          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
130             berrno be;
131             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.strerror(stat));
132             pthread_mutex_unlock(&jq->mutex);
133             return stat;
134          }
135       }
136    }
137    if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
138       berrno be;
139       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.strerror(stat));
140       return stat;
141    }
142    stat  = pthread_mutex_destroy(&jq->mutex);
143    stat1 = pthread_cond_destroy(&jq->work);
144    stat2 = pthread_attr_destroy(&jq->attr);
145    delete jq->waiting_jobs;
146    delete jq->running_jobs;
147    delete jq->ready_jobs;
148    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
149 }
150
151 struct wait_pkt {
152    JCR *jcr;
153    jobq_t *jq;
154 };
155
156 /*
157  * Wait until schedule time arrives before starting. Normally
158  *  this routine is only used for jobs started from the console
159  *  for which the user explicitly specified a start time. Otherwise
160  *  most jobs are put into the job queue only when their
161  *  scheduled time arives.
162  */
163 extern "C"
164 void *sched_wait(void *arg)
165 {
166    JCR *jcr = ((wait_pkt *)arg)->jcr;
167    jobq_t *jq = ((wait_pkt *)arg)->jq;
168
169    Dmsg0(2300, "Enter sched_wait.\n");
170    free(arg);
171    time_t wtime = jcr->sched_time - time(NULL);
172    set_jcr_job_status(jcr, JS_WaitStartTime);
173    /* Wait until scheduled time arrives */
174    if (wtime > 0) {
175       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
176          jcr->Job, wtime);
177    }
178    /* Check every 30 seconds if canceled */
179    while (wtime > 0) {
180       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", 
181          jcr->JobId, wtime, jcr->use_count());
182       if (wtime > 30) {
183          wtime = 30;
184       }
185       bmicrosleep(wtime, 0);
186       if (job_canceled(jcr)) {
187          break;
188       }
189       wtime = jcr->sched_time - time(NULL);
190    }
191    Dmsg1(200, "resched use=%d\n", jcr->use_count());
192    jobq_add(jq, jcr);
193    free_jcr(jcr);                     /* we are done with jcr */
194    Dmsg0(2300, "Exit sched_wait\n");
195    return NULL;
196 }
197
198 /*
199  *  Add a job to the queue
200  *    jq is a queue that was created with jobq_init
201  */
202 int jobq_add(jobq_t *jq, JCR *jcr)
203 {
204    int stat;
205    jobq_item_t *item, *li;
206    bool inserted = false;
207    time_t wtime = jcr->sched_time - time(NULL);
208    pthread_t id;
209    wait_pkt *sched_pkt;
210
211    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
212    if (jq->valid != JOBQ_VALID) {
213       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
214       return EINVAL;
215    }
216
217    jcr->inc_use_count();                 /* mark jcr in use by us */
218    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
219    if (!job_canceled(jcr) && wtime > 0) {
220       set_thread_concurrency(jq->max_workers + 2);
221       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
222       sched_pkt->jcr = jcr;
223       sched_pkt->jq = jq;
224       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
225       if (stat != 0) {                /* thread not created */
226          berrno be;
227          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.strerror(stat));
228       }
229       return stat;
230    }
231
232    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
233       berrno be;
234       Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
235       free_jcr(jcr);                    /* release jcr */
236       return stat;
237    }
238
239    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
240       free_jcr(jcr);                    /* release jcr */
241       return ENOMEM;
242    }
243    item->jcr = jcr;
244
245    if (job_canceled(jcr)) {
246       /* Add job to ready queue so that it is canceled quickly */
247       jq->ready_jobs->prepend(item);
248       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
249    } else {
250       /* Add this job to the wait queue in priority sorted order */
251       foreach_dlist(li, jq->waiting_jobs) {
252          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
253             li->jcr->JobId, li->jcr->JobPriority);
254          if (li->jcr->JobPriority > jcr->JobPriority) {
255             jq->waiting_jobs->insert_before(item, li);
256             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
257                li->jcr->JobId, jcr->JobId);
258             inserted = true;
259             break;
260          }
261       }
262       /* If not jobs in wait queue, append it */
263       if (!inserted) {
264          jq->waiting_jobs->append(item);
265          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
266       }
267    }
268
269    /* Ensure that at least one server looks at the queue. */
270    stat = start_server(jq);
271
272    pthread_mutex_unlock(&jq->mutex);
273    Dmsg0(2300, "Return jobq_add\n");
274    return stat;
275 }
276
277 /*
278  *  Remove a job from the job queue. Used only by cancel_job().
279  *    jq is a queue that was created with jobq_init
280  *    work_item is an element of work
281  *
282  *   Note, it is "removed" from the job queue.
283  *    If you want to cancel it, you need to provide some external means
284  *    of doing so (e.g. pthread_kill()).
285  */
286 int jobq_remove(jobq_t *jq, JCR *jcr)
287 {
288    int stat;
289    bool found = false;
290    jobq_item_t *item;
291
292    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
293    if (jq->valid != JOBQ_VALID) {
294       return EINVAL;
295    }
296
297    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
298       berrno be;
299       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
300       return stat;
301    }
302
303    foreach_dlist(item, jq->waiting_jobs) {
304       if (jcr == item->jcr) {
305          found = true;
306          break;
307       }
308    }
309    if (!found) {
310       pthread_mutex_unlock(&jq->mutex);
311       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
312       return EINVAL;
313    }
314
315    /* Move item to be the first on the list */
316    jq->waiting_jobs->remove(item);
317    jq->ready_jobs->prepend(item);
318    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
319
320    stat = start_server(jq);
321
322    pthread_mutex_unlock(&jq->mutex);
323    Dmsg0(2300, "Return jobq_remove\n");
324    return stat;
325 }
326
327
328 /*
329  * Start the server thread if it isn't already running
330  */
331 static int start_server(jobq_t *jq)
332 {
333    int stat = 0;
334    pthread_t id;
335
336    /*
337     * if any threads are idle, wake one --                
338     *   actually we do a broadcast because on /lib/tls 
339     *   these signals seem to get lost from time to time.
340     */
341    if (jq->idle_workers > 0) {
342       Dmsg0(2300, "Signal worker to wake up\n");
343       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
344          berrno be;
345          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.strerror(stat));
346          return stat;
347       }
348    } else if (jq->num_workers < jq->max_workers) {
349       Dmsg0(2300, "Create worker thread\n");
350       /* No idle threads so create a new one */
351       set_thread_concurrency(jq->max_workers + 1);
352       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
353          berrno be;
354          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.strerror(stat));
355          return stat;
356       }
357    }
358    return stat;
359 }
360
361
362 /*
363  * This is the worker thread that serves the job queue.
364  * When all the resources are acquired for the job,
365  *  it will call the user's engine.
366  */
367 extern "C"
368 void *jobq_server(void *arg)
369 {
370    struct timespec timeout;
371    jobq_t *jq = (jobq_t *)arg;
372    jobq_item_t *je;                   /* job entry in queue */
373    int stat;
374    bool timedout = false;
375    bool work = true;
376
377    Dmsg0(2300, "Start jobq_server\n");
378    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
379       berrno be;
380       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
381       return NULL;
382    }
383    jq->num_workers++;
384
385    for (;;) {
386       struct timeval tv;
387       struct timezone tz;
388
389       Dmsg0(2300, "Top of for loop\n");
390       if (!work && !jq->quit) {
391          gettimeofday(&tv, &tz);
392          timeout.tv_nsec = 0;
393          timeout.tv_sec = tv.tv_sec + 4;
394
395          while (!jq->quit) {
396             /*
397              * Wait 4 seconds, then if no more work, exit
398              */
399             Dmsg0(2300, "pthread_cond_timedwait()\n");
400             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
401             if (stat == ETIMEDOUT) {
402                Dmsg0(2300, "timedwait timedout.\n");
403                timedout = true;
404                break;
405             } else if (stat != 0) {
406                /* This shouldn't happen */
407                Dmsg0(2300, "This shouldn't happen\n");
408                jq->num_workers--;
409                pthread_mutex_unlock(&jq->mutex);
410                return NULL;
411             }
412             break;
413          }
414       }
415       /*
416        * If anything is in the ready queue, run it
417        */
418       Dmsg0(2300, "Checking ready queue.\n");
419       while (!jq->ready_jobs->empty() && !jq->quit) {
420          JCR *jcr;
421          je = (jobq_item_t *)jq->ready_jobs->first();
422          jcr = je->jcr;
423          jq->ready_jobs->remove(je);
424          if (!jq->ready_jobs->empty()) {
425             Dmsg0(2300, "ready queue not empty start server\n");
426             if (start_server(jq) != 0) {
427                jq->num_workers--;
428                pthread_mutex_unlock(&jq->mutex);
429                return NULL;
430             }
431          }
432          jq->running_jobs->append(je);
433          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
434
435          /* Release job queue lock */
436          V(jq->mutex);
437
438          /* Call user's routine here */
439          Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
440             jcr->use_count());
441          jq->engine(je->jcr);
442
443          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
444             jcr->use_count());
445
446          /* Reacquire job queue lock */
447          P(jq->mutex);
448          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
449          jq->running_jobs->remove(je);
450          /*
451           * Release locks if acquired. Note, they will not have
452           *  been acquired for jobs canceled before they were
453           *  put into the ready queue.
454           */
455          if (jcr->acquired_resource_locks) {
456             jcr->store->NumConcurrentJobs--;
457             jcr->client->NumConcurrentJobs--;
458             jcr->job->NumConcurrentJobs--;
459          }
460
461          /*
462           * Reschedule the job if necessary and requested
463           */
464          if (jcr->job->RescheduleOnError &&
465              jcr->JobStatus != JS_Terminated &&
466              jcr->JobStatus != JS_Canceled &&
467              jcr->job->RescheduleTimes > 0 &&
468              jcr->JobType == JT_BACKUP &&
469              jcr->reschedule_count < jcr->job->RescheduleTimes) {
470              char dt[50];
471
472              /*
473               * Reschedule this job by cleaning it up, but
474               *  reuse the same JobId if possible.
475               */
476             jcr->reschedule_count++;
477             jcr->sched_time = time(NULL) + jcr->job->RescheduleInterval;
478             Dmsg2(2300, "Rescheduled Job %s to re-run in %d seconds.\n", jcr->Job,
479                (int)jcr->job->RescheduleInterval);
480             bstrftime(dt, sizeof(dt), time(NULL));
481             Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds.\n"),
482                jcr->Job, dt, (int)jcr->job->RescheduleInterval);
483             dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
484             jcr->JobStatus = JS_WaitStartTime;
485             jcr->SDJobStatus = 0;
486             if (jcr->JobBytes == 0) {
487                Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
488                jcr->JobStatus = JS_WaitStartTime;
489                V(jq->mutex);
490                jobq_add(jq, jcr);     /* queue the job to run again */
491                P(jq->mutex);
492                free_jcr(jcr);         /* release jcr */
493                free(je);              /* free the job entry */
494                continue;              /* look for another job to run */
495             }
496             /*
497              * Something was actually backed up, so we cannot reuse
498              *   the old JobId or there will be database record
499              *   conflicts.  We now create a new job, copying the
500              *   appropriate fields.
501              */
502             JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
503             set_jcr_defaults(njcr, jcr->job);
504             njcr->reschedule_count = jcr->reschedule_count;
505             njcr->JobLevel = jcr->JobLevel;
506             njcr->JobStatus = jcr->JobStatus;
507             copy_storage(njcr, jcr);
508             njcr->messages = jcr->messages;
509             Dmsg0(2300, "Call to run new job\n");
510             V(jq->mutex);
511             run_job(njcr);            /* This creates a "new" job */
512             free_jcr(njcr);           /* release "new" jcr */
513             P(jq->mutex);
514             Dmsg0(2300, "Back from running new job.\n");
515          }
516          /* Clean up and release old jcr */
517          if (jcr->db) {
518             db_close_database(jcr, jcr->db);
519             jcr->db = NULL;
520          }
521          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
522          jcr->SDJobStatus = 0;
523          V(jq->mutex);                /* release internal lock */
524          free_jcr(jcr);
525          free(je);                    /* release job entry */
526          P(jq->mutex);                /* reacquire job queue lock */
527       }
528       /*
529        * If any job in the wait queue can be run,
530        *  move it to the ready queue
531        */
532       Dmsg0(2300, "Done check ready, now check wait queue.\n");
533       if (!jq->waiting_jobs->empty() && !jq->quit) {
534          int Priority;
535          je = (jobq_item_t *)jq->waiting_jobs->first();
536          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
537          if (re) {
538             Priority = re->jcr->JobPriority;
539             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId, Priority);
540          } else {
541             Priority = je->jcr->JobPriority;
542             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
543          }
544          /*
545           * Walk down the list of waiting jobs and attempt
546           *   to acquire the resources it needs.
547           */
548          for ( ; je;  ) {
549             /* je is current job item on the queue, jn is the next one */
550             JCR *jcr = je->jcr;
551             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
552
553             Dmsg3(2300, "Examining Job=%d JobPri=%d want Pri=%d\n",
554                jcr->JobId, jcr->JobPriority, Priority);
555
556             /* Take only jobs of correct Priority */
557             if (jcr->JobPriority != Priority) {
558                set_jcr_job_status(jcr, JS_WaitPriority);
559                break;
560             }
561
562             if (!acquire_resources(jcr)) {
563                je = jn;            /* point to next waiting job */
564                continue;
565             }
566
567             /* Got all locks, now remove it from wait queue and append it
568              *   to the ready queue
569              */
570             jcr->acquired_resource_locks = true;
571             jq->waiting_jobs->remove(je);
572             jq->ready_jobs->append(je);
573             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
574             je = jn;                  /* Point to next waiting job */
575          } /* end for loop */
576
577       } /* end if */
578
579       Dmsg0(2300, "Done checking wait queue.\n");
580       /*
581        * If no more ready work and we are asked to quit, then do it
582        */
583       if (jq->ready_jobs->empty() && jq->quit) {
584          jq->num_workers--;
585          if (jq->num_workers == 0) {
586             Dmsg0(2300, "Wake up destroy routine\n");
587             /* Wake up destroy routine if he is waiting */
588             pthread_cond_broadcast(&jq->work);
589          }
590          break;
591       }
592       Dmsg0(2300, "Check for work request\n");
593       /*
594        * If no more work requests, and we waited long enough, quit
595        */
596       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
597          jq->ready_jobs->empty());
598       if (jq->ready_jobs->empty() && timedout) {
599          Dmsg0(2300, "break big loop\n");
600          jq->num_workers--;
601          break;
602       }
603
604       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
605       if (work) {
606          /*
607           * If a job is waiting on a Resource, don't consume all
608           *   the CPU time looping looking for work, and even more
609           *   important, release the lock so that a job that has
610           *   terminated can give us the resource.
611           */
612          V(jq->mutex);
613          bmicrosleep(2, 0);              /* pause for 2 seconds */
614          P(jq->mutex);
615          /* Recompute work as something may have changed in last 2 secs */
616          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
617       }
618       Dmsg1(2300, "Loop again. work=%d\n", work);
619    } /* end of big for loop */
620
621    Dmsg0(200, "unlock mutex\n");
622    V(jq->mutex);
623    Dmsg0(2300, "End jobq_server\n");
624    return NULL;
625 }
626
627 /*
628  * See if we can acquire all the necessary resources for the job (JCR)
629  *
630  *  Returns: true  if successful
631  *           false if resource failure
632  */
633 static bool acquire_resources(JCR *jcr)
634 {
635    bool skip_this_jcr = false;
636
637    if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
638       /* 
639        * Let only one Restore/verify job run at a time regardless
640        *   of MaxConcurrentJobs.
641        */ 
642       if (jcr->store->NumConcurrentJobs == 0) {
643          jcr->store->NumConcurrentJobs = 1;
644       } else {
645          set_jcr_job_status(jcr, JS_WaitStoreRes);
646          return false;
647       }
648    /* We are not doing a Restore or Verify */
649    } else if (jcr->store->NumConcurrentJobs == 0 &&
650               jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
651        /* Simple case, first job */
652        jcr->store->NumConcurrentJobs = 1;
653    } else if (jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
654        jcr->store->NumConcurrentJobs++;
655    } else {
656       skip_this_jcr = true;
657    }
658    if (skip_this_jcr) {
659       set_jcr_job_status(jcr, JS_WaitStoreRes);
660       return false;
661    }
662
663    if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
664       jcr->client->NumConcurrentJobs++;
665    } else {
666       /* Back out previous locks */
667       jcr->store->NumConcurrentJobs--;
668       set_jcr_job_status(jcr, JS_WaitClientRes);
669       return false;
670    }
671    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
672       jcr->job->NumConcurrentJobs++;
673    } else {
674       /* Back out previous locks */
675       jcr->store->NumConcurrentJobs--;
676       jcr->client->NumConcurrentJobs--;
677       set_jcr_job_status(jcr, JS_WaitJobRes);
678       return false;
679    }
680    /* Check actual device availability */
681    /* ***FIXME****/
682    return true;
683 }