]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/lib/workq.c
Big backport from Enterprise
[bacula/bacula] / bacula / src / lib / workq.c
index bfed633a8d4be8fe80ae2f9b703ba69e300b8f07..115db7ebdcd03e70bf3b4179afce099d4052ea2e 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2016 Kern Sibbald
+   Copyright (C) 2000-2017 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
  *   }
  *
+ *  Wait for all queued work to be completed
+ *  if ((stat = workq_wait_idle(&job_wq, (void *)jcr)) != 0) {
+ *     berrno be;
+ *     Emsg1(M_ABORT, 0, "Could not wait for idle: ERR=%s\n", be.bstrerror(errno));
+ *  }
+ *
  *  Terminate the queue
  *  workq_destroy(workq_t *wq);
  *
@@ -78,17 +84,24 @@ int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
       pthread_attr_destroy(&wq->attr);
       return stat;
    }
+   if ((stat = pthread_cond_init(&wq->idle, NULL)) != 0) {
+      pthread_mutex_destroy(&wq->mutex);
+      pthread_attr_destroy(&wq->attr);
+      pthread_cond_destroy(&wq->work);
+      return stat;
+   }
    wq->quit = 0;
    wq->first = wq->last = NULL;
    wq->max_workers = threads;         /* max threads to create */
    wq->num_workers = 0;               /* no threads yet */
+   wq->num_running = 0;               /* no running threads */
    wq->idle_workers = 0;              /* no idle threads */
    wq->engine = engine;               /* routine to run */
    wq->valid = WORKQ_VALID;
    return 0;
 }
 
-/*
+/*        [B
  * Destroy a work queue
  *
  * Returns: 0 on success
@@ -96,7 +109,7 @@ int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
  */
 int workq_destroy(workq_t *wq)
 {
-   int stat, stat1, stat2;
+   int stat, stat1, stat2, stat3;
 
   if (wq->valid != WORKQ_VALID) {
      return EINVAL;
@@ -126,9 +139,41 @@ int workq_destroy(workq_t *wq)
   stat  = pthread_mutex_destroy(&wq->mutex);
   stat1 = pthread_cond_destroy(&wq->work);
   stat2 = pthread_attr_destroy(&wq->attr);
-  return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
+  stat3 = pthread_cond_destroy(&wq->idle);
+  if (stat != 0) return stat;
+  if (stat1 != 0) return stat1;
+  if (stat2 != 0) return stat2;
+  if (stat3 != 0) return stat3;
+  return 0;
 }
 
+/*
+ * Wait for work to terminate
+ *
+ * Returns: 0 on success
+ *          errno on failure
+ */
+int workq_wait_idle(workq_t *wq)
+{
+   int stat;
+
+  if (wq->valid != WORKQ_VALID) {
+     return EINVAL;
+  }
+  P(wq->mutex);
+
+  /* While there is work, wait */
+  while (wq->num_running || wq->first != NULL) {
+     if ((stat = pthread_cond_wait(&wq->idle, &wq->mutex)) != 0) {
+        V(wq->mutex);
+        return stat;
+     }
+  }
+  V(wq->mutex);
+  return 0;
+}
+
+
 
 /*
  *  Add work to a queue
@@ -320,6 +365,7 @@ void *workq_server(void *arg)
          if (wq->last == we) {
             wq->last = NULL;
          }
+         wq->num_running++;
          V(wq->mutex);
          /* Call user's routine here */
          Dmsg0(1400, "Calling user engine.\n");
@@ -328,8 +374,12 @@ void *workq_server(void *arg)
          free(we);                    /* release work entry */
          Dmsg0(1400, "relock mutex\n");
          P(wq->mutex);
+         wq->num_running--;
          Dmsg0(1400, "Done lock mutex\n");
       }
+      if (wq->first == NULL && !wq->num_running) {
+          pthread_cond_broadcast(&wq->idle);
+      }
       /*
        * If no more work request, and we are asked to quit, then do it
        */
@@ -364,3 +414,109 @@ void *workq_server(void *arg)
    Dmsg0(1400, "End workq_server\n");
    return NULL;
 }
+
+
+//=================================================
+#ifdef TEST_PROGRAM
+
+#define TEST_SLEEP_TIME_IN_SECONDS 3
+#define TEST_MAX_NUM_WORKERS 5
+#define TEST_NUM_WORKS 10
+
+
+void *callback(void *ctx)
+{
+   JCR* jcr = (JCR*)ctx;
+
+   if (jcr)
+   {
+      Jmsg1(jcr, M_INFO, 0, _("workq_test: thread %d : now starting work....\n"), (int)pthread_self());
+      sleep(TEST_SLEEP_TIME_IN_SECONDS);
+      Jmsg1(jcr, M_INFO, 0, _("workq_test: thread %d : ...work completed.\n"), (int)pthread_self());
+   }
+   return NULL;
+}
+
+
+char *configfile = NULL;
+//STORES *me = NULL;                    /* our Global resource */
+bool forge_on = false;                /* proceed inspite of I/O errors */
+pthread_mutex_t device_release_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t wait_device_release = PTHREAD_COND_INITIALIZER;
+
+int main (int argc, char *argv[])
+{
+   pthread_attr_t attr;
+
+   void * start_heap = sbrk(0);
+   (void)start_heap;
+
+   setlocale(LC_ALL, "");
+   bindtextdomain("bacula", LOCALEDIR);
+   textdomain("bacula");
+   init_stack_dump();
+   my_name_is(argc, argv, "workq_test");
+   init_msg(NULL, NULL);
+   daemon_start_time = time(NULL);
+   set_thread_concurrency(150);
+   lmgr_init_thread(); /* initialize the lockmanager stack */
+   pthread_attr_init(&attr);
+
+   int stat(-1);
+   berrno be;
+
+   workq_t queue;
+   /* Start work queues */
+   if ((stat = workq_init(&queue, TEST_MAX_NUM_WORKERS, callback)) != 0)
+   {
+      be.set_errno(stat);
+      Emsg1(M_ABORT, 0, _("Could not init work queue: ERR=%s\n"), be.bstrerror());
+   }
+
+   /* job1 is created and pseudo-submits some work to the work queue*/
+   JCR *jcr1 = new_jcr(sizeof(JCR), NULL);
+   jcr1->JobId = 1;
+   workq_ele_t * ret(0);
+   for (int w=0; w<TEST_NUM_WORKS; ++w)
+   {
+      if ((stat = workq_add(&queue, jcr1, &ret, 0)) != 0)
+      {
+         be.set_errno(stat);
+         Emsg1(M_ABORT, 0, _("Could not add work to queue: ERR=%s\n"), be.bstrerror());
+      }
+   }
+
+   JCR *jcr2 = new_jcr(sizeof(JCR), NULL);
+   jcr2->JobId = 2;
+   for (int w=0; w<TEST_NUM_WORKS; ++w)
+   {
+      if ((stat = workq_add(&queue, jcr2, &ret, 0)) != 0)
+      {
+         be.set_errno(stat);
+         Emsg1(M_ABORT, 0, _("Could not add work to queue: ERR=%s\n"), be.bstrerror());
+      }
+   }
+
+   printf("--------------------------------------------------------------\n");
+   printf("Start workq_wait_idle ....\n");
+   if ((stat = workq_wait_idle(&queue)) != 0)
+   {
+      be.set_errno(stat);
+      Emsg1(M_ABORT, 0, _("Waiting for workq to be empty: ERR=%s\n"), be.bstrerror());
+   }
+   printf("... workq_wait_idle completed.\n");
+   printf("--------------------------------------------------------------\n");
+
+   printf("Start workq_destroy ....\n");
+   if ((stat = workq_destroy(&queue)) != 0)
+   {
+      be.set_errno(stat);
+      Emsg1(M_ABORT, 0, _("Error in workq_destroy: ERR=%s\n"), be.bstrerror());
+   }
+   printf("... workq_destroy completed.\n");
+
+   return 0;
+
+}
+
+#endif