]> git.sur5r.net Git - bacula/bacula/commitdiff
Remove cancelled jobs from workq + add priority
authorKern Sibbald <kern@sibbald.com>
Sat, 1 Mar 2003 13:50:41 +0000 (13:50 +0000)
committerKern Sibbald <kern@sibbald.com>
Sat, 1 Mar 2003 13:50:41 +0000 (13:50 +0000)
git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@361 91ce42f0-d328-0410-95d8-f526ca767f89

bacula/kernstodo
bacula/src/dird/dird_conf.c
bacula/src/dird/fd_cmds.c
bacula/src/dird/job.c
bacula/src/dird/ua_cmds.c
bacula/src/jcr.h
bacula/src/lib/bnet_server.c
bacula/src/lib/workq.c
bacula/src/lib/workq.h

index 1d4112b6df74b947b7e5952f8a4465c37b4df532..1250fa5df4f9c3dcd530ce6ad0e270aec70795ab 100644 (file)
@@ -22,10 +22,6 @@ For 1.30 release:
   the last copy for 1 year.
 - Have Bacula "poll" the tape to see if it is there.
 - Add more of the config info to the tape label.
-- Look for default.bsr, and if not found, create dummy bsr allowing
-  any volume.
-- If config file not found in utility programs, pluge on
-  anyway.
 - Apparently cancel does not work for jobs waiting to be
   scheduled.
 - Implement bar code reader for autochangers
index 640dce9ed57a21a83443f1acb37c94e71e5deee5..1949d77a67128bb75ad66d82a649e3467437b3af 100644 (file)
@@ -1030,7 +1030,7 @@ void save_resource(int type, struct res_items *items, int pass)
       memcpy(res, &res_all, size);
       if (!resources[rindex].res_head) {
         resources[rindex].res_head = (RES *)res; /* store first entry */
-         Dmsg3(000, "Inserting first %s res: %s index=%d\n", res_to_str(type),
+         Dmsg3(200, "Inserting first %s res: %s index=%d\n", res_to_str(type),
               res->res_dir.hdr.name, rindex);
       } else {
         RES *next;
@@ -1038,7 +1038,7 @@ void save_resource(int type, struct res_items *items, int pass)
         for (next=resources[rindex].res_head; next->next; next=next->next)
            { }
         next->next = (RES *)res;
-         Dmsg3(000, "Inserting %s res: %s index=%d\n", res_to_str(type),
+         Dmsg3(200, "Inserting %s res: %s index=%d\n", res_to_str(type),
               res->res_dir.hdr.name, rindex);
       }
    }
@@ -1357,17 +1357,6 @@ static void scan_include_options(LEX *lc, int keyword, char *opts, int optlen)
    }
 }
 
-static void prtmsg(void *sock, char *fmt, ...)
-{
-   va_list arg_ptr;
-
-   va_start(arg_ptr, fmt);
-   vfprintf(stdout, fmt, arg_ptr);
-   va_end(arg_ptr);
-}
-
-
-
 /* Store FileSet Include/Exclude info */
 static void store_inc(LEX *lc, struct res_items *item, int index, int pass)
 {
@@ -1400,9 +1389,6 @@ static void store_inc(LEX *lc, struct res_items *item, int index, int pass)
       if (keyword == INC_KW_FILEOPTIONS) {
         token = lex_get_token(lc, T_NAME);
         if (pass == 2) {
-           for (int i=r_first; i<=r_last; i++) {
-              dump_resource(i, resources[i-r_first].res_head, prtmsg, NULL);
-           }
            res = GetResWithName(R_FILEOPTIONS, lc->str);
            if (res == NULL) {
                scan_err1(lc, _("Could not find specified FileOptions Resource: %s"), lc->str);
index 572b2451636db42b00e56a2d64f2f46e39321803..e4e6d56cf1e44bfbde167269970535957defa3f3 100644 (file)
@@ -219,7 +219,7 @@ int send_exclude_list(JCR *jcr)
    for (i=0; i < fileset->num_excludes; i++) {
       pm_strcpy(&fd->msg, fileset->exclude_array[i]->name);
       fd->msglen = strlen(fd->msg);
-      Dmsg1(000, "dird>filed: exclude file: %s\n", fileset->exclude_array[i]->name);
+      Dmsg1(200, "dird>filed: exclude file: %s\n", fileset->exclude_array[i]->name);
       if (!bnet_send(fd)) {
          Jmsg(jcr, M_FATAL, 0, _(">filed: write error on socket\n"));
         set_jcr_job_status(jcr, JS_ErrorTerminated);
index fc95c6b1a9bd2cb1f7a7481917bd58f1d59e493d..aff54c31751b5938f94cb3f4d9ae453f25338fa1 100644 (file)
@@ -47,7 +47,7 @@ extern int do_verify(JCR *jcr);
 extern void backup_cleanup(void);
 
 /* Queue of jobs to be run */
-static workq_t job_wq;               /* our job work queue */
+workq_t job_wq;                  /* our job work queue */
 
 
 void init_job_server(int max_workers)
@@ -68,6 +68,7 @@ void init_job_server(int max_workers)
 void run_job(JCR *jcr)
 {
    int stat, errstat;
+   workq_ele_t *work_item;
 
    sm_check(__FILE__, __LINE__, True);
    init_msg(jcr, jcr->messages);
@@ -124,9 +125,10 @@ void run_job(JCR *jcr)
 
 
    /* Queue the job to be run */
-   if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
+   if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) {
       Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat));
    }
+   jcr->work_item = work_item;
    Dmsg0(200, "Done run_job()\n");
 }
 
@@ -145,7 +147,9 @@ static void job_thread(void *arg)
 
    Dmsg0(100, "=====Start Job=========\n");
    jcr->start_time = now;            /* set the real start time */
-   if (jcr->job->MaxStartDelay != 0 && jcr->job->MaxStartDelay <
+   if (job_cancelled(jcr)) {
+      update_job_end_record(jcr);
+   } else if (jcr->job->MaxStartDelay != 0 && jcr->job->MaxStartDelay <
        (utime_t)(jcr->start_time - jcr->sched_time)) {
       Jmsg(jcr, M_FATAL, 0, _("Job cancelled because max delay time exceeded.\n"));
       set_jcr_job_status(jcr, JS_ErrorTerminated);
@@ -191,14 +195,14 @@ static void job_thread(void *arg)
             Pmsg1(0, "Unimplemented job type: %d\n", jcr->JobType);
            break;
         }
-   }
-   if (jcr->job->RunAfterJob) {
-      POOLMEM *after = get_pool_memory(PM_FNAME);
-      int status;
+      if (jcr->job->RunAfterJob) {
+        POOLMEM *after = get_pool_memory(PM_FNAME);
+        int status;
       
-      after = edit_run_codes(jcr, after, jcr->job->RunAfterJob);
-      status = run_program(after, 0, NULL);
-      free_pool_memory(after);
+        after = edit_run_codes(jcr, after, jcr->job->RunAfterJob);
+        status = run_program(after, 0, NULL);
+        free_pool_memory(after);
+      }
    }
    Dmsg0(50, "Before free jcr\n");
    free_jcr(jcr);
index 908f3a47164393dac6052821f52c93d5dc5d9118..82072bee124b3fbe1828993ca027f579383af655 100644 (file)
@@ -39,6 +39,7 @@ extern int r_first;
 extern int r_last;
 extern struct s_res resources[];
 extern char my_name[];
+extern workq_t job_wq;               /* work queue */
 
 /* Imported functions */
 extern int statuscmd(UAContext *ua, char *cmd);
@@ -414,11 +415,11 @@ static int cancelcmd(UAContext *ua, char *cmd)
       set_jcr_job_status(jcr, JS_Cancelled);
       bsendmsg(ua, _("JobId %d, Job %s marked to be cancelled.\n"),
              jcr->JobId, jcr->Job);
+      workq_remove(&job_wq, jcr->work_item); /* attempt to remove it from queue */
       free_jcr(jcr);
       return 1;
         
    default:
-
       set_jcr_job_status(jcr, JS_Cancelled);
       /* Cancel File daemon */
       ua->jcr->client = jcr->client;
index 47f8a0da9f8259f24978b53d84f589b06d0776ee..0669b5e63f37138ddfacb3f33e60490a971404bb 100644 (file)
@@ -117,6 +117,7 @@ struct s_jcr {
    /* Director Daemon specific part of JCR */
    pthread_t SD_msg_chan;             /* Message channel thread id */
    pthread_cond_t term_wait;          /* Wait for job termination */
+   workq_ele_t *work_item;            /* Work queue item if scheduled */
    int msg_thread_done;               /* Set when Storage message thread terms */
    BSOCK *ua;                         /* User agent */
    JOB *job;                          /* Job resource */
index 040a773fb93dffb1b7eab8b5b212e7f88f365e0f..100b15a2eddac4936d81145a31a0a7204edc20d1 100644 (file)
@@ -160,7 +160,7 @@ bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_w
 
       /* Queue client to be served */
       if ((stat = workq_add(client_wq, 
-            (void *)init_bsock(NULL, newsockfd, "client", caller, port))) != 0) {
+            (void *)init_bsock(NULL, newsockfd, "client", caller, port), NULL, 0)) != 0) {
          Jmsg1(NULL, M_ABORT, 0, _("Could not add job to client queue: ERR=%s\n"), strerror(stat));
       }
    }
index b076aca7da3daedce34667427a4ce736ab232750..3ccc663923a54d565280251d839472df2dffb25e 100755 (executable)
@@ -56,12 +56,12 @@ static void *workq_server(void *arg);
  * Initialize a work queue
  *
  *  Returns: 0 on success
- *          errno on failure
+ *           errno on failure
  */
 int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
 {
    int stat;
-                       
+                        
    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
       return stat;
    }
@@ -80,10 +80,10 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
    }
    wq->quit = 0;
    wq->first = wq->last = NULL;
-   wq->max_workers = threads;        /* max threads to create */
-   wq->num_workers = 0;              /* no threads yet */
-   wq->idle_workers = 0;             /* no idle threads */
-   wq->engine = engine;              /* routine to run */
+   wq->max_workers = threads;         /* max threads to create */
+   wq->num_workers = 0;               /* no threads yet */
+   wq->idle_workers = 0;              /* no idle threads */
+   wq->engine = engine;               /* routine to run */
    wq->valid = WORKQ_VALID; 
    return 0;
 }
@@ -92,7 +92,7 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
  * Destroy a work queue
  *
  * Returns: 0 on success
- *         errno on failure
+ *          errno on failure
  */
 int workq_destroy(workq_t *wq)
 {
@@ -104,7 +104,7 @@ int workq_destroy(workq_t *wq)
   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
      return stat;
   }
-  wq->valid = 0;                     /* prevent any more operations */
+  wq->valid = 0;                      /* prevent any more operations */
 
   /* 
    * If any threads are active, wake them 
@@ -112,22 +112,22 @@ int workq_destroy(workq_t *wq)
   if (wq->num_workers > 0) {
      wq->quit = 1;
      if (wq->idle_workers) {
-       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
-          pthread_mutex_unlock(&wq->mutex);
-          return stat;
-       }
+        if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+           pthread_mutex_unlock(&wq->mutex);
+           return stat;
+        }
      }
      while (wq->num_workers > 0) {
-       if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
-          pthread_mutex_unlock(&wq->mutex);
-          return stat;
-       }
+        if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
+           pthread_mutex_unlock(&wq->mutex);
+           return stat;
+        }
      }
   }
   if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
      return stat;
   }
-  stat = pthread_mutex_destroy(&wq->mutex);
+  stat  = pthread_mutex_destroy(&wq->mutex);
   stat1 = pthread_cond_destroy(&wq->work);
   stat2 = pthread_attr_destroy(&wq->attr);
   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
@@ -136,8 +136,14 @@ int workq_destroy(workq_t *wq)
 
 /*
  *  Add work to a queue
+ *    wq is a queue that was created with workq_init
+ *    element is a user unique item that will be passed to the 
+ *        processing routine
+ *    work_item will get internal work queue item -- if it is not NULL
+ *    priority if non-zero will cause the item to be placed on the
+ *        head of the list instead of the tail.
  */
-int workq_add(workq_t *wq, void *element)
+int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
 {
    int stat;
    workq_ele_t *item;
@@ -148,7 +154,7 @@ int workq_add(workq_t *wq, void *element)
       return EINVAL;
    }
 
-   if ((item = (workq_ele_t *) malloc(sizeof(workq_ele_t))) == NULL) {
+   if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
       return ENOMEM;
    }
    item->data = element;
@@ -159,36 +165,119 @@ int workq_add(workq_t *wq, void *element)
    }
 
    Dmsg0(200, "add item to queue\n");
-   /* Add the new item to the end of the queue */
-   if (wq->first == NULL) {
-      wq->first = item;
+   if (priority) {
+      /* Add to head of queue */
+      if (wq->first == NULL) {
+         wq->first = item;
+         wq->last = item;
+      } else {
+         item->next = wq->first;
+         wq->first = item;
+      }
    } else {
-      wq->last->next = item;
+      /* Add to end of queue */
+      if (wq->first == NULL) {
+         wq->first = item;
+      } else {
+         wq->last->next = item;
+      }
+      wq->last = item;
    }
-   wq->last = item;
 
    /* if any threads are idle, wake one */
    if (wq->idle_workers > 0) {
       Dmsg0(200, "Signal worker\n");
       if ((stat = pthread_cond_signal(&wq->work)) != 0) {
-        pthread_mutex_unlock(&wq->mutex);
-        return stat;
+         pthread_mutex_unlock(&wq->mutex);
+         return stat;
       }
    } else if (wq->num_workers < wq->max_workers) {
       Dmsg0(200, "Create worker thread\n");
       /* No idle threads so create a new one */
       set_thread_concurrency(wq->max_workers + 1);
       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
-        pthread_mutex_unlock(&wq->mutex);
-        return stat;
+         pthread_mutex_unlock(&wq->mutex);
+         return stat;
       }
       wq->num_workers++;
    }
    pthread_mutex_unlock(&wq->mutex);
    Dmsg0(200, "Return workq_add\n");
+   /* Return work_item if requested */
+   if (work_item) {
+      *work_item = item;
+   }
+   return stat;
+}
+
+/*
+ *  Remove work from a queue
+ *    wq is a queue that was created with workq_init
+ *    work_item is an element of work
+ *
+ *   Note, it is "removed" by immediately calling a processing routine.
+ *    if you want to cancel it, you need to provide some external means
+ *    of doing so.
+ */
+int workq_remove(workq_t *wq, workq_ele_t *work_item)
+{
+   int stat, found = 0;
+   pthread_t id;
+   workq_ele_t *item, *prev;
+    
+   Dmsg0(200, "workq_remove\n");
+   if (wq->valid != WORKQ_VALID) {
+      return EINVAL;
+   }
+
+   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
+      return stat;
+   }
+
+   for (prev=item=wq->first; item; item=item->next) {
+      if (item == work_item) {
+         found = 1;
+         break;
+      }
+      prev = item;
+   }
+   if (!found) {
+      return EINVAL;
+   }
+   
+   /* Move item to be first on list */
+   if (wq->first != work_item) {
+      prev->next = work_item->next;   
+      if (wq->last == work_item) {
+         wq->last = prev;
+      }
+      work_item->next = wq->first;
+      wq->first = work_item;
+   }
+
+   /* if any threads are idle, wake one */
+   if (wq->idle_workers > 0) {
+      Dmsg0(200, "Signal worker\n");
+      if ((stat = pthread_cond_signal(&wq->work)) != 0) {
+         pthread_mutex_unlock(&wq->mutex);
+         return stat;
+      }
+   } else {
+      Dmsg0(200, "Create worker thread\n");
+      /* No idle threads so create a new one */
+      set_thread_concurrency(wq->max_workers + 1);
+      if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
+         pthread_mutex_unlock(&wq->mutex);
+         return stat;
+      }
+      wq->num_workers++;
+   }
+   pthread_mutex_unlock(&wq->mutex);
+   Dmsg0(200, "Return workq_remove\n");
    return stat;
 }
 
+
 /* 
  * This is the worker thread that serves the work queue.
  * In due course, it will call the user's engine.
@@ -217,66 +306,66 @@ static void *workq_server(void *arg)
       timeout.tv_sec = tv.tv_sec + 2;
 
       while (wq->first == NULL && !wq->quit) {
-        /*
-         * Wait 2 seconds, then if no more work, exit
-         */
+         /*
+          * Wait 2 seconds, then if no more work, exit
+          */
          Dmsg0(200, "pthread_cond_timedwait()\n");
 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
-        /* CYGWIN dies with a page fault the second
-         * time that pthread_cond_timedwait() is called
-         * so fake it out.
-         */
-        pthread_mutex_lock(&wq->mutex);
-        stat = ETIMEDOUT;
+         /* CYGWIN dies with a page fault the second
+          * time that pthread_cond_timedwait() is called
+          * so fake it out.
+          */
+         pthread_mutex_lock(&wq->mutex);
+         stat = ETIMEDOUT;
 #else
-        stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
+         stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
 #endif
          Dmsg1(200, "timedwait=%d\n", stat);
-        if (stat == ETIMEDOUT) {
-           timedout = 1;
-           break;
-        } else if (stat != 0) {
+         if (stat == ETIMEDOUT) {
+            timedout = 1;
+            break;
+         } else if (stat != 0) {
             /* This shouldn't happen */
             Dmsg0(200, "This shouldn't happen\n");
-           wq->num_workers--;
-           pthread_mutex_unlock(&wq->mutex);
-           return NULL;
-        }
+            wq->num_workers--;
+            pthread_mutex_unlock(&wq->mutex);
+            return NULL;
+         }
       } 
       we = wq->first;
       if (we != NULL) {
-        wq->first = we->next;
-        if (wq->last == we) {
-           wq->last = NULL;
-        }
-        if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
-           return NULL;
-        }
+         wq->first = we->next;
+         if (wq->last == we) {
+            wq->last = NULL;
+         }
+         if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
+            return NULL;
+         }
          /* Call user's routine here */
          Dmsg0(200, "Calling user engine.\n");
-        wq->engine(we->data);
+         wq->engine(we->data);
          Dmsg0(200, "Back from user engine.\n");
-        free(we);                    /* release work entry */
+         free(we);                    /* release work entry */
          Dmsg0(200, "relock mutex\n"); 
-        if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-           return NULL;
-        }
+         if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
+            return NULL;
+         }
          Dmsg0(200, "Done lock mutex\n");
       }
       /*
        * If no more work request, and we are asked to quit, then do it
        */
       if (wq->first == NULL && wq->quit) {
-        wq->num_workers--;
-        if (wq->num_workers == 0) {
+         wq->num_workers--;
+         if (wq->num_workers == 0) {
             Dmsg0(200, "Wake up destroy routine\n");
-           /* Wake up destroy routine if he is waiting */
-           pthread_cond_broadcast(&wq->work);
-        }
+            /* Wake up destroy routine if he is waiting */
+            pthread_cond_broadcast(&wq->work);
+         }
          Dmsg0(200, "Unlock mutex\n");
-        pthread_mutex_unlock(&wq->mutex);
+         pthread_mutex_unlock(&wq->mutex);
          Dmsg0(200, "Return from workq_server\n");
-        return NULL;
+         return NULL;
       }
       Dmsg0(200, "Check for work request\n");
       /* 
@@ -286,8 +375,8 @@ static void *workq_server(void *arg)
       Dmsg1(200, "timedout=%d\n", timedout);
       if (wq->first == NULL && timedout) {
          Dmsg0(200, "break big loop\n");
-        wq->num_workers--;
-        break;
+         wq->num_workers--;
+         break;
       }
       Dmsg0(200, "Loop again\n");
    } /* end of big for loop */
index 92c66e1f5003c6b69133076faaf4aac7fecc600a..64ef4834f316403c7828ba9babfa82a7c32c33f1 100644 (file)
  */
 typedef struct workq_ele_tag {
    struct workq_ele_tag *next;
-   void                *data;
+   void                 *data;
 } workq_ele_t;
 
 /* 
  * Structure describing a work queue
  */
 typedef struct workq_tag {
-   pthread_mutex_t   mutex;          /* queue access control */
-   pthread_cond_t    work;           /* wait for work */
-   pthread_attr_t    attr;           /* create detached threads */
-   workq_ele_t      *first, *last;   /* work queue */
-   int              valid;           /* queue initialized */
-   int              quit;            /* workq should quit */
-   int              max_workers;     /* max threads */
-   int              num_workers;     /* current threads */
-   int              idle_workers;    /* idle threads */
-   void             (*engine)(void *arg); /* user engine */
+   pthread_mutex_t   mutex;           /* queue access control */
+   pthread_cond_t    work;            /* wait for work */
+   pthread_attr_t    attr;            /* create detached threads */
+   workq_ele_t       *first, *last;   /* work queue */
+   int               valid;           /* queue initialized */
+   int               quit;            /* workq should quit */
+   int               max_workers;     /* max threads */
+   int               num_workers;     /* current threads */
+   int               idle_workers;    /* idle threads */
+   void              (*engine)(void *arg); /* user engine */
 } workq_t;
 
 #define WORKQ_VALID  0xdec1992
 
 extern int workq_init(
-             workq_t *wq,
-             int     threads,        /* maximum threads */
-             void    (*engine)(void *)   /* engine routine */
-                   );
+              workq_t *wq,
+              int     threads,        /* maximum threads */
+              void    (*engine)(void *)   /* engine routine */
+                    );
 extern int workq_destroy(workq_t *wq);
-extern int workq_add(workq_t *wq, void *data);
+extern int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority);
+extern int workq_remove(workq_t *wq, workq_ele_t *work_item);
 
 #endif /* __WORKQ_H */