/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2016 Kern Sibbald
+ Copyright (C) 2000-2017 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
* Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
* }
*
+ * Wait for all queued work to be completed
+ * if ((stat = workq_wait_idle(&job_wq, (void *)jcr)) != 0) {
+ * berrno be;
+ * Emsg1(M_ABORT, 0, "Could not wait for idle: ERR=%s\n", be.bstrerror(errno));
+ * }
+ *
* Terminate the queue
* workq_destroy(workq_t *wq);
*
pthread_attr_destroy(&wq->attr);
return stat;
}
+ if ((stat = pthread_cond_init(&wq->idle, NULL)) != 0) {
+ pthread_mutex_destroy(&wq->mutex);
+ pthread_attr_destroy(&wq->attr);
+ pthread_cond_destroy(&wq->work);
+ return stat;
+ }
wq->quit = 0;
wq->first = wq->last = NULL;
wq->max_workers = threads; /* max threads to create */
wq->num_workers = 0; /* no threads yet */
+ wq->num_running = 0; /* no running threads */
wq->idle_workers = 0; /* no idle threads */
wq->engine = engine; /* routine to run */
wq->valid = WORKQ_VALID;
return 0;
}
-/*
+/* [B
* Destroy a work queue
*
* Returns: 0 on success
*/
int workq_destroy(workq_t *wq)
{
- int stat, stat1, stat2;
+ int stat, stat1, stat2, stat3;
if (wq->valid != WORKQ_VALID) {
return EINVAL;
stat = pthread_mutex_destroy(&wq->mutex);
stat1 = pthread_cond_destroy(&wq->work);
stat2 = pthread_attr_destroy(&wq->attr);
- return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
+ stat3 = pthread_cond_destroy(&wq->idle);
+ if (stat != 0) return stat;
+ if (stat1 != 0) return stat1;
+ if (stat2 != 0) return stat2;
+ if (stat3 != 0) return stat3;
+ return 0;
}
+/*
+ * Wait for work to terminate
+ *
+ * Returns: 0 on success
+ * errno on failure
+ */
+int workq_wait_idle(workq_t *wq)
+{
+ int stat;
+
+ if (wq->valid != WORKQ_VALID) {
+ return EINVAL;
+ }
+ P(wq->mutex);
+
+ /* While there is work, wait */
+ while (wq->num_running || wq->first != NULL) {
+ if ((stat = pthread_cond_wait(&wq->idle, &wq->mutex)) != 0) {
+ V(wq->mutex);
+ return stat;
+ }
+ }
+ V(wq->mutex);
+ return 0;
+}
+
+
/*
* Add work to a queue
if (wq->last == we) {
wq->last = NULL;
}
+ wq->num_running++;
V(wq->mutex);
/* Call user's routine here */
Dmsg0(1400, "Calling user engine.\n");
free(we); /* release work entry */
Dmsg0(1400, "relock mutex\n");
P(wq->mutex);
+ wq->num_running--;
Dmsg0(1400, "Done lock mutex\n");
}
+ if (wq->first == NULL && !wq->num_running) {
+ pthread_cond_broadcast(&wq->idle);
+ }
/*
* If no more work request, and we are asked to quit, then do it
*/
Dmsg0(1400, "End workq_server\n");
return NULL;
}
+
+
+//=================================================
+#ifdef TEST_PROGRAM
+
+#define TEST_SLEEP_TIME_IN_SECONDS 3
+#define TEST_MAX_NUM_WORKERS 5
+#define TEST_NUM_WORKS 10
+
+
+void *callback(void *ctx)
+{
+ JCR* jcr = (JCR*)ctx;
+
+ if (jcr)
+ {
+ Jmsg1(jcr, M_INFO, 0, _("workq_test: thread %d : now starting work....\n"), (int)pthread_self());
+ sleep(TEST_SLEEP_TIME_IN_SECONDS);
+ Jmsg1(jcr, M_INFO, 0, _("workq_test: thread %d : ...work completed.\n"), (int)pthread_self());
+ }
+ return NULL;
+}
+
+
+char *configfile = NULL;
+//STORES *me = NULL; /* our Global resource */
+bool forge_on = false; /* proceed inspite of I/O errors */
+pthread_mutex_t device_release_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t wait_device_release = PTHREAD_COND_INITIALIZER;
+
+int main (int argc, char *argv[])
+{
+ pthread_attr_t attr;
+
+ void * start_heap = sbrk(0);
+ (void)start_heap;
+
+ setlocale(LC_ALL, "");
+ bindtextdomain("bacula", LOCALEDIR);
+ textdomain("bacula");
+ init_stack_dump();
+ my_name_is(argc, argv, "workq_test");
+ init_msg(NULL, NULL);
+ daemon_start_time = time(NULL);
+ set_thread_concurrency(150);
+ lmgr_init_thread(); /* initialize the lockmanager stack */
+ pthread_attr_init(&attr);
+
+ int stat(-1);
+ berrno be;
+
+ workq_t queue;
+ /* Start work queues */
+ if ((stat = workq_init(&queue, TEST_MAX_NUM_WORKERS, callback)) != 0)
+ {
+ be.set_errno(stat);
+ Emsg1(M_ABORT, 0, _("Could not init work queue: ERR=%s\n"), be.bstrerror());
+ }
+
+ /* job1 is created and pseudo-submits some work to the work queue*/
+ JCR *jcr1 = new_jcr(sizeof(JCR), NULL);
+ jcr1->JobId = 1;
+ workq_ele_t * ret(0);
+ for (int w=0; w<TEST_NUM_WORKS; ++w)
+ {
+ if ((stat = workq_add(&queue, jcr1, &ret, 0)) != 0)
+ {
+ be.set_errno(stat);
+ Emsg1(M_ABORT, 0, _("Could not add work to queue: ERR=%s\n"), be.bstrerror());
+ }
+ }
+
+ JCR *jcr2 = new_jcr(sizeof(JCR), NULL);
+ jcr2->JobId = 2;
+ for (int w=0; w<TEST_NUM_WORKS; ++w)
+ {
+ if ((stat = workq_add(&queue, jcr2, &ret, 0)) != 0)
+ {
+ be.set_errno(stat);
+ Emsg1(M_ABORT, 0, _("Could not add work to queue: ERR=%s\n"), be.bstrerror());
+ }
+ }
+
+ printf("--------------------------------------------------------------\n");
+ printf("Start workq_wait_idle ....\n");
+ if ((stat = workq_wait_idle(&queue)) != 0)
+ {
+ be.set_errno(stat);
+ Emsg1(M_ABORT, 0, _("Waiting for workq to be empty: ERR=%s\n"), be.bstrerror());
+ }
+ printf("... workq_wait_idle completed.\n");
+ printf("--------------------------------------------------------------\n");
+
+ printf("Start workq_destroy ....\n");
+ if ((stat = workq_destroy(&queue)) != 0)
+ {
+ be.set_errno(stat);
+ Emsg1(M_ABORT, 0, _("Error in workq_destroy: ERR=%s\n"), be.bstrerror());
+ }
+ printf("... workq_destroy completed.\n");
+
+ return 0;
+
+}
+
+#endif