]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/jobq.c
kes Pass jcr to tls routines so debug messages can be handled better.
[bacula/bacula] / bacula / src / dird / jobq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula job queue routines.
30  *
31  *  This code consists of three queues, the waiting_jobs
32  *  queue, where jobs are initially queued, the ready_jobs
33  *  queue, where jobs are placed when all the resources are
34  *  allocated and they can immediately be run, and the
35  *  running queue where jobs are placed when they are
36  *  running.
37  *
38  *  Kern Sibbald, July MMIII
39  *
40  *   Version $Id$
41  *
42  *  This code was adapted from the Bacula workq, which was
43  *    adapted from "Programming with POSIX Threads", by
44  *    David R. Butenhof
45  *
46  */
47
48 #include "bacula.h"
49 #include "dird.h"
50
51 extern JCR *jobs;
52
53 /* Forward referenced functions */
54 extern "C" void *jobq_server(void *arg);
55 extern "C" void *sched_wait(void *arg);
56
57 static int  start_server(jobq_t *jq);
58 static bool acquire_resources(JCR *jcr);
59
60
61
62 /*
63  * Initialize a job queue
64  *
65  *  Returns: 0 on success
66  *           errno on failure
67  */
68 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
69 {
70    int stat;
71    jobq_item_t *item = NULL;
72
73    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
74       berrno be;
75       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
76       return stat;
77    }
78    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
79       pthread_attr_destroy(&jq->attr);
80       return stat;
81    }
82    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
83       berrno be;
84       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
85       pthread_attr_destroy(&jq->attr);
86       return stat;
87    }
88    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
89       berrno be;
90       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
91       pthread_mutex_destroy(&jq->mutex);
92       pthread_attr_destroy(&jq->attr);
93       return stat;
94    }
95    jq->quit = false;
96    jq->max_workers = threads;         /* max threads to create */
97    jq->num_workers = 0;               /* no threads yet */
98    jq->idle_workers = 0;              /* no idle threads */
99    jq->engine = engine;               /* routine to run */
100    jq->valid = JOBQ_VALID;
101    /* Initialize the job queues */
102    jq->waiting_jobs = New(dlist(item, &item->link));
103    jq->running_jobs = New(dlist(item, &item->link));
104    jq->ready_jobs = New(dlist(item, &item->link));
105    return 0;
106 }
107
108 /*
109  * Destroy the job queue
110  *
111  * Returns: 0 on success
112  *          errno on failure
113  */
114 int jobq_destroy(jobq_t *jq)
115 {
116    int stat, stat1, stat2;
117
118    if (jq->valid != JOBQ_VALID) {
119       return EINVAL;
120    }
121    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
122       berrno be;
123       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
124       return stat;
125    }
126    jq->valid = 0;                      /* prevent any more operations */
127
128    /* 
129     * If any threads are active, wake them 
130     */
131    if (jq->num_workers > 0) {
132       jq->quit = true;
133       if (jq->idle_workers) {
134          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
135             berrno be;
136             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
137             pthread_mutex_unlock(&jq->mutex);
138             return stat;
139          }
140       }
141       while (jq->num_workers > 0) {
142          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
143             berrno be;
144             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
145             pthread_mutex_unlock(&jq->mutex);
146             return stat;
147          }
148       }
149    }
150    if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
151       berrno be;
152       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.bstrerror(stat));
153       return stat;
154    }
155    stat  = pthread_mutex_destroy(&jq->mutex);
156    stat1 = pthread_cond_destroy(&jq->work);
157    stat2 = pthread_attr_destroy(&jq->attr);
158    delete jq->waiting_jobs;
159    delete jq->running_jobs;
160    delete jq->ready_jobs;
161    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
162 }
163
164 struct wait_pkt {
165    JCR *jcr;
166    jobq_t *jq;
167 };
168
169 /*
170  * Wait until schedule time arrives before starting. Normally
171  *  this routine is only used for jobs started from the console
172  *  for which the user explicitly specified a start time. Otherwise
173  *  most jobs are put into the job queue only when their
174  *  scheduled time arives.
175  */
176 extern "C"
177 void *sched_wait(void *arg)
178 {
179    JCR *jcr = ((wait_pkt *)arg)->jcr;
180    jobq_t *jq = ((wait_pkt *)arg)->jq;
181
182    Dmsg0(2300, "Enter sched_wait.\n");
183    free(arg);
184    time_t wtime = jcr->sched_time - time(NULL);
185    set_jcr_job_status(jcr, JS_WaitStartTime);
186    /* Wait until scheduled time arrives */
187    if (wtime > 0) {
188       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
189          jcr->Job, wtime);
190    }
191    /* Check every 30 seconds if canceled */
192    while (wtime > 0) {
193       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", 
194          jcr->JobId, wtime, jcr->use_count());
195       if (wtime > 30) {
196          wtime = 30;
197       }
198       bmicrosleep(wtime, 0);
199       if (job_canceled(jcr)) {
200          break;
201       }
202       wtime = jcr->sched_time - time(NULL);
203    }
204    Dmsg1(200, "resched use=%d\n", jcr->use_count());
205    jobq_add(jq, jcr);
206    free_jcr(jcr);                     /* we are done with jcr */
207    Dmsg0(2300, "Exit sched_wait\n");
208    return NULL;
209 }
210
211 /*
212  *  Add a job to the queue
213  *    jq is a queue that was created with jobq_init
214  */
215 int jobq_add(jobq_t *jq, JCR *jcr)
216 {
217    int stat;
218    jobq_item_t *item, *li;
219    bool inserted = false;
220    time_t wtime = jcr->sched_time - time(NULL);
221    pthread_t id;
222    wait_pkt *sched_pkt;
223
224    if (!jcr->term_wait_inited) { 
225       /* Initialize termination condition variable */
226       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
227          berrno be;
228          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
229          return stat;
230       }
231       jcr->term_wait_inited = true;
232    }                           
233                              
234    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
235    if (jq->valid != JOBQ_VALID) {
236       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
237       return EINVAL;
238    }
239
240    jcr->inc_use_count();                 /* mark jcr in use by us */
241    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
242    if (!job_canceled(jcr) && wtime > 0) {
243       set_thread_concurrency(jq->max_workers + 2);
244       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
245       sched_pkt->jcr = jcr;
246       sched_pkt->jq = jq;
247       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);        
248       if (stat != 0) {                /* thread not created */
249          berrno be;
250          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
251       }
252       return stat;
253    }
254
255    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
256       berrno be;
257       Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
258       free_jcr(jcr);                    /* release jcr */
259       return stat;
260    }
261
262    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
263       free_jcr(jcr);                    /* release jcr */
264       return ENOMEM;
265    }
266    item->jcr = jcr;
267
268    if (job_canceled(jcr)) {
269       /* Add job to ready queue so that it is canceled quickly */
270       jq->ready_jobs->prepend(item);
271       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
272    } else {
273       /* Add this job to the wait queue in priority sorted order */
274       foreach_dlist(li, jq->waiting_jobs) {
275          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
276             li->jcr->JobId, li->jcr->JobPriority);
277          if (li->jcr->JobPriority > jcr->JobPriority) {
278             jq->waiting_jobs->insert_before(item, li);
279             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
280                li->jcr->JobId, jcr->JobId);
281             inserted = true;
282             break;
283          }
284       }
285       /* If not jobs in wait queue, append it */
286       if (!inserted) {
287          jq->waiting_jobs->append(item);
288          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
289       }
290    }
291
292    /* Ensure that at least one server looks at the queue. */
293    stat = start_server(jq);
294
295    pthread_mutex_unlock(&jq->mutex);
296    Dmsg0(2300, "Return jobq_add\n");
297    return stat;
298 }
299
300 /*
301  *  Remove a job from the job queue. Used only by cancel_job().
302  *    jq is a queue that was created with jobq_init
303  *    work_item is an element of work
304  *
305  *   Note, it is "removed" from the job queue.
306  *    If you want to cancel it, you need to provide some external means
307  *    of doing so (e.g. pthread_kill()).
308  */
309 int jobq_remove(jobq_t *jq, JCR *jcr)
310 {
311    int stat;
312    bool found = false;
313    jobq_item_t *item;
314
315    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
316    if (jq->valid != JOBQ_VALID) {
317       return EINVAL;
318    }
319
320    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
321       berrno be;
322       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
323       return stat;
324    }
325
326    foreach_dlist(item, jq->waiting_jobs) {
327       if (jcr == item->jcr) {
328          found = true;
329          break;
330       }
331    }
332    if (!found) {
333       pthread_mutex_unlock(&jq->mutex);
334       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
335       return EINVAL;
336    }
337
338    /* Move item to be the first on the list */
339    jq->waiting_jobs->remove(item);
340    jq->ready_jobs->prepend(item);
341    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
342
343    stat = start_server(jq);
344
345    pthread_mutex_unlock(&jq->mutex);
346    Dmsg0(2300, "Return jobq_remove\n");
347    return stat;
348 }
349
350
351 /*
352  * Start the server thread if it isn't already running
353  */
354 static int start_server(jobq_t *jq)
355 {
356    int stat = 0;
357    pthread_t id;
358
359    /*
360     * if any threads are idle, wake one.
361     *   Actually we do a broadcast because on /lib/tls 
362     *   these signals seem to get lost from time to time.
363     */
364    if (jq->idle_workers > 0) {
365       Dmsg0(2300, "Signal worker to wake up\n");
366       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
367          berrno be;
368          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
369          return stat;
370       }
371    } else if (jq->num_workers < jq->max_workers) {
372       Dmsg0(2300, "Create worker thread\n");
373       /* No idle threads so create a new one */
374       set_thread_concurrency(jq->max_workers + 1);
375       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
376          berrno be;
377          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
378          return stat;
379       }
380    }
381    return stat;
382 }
383
384
385 /*
386  * This is the worker thread that serves the job queue.
387  * When all the resources are acquired for the job,
388  *  it will call the user's engine.
389  */
390 extern "C"
391 void *jobq_server(void *arg)
392 {
393    struct timespec timeout;
394    jobq_t *jq = (jobq_t *)arg;
395    jobq_item_t *je;                   /* job entry in queue */
396    int stat;
397    bool timedout = false;
398    bool work = true;
399
400    Dmsg0(2300, "Start jobq_server\n");
401    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
402       berrno be;
403       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
404       return NULL;
405    }
406    jq->num_workers++;
407
408    for (;;) {
409       struct timeval tv;
410       struct timezone tz;
411
412       Dmsg0(2300, "Top of for loop\n");
413       if (!work && !jq->quit) {
414          gettimeofday(&tv, &tz);
415          timeout.tv_nsec = 0;
416          timeout.tv_sec = tv.tv_sec + 4;
417
418          while (!jq->quit) {
419             /*
420              * Wait 4 seconds, then if no more work, exit
421              */
422             Dmsg0(2300, "pthread_cond_timedwait()\n");
423             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
424             if (stat == ETIMEDOUT) {
425                Dmsg0(2300, "timedwait timedout.\n");
426                timedout = true;
427                break;
428             } else if (stat != 0) {
429                /* This shouldn't happen */
430                Dmsg0(2300, "This shouldn't happen\n");
431                jq->num_workers--;
432                pthread_mutex_unlock(&jq->mutex);
433                return NULL;
434             }
435             break;
436          }
437       }
438       /*
439        * If anything is in the ready queue, run it
440        */
441       Dmsg0(2300, "Checking ready queue.\n");
442       while (!jq->ready_jobs->empty() && !jq->quit) {
443          JCR *jcr;
444          je = (jobq_item_t *)jq->ready_jobs->first();
445          jcr = je->jcr;
446          jq->ready_jobs->remove(je);
447          if (!jq->ready_jobs->empty()) {
448             Dmsg0(2300, "ready queue not empty start server\n");
449             if (start_server(jq) != 0) {
450                jq->num_workers--;
451                pthread_mutex_unlock(&jq->mutex);
452                return NULL;
453             }
454          }
455          jq->running_jobs->append(je);
456          set_jcr_in_tsd(jcr);
457          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
458
459          /* Release job queue lock */
460          V(jq->mutex);
461
462          /* Call user's routine here */
463          Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
464             jcr->use_count());
465          jq->engine(je->jcr);
466
467          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
468             jcr->use_count());
469
470          /* Reacquire job queue lock */
471          P(jq->mutex);
472          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
473          jq->running_jobs->remove(je);
474          /*
475           * Release locks if acquired. Note, they will not have
476           *  been acquired for jobs canceled before they were
477           *  put into the ready queue.
478           */
479          if (jcr->acquired_resource_locks) {
480             if (jcr->rstore) {
481                jcr->rstore->NumConcurrentJobs--;
482                Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
483                ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
484             }
485             if (jcr->wstore) {
486                jcr->wstore->NumConcurrentJobs--;
487                Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
488                ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
489             }
490             jcr->client->NumConcurrentJobs--;
491             jcr->job->NumConcurrentJobs--;
492             jcr->acquired_resource_locks = false;
493          }
494
495          /*
496           * Reschedule the job if necessary and requested
497           */
498          if (jcr->job->RescheduleOnError &&
499              jcr->JobStatus != JS_Terminated &&
500              jcr->JobStatus != JS_Canceled &&
501              jcr->JobType == JT_BACKUP &&
502              (jcr->job->RescheduleTimes == 0 ||
503               jcr->reschedule_count < jcr->job->RescheduleTimes)) {
504              char dt[50], dt2[50];
505
506              /*
507               * Reschedule this job by cleaning it up, but
508               *  reuse the same JobId if possible.
509               */
510             time_t now = time(NULL);
511             jcr->reschedule_count++;
512             jcr->sched_time = now + jcr->job->RescheduleInterval;
513             bstrftime(dt, sizeof(dt), now);
514             bstrftime(dt2, sizeof(dt2), jcr->sched_time);
515             Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
516                   (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
517             Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
518                  jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
519             dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
520             jcr->JobStatus = -1;
521             set_jcr_job_status(jcr, JS_WaitStartTime);
522             jcr->SDJobStatus = 0;
523             if (jcr->JobBytes == 0) {
524                Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
525                V(jq->mutex);
526                jobq_add(jq, jcr);     /* queue the job to run again */
527                P(jq->mutex);
528                free_jcr(jcr);         /* release jcr */
529                free(je);              /* free the job entry */
530                continue;              /* look for another job to run */
531             }
532             /*
533              * Something was actually backed up, so we cannot reuse
534              *   the old JobId or there will be database record
535              *   conflicts.  We now create a new job, copying the
536              *   appropriate fields.
537              */           
538             JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
539             set_jcr_defaults(njcr, jcr->job);
540             njcr->reschedule_count = jcr->reschedule_count;
541             njcr->sched_time = jcr->sched_time;
542             njcr->JobLevel = jcr->JobLevel;
543             njcr->JobStatus = -1;
544             set_jcr_job_status(njcr, jcr->JobStatus);
545             if (jcr->rstore) {
546                copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
547             } else {
548                free_rstorage(njcr);
549             }
550             if (jcr->wstore) {
551                copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
552             } else {
553                free_wstorage(njcr);
554             }
555             njcr->messages = jcr->messages;
556             Dmsg0(2300, "Call to run new job\n");
557             V(jq->mutex);
558             run_job(njcr);            /* This creates a "new" job */
559             free_jcr(njcr);           /* release "new" jcr */
560             P(jq->mutex);
561             Dmsg0(2300, "Back from running new job.\n");
562          }
563          /* Clean up and release old jcr */
564          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
565          jcr->SDJobStatus = 0;
566          V(jq->mutex);                /* release internal lock */
567          free_jcr(jcr);
568          free(je);                    /* release job entry */
569          P(jq->mutex);                /* reacquire job queue lock */
570       }
571       /*
572        * If any job in the wait queue can be run,
573        *  move it to the ready queue
574        */
575       Dmsg0(2300, "Done check ready, now check wait queue.\n");
576       if (!jq->waiting_jobs->empty() && !jq->quit) {
577          int Priority;
578          je = (jobq_item_t *)jq->waiting_jobs->first();
579          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
580          if (re) {
581             Priority = re->jcr->JobPriority;
582             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId, Priority);
583          } else {
584             Priority = je->jcr->JobPriority;
585             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
586          }
587          /*
588           * Walk down the list of waiting jobs and attempt
589           *   to acquire the resources it needs.
590           */
591          for ( ; je;  ) {
592             /* je is current job item on the queue, jn is the next one */
593             JCR *jcr = je->jcr;
594             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
595
596             Dmsg3(2300, "Examining Job=%d JobPri=%d want Pri=%d\n",
597                jcr->JobId, jcr->JobPriority, Priority);
598
599             /* Take only jobs of correct Priority */
600             if (jcr->JobPriority != Priority) {
601                set_jcr_job_status(jcr, JS_WaitPriority);
602                break;
603             }
604
605             if (!acquire_resources(jcr)) {
606                /* If resource conflict, job is canceled */
607                if (!job_canceled(jcr)) {
608                   je = jn;            /* point to next waiting job */
609                   continue;
610                }
611             }
612
613             /*
614              * Got all locks, now remove it from wait queue and append it
615              *   to the ready queue.  Note, we may also get here if the
616              *    job was canceled.  Once it is "run", it will quickly
617              *    terminate.
618              */
619             jq->waiting_jobs->remove(je);
620             jq->ready_jobs->append(je);
621             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
622             je = jn;                  /* Point to next waiting job */
623          } /* end for loop */
624
625       } /* end if */
626
627       Dmsg0(2300, "Done checking wait queue.\n");
628       /*
629        * If no more ready work and we are asked to quit, then do it
630        */
631       if (jq->ready_jobs->empty() && jq->quit) {
632          jq->num_workers--;
633          if (jq->num_workers == 0) {
634             Dmsg0(2300, "Wake up destroy routine\n");
635             /* Wake up destroy routine if he is waiting */
636             pthread_cond_broadcast(&jq->work);
637          }
638          break;
639       }
640       Dmsg0(2300, "Check for work request\n");
641       /*
642        * If no more work requests, and we waited long enough, quit
643        */
644       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
645          jq->ready_jobs->empty());
646       if (jq->ready_jobs->empty() && timedout) {
647          Dmsg0(2300, "break big loop\n");
648          jq->num_workers--;
649          break;
650       }
651
652       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
653       if (work) {
654          /*
655           * If a job is waiting on a Resource, don't consume all
656           *   the CPU time looping looking for work, and even more
657           *   important, release the lock so that a job that has
658           *   terminated can give us the resource.
659           */
660          V(jq->mutex);
661          bmicrosleep(2, 0);              /* pause for 2 seconds */
662          P(jq->mutex);
663          /* Recompute work as something may have changed in last 2 secs */
664          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
665       }
666       Dmsg1(2300, "Loop again. work=%d\n", work);
667    } /* end of big for loop */
668
669    Dmsg0(200, "unlock mutex\n");
670    V(jq->mutex);
671    Dmsg0(2300, "End jobq_server\n");
672    return NULL;
673 }
674
675 /*
676  * See if we can acquire all the necessary resources for the job (JCR)
677  *
678  *  Returns: true  if successful
679  *           false if resource failure
680  */
681 static bool acquire_resources(JCR *jcr)
682 {
683    bool skip_this_jcr = false;
684
685    jcr->acquired_resource_locks = false;
686    if (jcr->rstore == jcr->wstore) {           /* deadlock */
687       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
688          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"), 
689          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
690       set_jcr_job_status(jcr, JS_Canceled);
691       return false;
692    }
693    if (jcr->rstore) {
694       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
695       if (jcr->rstore->NumConcurrentJobs == 0 &&
696           jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
697          /* Simple case, first job */
698          jcr->rstore->NumConcurrentJobs = 1;
699          Dmsg0(200, "Set rncj=1\n");
700       /* We can do this only if multi-drive autochanger */
701 //    } else if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
702 //       jcr->rstore->NumConcurrentJobs++;
703 //       Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
704       } else {
705          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
706          set_jcr_job_status(jcr, JS_WaitStoreRes);
707          return false;
708       }
709    }
710    
711    if (jcr->wstore) {
712       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
713       if (jcr->wstore->NumConcurrentJobs == 0 &&
714           jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
715          /* Simple case, first job */
716          jcr->wstore->NumConcurrentJobs = 1;
717          Dmsg0(200, "Set wncj=1\n");
718       } else if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
719          jcr->wstore->NumConcurrentJobs++;
720          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
721       } else if (jcr->rstore) {
722          jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
723          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
724          ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
725          skip_this_jcr = true;
726       } else {
727          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
728          skip_this_jcr = true;
729       }
730    }
731    if (skip_this_jcr) {
732       set_jcr_job_status(jcr, JS_WaitStoreRes);
733       return false;
734    }
735
736    if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
737       jcr->client->NumConcurrentJobs++;
738    } else {
739       /* Back out previous locks */
740       if (jcr->wstore) {
741          jcr->wstore->NumConcurrentJobs--;
742          Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
743          ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
744       }
745       if (jcr->rstore) {
746          jcr->rstore->NumConcurrentJobs--;  
747          Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
748          ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
749       }
750       set_jcr_job_status(jcr, JS_WaitClientRes);
751       return false;
752    }
753    if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
754       jcr->job->NumConcurrentJobs++;
755    } else {
756       /* Back out previous locks */
757       if (jcr->wstore) {
758          jcr->wstore->NumConcurrentJobs--;
759          Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
760          ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
761       }
762       if (jcr->rstore) {
763          jcr->rstore->NumConcurrentJobs--;
764          Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
765          ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
766       }
767       jcr->client->NumConcurrentJobs--;
768       set_jcr_job_status(jcr, JS_WaitJobRes);
769       return false;
770    }
771
772    jcr->acquired_resource_locks = true;
773    return true;
774 }