]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
This commit was manufactured by cvs2svn to create tag
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2  * Bacula job queue routines.
3  *
4  *  This code consists of three queues, the waiting_jobs
5  *  queue, where jobs are initially queued, the ready_jobs
6  *  queue, where jobs are placed when all the resources are
7  *  allocated and they can immediately be run, and the
8  *  running queue where jobs are placed when they are
9  *  running.
10  *
11  *  Kern Sibbald, July MMIII
12  *
13  *   Version $Id$
14  *
15  *  This code was adapted from the Bacula workq, which was
16  *    adapted from "Programming with POSIX Threads", by
17  *    David R. Butenhof
18  *
19  */
20 /*
21    Copyright (C) 2003-2006 Kern Sibbald
22
23    This program is free software; you can redistribute it and/or
24    modify it under the terms of the GNU General Public License
25    version 2 as amended with additional clauses defined in the
26    file LICENSE in the main source directory.
27
28    This program is distributed in the hope that it will be useful,
29    but WITHOUT ANY WARRANTY; without even the implied warranty of
30    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
31    the file LICENSE for additional details.
32
33  */
34
35 #include "bacula.h"
36 #include "dird.h"
37
38 extern JCR *jobs;
39
40 /* Forward referenced functions */
41 extern "C" void *jobq_server(void *arg);
42 extern "C" void *sched_wait(void *arg);
43
44 static int  start_server(jobq_t *jq);
45 static bool acquire_resources(JCR *jcr);
46
47
48
49 /*
50  * Initialize a job queue
51  *
52  *  Returns: 0 on success
53  *           errno on failure
54  */
55 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
56 {
57    int stat;
58    jobq_item_t *item = NULL;
59
60    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
61       berrno be;
62       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.strerror(stat));
63       return stat;
64    }
65    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
66       pthread_attr_destroy(&jq->attr);
67       return stat;
68    }
69    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
70       berrno be;
71       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.strerror(stat));
72       pthread_attr_destroy(&jq->attr);
73       return stat;
74    }
75    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
76       berrno be;
77       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.strerror(stat));
78       pthread_mutex_destroy(&jq->mutex);
79       pthread_attr_destroy(&jq->attr);
80       return stat;
81    }
82    jq->quit = false;
83    jq->max_workers = threads;         /* max threads to create */
84    jq->num_workers = 0;               /* no threads yet */
85    jq->idle_workers = 0;              /* no idle threads */
86    jq->engine = engine;               /* routine to run */
87    jq->valid = JOBQ_VALID;
88    /* Initialize the job queues */
89    jq->waiting_jobs = New(dlist(item, &item->link));
90    jq->running_jobs = New(dlist(item, &item->link));
91    jq->ready_jobs = New(dlist(item, &item->link));
92    return 0;
93 }
94
95 /*
96  * Destroy the job queue
97  *
98  * Returns: 0 on success
99  *          errno on failure
100  */
101 int jobq_destroy(jobq_t *jq)
102 {
103    int stat, stat1, stat2;
104
105    if (jq->valid != JOBQ_VALID) {
106       return EINVAL;
107    }
108    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
109       berrno be;
110       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
111       return stat;
112    }
113    jq->valid = 0;                      /* prevent any more operations */
114
115    /* 
116     * If any threads are active, wake them 
117     */
118    if (jq->num_workers > 0) {
119       jq->quit = true;
120       if (jq->idle_workers) {
121          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
122             berrno be;
123             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.strerror(stat));
124             pthread_mutex_unlock(&jq->mutex);
125             return stat;
126          }
127       }
128       while (jq->num_workers > 0) {
129          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
130             berrno be;
131             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.strerror(stat));
132             pthread_mutex_unlock(&jq->mutex);
133             return stat;
134          }
135       }
136    }
137    if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
138       berrno be;
139       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.strerror(stat));
140       return stat;
141    }
142    stat  = pthread_mutex_destroy(&jq->mutex);
143    stat1 = pthread_cond_destroy(&jq->work);
144    stat2 = pthread_attr_destroy(&jq->attr);
145    delete jq->waiting_jobs;
146    delete jq->running_jobs;
147    delete jq->ready_jobs;
148    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
149 }
150
151 struct wait_pkt {
152    JCR *jcr;
153    jobq_t *jq;
154 };
155
156 /*
157  * Wait until schedule time arrives before starting. Normally
158  *  this routine is only used for jobs started from the console
159  *  for which the user explicitly specified a start time. Otherwise
160  *  most jobs are put into the job queue only when their
161  *  scheduled time arives.
162  */
163 extern "C"
164 void *sched_wait(void *arg)
165 {
166    JCR *jcr = ((wait_pkt *)arg)->jcr;
167    jobq_t *jq = ((wait_pkt *)arg)->jq;
168
169    Dmsg0(2300, "Enter sched_wait.\n");
170    free(arg);
171    time_t wtime = jcr->sched_time - time(NULL);
172    set_jcr_job_status(jcr, JS_WaitStartTime);
173    /* Wait until scheduled time arrives */
174    if (wtime > 0) {
175       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
176          jcr->Job, wtime);
177    }
178    /* Check every 30 seconds if canceled */
179    while (wtime > 0) {
180       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", 
181          jcr->JobId, wtime, jcr->use_count());
182       if (wtime > 30) {
183          wtime = 30;
184       }
185       bmicrosleep(wtime, 0);
186       if (job_canceled(jcr)) {
187          break;
188       }
189       wtime = jcr->sched_time - time(NULL);
190    }
191    Dmsg1(200, "resched use=%d\n", jcr->use_count());
192    jobq_add(jq, jcr);
193    free_jcr(jcr);                     /* we are done with jcr */
194    Dmsg0(2300, "Exit sched_wait\n");
195    return NULL;
196 }
197
198 /*
199  *  Add a job to the queue
200  *    jq is a queue that was created with jobq_init
201  */
202 int jobq_add(jobq_t *jq, JCR *jcr)
203 {
204    int stat;
205    jobq_item_t *item, *li;
206    bool inserted = false;
207    time_t wtime = jcr->sched_time - time(NULL);
208    pthread_t id;
209    wait_pkt *sched_pkt;
210
211    if (!jcr->term_wait_inited) { 
212       /* Initialize termination condition variable */
213       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
214          berrno be;
215          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.strerror(stat));
216          return stat;
217       }
218       jcr->term_wait_inited = true;
219    }                           
220                              
221    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
222    if (jq->valid != JOBQ_VALID) {
223       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
224       return EINVAL;
225    }
226
227    jcr->inc_use_count();                 /* mark jcr in use by us */
228    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
229    if (!job_canceled(jcr) && wtime > 0) {
230       set_thread_concurrency(jq->max_workers + 2);
231       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
232       sched_pkt->jcr = jcr;
233       sched_pkt->jq = jq;
234       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
235       if (stat != 0) {                /* thread not created */
236          berrno be;
237          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.strerror(stat));
238       }
239       return stat;
240    }
241
242    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
243       berrno be;
244       Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
245       free_jcr(jcr);                    /* release jcr */
246       return stat;
247    }
248
249    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
250       free_jcr(jcr);                    /* release jcr */
251       return ENOMEM;
252    }
253    item->jcr = jcr;
254
255    if (job_canceled(jcr)) {
256       /* Add job to ready queue so that it is canceled quickly */
257       jq->ready_jobs->prepend(item);
258       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
259    } else {
260       /* Add this job to the wait queue in priority sorted order */
261       foreach_dlist(li, jq->waiting_jobs) {
262          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
263             li->jcr->JobId, li->jcr->JobPriority);
264          if (li->jcr->JobPriority > jcr->JobPriority) {
265             jq->waiting_jobs->insert_before(item, li);
266             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
267                li->jcr->JobId, jcr->JobId);
268             inserted = true;
269             break;
270          }
271       }
272       /* If not jobs in wait queue, append it */
273       if (!inserted) {
274          jq->waiting_jobs->append(item);
275          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
276       }
277    }
278
279    /* Ensure that at least one server looks at the queue. */
280    stat = start_server(jq);
281
282    pthread_mutex_unlock(&jq->mutex);
283    Dmsg0(2300, "Return jobq_add\n");
284    return stat;
285 }
286
287 /*
288  *  Remove a job from the job queue. Used only by cancel_job().
289  *    jq is a queue that was created with jobq_init
290  *    work_item is an element of work
291  *
292  *   Note, it is "removed" from the job queue.
293  *    If you want to cancel it, you need to provide some external means
294  *    of doing so (e.g. pthread_kill()).
295  */
296 int jobq_remove(jobq_t *jq, JCR *jcr)
297 {
298    int stat;
299    bool found = false;
300    jobq_item_t *item;
301
302    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
303    if (jq->valid != JOBQ_VALID) {
304       return EINVAL;
305    }
306
307    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
308       berrno be;
309       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
310       return stat;
311    }
312
313    foreach_dlist(item, jq->waiting_jobs) {
314       if (jcr == item->jcr) {
315          found = true;
316          break;
317       }
318    }
319    if (!found) {
320       pthread_mutex_unlock(&jq->mutex);
321       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
322       return EINVAL;
323    }
324
325    /* Move item to be the first on the list */
326    jq->waiting_jobs->remove(item);
327    jq->ready_jobs->prepend(item);
328    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
329
330    stat = start_server(jq);
331
332    pthread_mutex_unlock(&jq->mutex);
333    Dmsg0(2300, "Return jobq_remove\n");
334    return stat;
335 }
336
337
338 /*
339  * Start the server thread if it isn't already running
340  */
341 static int start_server(jobq_t *jq)
342 {
343    int stat = 0;
344    pthread_t id;
345
346    /*
347     * if any threads are idle, wake one --                
348     *   actually we do a broadcast because on /lib/tls 
349     *   these signals seem to get lost from time to time.
350     */
351    if (jq->idle_workers > 0) {
352       Dmsg0(2300, "Signal worker to wake up\n");
353       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
354          berrno be;
355          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.strerror(stat));
356          return stat;
357       }
358    } else if (jq->num_workers < jq->max_workers) {
359       Dmsg0(2300, "Create worker thread\n");
360       /* No idle threads so create a new one */
361       set_thread_concurrency(jq->max_workers + 1);
362       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
363          berrno be;
364          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.strerror(stat));
365          return stat;
366       }
367    }
368    return stat;
369 }
370
371
372 /*
373  * This is the worker thread that serves the job queue.
374  * When all the resources are acquired for the job,
375  *  it will call the user's engine.
376  */
377 extern "C"
378 void *jobq_server(void *arg)
379 {
380    struct timespec timeout;
381    jobq_t *jq = (jobq_t *)arg;
382    jobq_item_t *je;                   /* job entry in queue */
383    int stat;
384    bool timedout = false;
385    bool work = true;
386
387    Dmsg0(2300, "Start jobq_server\n");
388    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
389       berrno be;
390       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
391       return NULL;
392    }
393    jq->num_workers++;
394
395    for (;;) {
396       struct timeval tv;
397       struct timezone tz;
398
399       Dmsg0(2300, "Top of for loop\n");
400       if (!work && !jq->quit) {
401          gettimeofday(&tv, &tz);
402          timeout.tv_nsec = 0;
403          timeout.tv_sec = tv.tv_sec + 4;
404
405          while (!jq->quit) {
406             /*
407              * Wait 4 seconds, then if no more work, exit
408              */
409             Dmsg0(2300, "pthread_cond_timedwait()\n");
410             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
411             if (stat == ETIMEDOUT) {
412                Dmsg0(2300, "timedwait timedout.\n");
413                timedout = true;
414                break;
415             } else if (stat != 0) {
416                /* This shouldn't happen */
417                Dmsg0(2300, "This shouldn't happen\n");
418                jq->num_workers--;
419                pthread_mutex_unlock(&jq->mutex);
420                return NULL;
421             }
422             break;
423          }
424       }
425       /*
426        * If anything is in the ready queue, run it
427        */
428       Dmsg0(2300, "Checking ready queue.\n");
429       while (!jq->ready_jobs->empty() && !jq->quit) {
430          JCR *jcr;
431          je = (jobq_item_t *)jq->ready_jobs->first();
432          jcr = je->jcr;
433          jq->ready_jobs->remove(je);
434          if (!jq->ready_jobs->empty()) {
435             Dmsg0(2300, "ready queue not empty start server\n");
436             if (start_server(jq) != 0) {
437                jq->num_workers--;
438                pthread_mutex_unlock(&jq->mutex);
439                return NULL;
440             }
441          }
442          jq->running_jobs->append(je);
443          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
444
445          /* Release job queue lock */
446          V(jq->mutex);
447
448          /* Call user's routine here */
449          Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
450             jcr->use_count());
451          jq->engine(je->jcr);
452
453          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
454             jcr->use_count());
455
456          /* Reacquire job queue lock */
457          P(jq->mutex);
458          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
459          jq->running_jobs->remove(je);
460          /*
461           * Release locks if acquired. Note, they will not have
462           *  been acquired for jobs canceled before they were
463           *  put into the ready queue.
464           */
465          if (jcr->acquired_resource_locks) {
466             jcr->store->NumConcurrentJobs--;
467             jcr->client->NumConcurrentJobs--;
468             jcr->job->NumConcurrentJobs--;
469          }
470
471          /*
472           * Reschedule the job if necessary and requested
473           */
474          if (jcr->job->RescheduleOnError &&
475              jcr->JobStatus != JS_Terminated &&
476              jcr->JobStatus != JS_Canceled &&
477              jcr->job->RescheduleTimes > 0 &&
478              jcr->JobType == JT_BACKUP &&
479              (jcr->job->RescheduleTimes == 0 ||
480               jcr->reschedule_count < jcr->job->RescheduleTimes)) {
481              char dt[50], dt2[50];
482
483              /*
484               * Reschedule this job by cleaning it up, but
485               *  reuse the same JobId if possible.
486               */
487             time_t now = time(NULL);
488             jcr->reschedule_count++;
489             jcr->sched_time = now + jcr->job->RescheduleInterval;
490             bstrftime(dt, sizeof(dt), now);
491             bstrftime(dt2, sizeof(dt2), jcr->sched_time);
492             Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
493                   (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
494             Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
495                  jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
496             dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
497             jcr->JobStatus = -1;
498             set_jcr_job_status(jcr, JS_WaitStartTime);
499             jcr->SDJobStatus = 0;
500             if (jcr->JobBytes == 0) {
501                Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
502                V(jq->mutex);
503                jobq_add(jq, jcr);     /* queue the job to run again */
504                P(jq->mutex);
505                free_jcr(jcr);         /* release jcr */
506                free(je);              /* free the job entry */
507                continue;              /* look for another job to run */
508             }
509             /*
510              * Something was actually backed up, so we cannot reuse
511              *   the old JobId or there will be database record
512              *   conflicts.  We now create a new job, copying the
513              *   appropriate fields.
514              */           
515             JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
516             set_jcr_defaults(njcr, jcr->job);
517             njcr->reschedule_count = jcr->reschedule_count;
518             njcr->sched_time = jcr->sched_time;
519             njcr->JobLevel = jcr->JobLevel;
520             njcr->JobStatus = -1;
521             set_jcr_job_status(njcr, jcr->JobStatus);
522             copy_storage(njcr, jcr);
523             njcr->messages = jcr->messages;
524             Dmsg0(2300, "Call to run new job\n");
525             V(jq->mutex);
526             run_job(njcr);            /* This creates a "new" job */
527             free_jcr(njcr);           /* release "new" jcr */
528             P(jq->mutex);
529             Dmsg0(2300, "Back from running new job.\n");
530          }
531          /* Clean up and release old jcr */
532          if (jcr->db) {
533             db_close_database(jcr, jcr->db);
534             jcr->db = NULL;
535          }
536          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
537          jcr->SDJobStatus = 0;
538          V(jq->mutex);                /* release internal lock */
539          free_jcr(jcr);
540          free(je);                    /* release job entry */
541          P(jq->mutex);                /* reacquire job queue lock */
542       }
543       /*
544        * If any job in the wait queue can be run,
545        *  move it to the ready queue
546        */
547       Dmsg0(2300, "Done check ready, now check wait queue.\n");
548       if (!jq->waiting_jobs->empty() && !jq->quit) {
549          int Priority;
550          je = (jobq_item_t *)jq->waiting_jobs->first();
551          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
552          if (re) {
553             Priority = re->jcr->JobPriority;
554             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId, Priority);
555          } else {
556             Priority = je->jcr->JobPriority;
557             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
558          }
559          /*
560           * Walk down the list of waiting jobs and attempt
561           *   to acquire the resources it needs.
562           */
563          for ( ; je;  ) {
564             /* je is current job item on the queue, jn is the next one */
565             JCR *jcr = je->jcr;
566             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
567
568             Dmsg3(2300, "Examining Job=%d JobPri=%d want Pri=%d\n",
569                jcr->JobId, jcr->JobPriority, Priority);
570
571             /* Take only jobs of correct Priority */
572             if (jcr->JobPriority != Priority) {
573                set_jcr_job_status(jcr, JS_WaitPriority);
574                break;
575             }
576
577             if (!acquire_resources(jcr)) {
578                je = jn;            /* point to next waiting job */
579                continue;
580             }
581
582             /* Got all locks, now remove it from wait queue and append it
583              *   to the ready queue
584              */
585             jcr->acquired_resource_locks = true;
586             jq->waiting_jobs->remove(je);
587             jq->ready_jobs->append(je);
588             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
589             je = jn;                  /* Point to next waiting job */
590          } /* end for loop */
591
592       } /* end if */
593
594       Dmsg0(2300, "Done checking wait queue.\n");
595       /*
596        * If no more ready work and we are asked to quit, then do it
597        */
598       if (jq->ready_jobs->empty() && jq->quit) {
599          jq->num_workers--;
600          if (jq->num_workers == 0) {
601             Dmsg0(2300, "Wake up destroy routine\n");
602             /* Wake up destroy routine if he is waiting */
603             pthread_cond_broadcast(&jq->work);
604          }
605          break;
606       }
607       Dmsg0(2300, "Check for work request\n");
608       /*
609        * If no more work requests, and we waited long enough, quit
610        */
611       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
612          jq->ready_jobs->empty());
613       if (jq->ready_jobs->empty() && timedout) {
614          Dmsg0(2300, "break big loop\n");
615          jq->num_workers--;
616          break;
617       }
618
619       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
620       if (work) {
621          /*
622           * If a job is waiting on a Resource, don't consume all
623           *   the CPU time looping looking for work, and even more
624           *   important, release the lock so that a job that has
625           *   terminated can give us the resource.
626           */
627          V(jq->mutex);
628          bmicrosleep(2, 0);              /* pause for 2 seconds */
629          P(jq->mutex);
630          /* Recompute work as something may have changed in last 2 secs */
631          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
632       }
633       Dmsg1(2300, "Loop again. work=%d\n", work);
634    } /* end of big for loop */
635
636    Dmsg0(200, "unlock mutex\n");
637    V(jq->mutex);
638    Dmsg0(2300, "End jobq_server\n");
639    return NULL;
640 }
641
642 /*
643  * See if we can acquire all the necessary resources for the job (JCR)
644  *
645  *  Returns: true  if successful
646  *           false if resource failure
647  */
648 static bool acquire_resources(JCR *jcr)
649 {
650    bool skip_this_jcr = false;
651
652    if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
653       /* 
654        * Let only one Restore/verify job run at a time regardless
655        *   of MaxConcurrentJobs.
656        */ 
657       if (jcr->store->NumConcurrentJobs == 0) {
658          jcr->store->NumConcurrentJobs = 1;
659       } else {
660          set_jcr_job_status(jcr, JS_WaitStoreRes);
661          return false;
662       }
663    /* We are not doing a Restore or Verify */
664    } else if (jcr->store->NumConcurrentJobs == 0 &&
665               jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
666        /* Simple case, first job */
667        jcr->store->NumConcurrentJobs = 1;
668    } else if (jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
669        jcr->store->NumConcurrentJobs++;
670    } else {
671       skip_this_jcr = true;
672    }
673    if (skip_this_jcr) {
674       set_jcr_job_status(jcr, JS_WaitStoreRes);
675       return false;
676    }
677
678    if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
679       jcr->client->NumConcurrentJobs++;
680    } else {
681       /* Back out previous locks */
682       jcr->store->NumConcurrentJobs--;
683       set_jcr_job_status(jcr, JS_WaitClientRes);
684       return false;
685    }
686    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
687       jcr->job->NumConcurrentJobs++;
688    } else {
689       /* Back out previous locks */
690       jcr->store->NumConcurrentJobs--;
691       jcr->client->NumConcurrentJobs--;
692       set_jcr_job_status(jcr, JS_WaitJobRes);
693       return false;
694    }
695    /* Check actual device availability */
696    /* ***FIXME****/
697    return true;
698 }