]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
dd92d6ff319993633d7422b2532d44ff90a71a0e
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2  * Bacula job queue routines.
3  *
4  *  This code consists of three queues, the waiting_jobs
5  *  queue, where jobs are initially queued, the ready_jobs
6  *  queue, where jobs are placed when all the resources are
7  *  allocated and they can immediately be run, and the
8  *  running queue where jobs are placed when they are
9  *  running.
10  *
11  *  Kern Sibbald, July MMIII
12  *
13  *   Version $Id$
14  *
15  *  This code was adapted from the Bacula workq, which was
16  *    adapted from "Programming with POSIX Threads", by
17  *    David R. Butenhof
18  *
19  */
20 /*
21    Copyright (C) 2003-2004 Kern Sibbald and John Walker
22
23    This program is free software; you can redistribute it and/or
24    modify it under the terms of the GNU General Public License as
25    published by the Free Software Foundation; either version 2 of
26    the License, or (at your option) any later version.
27
28    This program is distributed in the hope that it will be useful,
29    but WITHOUT ANY WARRANTY; without even the implied warranty of
30    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
31    General Public License for more details.
32
33    You should have received a copy of the GNU General Public
34    License along with this program; if not, write to the Free
35    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
36    MA 02111-1307, USA.
37
38  */
39
40 #include "bacula.h"
41 #include "dird.h"
42
43 extern JCR *jobs;
44
45 /* Forward referenced functions */
46 extern "C" void *jobq_server(void *arg);
47 extern "C" void *sched_wait(void *arg);
48
49 static int   start_server(jobq_t *jq);
50
51
52
53 /*   
54  * Initialize a job queue
55  *
56  *  Returns: 0 on success
57  *           errno on failure
58  */
59 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
60 {
61    int stat;
62    jobq_item_t *item = NULL;
63                         
64    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
65       Jmsg1(NULL, M_ERROR, 0, "pthread_attr_init: ERR=%s\n", strerror(stat));
66       return stat;
67    }
68    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69       pthread_attr_destroy(&jq->attr);
70       return stat;
71    }
72    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
73       Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_init: ERR=%s\n", strerror(stat));
74       pthread_attr_destroy(&jq->attr);
75       return stat;
76    }
77    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
78       Jmsg1(NULL, M_ERROR, 0, "pthread_cond_init: ERR=%s\n", strerror(stat));
79       pthread_mutex_destroy(&jq->mutex);
80       pthread_attr_destroy(&jq->attr);
81       return stat;
82    }
83    jq->quit = false;
84    jq->max_workers = threads;         /* max threads to create */
85    jq->num_workers = 0;               /* no threads yet */
86    jq->idle_workers = 0;              /* no idle threads */
87    jq->engine = engine;               /* routine to run */
88    jq->valid = JOBQ_VALID; 
89    /* Initialize the job queues */
90    jq->waiting_jobs = New(dlist(item, &item->link));
91    jq->running_jobs = New(dlist(item, &item->link));
92    jq->ready_jobs = New(dlist(item, &item->link));
93    return 0;
94 }
95
96 /*
97  * Destroy the job queue
98  *
99  * Returns: 0 on success
100  *          errno on failure
101  */
102 int jobq_destroy(jobq_t *jq)
103 {
104    int stat, stat1, stat2;
105
106   if (jq->valid != JOBQ_VALID) {
107      return EINVAL;
108   }
109   if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
110      Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat));
111      return stat;
112   }
113   jq->valid = 0;                      /* prevent any more operations */
114
115   /* 
116    * If any threads are active, wake them 
117    */
118   if (jq->num_workers > 0) {
119      jq->quit = true;
120      if (jq->idle_workers) {
121         if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
122            Jmsg1(NULL, M_ERROR, 0, "pthread_cond_broadcast: ERR=%s\n", strerror(stat));
123            pthread_mutex_unlock(&jq->mutex);
124            return stat;
125         }
126      }
127      while (jq->num_workers > 0) {
128         if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
129            Jmsg1(NULL, M_ERROR, 0, "pthread_cond_wait: ERR=%s\n", strerror(stat));
130            pthread_mutex_unlock(&jq->mutex);
131            return stat;
132         }
133      }
134   }
135   if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
136      Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", strerror(stat));
137      return stat;
138   }
139   stat  = pthread_mutex_destroy(&jq->mutex);
140   stat1 = pthread_cond_destroy(&jq->work);
141   stat2 = pthread_attr_destroy(&jq->attr);
142   delete jq->waiting_jobs;
143   delete jq->running_jobs;
144   delete jq->ready_jobs;
145   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
146 }
147
148 struct wait_pkt {
149    JCR *jcr;
150    jobq_t *jq;
151 };
152
153 /*
154  * Wait until schedule time arrives before starting. Normally
155  *  this routine is only used for jobs started from the console
156  *  for which the user explicitly specified a start time. Otherwise
157  *  most jobs are put into the job queue only when their
158  *  scheduled time arives.
159  */
160 extern "C"  
161 void *sched_wait(void *arg)
162 {
163    JCR *jcr = ((wait_pkt *)arg)->jcr;
164    jobq_t *jq = ((wait_pkt *)arg)->jq;
165
166    Dmsg0(300, "Enter sched_wait.\n");
167    free(arg);
168    time_t wtime = jcr->sched_time - time(NULL);
169    set_jcr_job_status(jcr, JS_WaitStartTime);
170    /* Wait until scheduled time arrives */
171    if (wtime > 0) {
172       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"), 
173          jcr->Job, wtime);
174    }
175    /* Check every 30 seconds if canceled */ 
176    while (wtime > 0) {
177       Dmsg2(300, "Waiting on sched time, jobid=%d secs=%d\n", jcr->JobId, wtime);
178       if (wtime > 30) {
179          wtime = 30;
180       }
181       bmicrosleep(wtime, 0);
182       if (job_canceled(jcr)) {
183          break;
184       }
185       wtime = jcr->sched_time - time(NULL);
186    }
187    P(jcr->mutex);                     /* lock jcr */
188    jobq_add(jq, jcr);
189    V(jcr->mutex);
190    free_jcr(jcr);                     /* we are done with jcr */
191    Dmsg0(300, "Exit sched_wait\n");
192    return NULL;
193 }
194
195 /*
196  *  Add a job to the queue
197  *    jq is a queue that was created with jobq_init
198  * 
199  *  On entry jcr->mutex must be locked.
200  *   
201  */
202 int jobq_add(jobq_t *jq, JCR *jcr)
203 {
204    int stat;
205    jobq_item_t *item, *li;
206    bool inserted = false;
207    time_t wtime = jcr->sched_time - time(NULL);
208    pthread_t id;
209    wait_pkt *sched_pkt;
210     
211    Dmsg3(300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count);
212    if (jq->valid != JOBQ_VALID) {
213       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
214       return EINVAL;
215    }
216
217    jcr->use_count++;                  /* mark jcr in use by us */
218    Dmsg3(300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count);
219    if (!job_canceled(jcr) && wtime > 0) {
220       set_thread_concurrency(jq->max_workers + 2);
221       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
222       sched_pkt->jcr = jcr;
223       sched_pkt->jq = jq;
224       jcr->use_count--;            /* release our use of jcr */
225       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
226       if (stat != 0) {                /* thread not created */
227          Jmsg1(jcr, M_ERROR, 0, "pthread_thread_create: ERR=%s\n", strerror(stat));
228       }
229       return stat;
230    }
231
232    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
233       Jmsg1(jcr, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat));
234       jcr->use_count--;               /* release jcr */
235       return stat;
236    }
237
238    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
239       jcr->use_count--;               /* release jcr */
240       return ENOMEM;
241    }
242    item->jcr = jcr;
243
244    if (job_canceled(jcr)) {
245       /* Add job to ready queue so that it is canceled quickly */
246       jq->ready_jobs->prepend(item);
247       Dmsg1(300, "Prepended job=%d to ready queue\n", jcr->JobId);
248    } else {
249       /* Add this job to the wait queue in priority sorted order */
250       foreach_dlist(li, jq->waiting_jobs) {
251          Dmsg2(300, "waiting item jobid=%d priority=%d\n",
252             li->jcr->JobId, li->jcr->JobPriority);
253          if (li->jcr->JobPriority > jcr->JobPriority) {
254             jq->waiting_jobs->insert_before(item, li);
255             Dmsg2(300, "insert_before jobid=%d before waiting job=%d\n", 
256                li->jcr->JobId, jcr->JobId);
257             inserted = true;
258             break;
259          }
260       }
261       /* If not jobs in wait queue, append it */
262       if (!inserted) {
263          jq->waiting_jobs->append(item);
264          Dmsg1(300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
265       }
266    }
267
268    /* Ensure that at least one server looks at the queue. */
269    stat = start_server(jq);
270
271    pthread_mutex_unlock(&jq->mutex);
272    Dmsg0(300, "Return jobq_add\n");
273    return stat;
274 }
275
276 /*
277  *  Remove a job from the job queue. Used only by cancel_job().
278  *    jq is a queue that was created with jobq_init
279  *    work_item is an element of work
280  *
281  *   Note, it is "removed" from the job queue.
282  *    If you want to cancel it, you need to provide some external means
283  *    of doing so (e.g. pthread_kill()).
284  */
285 int jobq_remove(jobq_t *jq, JCR *jcr)
286 {
287    int stat;
288    bool found = false;
289    jobq_item_t *item;
290     
291    Dmsg2(300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
292    if (jq->valid != JOBQ_VALID) {
293       return EINVAL;
294    }
295
296    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
297       Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat));
298       return stat;
299    }
300
301    foreach_dlist(item, jq->waiting_jobs) {
302       if (jcr == item->jcr) {
303          found = true;
304          break;
305       }
306    }
307    if (!found) {
308       pthread_mutex_unlock(&jq->mutex);
309       Dmsg2(300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
310       return EINVAL;
311    }
312
313    /* Move item to be the first on the list */
314    jq->waiting_jobs->remove(item);
315    jq->ready_jobs->prepend(item);
316    Dmsg2(300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
317    
318    stat = start_server(jq);
319
320    pthread_mutex_unlock(&jq->mutex);
321    Dmsg0(300, "Return jobq_remove\n");
322    return stat;
323 }
324
325
326 /*
327  * Start the server thread if it isn't already running
328  */
329 static int start_server(jobq_t *jq)
330 {
331    int stat = 0;
332    pthread_t id;
333
334    /* if any threads are idle, wake one */
335    if (jq->idle_workers > 0) {
336       Dmsg0(300, "Signal worker to wake up\n");
337       if ((stat = pthread_cond_signal(&jq->work)) != 0) {
338          Jmsg1(NULL, M_ERROR, 0, "pthread_cond_signal: ERR=%s\n", strerror(stat));
339          return stat;
340       }
341    } else if (jq->num_workers < jq->max_workers) {
342       Dmsg0(300, "Create worker thread\n");
343       /* No idle threads so create a new one */
344       set_thread_concurrency(jq->max_workers + 1);
345       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
346          Jmsg1(NULL, M_ERROR, 0, "pthread_create: ERR=%s\n", strerror(stat));
347          return stat;
348       }
349    }
350    return stat;
351 }
352
353
354 /* 
355  * This is the worker thread that serves the job queue.
356  * When all the resources are acquired for the job, 
357  *  it will call the user's engine.
358  */
359 extern "C"  
360 void *jobq_server(void *arg)
361 {
362    struct timespec timeout;
363    jobq_t *jq = (jobq_t *)arg;
364    jobq_item_t *je;                   /* job entry in queue */
365    int stat;
366    bool timedout = false;
367    bool work = true;
368
369    Dmsg0(300, "Start jobq_server\n");
370    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
371       Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat));
372       return NULL;
373    }
374    jq->num_workers++;
375
376    for (;;) {
377       struct timeval tv;
378       struct timezone tz;
379
380       Dmsg0(300, "Top of for loop\n");
381       if (!work && !jq->quit) {
382          gettimeofday(&tv, &tz);
383          timeout.tv_nsec = 0;
384          timeout.tv_sec = tv.tv_sec + 4;
385
386          while (!jq->quit) {
387             /*
388              * Wait 4 seconds, then if no more work, exit
389              */
390             Dmsg0(300, "pthread_cond_timedwait()\n");
391             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
392             if (stat == ETIMEDOUT) {
393                Dmsg0(300, "timedwait timedout.\n");
394                timedout = true;
395                break;
396             } else if (stat != 0) {
397                /* This shouldn't happen */
398                Dmsg0(300, "This shouldn't happen\n");
399                jq->num_workers--;
400                pthread_mutex_unlock(&jq->mutex);
401                return NULL;
402             }
403             break;
404          } 
405       }
406       /* 
407        * If anything is in the ready queue, run it
408        */
409       Dmsg0(300, "Checking ready queue.\n");
410       while (!jq->ready_jobs->empty() && !jq->quit) {
411          JCR *jcr;
412          je = (jobq_item_t *)jq->ready_jobs->first(); 
413          jcr = je->jcr;
414          jq->ready_jobs->remove(je);
415          if (!jq->ready_jobs->empty()) {
416             Dmsg0(300, "ready queue not empty start server\n");
417             if (start_server(jq) != 0) {
418                jq->num_workers--;
419                pthread_mutex_unlock(&jq->mutex);
420                return NULL;
421             }
422          }
423          jq->running_jobs->append(je);
424          Dmsg1(300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
425
426          /* Release job queue lock */
427          if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
428             Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", strerror(stat));
429             jq->num_workers--;
430             return NULL;
431          }
432
433          /* Call user's routine here */
434          Dmsg1(300, "Calling user engine for jobid=%d\n", jcr->JobId);
435          jq->engine(je->jcr);
436
437          Dmsg1(300, "Back from user engine jobid=%d.\n", jcr->JobId);
438
439          /* Reacquire job queue lock */
440          if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
441             Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat));
442             jq->num_workers--;
443             free(je);                 /* release job entry */
444             return NULL;
445          }
446          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
447          jq->running_jobs->remove(je);
448          /* 
449           * Release locks if acquired. Note, they will not have
450           *  been acquired for jobs canceled before they were
451           *  put into the ready queue.
452           */
453          if (jcr->acquired_resource_locks) {
454             jcr->store->NumConcurrentJobs--;
455             if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
456                jcr->store->MaxConcurrentJobs = jcr->saveMaxConcurrentJobs;  
457             }
458             jcr->client->NumConcurrentJobs--;
459             jcr->job->NumConcurrentJobs--;
460          }
461
462          /*
463           * Reschedule the job if necessary and requested
464           */
465          if (jcr->job->RescheduleOnError && 
466              jcr->JobStatus != JS_Terminated &&
467              jcr->JobStatus != JS_Canceled && 
468              jcr->job->RescheduleTimes > 0 && 
469              jcr->reschedule_count < jcr->job->RescheduleTimes) {
470              char dt[50];
471
472              /*
473               * Reschedule this job by cleaning it up, but
474               *  reuse the same JobId if possible.
475               */
476             jcr->reschedule_count++;
477             jcr->sched_time = time(NULL) + jcr->job->RescheduleInterval;
478             Dmsg2(300, "Rescheduled Job %s to re-run in %d seconds.\n", jcr->Job,
479                (int)jcr->job->RescheduleInterval);
480             bstrftime(dt, sizeof(dt), time(NULL));
481             Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds.\n"),
482                jcr->Job, dt, (int)jcr->job->RescheduleInterval);
483             dird_free_jcr(jcr);          /* partial cleanup old stuff */
484             jcr->JobStatus = JS_WaitStartTime;
485             jcr->SDJobStatus = 0;
486             if (jcr->JobBytes == 0) {
487                Dmsg1(300, "Requeue job=%d\n", jcr->JobId);
488                jcr->JobStatus = JS_WaitStartTime;
489                V(jq->mutex);
490                jobq_add(jq, jcr);     /* queue the job to run again */
491                P(jq->mutex);
492                free(je);              /* free the job entry */
493                continue;              /* look for another job to run */
494             }
495             /* 
496              * Something was actually backed up, so we cannot reuse
497              *   the old JobId or there will be database record
498              *   conflicts.  We now create a new job, copying the
499              *   appropriate fields.
500              */
501             JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
502             set_jcr_defaults(njcr, jcr->job);
503             njcr->reschedule_count = jcr->reschedule_count;
504             njcr->JobLevel = jcr->JobLevel;
505             njcr->JobStatus = jcr->JobStatus;
506             copy_storage(njcr, jcr);
507             njcr->messages = jcr->messages;
508             Dmsg0(300, "Call to run new job\n");
509             V(jq->mutex);
510             run_job(njcr);            /* This creates a "new" job */
511             free_jcr(njcr);           /* release "new" jcr */
512             P(jq->mutex);
513             Dmsg0(300, "Back from running new job.\n");
514          }
515          /* Clean up and release old jcr */
516          if (jcr->db) {
517             db_close_database(jcr, jcr->db);
518             jcr->db = NULL;
519          }
520          Dmsg2(300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count);
521          jcr->SDJobStatus = 0;
522          V(jq->mutex);                /* release internal lock */
523          free_jcr(jcr);
524          free(je);                    /* release job entry */
525          P(jq->mutex);                /* reacquire job queue lock */
526       }
527       /*
528        * If any job in the wait queue can be run,
529        *  move it to the ready queue
530        */
531       Dmsg0(300, "Done check ready, now check wait queue.\n");
532       while (!jq->waiting_jobs->empty() && !jq->quit) {
533          int Priority;
534          je = (jobq_item_t *)jq->waiting_jobs->first(); 
535          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
536          if (re) {
537             Priority = re->jcr->JobPriority;
538             Dmsg2(300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId, Priority);
539          } else {
540             Priority = je->jcr->JobPriority;
541             Dmsg1(300, "No job running. Look for Job pri=%d\n", Priority);
542          }
543          /*
544           * Walk down the list of waiting jobs and attempt
545           *   to acquire the resources it needs.
546           */
547          for ( ; je;  ) {
548             /* je is current job item on the queue, jn is the next one */
549             JCR *jcr = je->jcr;
550             bool skip_this_jcr = false;
551             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
552             Dmsg3(300, "Examining Job=%d JobPri=%d want Pri=%d\n",
553                jcr->JobId, jcr->JobPriority, Priority);
554             /* Take only jobs of correct Priority */
555             if (jcr->JobPriority != Priority) {
556                set_jcr_job_status(jcr, JS_WaitPriority);
557                break;
558             }
559             if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
560                /* Let only one Restore/verify job run at a time regardless of MaxConcurrentJobs */
561                if (jcr->store->NumConcurrentJobs == 0) {
562                   jcr->store->NumConcurrentJobs++;
563                   jcr->saveMaxConcurrentJobs = jcr->store->MaxConcurrentJobs;
564                   jcr->store->MaxConcurrentJobs = 1;
565                } else {
566                   set_jcr_job_status(jcr, JS_WaitStoreRes);
567                   je = jn;            /* point to next waiting job */
568                   continue;
569                }
570             /* We are not doing a Restore or Verify */
571             } else if (jcr->store->NumConcurrentJobs == 0 &&
572                        jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
573                 /* Simple case, first job */
574                 jcr->store->NumConcurrentJobs = 1;  
575             } else if (jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
576                /*
577                 * At this point, we already have at least one Job running 
578                 *  for this Storage daemon, so we must ensure that there
579                 *  is no Volume conflict. In general, it should be OK, if
580                 *  all Jobs pull from the same Pool, so we check the Pools.
581                 */
582                 JCR *njcr;
583                 lock_jcr_chain();
584                 for (njcr=jobs; njcr; njcr=njcr->next) {
585                    if (njcr->JobId == 0 || njcr == jcr) {
586                       continue;
587                    }
588                    if (njcr->pool != jcr->pool) {
589                       skip_this_jcr = true;
590                       break;
591                    }
592                 }  
593                 unlock_jcr_chain();
594                 if (!skip_this_jcr) {
595                    jcr->store->NumConcurrentJobs++;    
596                 }
597             } 
598             if (skip_this_jcr) {
599                set_jcr_job_status(jcr, JS_WaitStoreRes);
600                je = jn;               /* point to next waiting job */
601                continue;
602             }
603
604             if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
605                jcr->client->NumConcurrentJobs++;
606             } else {
607                /* Back out previous locks */
608                jcr->store->NumConcurrentJobs--;
609                if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
610                   jcr->store->MaxConcurrentJobs = jcr->saveMaxConcurrentJobs;  
611                }
612                set_jcr_job_status(jcr, JS_WaitClientRes);
613                je = jn;               /* point to next waiting job */
614                continue;
615             }
616             if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
617                jcr->job->NumConcurrentJobs++;
618             } else {
619                /* Back out previous locks */
620                jcr->store->NumConcurrentJobs--;
621                if (jcr->JobType == JT_RESTORE || jcr->JobType == JT_VERIFY) {
622                   jcr->store->MaxConcurrentJobs = jcr->saveMaxConcurrentJobs;  
623                }
624                jcr->client->NumConcurrentJobs--;
625                set_jcr_job_status(jcr, JS_WaitJobRes);
626                je = jn;               /* Point to next waiting job */
627                continue;
628             }
629             /* Got all locks, now remove it from wait queue and append it
630              *   to the ready queue  
631              */
632             jcr->acquired_resource_locks = true;
633             jq->waiting_jobs->remove(je);
634             jq->ready_jobs->append(je);
635             Dmsg1(300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
636             je = jn;                  /* Point to next waiting job */
637          } /* end for loop */
638          break;
639       } /* end while loop */
640       Dmsg0(300, "Done checking wait queue.\n");
641       /*
642        * If no more ready work and we are asked to quit, then do it
643        */
644       if (jq->ready_jobs->empty() && jq->quit) {
645          jq->num_workers--;
646          if (jq->num_workers == 0) {
647             Dmsg0(300, "Wake up destroy routine\n");
648             /* Wake up destroy routine if he is waiting */
649             pthread_cond_broadcast(&jq->work);
650          }
651          break;
652       }
653       Dmsg0(300, "Check for work request\n");
654       /* 
655        * If no more work requests, and we waited long enough, quit
656        */
657       Dmsg2(300, "timedout=%d read empty=%d\n", timedout,
658          jq->ready_jobs->empty());
659       if (jq->ready_jobs->empty() && timedout) {
660          Dmsg0(300, "break big loop\n");
661          jq->num_workers--;
662          break;
663       }
664
665       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
666       if (work) {
667          /*          
668           * If a job is waiting on a Resource, don't consume all
669           *   the CPU time looping looking for work, and even more
670           *   important, release the lock so that a job that has
671           *   terminated can give us the resource.
672           */
673          if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
674             Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", strerror(stat));
675             jq->num_workers--;
676             return NULL;
677          }
678          bmicrosleep(2, 0);              /* pause for 2 seconds */
679          if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
680             Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat));
681             jq->num_workers--;
682             return NULL;
683          }
684          /* Recompute work as something may have changed in last 2 secs */
685          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
686       }
687       Dmsg1(300, "Loop again. work=%d\n", work);
688    } /* end of big for loop */
689
690    Dmsg0(200, "unlock mutex\n");
691    if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
692       Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", strerror(stat));
693    }
694    Dmsg0(300, "End jobq_server\n");
695    return NULL;
696 }