]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
======================= Warning ==========================
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2  * Bacula job queue routines.
3  *
4  *  This code consists of three queues, the waiting_jobs
5  *  queue, where jobs are initially queued, the ready_jobs
6  *  queue, where jobs are placed when all the resources are
7  *  allocated and they can immediately be run, and the
8  *  running queue where jobs are placed when they are
9  *  running.
10  *
11  *  Kern Sibbald, July MMIII
12  *
13  *   Version $Id$
14  *
15  *  This code was adapted from the Bacula workq, which was
16  *    adapted from "Programming with POSIX Threads", by
17  *    David R. Butenhof
18  *
19  */
20 /*
21    Copyright (C) 2003-2006 Kern Sibbald
22
23    This program is free software; you can redistribute it and/or
24    modify it under the terms of the GNU General Public License
25    version 2 as amended with additional clauses defined in the
26    file LICENSE in the main source directory.
27
28    This program is distributed in the hope that it will be useful,
29    but WITHOUT ANY WARRANTY; without even the implied warranty of
30    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
31    the file LICENSE for additional details.
32
33  */
34
35 #include "bacula.h"
36 #include "dird.h"
37
38 extern JCR *jobs;
39
40 /* Forward referenced functions */
41 extern "C" void *jobq_server(void *arg);
42 extern "C" void *sched_wait(void *arg);
43
44 static int  start_server(jobq_t *jq);
45 static bool acquire_resources(JCR *jcr);
46
47
48
49 /*
50  * Initialize a job queue
51  *
52  *  Returns: 0 on success
53  *           errno on failure
54  */
55 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
56 {
57    int stat;
58    jobq_item_t *item = NULL;
59
60    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
61       berrno be;
62       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.strerror(stat));
63       return stat;
64    }
65    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
66       pthread_attr_destroy(&jq->attr);
67       return stat;
68    }
69    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
70       berrno be;
71       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.strerror(stat));
72       pthread_attr_destroy(&jq->attr);
73       return stat;
74    }
75    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
76       berrno be;
77       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.strerror(stat));
78       pthread_mutex_destroy(&jq->mutex);
79       pthread_attr_destroy(&jq->attr);
80       return stat;
81    }
82    jq->quit = false;
83    jq->max_workers = threads;         /* max threads to create */
84    jq->num_workers = 0;               /* no threads yet */
85    jq->idle_workers = 0;              /* no idle threads */
86    jq->engine = engine;               /* routine to run */
87    jq->valid = JOBQ_VALID;
88    /* Initialize the job queues */
89    jq->waiting_jobs = New(dlist(item, &item->link));
90    jq->running_jobs = New(dlist(item, &item->link));
91    jq->ready_jobs = New(dlist(item, &item->link));
92    return 0;
93 }
94
95 /*
96  * Destroy the job queue
97  *
98  * Returns: 0 on success
99  *          errno on failure
100  */
101 int jobq_destroy(jobq_t *jq)
102 {
103    int stat, stat1, stat2;
104
105    if (jq->valid != JOBQ_VALID) {
106       return EINVAL;
107    }
108    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
109       berrno be;
110       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
111       return stat;
112    }
113    jq->valid = 0;                      /* prevent any more operations */
114
115    /* 
116     * If any threads are active, wake them 
117     */
118    if (jq->num_workers > 0) {
119       jq->quit = true;
120       if (jq->idle_workers) {
121          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
122             berrno be;
123             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.strerror(stat));
124             pthread_mutex_unlock(&jq->mutex);
125             return stat;
126          }
127       }
128       while (jq->num_workers > 0) {
129          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
130             berrno be;
131             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.strerror(stat));
132             pthread_mutex_unlock(&jq->mutex);
133             return stat;
134          }
135       }
136    }
137    if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
138       berrno be;
139       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.strerror(stat));
140       return stat;
141    }
142    stat  = pthread_mutex_destroy(&jq->mutex);
143    stat1 = pthread_cond_destroy(&jq->work);
144    stat2 = pthread_attr_destroy(&jq->attr);
145    delete jq->waiting_jobs;
146    delete jq->running_jobs;
147    delete jq->ready_jobs;
148    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
149 }
150
151 struct wait_pkt {
152    JCR *jcr;
153    jobq_t *jq;
154 };
155
156 /*
157  * Wait until schedule time arrives before starting. Normally
158  *  this routine is only used for jobs started from the console
159  *  for which the user explicitly specified a start time. Otherwise
160  *  most jobs are put into the job queue only when their
161  *  scheduled time arives.
162  */
163 extern "C"
164 void *sched_wait(void *arg)
165 {
166    JCR *jcr = ((wait_pkt *)arg)->jcr;
167    jobq_t *jq = ((wait_pkt *)arg)->jq;
168
169    Dmsg0(2300, "Enter sched_wait.\n");
170    free(arg);
171    time_t wtime = jcr->sched_time - time(NULL);
172    set_jcr_job_status(jcr, JS_WaitStartTime);
173    /* Wait until scheduled time arrives */
174    if (wtime > 0) {
175       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
176          jcr->Job, wtime);
177    }
178    /* Check every 30 seconds if canceled */
179    while (wtime > 0) {
180       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", 
181          jcr->JobId, wtime, jcr->use_count());
182       if (wtime > 30) {
183          wtime = 30;
184       }
185       bmicrosleep(wtime, 0);
186       if (job_canceled(jcr)) {
187          break;
188       }
189       wtime = jcr->sched_time - time(NULL);
190    }
191    Dmsg1(200, "resched use=%d\n", jcr->use_count());
192    jobq_add(jq, jcr);
193    free_jcr(jcr);                     /* we are done with jcr */
194    Dmsg0(2300, "Exit sched_wait\n");
195    return NULL;
196 }
197
198 /*
199  *  Add a job to the queue
200  *    jq is a queue that was created with jobq_init
201  */
202 int jobq_add(jobq_t *jq, JCR *jcr)
203 {
204    int stat;
205    jobq_item_t *item, *li;
206    bool inserted = false;
207    time_t wtime = jcr->sched_time - time(NULL);
208    pthread_t id;
209    wait_pkt *sched_pkt;
210
211    if (!jcr->term_wait_inited) { 
212       /* Initialize termination condition variable */
213       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
214          berrno be;
215          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.strerror(stat));
216          return stat;
217       }
218       jcr->term_wait_inited = true;
219    }                           
220                              
221    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
222    if (jq->valid != JOBQ_VALID) {
223       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
224       return EINVAL;
225    }
226
227    jcr->inc_use_count();                 /* mark jcr in use by us */
228    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
229    if (!job_canceled(jcr) && wtime > 0) {
230       set_thread_concurrency(jq->max_workers + 2);
231       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
232       sched_pkt->jcr = jcr;
233       sched_pkt->jq = jq;
234       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
235       if (stat != 0) {                /* thread not created */
236          berrno be;
237          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.strerror(stat));
238       }
239       return stat;
240    }
241
242    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
243       berrno be;
244       Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
245       free_jcr(jcr);                    /* release jcr */
246       return stat;
247    }
248
249    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
250       free_jcr(jcr);                    /* release jcr */
251       return ENOMEM;
252    }
253    item->jcr = jcr;
254
255    if (job_canceled(jcr)) {
256       /* Add job to ready queue so that it is canceled quickly */
257       jq->ready_jobs->prepend(item);
258       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
259    } else {
260       /* Add this job to the wait queue in priority sorted order */
261       foreach_dlist(li, jq->waiting_jobs) {
262          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
263             li->jcr->JobId, li->jcr->JobPriority);
264          if (li->jcr->JobPriority > jcr->JobPriority) {
265             jq->waiting_jobs->insert_before(item, li);
266             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
267                li->jcr->JobId, jcr->JobId);
268             inserted = true;
269             break;
270          }
271       }
272       /* If not jobs in wait queue, append it */
273       if (!inserted) {
274          jq->waiting_jobs->append(item);
275          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
276       }
277    }
278
279    /* Ensure that at least one server looks at the queue. */
280    stat = start_server(jq);
281
282    pthread_mutex_unlock(&jq->mutex);
283    Dmsg0(2300, "Return jobq_add\n");
284    return stat;
285 }
286
287 /*
288  *  Remove a job from the job queue. Used only by cancel_job().
289  *    jq is a queue that was created with jobq_init
290  *    work_item is an element of work
291  *
292  *   Note, it is "removed" from the job queue.
293  *    If you want to cancel it, you need to provide some external means
294  *    of doing so (e.g. pthread_kill()).
295  */
296 int jobq_remove(jobq_t *jq, JCR *jcr)
297 {
298    int stat;
299    bool found = false;
300    jobq_item_t *item;
301
302    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
303    if (jq->valid != JOBQ_VALID) {
304       return EINVAL;
305    }
306
307    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
308       berrno be;
309       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
310       return stat;
311    }
312
313    foreach_dlist(item, jq->waiting_jobs) {
314       if (jcr == item->jcr) {
315          found = true;
316          break;
317       }
318    }
319    if (!found) {
320       pthread_mutex_unlock(&jq->mutex);
321       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
322       return EINVAL;
323    }
324
325    /* Move item to be the first on the list */
326    jq->waiting_jobs->remove(item);
327    jq->ready_jobs->prepend(item);
328    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
329
330    stat = start_server(jq);
331
332    pthread_mutex_unlock(&jq->mutex);
333    Dmsg0(2300, "Return jobq_remove\n");
334    return stat;
335 }
336
337
338 /*
339  * Start the server thread if it isn't already running
340  */
341 static int start_server(jobq_t *jq)
342 {
343    int stat = 0;
344    pthread_t id;
345
346    /*
347     * if any threads are idle, wake one --                
348     *   actually we do a broadcast because on /lib/tls 
349     *   these signals seem to get lost from time to time.
350     */
351    if (jq->idle_workers > 0) {
352       Dmsg0(2300, "Signal worker to wake up\n");
353       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
354          berrno be;
355          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.strerror(stat));
356          return stat;
357       }
358    } else if (jq->num_workers < jq->max_workers) {
359       Dmsg0(2300, "Create worker thread\n");
360       /* No idle threads so create a new one */
361       set_thread_concurrency(jq->max_workers + 1);
362       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
363          berrno be;
364          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.strerror(stat));
365          return stat;
366       }
367    }
368    return stat;
369 }
370
371
372 /*
373  * This is the worker thread that serves the job queue.
374  * When all the resources are acquired for the job,
375  *  it will call the user's engine.
376  */
377 extern "C"
378 void *jobq_server(void *arg)
379 {
380    struct timespec timeout;
381    jobq_t *jq = (jobq_t *)arg;
382    jobq_item_t *je;                   /* job entry in queue */
383    int stat;
384    bool timedout = false;
385    bool work = true;
386
387    Dmsg0(2300, "Start jobq_server\n");
388    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
389       berrno be;
390       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat));
391       return NULL;
392    }
393    jq->num_workers++;
394
395    for (;;) {
396       struct timeval tv;
397       struct timezone tz;
398
399       Dmsg0(2300, "Top of for loop\n");
400       if (!work && !jq->quit) {
401          gettimeofday(&tv, &tz);
402          timeout.tv_nsec = 0;
403          timeout.tv_sec = tv.tv_sec + 4;
404
405          while (!jq->quit) {
406             /*
407              * Wait 4 seconds, then if no more work, exit
408              */
409             Dmsg0(2300, "pthread_cond_timedwait()\n");
410             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
411             if (stat == ETIMEDOUT) {
412                Dmsg0(2300, "timedwait timedout.\n");
413                timedout = true;
414                break;
415             } else if (stat != 0) {
416                /* This shouldn't happen */
417                Dmsg0(2300, "This shouldn't happen\n");
418                jq->num_workers--;
419                pthread_mutex_unlock(&jq->mutex);
420                return NULL;
421             }
422             break;
423          }
424       }
425       /*
426        * If anything is in the ready queue, run it
427        */
428       Dmsg0(2300, "Checking ready queue.\n");
429       while (!jq->ready_jobs->empty() && !jq->quit) {
430          JCR *jcr;
431          je = (jobq_item_t *)jq->ready_jobs->first();
432          jcr = je->jcr;
433          jq->ready_jobs->remove(je);
434          if (!jq->ready_jobs->empty()) {
435             Dmsg0(2300, "ready queue not empty start server\n");
436             if (start_server(jq) != 0) {
437                jq->num_workers--;
438                pthread_mutex_unlock(&jq->mutex);
439                return NULL;
440             }
441          }
442          jq->running_jobs->append(je);
443          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
444
445          /* Release job queue lock */
446          V(jq->mutex);
447
448          /* Call user's routine here */
449          Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
450             jcr->use_count());
451          jq->engine(je->jcr);
452
453          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
454             jcr->use_count());
455
456          /* Reacquire job queue lock */
457          P(jq->mutex);
458          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
459          jq->running_jobs->remove(je);
460          /*
461           * Release locks if acquired. Note, they will not have
462           *  been acquired for jobs canceled before they were
463           *  put into the ready queue.
464           */
465          if (jcr->acquired_resource_locks) {
466             if (jcr->rstore) {
467                jcr->rstore->NumConcurrentJobs = 0;
468                Dmsg1(000, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
469             }
470             if (jcr->wstore) {
471                jcr->wstore->NumConcurrentJobs--;
472                Dmsg1(000, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
473             }
474             jcr->client->NumConcurrentJobs--;
475             jcr->job->NumConcurrentJobs--;
476             jcr->acquired_resource_locks = false;
477          }
478
479          /*
480           * Reschedule the job if necessary and requested
481           */
482          if (jcr->job->RescheduleOnError &&
483              jcr->JobStatus != JS_Terminated &&
484              jcr->JobStatus != JS_Canceled &&
485              jcr->job->RescheduleTimes > 0 &&
486              jcr->JobType == JT_BACKUP &&
487              (jcr->job->RescheduleTimes == 0 ||
488               jcr->reschedule_count < jcr->job->RescheduleTimes)) {
489              char dt[50], dt2[50];
490
491              /*
492               * Reschedule this job by cleaning it up, but
493               *  reuse the same JobId if possible.
494               */
495             time_t now = time(NULL);
496             jcr->reschedule_count++;
497             jcr->sched_time = now + jcr->job->RescheduleInterval;
498             bstrftime(dt, sizeof(dt), now);
499             bstrftime(dt2, sizeof(dt2), jcr->sched_time);
500             Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
501                   (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
502             Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
503                  jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
504             dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
505             jcr->JobStatus = -1;
506             set_jcr_job_status(jcr, JS_WaitStartTime);
507             jcr->SDJobStatus = 0;
508             if (jcr->JobBytes == 0) {
509                Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
510                V(jq->mutex);
511                jobq_add(jq, jcr);     /* queue the job to run again */
512                P(jq->mutex);
513                free_jcr(jcr);         /* release jcr */
514                free(je);              /* free the job entry */
515                continue;              /* look for another job to run */
516             }
517             /*
518              * Something was actually backed up, so we cannot reuse
519              *   the old JobId or there will be database record
520              *   conflicts.  We now create a new job, copying the
521              *   appropriate fields.
522              */           
523             JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
524             set_jcr_defaults(njcr, jcr->job);
525             njcr->reschedule_count = jcr->reschedule_count;
526             njcr->sched_time = jcr->sched_time;
527             njcr->JobLevel = jcr->JobLevel;
528             njcr->JobStatus = -1;
529             set_jcr_job_status(njcr, jcr->JobStatus);
530             if (jcr->rstore) {
531                copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
532             } else {
533                free_rstorage(njcr);
534             }
535             if (jcr->wstore) {
536                copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
537             } else {
538                free_wstorage(njcr);
539             }
540             njcr->messages = jcr->messages;
541             Dmsg0(2300, "Call to run new job\n");
542             V(jq->mutex);
543             run_job(njcr);            /* This creates a "new" job */
544             free_jcr(njcr);           /* release "new" jcr */
545             P(jq->mutex);
546             Dmsg0(2300, "Back from running new job.\n");
547          }
548          /* Clean up and release old jcr */
549          if (jcr->db) {
550             db_close_database(jcr, jcr->db);
551             jcr->db = NULL;
552          }
553          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
554          jcr->SDJobStatus = 0;
555          V(jq->mutex);                /* release internal lock */
556          free_jcr(jcr);
557          free(je);                    /* release job entry */
558          P(jq->mutex);                /* reacquire job queue lock */
559       }
560       /*
561        * If any job in the wait queue can be run,
562        *  move it to the ready queue
563        */
564       Dmsg0(2300, "Done check ready, now check wait queue.\n");
565       if (!jq->waiting_jobs->empty() && !jq->quit) {
566          int Priority;
567          je = (jobq_item_t *)jq->waiting_jobs->first();
568          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
569          if (re) {
570             Priority = re->jcr->JobPriority;
571             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId, Priority);
572          } else {
573             Priority = je->jcr->JobPriority;
574             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
575          }
576          /*
577           * Walk down the list of waiting jobs and attempt
578           *   to acquire the resources it needs.
579           */
580          for ( ; je;  ) {
581             /* je is current job item on the queue, jn is the next one */
582             JCR *jcr = je->jcr;
583             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
584
585             Dmsg3(2300, "Examining Job=%d JobPri=%d want Pri=%d\n",
586                jcr->JobId, jcr->JobPriority, Priority);
587
588             /* Take only jobs of correct Priority */
589             if (jcr->JobPriority != Priority) {
590                set_jcr_job_status(jcr, JS_WaitPriority);
591                break;
592             }
593
594             if (!acquire_resources(jcr)) {
595                je = jn;            /* point to next waiting job */
596                continue;
597             }
598
599             /* Got all locks, now remove it from wait queue and append it
600              *   to the ready queue
601              */
602             jq->waiting_jobs->remove(je);
603             jq->ready_jobs->append(je);
604             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
605             je = jn;                  /* Point to next waiting job */
606          } /* end for loop */
607
608       } /* end if */
609
610       Dmsg0(2300, "Done checking wait queue.\n");
611       /*
612        * If no more ready work and we are asked to quit, then do it
613        */
614       if (jq->ready_jobs->empty() && jq->quit) {
615          jq->num_workers--;
616          if (jq->num_workers == 0) {
617             Dmsg0(2300, "Wake up destroy routine\n");
618             /* Wake up destroy routine if he is waiting */
619             pthread_cond_broadcast(&jq->work);
620          }
621          break;
622       }
623       Dmsg0(2300, "Check for work request\n");
624       /*
625        * If no more work requests, and we waited long enough, quit
626        */
627       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
628          jq->ready_jobs->empty());
629       if (jq->ready_jobs->empty() && timedout) {
630          Dmsg0(2300, "break big loop\n");
631          jq->num_workers--;
632          break;
633       }
634
635       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
636       if (work) {
637          /*
638           * If a job is waiting on a Resource, don't consume all
639           *   the CPU time looping looking for work, and even more
640           *   important, release the lock so that a job that has
641           *   terminated can give us the resource.
642           */
643          V(jq->mutex);
644          bmicrosleep(2, 0);              /* pause for 2 seconds */
645          P(jq->mutex);
646          /* Recompute work as something may have changed in last 2 secs */
647          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
648       }
649       Dmsg1(2300, "Loop again. work=%d\n", work);
650    } /* end of big for loop */
651
652    Dmsg0(200, "unlock mutex\n");
653    V(jq->mutex);
654    Dmsg0(2300, "End jobq_server\n");
655    return NULL;
656 }
657
658 /*
659  * See if we can acquire all the necessary resources for the job (JCR)
660  *
661  *  Returns: true  if successful
662  *           false if resource failure
663  */
664 static bool acquire_resources(JCR *jcr)
665 {
666    bool skip_this_jcr = false;
667
668    jcr->acquired_resource_locks = false;
669    if (jcr->rstore) {
670       Dmsg1(000, "Rstore=%s\n", jcr->rstore->name());
671       /* 
672        * Let only one Restore/verify job run at a time regardless
673        *   of MaxConcurrentJobs.
674        */ 
675       if (jcr->rstore->NumConcurrentJobs == 0) {
676          jcr->rstore->NumConcurrentJobs = 1;
677          Dmsg0(000, "Set rncj=1\n");
678       } else {
679          Dmsg1(000, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
680          set_jcr_job_status(jcr, JS_WaitStoreRes);
681          return false;
682       }
683    }
684    
685    if (jcr->wstore) {
686       if (jcr->wstore->NumConcurrentJobs == 0 &&
687           jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
688          /* Simple case, first job */
689          jcr->wstore->NumConcurrentJobs = 1;
690          Dmsg0(000, "Set wncj=1\n");
691       } else if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
692          jcr->wstore->NumConcurrentJobs++;
693          Dmsg1(000, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
694       } else if (jcr->rstore) {
695          jcr->rstore->NumConcurrentJobs = 0;      /* back out rstore */
696          Dmsg1(000, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
697          skip_this_jcr = true;
698       } else {
699          Dmsg1(000, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
700          skip_this_jcr = true;
701       }
702    }
703    if (skip_this_jcr) {
704       set_jcr_job_status(jcr, JS_WaitStoreRes);
705       return false;
706    }
707
708    if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
709       jcr->client->NumConcurrentJobs++;
710    } else {
711       /* Back out previous locks */
712       if (jcr->wstore) {
713          jcr->wstore->NumConcurrentJobs--;
714          Dmsg1(000, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
715       }
716       if (jcr->rstore) {
717          jcr->rstore->NumConcurrentJobs = 0;
718          Dmsg1(000, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
719       }
720       set_jcr_job_status(jcr, JS_WaitClientRes);
721       return false;
722    }
723    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
724       jcr->job->NumConcurrentJobs++;
725    } else {
726       /* Back out previous locks */
727       if (jcr->wstore) {
728          jcr->wstore->NumConcurrentJobs--;
729          Dmsg1(000, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
730       }
731       if (jcr->rstore) {
732          jcr->rstore->NumConcurrentJobs = 0;
733          Dmsg1(000, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
734       }
735       jcr->client->NumConcurrentJobs--;
736       set_jcr_job_status(jcr, JS_WaitJobRes);
737       return false;
738    }
739    /* Check actual device availability */
740    /* ***FIXME****/
741
742
743    jcr->acquired_resource_locks = true;
744    return true;
745 }