]> git.sur5r.net Git - bacula/bacula/commitdiff
Fix priority scheduling bugs
authorKern Sibbald <kern@sibbald.com>
Sun, 20 Jul 2003 20:46:35 +0000 (20:46 +0000)
committerKern Sibbald <kern@sibbald.com>
Sun, 20 Jul 2003 20:46:35 +0000 (20:46 +0000)
git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@638 91ce42f0-d328-0410-95d8-f526ca767f89

bacula/src/dird/jobq.c
bacula/src/version.h

index cc9502df1be70a4adbdfcb7dea290206394c6a9a..bff56d1ecde27b8d0824741e2c42d74212cf5e7e 100755 (executable)
@@ -189,7 +189,7 @@ int jobq_add(jobq_t *jq, JCR *jcr)
    wait_pkt *sched_pkt;
     
     
-   Dmsg0(100, "jobq_add\n");
+   Dmsg1(100, "jobq_add jobid=%d\n", jcr->JobId);
    if (jq->valid != JOBQ_VALID) {
       return EINVAL;
    }
@@ -213,25 +213,28 @@ int jobq_add(jobq_t *jq, JCR *jcr)
    }
    item->jcr = jcr;
 
-   Dmsg1(100, "add 0x%x to queue\n", (unsigned)item);
    if (job_canceled(jcr)) {
       /* Add job to ready queue so that it is canceled quickly */
       jq->ready_jobs->prepend(item);
+      Dmsg1(100, "Prepended job=%d to ready queue\n", jcr->JobId);
    } else {
       /* Add this job to the wait queue in priority sorted order */
       for (li=NULL; (li=(jobq_item_t *)jq->waiting_jobs->next(li)); ) {
-        if (li->jcr->JobPriority < jcr->JobPriority) {
+         Dmsg2(100, "waiting item jobid=%d priority=%d\n",
+           li->jcr->JobId, li->jcr->JobPriority);
+        if (li->jcr->JobPriority > jcr->JobPriority) {
            jq->waiting_jobs->insert_before(item, li);
-            Dmsg1(100, "insert_before 0x%x\n", (unsigned)li);
+            Dmsg2(100, "insert_before jobid=%d before %d\n", 
+              li->jcr->JobId, jcr->JobId);
            inserted = true;
+           break;
         }
       }
       /* If not jobs in wait queue, append it */
       if (!inserted) {
         jq->waiting_jobs->append(item);
-         Dmsg0(100, "Appended item.\n");
+         Dmsg1(100, "Appended item jobid=%d\n", jcr->JobId);
       }
-      Dmsg1(100, "Next=0x%x\n", (unsigned)jq->waiting_jobs->next(item));
    }
 
    stat = start_server(jq);
@@ -256,10 +259,9 @@ int jobq_remove(jobq_t *jq, JCR *jcr)
 {
    int stat;
    bool found = false;
-   pthread_t id;
    jobq_item_t *item;
     
-   Dmsg0(100, "jobq_remove\n");
+   Dmsg1(100, "jobq_remove jobid=%d\n", jcr->JobId);
    if (jq->valid != JOBQ_VALID) {
       return EINVAL;
    }
@@ -282,22 +284,9 @@ int jobq_remove(jobq_t *jq, JCR *jcr)
    jq->waiting_jobs->remove(item);
    jq->ready_jobs->prepend(item);
    
-   /* if any threads are idle, wake one */
-   if (jq->idle_workers > 0) {
-      Dmsg0(100, "Signal worker\n");
-      if ((stat = pthread_cond_signal(&jq->work)) != 0) {
-        pthread_mutex_unlock(&jq->mutex);
-        return stat;
-      }
-   } else {
-      Dmsg0(100, "Create worker thread\n");
-      /* No idle threads so create a new one */
-      set_thread_concurrency(jq->max_workers + 1);
-      if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
-        pthread_mutex_unlock(&jq->mutex);
-        return stat;
-      }
-      jq->num_workers++;
+   stat = start_server(jq);
+   if (stat != 0) {
+      return stat;
    }
    pthread_mutex_unlock(&jq->mutex);
    Dmsg0(100, "Return jobq_remove\n");
@@ -315,7 +304,7 @@ static int start_server(jobq_t *jq)
 
    /* if any threads are idle, wake one */
    if (jq->idle_workers > 0) {
-      Dmsg0(100, "Signal worker\n");
+      Dmsg0(100, "Signal worker to wake up\n");
       if ((stat = pthread_cond_signal(&jq->work)) != 0) {
         pthread_mutex_unlock(&jq->mutex);
         return stat;
@@ -346,6 +335,7 @@ static void *jobq_server(void *arg)
    jobq_item_t *je;                  /* job entry in queue */
    int stat;
    bool timedout;
+   bool work = true;
 
    Dmsg0(100, "Start jobq_server\n");
    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
@@ -363,11 +353,11 @@ static void *jobq_server(void *arg)
       timeout.tv_nsec = 0;
       timeout.tv_sec = tv.tv_sec + 4;
 
-      while (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && !jq->quit) {
+      while (!work && !jq->quit) {
         /*
          * Wait 4 seconds, then if no more work, exit
          */
-         Dmsg0(100, "pthread_cond_timedwait()\n");
+         Dmsg0(200, "pthread_cond_timedwait()\n");
         stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
          Dmsg1(100, "timedwait=%d\n", stat);
         if (stat == ETIMEDOUT) {
@@ -389,23 +379,25 @@ static void *jobq_server(void *arg)
         je = (jobq_item_t *)jq->ready_jobs->first(); 
         jq->ready_jobs->remove(je);
         if (!jq->ready_jobs->empty()) {
+            Dmsg0(100, "ready queue not empty start server\n");
            if (start_server(jq) != 0) {
               return NULL;
            }
         }
         jq->running_jobs->append(je);
+         Dmsg1(100, "Took jobid=%d from ready and appended to run\n", je->jcr->JobId);
         if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
            return NULL;
         }
          /* Call user's routine here */
-         Dmsg0(100, "Calling user engine.\n");
+         Dmsg1(100, "Calling user engine for jobid=%d\n", je->jcr->JobId);
         jq->engine(je->jcr);
-         Dmsg0(100, "Back from user engine.\n");
+         Dmsg1(100, "Back from user engine jobid=%d.\n", je->jcr->JobId);
         if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
            free(je);                 /* release job entry */
            return NULL;
         }
-         Dmsg0(100, "Done lock mutex\n");
+         Dmsg0(200, "Done lock mutex\n");
         jq->running_jobs->remove(je);
         /* 
          * Release locks if acquired. Note, they will not have
@@ -442,7 +434,6 @@ static void *jobq_server(void *arg)
         for ( ; je;  ) {
            JCR *jcr = je->jcr;
            jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
-            Dmsg2(100, "je=0x%x jn=0x%x\n", (unsigned)je, (unsigned)jn);
             Dmsg3(100, "Examining Job=%d JobPri=%d want Pri=%d\n",
               jcr->JobId, jcr->JobPriority, Priority);
            /* Take only jobs of correct Priority */
@@ -477,16 +468,16 @@ static void *jobq_server(void *arg)
            jcr->acquired_resource_locks = true;
            jq->waiting_jobs->remove(je);
            jq->ready_jobs->append(je);
-            Dmsg1(100, "moved JobId=%d from wait to ready queue\n", 
-             je->jcr->JobId);
+            Dmsg1(100, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
            je = jn;
         } /* end for loop */
+        break;
       } /* end while loop */
       Dmsg0(100, "Done checking wait queue.\n");
       /*
-       * If no more work request, and we are asked to quit, then do it
+       * If no more ready work and we are asked to quit, then do it
        */
-      if (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && jq->quit) {
+      if (jq->ready_jobs->empty() && jq->quit) {
         jq->num_workers--;
         if (jq->num_workers == 0) {
             Dmsg0(100, "Wake up destroy routine\n");
@@ -499,17 +490,18 @@ static void *jobq_server(void *arg)
       /* 
        * If no more work requests, and we waited long enough, quit
        */
-      Dmsg1(100, "jq empty = %d\n", jq->waiting_jobs->empty());
-      Dmsg1(100, "timedout=%d\n", timedout);
-      if (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && timedout) {
+      Dmsg2(100, "timedout=%d read empty=%d\n", timedout,
+        jq->ready_jobs->empty());
+      if (jq->ready_jobs->empty() && timedout) {
          Dmsg0(100, "break big loop\n");
         jq->num_workers--;
         break;
       }
       Dmsg0(100, "Loop again\n");
+      work = false;
    } /* end of big for loop */
 
-   Dmsg0(100, "unlock mutex\n");
+   Dmsg0(200, "unlock mutex\n");
    pthread_mutex_unlock(&jq->mutex);
    Dmsg0(100, "End jobq_server\n");
    return NULL;
index d490acd64751d769cc943dca07ec1a3f41a8667d..80409f7401f8877ced73207a2422441c993c93ec 100644 (file)
 #define USE_SEMAPHORE
 
 /* Turn this on if you want to use the new Job scheduling code */
-#ifdef xxx
 #undef USE_SEMAPHORE
 #define JOB_QUEUE 1
-#endif
 
 
 /* #define NO_ATTRIBUTES_TEST 1 */