]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/lib/workq.c
First cut of bat rerun a Job from Jobs Run
[bacula/bacula] / bacula / src / lib / workq.c
old mode 100755 (executable)
new mode 100644 (file)
index f8b0106..1f957d5
@@ -1,3 +1,30 @@
+/*
+   Bacula® - The Network Backup Solution
+
+   Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
+
+   The main author of Bacula is Kern Sibbald, with contributions from
+   many others, a complete list can be found in the file AUTHORS.
+   This program is Free Software; you can redistribute it and/or
+   modify it under the terms of version three of the GNU Affero General Public
+   License as published by the Free Software Foundation and included
+   in the file LICENSE.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU Affero General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA.
+
+   Bacula® is a registered trademark of Kern Sibbald.
+   The licensor of Bacula is the Free Software Foundation Europe
+   (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
+   Switzerland, email:ftf@fsfeurope.org.
+*/
 /*
  * Bacula work queue routines. Permits passing work to
  *  multiple threads.
  *
  *  Initialize queue
  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
- *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
+ *     berrno be;
+ *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
  *   }
  *
  *  Add an item to the queue
  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
- *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", strerror(errno));
+ *      berrno be;
+ *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
  *   }
  *
  *  Terminate the queue
  *  workq_destroy(workq_t *wq);
  *
  */
-/*
-   Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker
-
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License as
-   published by the Free Software Foundation; either version 2 of
-   the License, or (at your option) any later version.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-   General Public License for more details.
-
-   You should have received a copy of the GNU General Public
-   License along with this program; if not, write to the Free
-   Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
-   MA 02111-1307, USA.
-
- */
 
 #include "bacula.h"
+#include "jcr.h"
 
 /* Forward referenced functions */
-static void *workq_server(void *arg);
+extern "C" void *workq_server(void *arg);
 
-/*   
+/*
  * Initialize a work queue
  *
  *  Returns: 0 on success
- *          errno on failure
+ *           errno on failure
  */
-int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
+int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
 {
    int stat;
-                       
+
    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
       return stat;
    }
@@ -80,11 +91,11 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
    }
    wq->quit = 0;
    wq->first = wq->last = NULL;
-   wq->max_workers = threads;        /* max threads to create */
-   wq->num_workers = 0;              /* no threads yet */
-   wq->idle_workers = 0;             /* no idle threads */
-   wq->engine = engine;              /* routine to run */
-   wq->valid = WORKQ_VALID; 
+   wq->max_workers = threads;         /* max threads to create */
+   wq->num_workers = 0;               /* no threads yet */
+   wq->idle_workers = 0;              /* no idle threads */
+   wq->engine = engine;               /* routine to run */
+   wq->valid = WORKQ_VALID;
    return 0;
 }
 
@@ -92,7 +103,7 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
  * Destroy a work queue
  *
  * Returns: 0 on success
- *         errno on failure
+ *          errno on failure
  */
 int workq_destroy(workq_t *wq)
 {
@@ -101,33 +112,29 @@ int workq_destroy(workq_t *wq)
   if (wq->valid != WORKQ_VALID) {
      return EINVAL;
   }
-  if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-     return stat;
-  }
-  wq->valid = 0;                     /* prevent any more operations */
+  P(wq->mutex);
+  wq->valid = 0;                      /* prevent any more operations */
 
-  /* 
-   * If any threads are active, wake them 
+  /*
+   * If any threads are active, wake them
    */
   if (wq->num_workers > 0) {
      wq->quit = 1;
      if (wq->idle_workers) {
-       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
-          pthread_mutex_unlock(&wq->mutex);
-          return stat;
-       }
+        if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+           V(wq->mutex);
+           return stat;
+        }
      }
      while (wq->num_workers > 0) {
-       if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
-          pthread_mutex_unlock(&wq->mutex);
-          return stat;
-       }
+        if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
+           V(wq->mutex);
+           return stat;
+        }
      }
   }
-  if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
-     return stat;
-  }
-  stat = pthread_mutex_destroy(&wq->mutex);
+  V(wq->mutex);
+  stat  = pthread_mutex_destroy(&wq->mutex);
   stat1 = pthread_cond_destroy(&wq->work);
   stat2 = pthread_attr_destroy(&wq->attr);
   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
@@ -136,164 +143,244 @@ int workq_destroy(workq_t *wq)
 
 /*
  *  Add work to a queue
+ *    wq is a queue that was created with workq_init
+ *    element is a user unique item that will be passed to the
+ *        processing routine
+ *    work_item will get internal work queue item -- if it is not NULL
+ *    priority if non-zero will cause the item to be placed on the
+ *        head of the list instead of the tail.
  */
-int workq_add(workq_t *wq, void *element)
+int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
 {
-   int stat;
+   int stat=0;
    workq_ele_t *item;
    pthread_t id;
-    
-   Dmsg0(200, "workq_add\n");
+
+   Dmsg0(1400, "workq_add\n");
    if (wq->valid != WORKQ_VALID) {
       return EINVAL;
    }
 
-   if ((item = (workq_ele_t *) malloc(sizeof(workq_ele_t))) == NULL) {
+   if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
       return ENOMEM;
    }
    item->data = element;
    item->next = NULL;
-   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-      free(item);
-      return stat;
-   }
+   P(wq->mutex);
 
-   Dmsg0(200, "add item to queue\n");
-   /* Add the new item to the end of the queue */
-   if (wq->first == NULL) {
-      wq->first = item;
+   Dmsg0(1400, "add item to queue\n");
+   if (priority) {
+      /* Add to head of queue */
+      if (wq->first == NULL) {
+         wq->first = item;
+         wq->last = item;
+      } else {
+         item->next = wq->first;
+         wq->first = item;
+      }
    } else {
-      wq->last->next = item;
+      /* Add to end of queue */
+      if (wq->first == NULL) {
+         wq->first = item;
+      } else {
+         wq->last->next = item;
+      }
+      wq->last = item;
    }
-   wq->last = item;
 
    /* if any threads are idle, wake one */
    if (wq->idle_workers > 0) {
-      Dmsg0(200, "Signal worker\n");
-      if ((stat = pthread_cond_signal(&wq->work)) != 0) {
-        pthread_mutex_unlock(&wq->mutex);
-        return stat;
+      Dmsg0(1400, "Signal worker\n");
+      if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+         V(wq->mutex);
+         return stat;
       }
    } else if (wq->num_workers < wq->max_workers) {
-      Dmsg0(200, "Create worker thread\n");
+      Dmsg0(1400, "Create worker thread\n");
+      /* No idle threads so create a new one */
+      set_thread_concurrency(wq->max_workers + 1);
+      if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
+         V(wq->mutex);
+         return stat;
+      }
+      wq->num_workers++;
+   }
+   V(wq->mutex);
+   Dmsg0(1400, "Return workq_add\n");
+   /* Return work_item if requested */
+   if (work_item) {
+      *work_item = item;
+   }
+   return stat;
+}
+
+/*
+ *  Remove work from a queue
+ *    wq is a queue that was created with workq_init
+ *    work_item is an element of work
+ *
+ *   Note, it is "removed" by immediately calling a processing routine.
+ *    if you want to cancel it, you need to provide some external means
+ *    of doing so.
+ */
+int workq_remove(workq_t *wq, workq_ele_t *work_item)
+{
+   int stat, found = 0;
+   pthread_t id;
+   workq_ele_t *item, *prev;
+
+   Dmsg0(1400, "workq_remove\n");
+   if (wq->valid != WORKQ_VALID) {
+      return EINVAL;
+   }
+
+   P(wq->mutex);
+
+   for (prev=item=wq->first; item; item=item->next) {
+      if (item == work_item) {
+         found = 1;
+         break;
+      }
+      prev = item;
+   }
+   if (!found) {
+      return EINVAL;
+   }
+
+   /* Move item to be first on list */
+   if (wq->first != work_item) {
+      prev->next = work_item->next;
+      if (wq->last == work_item) {
+         wq->last = prev;
+      }
+      work_item->next = wq->first;
+      wq->first = work_item;
+   }
+
+   /* if any threads are idle, wake one */
+   if (wq->idle_workers > 0) {
+      Dmsg0(1400, "Signal worker\n");
+      if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+         V(wq->mutex);
+         return stat;
+      }
+   } else {
+      Dmsg0(1400, "Create worker thread\n");
       /* No idle threads so create a new one */
       set_thread_concurrency(wq->max_workers + 1);
       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
-        pthread_mutex_unlock(&wq->mutex);
-        return stat;
+         V(wq->mutex);
+         return stat;
       }
       wq->num_workers++;
    }
-   pthread_mutex_unlock(&wq->mutex);
-   Dmsg0(200, "Return workq_add\n");
+   V(wq->mutex);
+   Dmsg0(1400, "Return workq_remove\n");
    return stat;
 }
 
-/* 
+
+/*
  * This is the worker thread that serves the work queue.
  * In due course, it will call the user's engine.
  */
-static void *workq_server(void *arg)
+extern "C"
+void *workq_server(void *arg)
 {
    struct timespec timeout;
    workq_t *wq = (workq_t *)arg;
    workq_ele_t *we;
    int stat, timedout;
 
-   Dmsg0(200, "Start workq_server\n");
-   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-      return NULL;
-   }
+   Dmsg0(1400, "Start workq_server\n");
+   P(wq->mutex);
+   set_jcr_in_tsd(INVALID_JCR);
 
    for (;;) {
       struct timeval tv;
       struct timezone tz;
 
-      Dmsg0(200, "Top of for loop\n");
+      Dmsg0(1400, "Top of for loop\n");
       timedout = 0;
-      Dmsg0(200, "gettimeofday()\n");
+      Dmsg0(1400, "gettimeofday()\n");
       gettimeofday(&tv, &tz);
       timeout.tv_nsec = 0;
       timeout.tv_sec = tv.tv_sec + 2;
 
       while (wq->first == NULL && !wq->quit) {
-        /*
-         * Wait 2 seconds, then if no more work, exit
-         */
-         Dmsg0(200, "pthread_cond_timedwait()\n");
+         /*
+          * Wait 2 seconds, then if no more work, exit
+          */
+         Dmsg0(1400, "pthread_cond_timedwait()\n");
 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
-        /* CYGWIN dies with a page fault the second
-         * time that pthread_cond_timedwait() is called
-         * so fake it out.
-         */
-        pthread_mutex_lock(&wq->mutex);
-        stat = ETIMEDOUT;
+         /* CYGWIN dies with a page fault the second
+          * time that pthread_cond_timedwait() is called
+          * so fake it out.
+          */
+         P(wq->mutex);
+         stat = ETIMEDOUT;
 #else
-        stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
+         stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
 #endif
-         Dmsg1(200, "timedwait=%d\n", stat);
-        if (stat == ETIMEDOUT) {
-           timedout = 1;
-           break;
-        } else if (stat != 0) {
+         Dmsg1(1400, "timedwait=%d\n", stat);
+         if (stat == ETIMEDOUT) {
+            timedout = 1;
+            break;
+         } else if (stat != 0) {
             /* This shouldn't happen */
-            Dmsg0(200, "This shouldn't happen\n");
-           wq->num_workers--;
-           pthread_mutex_unlock(&wq->mutex);
-           return NULL;
-        }
-      } 
+            Dmsg0(1400, "This shouldn't happen\n");
+            wq->num_workers--;
+            V(wq->mutex);
+            return NULL;
+         }
+      }
       we = wq->first;
       if (we != NULL) {
-        wq->first = we->next;
-        if (wq->last == we) {
-           wq->last = NULL;
-        }
-        if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
-           return NULL;
-        }
+         wq->first = we->next;
+         if (wq->last == we) {
+            wq->last = NULL;
+         }
+         V(wq->mutex);
          /* Call user's routine here */
-         Dmsg0(200, "Calling user engine.\n");
-        wq->engine(we->data);
-         Dmsg0(200, "Back from user engine.\n");
-        free(we);                    /* release work entry */
-         Dmsg0(200, "relock mutex\n"); 
-        if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-           return NULL;
-        }
-         Dmsg0(200, "Done lock mutex\n");
+         Dmsg0(1400, "Calling user engine.\n");
+         wq->engine(we->data);
+         Dmsg0(1400, "Back from user engine.\n");
+         free(we);                    /* release work entry */
+         Dmsg0(1400, "relock mutex\n");
+         P(wq->mutex);
+         Dmsg0(1400, "Done lock mutex\n");
       }
       /*
        * If no more work request, and we are asked to quit, then do it
        */
       if (wq->first == NULL && wq->quit) {
-        wq->num_workers--;
-        if (wq->num_workers == 0) {
-            Dmsg0(200, "Wake up destroy routine\n");
-           /* Wake up destroy routine if he is waiting */
-           pthread_cond_broadcast(&wq->work);
-        }
-         Dmsg0(200, "Unlock mutex\n");
-        pthread_mutex_unlock(&wq->mutex);
-         Dmsg0(200, "Return from workq_server\n");
-        return NULL;
+         wq->num_workers--;
+         if (wq->num_workers == 0) {
+            Dmsg0(1400, "Wake up destroy routine\n");
+            /* Wake up destroy routine if he is waiting */
+            pthread_cond_broadcast(&wq->work);
+         }
+         Dmsg0(1400, "Unlock mutex\n");
+         V(wq->mutex);
+         Dmsg0(1400, "Return from workq_server\n");
+         return NULL;
       }
-      Dmsg0(200, "Check for work request\n");
-      /* 
+      Dmsg0(1400, "Check for work request\n");
+      /*
        * If no more work requests, and we waited long enough, quit
        */
-      Dmsg1(200, "wq->first==NULL = %d\n", wq->first==NULL);
-      Dmsg1(200, "timedout=%d\n", timedout);
+      Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
+      Dmsg1(1400, "timedout=%d\n", timedout);
       if (wq->first == NULL && timedout) {
-         Dmsg0(200, "break big loop\n");
-        wq->num_workers--;
-        break;
+         Dmsg0(1400, "break big loop\n");
+         wq->num_workers--;
+         break;
       }
-      Dmsg0(200, "Loop again\n");
+      Dmsg0(1400, "Loop again\n");
    } /* end of big for loop */
 
-   Dmsg0(200, "unlock mutex\n");
-   pthread_mutex_unlock(&wq->mutex);
-   Dmsg0(200, "End workq_server\n");
+   Dmsg0(1400, "unlock mutex\n");
+   V(wq->mutex);
+   Dmsg0(1400, "End workq_server\n");
    return NULL;
 }