From: Kern Sibbald Date: Sun, 20 Jul 2003 20:46:35 +0000 (+0000) Subject: Fix priority scheduling bugs X-Git-Tag: Release-7.0.0~10056 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=136516ad03f486471779affa0084cc1fe15dd588;p=bacula%2Fbacula Fix priority scheduling bugs git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@638 91ce42f0-d328-0410-95d8-f526ca767f89 --- diff --git a/bacula/src/dird/jobq.c b/bacula/src/dird/jobq.c index cc9502df1b..bff56d1ecd 100755 --- a/bacula/src/dird/jobq.c +++ b/bacula/src/dird/jobq.c @@ -189,7 +189,7 @@ int jobq_add(jobq_t *jq, JCR *jcr) wait_pkt *sched_pkt; - Dmsg0(100, "jobq_add\n"); + Dmsg1(100, "jobq_add jobid=%d\n", jcr->JobId); if (jq->valid != JOBQ_VALID) { return EINVAL; } @@ -213,25 +213,28 @@ int jobq_add(jobq_t *jq, JCR *jcr) } item->jcr = jcr; - Dmsg1(100, "add 0x%x to queue\n", (unsigned)item); if (job_canceled(jcr)) { /* Add job to ready queue so that it is canceled quickly */ jq->ready_jobs->prepend(item); + Dmsg1(100, "Prepended job=%d to ready queue\n", jcr->JobId); } else { /* Add this job to the wait queue in priority sorted order */ for (li=NULL; (li=(jobq_item_t *)jq->waiting_jobs->next(li)); ) { - if (li->jcr->JobPriority < jcr->JobPriority) { + Dmsg2(100, "waiting item jobid=%d priority=%d\n", + li->jcr->JobId, li->jcr->JobPriority); + if (li->jcr->JobPriority > jcr->JobPriority) { jq->waiting_jobs->insert_before(item, li); - Dmsg1(100, "insert_before 0x%x\n", (unsigned)li); + Dmsg2(100, "insert_before jobid=%d before %d\n", + li->jcr->JobId, jcr->JobId); inserted = true; + break; } } /* If not jobs in wait queue, append it */ if (!inserted) { jq->waiting_jobs->append(item); - Dmsg0(100, "Appended item.\n"); + Dmsg1(100, "Appended item jobid=%d\n", jcr->JobId); } - Dmsg1(100, "Next=0x%x\n", (unsigned)jq->waiting_jobs->next(item)); } stat = start_server(jq); @@ -256,10 +259,9 @@ int jobq_remove(jobq_t *jq, JCR *jcr) { int stat; bool found = false; - pthread_t id; jobq_item_t *item; - Dmsg0(100, "jobq_remove\n"); + Dmsg1(100, "jobq_remove jobid=%d\n", jcr->JobId); if (jq->valid != JOBQ_VALID) { return EINVAL; } @@ -282,22 +284,9 @@ int jobq_remove(jobq_t *jq, JCR *jcr) jq->waiting_jobs->remove(item); jq->ready_jobs->prepend(item); - /* if any threads are idle, wake one */ - if (jq->idle_workers > 0) { - Dmsg0(100, "Signal worker\n"); - if ((stat = pthread_cond_signal(&jq->work)) != 0) { - pthread_mutex_unlock(&jq->mutex); - return stat; - } - } else { - Dmsg0(100, "Create worker thread\n"); - /* No idle threads so create a new one */ - set_thread_concurrency(jq->max_workers + 1); - if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) { - pthread_mutex_unlock(&jq->mutex); - return stat; - } - jq->num_workers++; + stat = start_server(jq); + if (stat != 0) { + return stat; } pthread_mutex_unlock(&jq->mutex); Dmsg0(100, "Return jobq_remove\n"); @@ -315,7 +304,7 @@ static int start_server(jobq_t *jq) /* if any threads are idle, wake one */ if (jq->idle_workers > 0) { - Dmsg0(100, "Signal worker\n"); + Dmsg0(100, "Signal worker to wake up\n"); if ((stat = pthread_cond_signal(&jq->work)) != 0) { pthread_mutex_unlock(&jq->mutex); return stat; @@ -346,6 +335,7 @@ static void *jobq_server(void *arg) jobq_item_t *je; /* job entry in queue */ int stat; bool timedout; + bool work = true; Dmsg0(100, "Start jobq_server\n"); if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { @@ -363,11 +353,11 @@ static void *jobq_server(void *arg) timeout.tv_nsec = 0; timeout.tv_sec = tv.tv_sec + 4; - while (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && !jq->quit) { + while (!work && !jq->quit) { /* * Wait 4 seconds, then if no more work, exit */ - Dmsg0(100, "pthread_cond_timedwait()\n"); + Dmsg0(200, "pthread_cond_timedwait()\n"); stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout); Dmsg1(100, "timedwait=%d\n", stat); if (stat == ETIMEDOUT) { @@ -389,23 +379,25 @@ static void *jobq_server(void *arg) je = (jobq_item_t *)jq->ready_jobs->first(); jq->ready_jobs->remove(je); if (!jq->ready_jobs->empty()) { + Dmsg0(100, "ready queue not empty start server\n"); if (start_server(jq) != 0) { return NULL; } } jq->running_jobs->append(je); + Dmsg1(100, "Took jobid=%d from ready and appended to run\n", je->jcr->JobId); if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { return NULL; } /* Call user's routine here */ - Dmsg0(100, "Calling user engine.\n"); + Dmsg1(100, "Calling user engine for jobid=%d\n", je->jcr->JobId); jq->engine(je->jcr); - Dmsg0(100, "Back from user engine.\n"); + Dmsg1(100, "Back from user engine jobid=%d.\n", je->jcr->JobId); if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { free(je); /* release job entry */ return NULL; } - Dmsg0(100, "Done lock mutex\n"); + Dmsg0(200, "Done lock mutex\n"); jq->running_jobs->remove(je); /* * Release locks if acquired. Note, they will not have @@ -442,7 +434,6 @@ static void *jobq_server(void *arg) for ( ; je; ) { JCR *jcr = je->jcr; jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je); - Dmsg2(100, "je=0x%x jn=0x%x\n", (unsigned)je, (unsigned)jn); Dmsg3(100, "Examining Job=%d JobPri=%d want Pri=%d\n", jcr->JobId, jcr->JobPriority, Priority); /* Take only jobs of correct Priority */ @@ -477,16 +468,16 @@ static void *jobq_server(void *arg) jcr->acquired_resource_locks = true; jq->waiting_jobs->remove(je); jq->ready_jobs->append(je); - Dmsg1(100, "moved JobId=%d from wait to ready queue\n", - je->jcr->JobId); + Dmsg1(100, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId); je = jn; } /* end for loop */ + break; } /* end while loop */ Dmsg0(100, "Done checking wait queue.\n"); /* - * If no more work request, and we are asked to quit, then do it + * If no more ready work and we are asked to quit, then do it */ - if (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && jq->quit) { + if (jq->ready_jobs->empty() && jq->quit) { jq->num_workers--; if (jq->num_workers == 0) { Dmsg0(100, "Wake up destroy routine\n"); @@ -499,17 +490,18 @@ static void *jobq_server(void *arg) /* * If no more work requests, and we waited long enough, quit */ - Dmsg1(100, "jq empty = %d\n", jq->waiting_jobs->empty()); - Dmsg1(100, "timedout=%d\n", timedout); - if (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && timedout) { + Dmsg2(100, "timedout=%d read empty=%d\n", timedout, + jq->ready_jobs->empty()); + if (jq->ready_jobs->empty() && timedout) { Dmsg0(100, "break big loop\n"); jq->num_workers--; break; } Dmsg0(100, "Loop again\n"); + work = false; } /* end of big for loop */ - Dmsg0(100, "unlock mutex\n"); + Dmsg0(200, "unlock mutex\n"); pthread_mutex_unlock(&jq->mutex); Dmsg0(100, "End jobq_server\n"); return NULL; diff --git a/bacula/src/version.h b/bacula/src/version.h index d490acd647..80409f7401 100644 --- a/bacula/src/version.h +++ b/bacula/src/version.h @@ -21,10 +21,8 @@ #define USE_SEMAPHORE /* Turn this on if you want to use the new Job scheduling code */ -#ifdef xxx #undef USE_SEMAPHORE #define JOB_QUEUE 1 -#endif /* #define NO_ATTRIBUTES_TEST 1 */