memcpy(res, &res_all, size);
if (!resources[rindex].res_head) {
resources[rindex].res_head = (RES *)res; /* store first entry */
- Dmsg3(000, "Inserting first %s res: %s index=%d\n", res_to_str(type),
+ Dmsg3(200, "Inserting first %s res: %s index=%d\n", res_to_str(type),
res->res_dir.hdr.name, rindex);
} else {
RES *next;
for (next=resources[rindex].res_head; next->next; next=next->next)
{ }
next->next = (RES *)res;
- Dmsg3(000, "Inserting %s res: %s index=%d\n", res_to_str(type),
+ Dmsg3(200, "Inserting %s res: %s index=%d\n", res_to_str(type),
res->res_dir.hdr.name, rindex);
}
}
}
}
-static void prtmsg(void *sock, char *fmt, ...)
-{
- va_list arg_ptr;
-
- va_start(arg_ptr, fmt);
- vfprintf(stdout, fmt, arg_ptr);
- va_end(arg_ptr);
-}
-
-
-
/* Store FileSet Include/Exclude info */
static void store_inc(LEX *lc, struct res_items *item, int index, int pass)
{
if (keyword == INC_KW_FILEOPTIONS) {
token = lex_get_token(lc, T_NAME);
if (pass == 2) {
- for (int i=r_first; i<=r_last; i++) {
- dump_resource(i, resources[i-r_first].res_head, prtmsg, NULL);
- }
res = GetResWithName(R_FILEOPTIONS, lc->str);
if (res == NULL) {
scan_err1(lc, _("Could not find specified FileOptions Resource: %s"), lc->str);
extern void backup_cleanup(void);
/* Queue of jobs to be run */
-static workq_t job_wq; /* our job work queue */
+workq_t job_wq; /* our job work queue */
void init_job_server(int max_workers)
void run_job(JCR *jcr)
{
int stat, errstat;
+ workq_ele_t *work_item;
sm_check(__FILE__, __LINE__, True);
init_msg(jcr, jcr->messages);
/* Queue the job to be run */
- if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
+ if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) {
Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat));
}
+ jcr->work_item = work_item;
Dmsg0(200, "Done run_job()\n");
}
Dmsg0(100, "=====Start Job=========\n");
jcr->start_time = now; /* set the real start time */
- if (jcr->job->MaxStartDelay != 0 && jcr->job->MaxStartDelay <
+ if (job_cancelled(jcr)) {
+ update_job_end_record(jcr);
+ } else if (jcr->job->MaxStartDelay != 0 && jcr->job->MaxStartDelay <
(utime_t)(jcr->start_time - jcr->sched_time)) {
Jmsg(jcr, M_FATAL, 0, _("Job cancelled because max delay time exceeded.\n"));
set_jcr_job_status(jcr, JS_ErrorTerminated);
Pmsg1(0, "Unimplemented job type: %d\n", jcr->JobType);
break;
}
- }
- if (jcr->job->RunAfterJob) {
- POOLMEM *after = get_pool_memory(PM_FNAME);
- int status;
+ if (jcr->job->RunAfterJob) {
+ POOLMEM *after = get_pool_memory(PM_FNAME);
+ int status;
- after = edit_run_codes(jcr, after, jcr->job->RunAfterJob);
- status = run_program(after, 0, NULL);
- free_pool_memory(after);
+ after = edit_run_codes(jcr, after, jcr->job->RunAfterJob);
+ status = run_program(after, 0, NULL);
+ free_pool_memory(after);
+ }
}
Dmsg0(50, "Before free jcr\n");
free_jcr(jcr);
* Initialize a work queue
*
* Returns: 0 on success
- * errno on failure
+ * errno on failure
*/
int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
{
int stat;
-
+
if ((stat = pthread_attr_init(&wq->attr)) != 0) {
return stat;
}
}
wq->quit = 0;
wq->first = wq->last = NULL;
- wq->max_workers = threads; /* max threads to create */
- wq->num_workers = 0; /* no threads yet */
- wq->idle_workers = 0; /* no idle threads */
- wq->engine = engine; /* routine to run */
+ wq->max_workers = threads; /* max threads to create */
+ wq->num_workers = 0; /* no threads yet */
+ wq->idle_workers = 0; /* no idle threads */
+ wq->engine = engine; /* routine to run */
wq->valid = WORKQ_VALID;
return 0;
}
* Destroy a work queue
*
* Returns: 0 on success
- * errno on failure
+ * errno on failure
*/
int workq_destroy(workq_t *wq)
{
if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
return stat;
}
- wq->valid = 0; /* prevent any more operations */
+ wq->valid = 0; /* prevent any more operations */
/*
* If any threads are active, wake them
if (wq->num_workers > 0) {
wq->quit = 1;
if (wq->idle_workers) {
- if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
- }
+ if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
+ }
}
while (wq->num_workers > 0) {
- if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
- }
+ if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
+ }
}
}
if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
return stat;
}
- stat = pthread_mutex_destroy(&wq->mutex);
+ stat = pthread_mutex_destroy(&wq->mutex);
stat1 = pthread_cond_destroy(&wq->work);
stat2 = pthread_attr_destroy(&wq->attr);
return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
/*
* Add work to a queue
+ * wq is a queue that was created with workq_init
+ * element is a user unique item that will be passed to the
+ * processing routine
+ * work_item will get internal work queue item -- if it is not NULL
+ * priority if non-zero will cause the item to be placed on the
+ * head of the list instead of the tail.
*/
-int workq_add(workq_t *wq, void *element)
+int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
{
int stat;
workq_ele_t *item;
return EINVAL;
}
- if ((item = (workq_ele_t *) malloc(sizeof(workq_ele_t))) == NULL) {
+ if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
return ENOMEM;
}
item->data = element;
}
Dmsg0(200, "add item to queue\n");
- /* Add the new item to the end of the queue */
- if (wq->first == NULL) {
- wq->first = item;
+ if (priority) {
+ /* Add to head of queue */
+ if (wq->first == NULL) {
+ wq->first = item;
+ wq->last = item;
+ } else {
+ item->next = wq->first;
+ wq->first = item;
+ }
} else {
- wq->last->next = item;
+ /* Add to end of queue */
+ if (wq->first == NULL) {
+ wq->first = item;
+ } else {
+ wq->last->next = item;
+ }
+ wq->last = item;
}
- wq->last = item;
/* if any threads are idle, wake one */
if (wq->idle_workers > 0) {
Dmsg0(200, "Signal worker\n");
if ((stat = pthread_cond_signal(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
}
} else if (wq->num_workers < wq->max_workers) {
Dmsg0(200, "Create worker thread\n");
/* No idle threads so create a new one */
set_thread_concurrency(wq->max_workers + 1);
if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
}
wq->num_workers++;
}
pthread_mutex_unlock(&wq->mutex);
Dmsg0(200, "Return workq_add\n");
+ /* Return work_item if requested */
+ if (work_item) {
+ *work_item = item;
+ }
+ return stat;
+}
+
+/*
+ * Remove work from a queue
+ * wq is a queue that was created with workq_init
+ * work_item is an element of work
+ *
+ * Note, it is "removed" by immediately calling a processing routine.
+ * if you want to cancel it, you need to provide some external means
+ * of doing so.
+ */
+int workq_remove(workq_t *wq, workq_ele_t *work_item)
+{
+ int stat, found = 0;
+ pthread_t id;
+ workq_ele_t *item, *prev;
+
+ Dmsg0(200, "workq_remove\n");
+ if (wq->valid != WORKQ_VALID) {
+ return EINVAL;
+ }
+
+ if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
+ return stat;
+ }
+
+ for (prev=item=wq->first; item; item=item->next) {
+ if (item == work_item) {
+ found = 1;
+ break;
+ }
+ prev = item;
+ }
+ if (!found) {
+ return EINVAL;
+ }
+
+ /* Move item to be first on list */
+ if (wq->first != work_item) {
+ prev->next = work_item->next;
+ if (wq->last == work_item) {
+ wq->last = prev;
+ }
+ work_item->next = wq->first;
+ wq->first = work_item;
+ }
+
+ /* if any threads are idle, wake one */
+ if (wq->idle_workers > 0) {
+ Dmsg0(200, "Signal worker\n");
+ if ((stat = pthread_cond_signal(&wq->work)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
+ }
+ } else {
+ Dmsg0(200, "Create worker thread\n");
+ /* No idle threads so create a new one */
+ set_thread_concurrency(wq->max_workers + 1);
+ if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
+ }
+ wq->num_workers++;
+ }
+ pthread_mutex_unlock(&wq->mutex);
+ Dmsg0(200, "Return workq_remove\n");
return stat;
}
+
/*
* This is the worker thread that serves the work queue.
* In due course, it will call the user's engine.
timeout.tv_sec = tv.tv_sec + 2;
while (wq->first == NULL && !wq->quit) {
- /*
- * Wait 2 seconds, then if no more work, exit
- */
+ /*
+ * Wait 2 seconds, then if no more work, exit
+ */
Dmsg0(200, "pthread_cond_timedwait()\n");
#ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
- /* CYGWIN dies with a page fault the second
- * time that pthread_cond_timedwait() is called
- * so fake it out.
- */
- pthread_mutex_lock(&wq->mutex);
- stat = ETIMEDOUT;
+ /* CYGWIN dies with a page fault the second
+ * time that pthread_cond_timedwait() is called
+ * so fake it out.
+ */
+ pthread_mutex_lock(&wq->mutex);
+ stat = ETIMEDOUT;
#else
- stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
+ stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
#endif
Dmsg1(200, "timedwait=%d\n", stat);
- if (stat == ETIMEDOUT) {
- timedout = 1;
- break;
- } else if (stat != 0) {
+ if (stat == ETIMEDOUT) {
+ timedout = 1;
+ break;
+ } else if (stat != 0) {
/* This shouldn't happen */
Dmsg0(200, "This shouldn't happen\n");
- wq->num_workers--;
- pthread_mutex_unlock(&wq->mutex);
- return NULL;
- }
+ wq->num_workers--;
+ pthread_mutex_unlock(&wq->mutex);
+ return NULL;
+ }
}
we = wq->first;
if (we != NULL) {
- wq->first = we->next;
- if (wq->last == we) {
- wq->last = NULL;
- }
- if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
- return NULL;
- }
+ wq->first = we->next;
+ if (wq->last == we) {
+ wq->last = NULL;
+ }
+ if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
+ return NULL;
+ }
/* Call user's routine here */
Dmsg0(200, "Calling user engine.\n");
- wq->engine(we->data);
+ wq->engine(we->data);
Dmsg0(200, "Back from user engine.\n");
- free(we); /* release work entry */
+ free(we); /* release work entry */
Dmsg0(200, "relock mutex\n");
- if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
- return NULL;
- }
+ if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
+ return NULL;
+ }
Dmsg0(200, "Done lock mutex\n");
}
/*
* If no more work request, and we are asked to quit, then do it
*/
if (wq->first == NULL && wq->quit) {
- wq->num_workers--;
- if (wq->num_workers == 0) {
+ wq->num_workers--;
+ if (wq->num_workers == 0) {
Dmsg0(200, "Wake up destroy routine\n");
- /* Wake up destroy routine if he is waiting */
- pthread_cond_broadcast(&wq->work);
- }
+ /* Wake up destroy routine if he is waiting */
+ pthread_cond_broadcast(&wq->work);
+ }
Dmsg0(200, "Unlock mutex\n");
- pthread_mutex_unlock(&wq->mutex);
+ pthread_mutex_unlock(&wq->mutex);
Dmsg0(200, "Return from workq_server\n");
- return NULL;
+ return NULL;
}
Dmsg0(200, "Check for work request\n");
/*
Dmsg1(200, "timedout=%d\n", timedout);
if (wq->first == NULL && timedout) {
Dmsg0(200, "break big loop\n");
- wq->num_workers--;
- break;
+ wq->num_workers--;
+ break;
}
Dmsg0(200, "Loop again\n");
} /* end of big for loop */
*/
typedef struct workq_ele_tag {
struct workq_ele_tag *next;
- void *data;
+ void *data;
} workq_ele_t;
/*
* Structure describing a work queue
*/
typedef struct workq_tag {
- pthread_mutex_t mutex; /* queue access control */
- pthread_cond_t work; /* wait for work */
- pthread_attr_t attr; /* create detached threads */
- workq_ele_t *first, *last; /* work queue */
- int valid; /* queue initialized */
- int quit; /* workq should quit */
- int max_workers; /* max threads */
- int num_workers; /* current threads */
- int idle_workers; /* idle threads */
- void (*engine)(void *arg); /* user engine */
+ pthread_mutex_t mutex; /* queue access control */
+ pthread_cond_t work; /* wait for work */
+ pthread_attr_t attr; /* create detached threads */
+ workq_ele_t *first, *last; /* work queue */
+ int valid; /* queue initialized */
+ int quit; /* workq should quit */
+ int max_workers; /* max threads */
+ int num_workers; /* current threads */
+ int idle_workers; /* idle threads */
+ void (*engine)(void *arg); /* user engine */
} workq_t;
#define WORKQ_VALID 0xdec1992
extern int workq_init(
- workq_t *wq,
- int threads, /* maximum threads */
- void (*engine)(void *) /* engine routine */
- );
+ workq_t *wq,
+ int threads, /* maximum threads */
+ void (*engine)(void *) /* engine routine */
+ );
extern int workq_destroy(workq_t *wq);
-extern int workq_add(workq_t *wq, void *data);
+extern int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority);
+extern int workq_remove(workq_t *wq, workq_ele_t *work_item);
#endif /* __WORKQ_H */