From 82151d934f8b46ff7133b084808f288c81edd372 Mon Sep 17 00:00:00 2001 From: Kern Sibbald Date: Sat, 1 Mar 2003 13:50:41 +0000 Subject: [PATCH] Remove cancelled jobs from workq + add priority git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@361 91ce42f0-d328-0410-95d8-f526ca767f89 --- bacula/kernstodo | 4 - bacula/src/dird/dird_conf.c | 18 +-- bacula/src/dird/fd_cmds.c | 2 +- bacula/src/dird/job.c | 24 ++-- bacula/src/dird/ua_cmds.c | 3 +- bacula/src/jcr.h | 1 + bacula/src/lib/bnet_server.c | 2 +- bacula/src/lib/workq.c | 223 ++++++++++++++++++++++++----------- bacula/src/lib/workq.h | 33 +++--- 9 files changed, 194 insertions(+), 116 deletions(-) diff --git a/bacula/kernstodo b/bacula/kernstodo index 1d4112b6df..1250fa5df4 100644 --- a/bacula/kernstodo +++ b/bacula/kernstodo @@ -22,10 +22,6 @@ For 1.30 release: the last copy for 1 year. - Have Bacula "poll" the tape to see if it is there. - Add more of the config info to the tape label. -- Look for default.bsr, and if not found, create dummy bsr allowing - any volume. -- If config file not found in utility programs, pluge on - anyway. - Apparently cancel does not work for jobs waiting to be scheduled. - Implement bar code reader for autochangers diff --git a/bacula/src/dird/dird_conf.c b/bacula/src/dird/dird_conf.c index 640dce9ed5..1949d77a67 100644 --- a/bacula/src/dird/dird_conf.c +++ b/bacula/src/dird/dird_conf.c @@ -1030,7 +1030,7 @@ void save_resource(int type, struct res_items *items, int pass) memcpy(res, &res_all, size); if (!resources[rindex].res_head) { resources[rindex].res_head = (RES *)res; /* store first entry */ - Dmsg3(000, "Inserting first %s res: %s index=%d\n", res_to_str(type), + Dmsg3(200, "Inserting first %s res: %s index=%d\n", res_to_str(type), res->res_dir.hdr.name, rindex); } else { RES *next; @@ -1038,7 +1038,7 @@ void save_resource(int type, struct res_items *items, int pass) for (next=resources[rindex].res_head; next->next; next=next->next) { } next->next = (RES *)res; - Dmsg3(000, "Inserting %s res: %s index=%d\n", res_to_str(type), + Dmsg3(200, "Inserting %s res: %s index=%d\n", res_to_str(type), res->res_dir.hdr.name, rindex); } } @@ -1357,17 +1357,6 @@ static void scan_include_options(LEX *lc, int keyword, char *opts, int optlen) } } -static void prtmsg(void *sock, char *fmt, ...) -{ - va_list arg_ptr; - - va_start(arg_ptr, fmt); - vfprintf(stdout, fmt, arg_ptr); - va_end(arg_ptr); -} - - - /* Store FileSet Include/Exclude info */ static void store_inc(LEX *lc, struct res_items *item, int index, int pass) { @@ -1400,9 +1389,6 @@ static void store_inc(LEX *lc, struct res_items *item, int index, int pass) if (keyword == INC_KW_FILEOPTIONS) { token = lex_get_token(lc, T_NAME); if (pass == 2) { - for (int i=r_first; i<=r_last; i++) { - dump_resource(i, resources[i-r_first].res_head, prtmsg, NULL); - } res = GetResWithName(R_FILEOPTIONS, lc->str); if (res == NULL) { scan_err1(lc, _("Could not find specified FileOptions Resource: %s"), lc->str); diff --git a/bacula/src/dird/fd_cmds.c b/bacula/src/dird/fd_cmds.c index 572b245163..e4e6d56cf1 100644 --- a/bacula/src/dird/fd_cmds.c +++ b/bacula/src/dird/fd_cmds.c @@ -219,7 +219,7 @@ int send_exclude_list(JCR *jcr) for (i=0; i < fileset->num_excludes; i++) { pm_strcpy(&fd->msg, fileset->exclude_array[i]->name); fd->msglen = strlen(fd->msg); - Dmsg1(000, "dird>filed: exclude file: %s\n", fileset->exclude_array[i]->name); + Dmsg1(200, "dird>filed: exclude file: %s\n", fileset->exclude_array[i]->name); if (!bnet_send(fd)) { Jmsg(jcr, M_FATAL, 0, _(">filed: write error on socket\n")); set_jcr_job_status(jcr, JS_ErrorTerminated); diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index fc95c6b1a9..aff54c3175 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -47,7 +47,7 @@ extern int do_verify(JCR *jcr); extern void backup_cleanup(void); /* Queue of jobs to be run */ -static workq_t job_wq; /* our job work queue */ +workq_t job_wq; /* our job work queue */ void init_job_server(int max_workers) @@ -68,6 +68,7 @@ void init_job_server(int max_workers) void run_job(JCR *jcr) { int stat, errstat; + workq_ele_t *work_item; sm_check(__FILE__, __LINE__, True); init_msg(jcr, jcr->messages); @@ -124,9 +125,10 @@ void run_job(JCR *jcr) /* Queue the job to be run */ - if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) { + if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) { Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat)); } + jcr->work_item = work_item; Dmsg0(200, "Done run_job()\n"); } @@ -145,7 +147,9 @@ static void job_thread(void *arg) Dmsg0(100, "=====Start Job=========\n"); jcr->start_time = now; /* set the real start time */ - if (jcr->job->MaxStartDelay != 0 && jcr->job->MaxStartDelay < + if (job_cancelled(jcr)) { + update_job_end_record(jcr); + } else if (jcr->job->MaxStartDelay != 0 && jcr->job->MaxStartDelay < (utime_t)(jcr->start_time - jcr->sched_time)) { Jmsg(jcr, M_FATAL, 0, _("Job cancelled because max delay time exceeded.\n")); set_jcr_job_status(jcr, JS_ErrorTerminated); @@ -191,14 +195,14 @@ static void job_thread(void *arg) Pmsg1(0, "Unimplemented job type: %d\n", jcr->JobType); break; } - } - if (jcr->job->RunAfterJob) { - POOLMEM *after = get_pool_memory(PM_FNAME); - int status; + if (jcr->job->RunAfterJob) { + POOLMEM *after = get_pool_memory(PM_FNAME); + int status; - after = edit_run_codes(jcr, after, jcr->job->RunAfterJob); - status = run_program(after, 0, NULL); - free_pool_memory(after); + after = edit_run_codes(jcr, after, jcr->job->RunAfterJob); + status = run_program(after, 0, NULL); + free_pool_memory(after); + } } Dmsg0(50, "Before free jcr\n"); free_jcr(jcr); diff --git a/bacula/src/dird/ua_cmds.c b/bacula/src/dird/ua_cmds.c index 908f3a4716..82072bee12 100644 --- a/bacula/src/dird/ua_cmds.c +++ b/bacula/src/dird/ua_cmds.c @@ -39,6 +39,7 @@ extern int r_first; extern int r_last; extern struct s_res resources[]; extern char my_name[]; +extern workq_t job_wq; /* work queue */ /* Imported functions */ extern int statuscmd(UAContext *ua, char *cmd); @@ -414,11 +415,11 @@ static int cancelcmd(UAContext *ua, char *cmd) set_jcr_job_status(jcr, JS_Cancelled); bsendmsg(ua, _("JobId %d, Job %s marked to be cancelled.\n"), jcr->JobId, jcr->Job); + workq_remove(&job_wq, jcr->work_item); /* attempt to remove it from queue */ free_jcr(jcr); return 1; default: - set_jcr_job_status(jcr, JS_Cancelled); /* Cancel File daemon */ ua->jcr->client = jcr->client; diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index 47f8a0da9f..0669b5e63f 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -117,6 +117,7 @@ struct s_jcr { /* Director Daemon specific part of JCR */ pthread_t SD_msg_chan; /* Message channel thread id */ pthread_cond_t term_wait; /* Wait for job termination */ + workq_ele_t *work_item; /* Work queue item if scheduled */ int msg_thread_done; /* Set when Storage message thread terms */ BSOCK *ua; /* User agent */ JOB *job; /* Job resource */ diff --git a/bacula/src/lib/bnet_server.c b/bacula/src/lib/bnet_server.c index 040a773fb9..100b15a2ed 100644 --- a/bacula/src/lib/bnet_server.c +++ b/bacula/src/lib/bnet_server.c @@ -160,7 +160,7 @@ bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_w /* Queue client to be served */ if ((stat = workq_add(client_wq, - (void *)init_bsock(NULL, newsockfd, "client", caller, port))) != 0) { + (void *)init_bsock(NULL, newsockfd, "client", caller, port), NULL, 0)) != 0) { Jmsg1(NULL, M_ABORT, 0, _("Could not add job to client queue: ERR=%s\n"), strerror(stat)); } } diff --git a/bacula/src/lib/workq.c b/bacula/src/lib/workq.c index b076aca7da..3ccc663923 100755 --- a/bacula/src/lib/workq.c +++ b/bacula/src/lib/workq.c @@ -56,12 +56,12 @@ static void *workq_server(void *arg); * Initialize a work queue * * Returns: 0 on success - * errno on failure + * errno on failure */ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg)) { int stat; - + if ((stat = pthread_attr_init(&wq->attr)) != 0) { return stat; } @@ -80,10 +80,10 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg)) } wq->quit = 0; wq->first = wq->last = NULL; - wq->max_workers = threads; /* max threads to create */ - wq->num_workers = 0; /* no threads yet */ - wq->idle_workers = 0; /* no idle threads */ - wq->engine = engine; /* routine to run */ + wq->max_workers = threads; /* max threads to create */ + wq->num_workers = 0; /* no threads yet */ + wq->idle_workers = 0; /* no idle threads */ + wq->engine = engine; /* routine to run */ wq->valid = WORKQ_VALID; return 0; } @@ -92,7 +92,7 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg)) * Destroy a work queue * * Returns: 0 on success - * errno on failure + * errno on failure */ int workq_destroy(workq_t *wq) { @@ -104,7 +104,7 @@ int workq_destroy(workq_t *wq) if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { return stat; } - wq->valid = 0; /* prevent any more operations */ + wq->valid = 0; /* prevent any more operations */ /* * If any threads are active, wake them @@ -112,22 +112,22 @@ int workq_destroy(workq_t *wq) if (wq->num_workers > 0) { wq->quit = 1; if (wq->idle_workers) { - if ((stat = pthread_cond_broadcast(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; - } + if ((stat = pthread_cond_broadcast(&wq->work)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } } while (wq->num_workers > 0) { - if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; - } + if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } } } if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { return stat; } - stat = pthread_mutex_destroy(&wq->mutex); + stat = pthread_mutex_destroy(&wq->mutex); stat1 = pthread_cond_destroy(&wq->work); stat2 = pthread_attr_destroy(&wq->attr); return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2)); @@ -136,8 +136,14 @@ int workq_destroy(workq_t *wq) /* * Add work to a queue + * wq is a queue that was created with workq_init + * element is a user unique item that will be passed to the + * processing routine + * work_item will get internal work queue item -- if it is not NULL + * priority if non-zero will cause the item to be placed on the + * head of the list instead of the tail. */ -int workq_add(workq_t *wq, void *element) +int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) { int stat; workq_ele_t *item; @@ -148,7 +154,7 @@ int workq_add(workq_t *wq, void *element) return EINVAL; } - if ((item = (workq_ele_t *) malloc(sizeof(workq_ele_t))) == NULL) { + if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) { return ENOMEM; } item->data = element; @@ -159,36 +165,119 @@ int workq_add(workq_t *wq, void *element) } Dmsg0(200, "add item to queue\n"); - /* Add the new item to the end of the queue */ - if (wq->first == NULL) { - wq->first = item; + if (priority) { + /* Add to head of queue */ + if (wq->first == NULL) { + wq->first = item; + wq->last = item; + } else { + item->next = wq->first; + wq->first = item; + } } else { - wq->last->next = item; + /* Add to end of queue */ + if (wq->first == NULL) { + wq->first = item; + } else { + wq->last->next = item; + } + wq->last = item; } - wq->last = item; /* if any threads are idle, wake one */ if (wq->idle_workers > 0) { Dmsg0(200, "Signal worker\n"); if ((stat = pthread_cond_signal(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } } else if (wq->num_workers < wq->max_workers) { Dmsg0(200, "Create worker thread\n"); /* No idle threads so create a new one */ set_thread_concurrency(wq->max_workers + 1); if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } wq->num_workers++; } pthread_mutex_unlock(&wq->mutex); Dmsg0(200, "Return workq_add\n"); + /* Return work_item if requested */ + if (work_item) { + *work_item = item; + } + return stat; +} + +/* + * Remove work from a queue + * wq is a queue that was created with workq_init + * work_item is an element of work + * + * Note, it is "removed" by immediately calling a processing routine. + * if you want to cancel it, you need to provide some external means + * of doing so. + */ +int workq_remove(workq_t *wq, workq_ele_t *work_item) +{ + int stat, found = 0; + pthread_t id; + workq_ele_t *item, *prev; + + Dmsg0(200, "workq_remove\n"); + if (wq->valid != WORKQ_VALID) { + return EINVAL; + } + + if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { + return stat; + } + + for (prev=item=wq->first; item; item=item->next) { + if (item == work_item) { + found = 1; + break; + } + prev = item; + } + if (!found) { + return EINVAL; + } + + /* Move item to be first on list */ + if (wq->first != work_item) { + prev->next = work_item->next; + if (wq->last == work_item) { + wq->last = prev; + } + work_item->next = wq->first; + wq->first = work_item; + } + + /* if any threads are idle, wake one */ + if (wq->idle_workers > 0) { + Dmsg0(200, "Signal worker\n"); + if ((stat = pthread_cond_signal(&wq->work)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } + } else { + Dmsg0(200, "Create worker thread\n"); + /* No idle threads so create a new one */ + set_thread_concurrency(wq->max_workers + 1); + if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } + wq->num_workers++; + } + pthread_mutex_unlock(&wq->mutex); + Dmsg0(200, "Return workq_remove\n"); return stat; } + /* * This is the worker thread that serves the work queue. * In due course, it will call the user's engine. @@ -217,66 +306,66 @@ static void *workq_server(void *arg) timeout.tv_sec = tv.tv_sec + 2; while (wq->first == NULL && !wq->quit) { - /* - * Wait 2 seconds, then if no more work, exit - */ + /* + * Wait 2 seconds, then if no more work, exit + */ Dmsg0(200, "pthread_cond_timedwait()\n"); #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN - /* CYGWIN dies with a page fault the second - * time that pthread_cond_timedwait() is called - * so fake it out. - */ - pthread_mutex_lock(&wq->mutex); - stat = ETIMEDOUT; + /* CYGWIN dies with a page fault the second + * time that pthread_cond_timedwait() is called + * so fake it out. + */ + pthread_mutex_lock(&wq->mutex); + stat = ETIMEDOUT; #else - stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout); + stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout); #endif Dmsg1(200, "timedwait=%d\n", stat); - if (stat == ETIMEDOUT) { - timedout = 1; - break; - } else if (stat != 0) { + if (stat == ETIMEDOUT) { + timedout = 1; + break; + } else if (stat != 0) { /* This shouldn't happen */ Dmsg0(200, "This shouldn't happen\n"); - wq->num_workers--; - pthread_mutex_unlock(&wq->mutex); - return NULL; - } + wq->num_workers--; + pthread_mutex_unlock(&wq->mutex); + return NULL; + } } we = wq->first; if (we != NULL) { - wq->first = we->next; - if (wq->last == we) { - wq->last = NULL; - } - if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { - return NULL; - } + wq->first = we->next; + if (wq->last == we) { + wq->last = NULL; + } + if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { + return NULL; + } /* Call user's routine here */ Dmsg0(200, "Calling user engine.\n"); - wq->engine(we->data); + wq->engine(we->data); Dmsg0(200, "Back from user engine.\n"); - free(we); /* release work entry */ + free(we); /* release work entry */ Dmsg0(200, "relock mutex\n"); - if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { - return NULL; - } + if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { + return NULL; + } Dmsg0(200, "Done lock mutex\n"); } /* * If no more work request, and we are asked to quit, then do it */ if (wq->first == NULL && wq->quit) { - wq->num_workers--; - if (wq->num_workers == 0) { + wq->num_workers--; + if (wq->num_workers == 0) { Dmsg0(200, "Wake up destroy routine\n"); - /* Wake up destroy routine if he is waiting */ - pthread_cond_broadcast(&wq->work); - } + /* Wake up destroy routine if he is waiting */ + pthread_cond_broadcast(&wq->work); + } Dmsg0(200, "Unlock mutex\n"); - pthread_mutex_unlock(&wq->mutex); + pthread_mutex_unlock(&wq->mutex); Dmsg0(200, "Return from workq_server\n"); - return NULL; + return NULL; } Dmsg0(200, "Check for work request\n"); /* @@ -286,8 +375,8 @@ static void *workq_server(void *arg) Dmsg1(200, "timedout=%d\n", timedout); if (wq->first == NULL && timedout) { Dmsg0(200, "break big loop\n"); - wq->num_workers--; - break; + wq->num_workers--; + break; } Dmsg0(200, "Loop again\n"); } /* end of big for loop */ diff --git a/bacula/src/lib/workq.h b/bacula/src/lib/workq.h index 92c66e1f50..64ef4834f3 100644 --- a/bacula/src/lib/workq.h +++ b/bacula/src/lib/workq.h @@ -37,33 +37,34 @@ */ typedef struct workq_ele_tag { struct workq_ele_tag *next; - void *data; + void *data; } workq_ele_t; /* * Structure describing a work queue */ typedef struct workq_tag { - pthread_mutex_t mutex; /* queue access control */ - pthread_cond_t work; /* wait for work */ - pthread_attr_t attr; /* create detached threads */ - workq_ele_t *first, *last; /* work queue */ - int valid; /* queue initialized */ - int quit; /* workq should quit */ - int max_workers; /* max threads */ - int num_workers; /* current threads */ - int idle_workers; /* idle threads */ - void (*engine)(void *arg); /* user engine */ + pthread_mutex_t mutex; /* queue access control */ + pthread_cond_t work; /* wait for work */ + pthread_attr_t attr; /* create detached threads */ + workq_ele_t *first, *last; /* work queue */ + int valid; /* queue initialized */ + int quit; /* workq should quit */ + int max_workers; /* max threads */ + int num_workers; /* current threads */ + int idle_workers; /* idle threads */ + void (*engine)(void *arg); /* user engine */ } workq_t; #define WORKQ_VALID 0xdec1992 extern int workq_init( - workq_t *wq, - int threads, /* maximum threads */ - void (*engine)(void *) /* engine routine */ - ); + workq_t *wq, + int threads, /* maximum threads */ + void (*engine)(void *) /* engine routine */ + ); extern int workq_destroy(workq_t *wq); -extern int workq_add(workq_t *wq, void *data); +extern int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority); +extern int workq_remove(workq_t *wq, workq_ele_t *work_item); #endif /* __WORKQ_H */ -- 2.39.5