From 2543fafe18dd9c060a58c6c18cd34fcb432de86c Mon Sep 17 00:00:00 2001 From: Kern Sibbald Date: Sun, 20 Jul 2003 11:25:50 +0000 Subject: [PATCH] Complete new job scheduler + fix from Nic Bellamy git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@637 91ce42f0-d328-0410-95d8-f526ca767f89 --- bacula/ChangeLog | 6 + bacula/src/dird/dird.h | 11 +- bacula/src/dird/job.c | 7 +- bacula/src/dird/jobq.c | 336 ++++++++++++++++++++++++++---------- bacula/src/dird/jobq.h | 4 +- bacula/src/dird/ua_cmds.c | 9 + bacula/src/dird/ua_status.c | 3 + bacula/src/jcr.h | 219 +++++++++++------------ bacula/src/lib/dlist.c | 41 ++++- bacula/src/stored/mount.c | 3 + bacula/src/version.h | 13 +- 11 files changed, 437 insertions(+), 215 deletions(-) diff --git a/bacula/ChangeLog b/bacula/ChangeLog index c0848f4018..6f39db0f06 100644 --- a/bacula/ChangeLog +++ b/bacula/ChangeLog @@ -1,5 +1,11 @@ 2003-07-xx Version 1.31 Beta xxJul03 +- Complete implementation of new job scheduler. jobq.h jobq.c + This code is turned off unless specifically enabled in src/version.h +- Integrate code from Nic Bellamy to check for recycled volume in + mount.c in SD. +- Fix a couple of bugs in dlist.c +- Begin implementation of new job scheduler. - Take serial.h provided by David Craigon, which corrects differences in prototypes between serial.h and serial.c. - Make db_get_media_ids() return Media Ids only for the current pool. diff --git a/bacula/src/dird/dird.h b/bacula/src/dird/dird.h index 81fd847f40..8cc1b004f3 100644 --- a/bacula/src/dird/dird.h +++ b/bacula/src/dird/dird.h @@ -29,26 +29,17 @@ #define DIRECTOR_DAEMON 1 -/* The following includes are at the bottom of - * this file rather than at the top because the - * #include "jcr.h" uses the definition of JOB - * as supplied above. - */ - #include "cats/cats.h" #include "jcr.h" - #include "bsr.h" #include "ua.h" #include "protos.h" + #include "jobq.h" /* Globals that dird.c exports */ -extern int debug_level; -extern time_t start_time; extern DIRRES *director; /* Director resource */ -extern char *working_directory; /* export our working directory */ extern int FDConnectTimeout; extern int SDConnectTimeout; diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index c41af8325b..e026dffbc6 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -80,7 +80,7 @@ void init_job_server(int max_workers) #else #ifdef JOB_QUEUE - if ((stat = job_init(&job_queue, max_workers, job_thread)) != 0) { + if ((stat = jobq_init(&job_queue, max_workers, job_thread)) != 0) { Emsg1(M_ABORT, 0, _("Could not init job queue: ERR=%s\n"), strerror(stat)); } #else @@ -329,7 +329,9 @@ bail_out: db_close_database(jcr, jcr->db); jcr->db = NULL; } +#ifndef JOB_QUEUE free_jcr(jcr); +#endif Dmsg0(50, "======== End Job ==========\n"); sm_check(__FILE__, __LINE__, True); return NULL; @@ -342,6 +344,7 @@ bail_out: */ static int acquire_resource_locks(JCR *jcr) { +#ifndef JOB_QUEUE time_t now = time(NULL); time_t wtime = jcr->sched_time - now; @@ -363,6 +366,7 @@ static int acquire_resource_locks(JCR *jcr) } wtime = jcr->sched_time - time(NULL); } +#endif #ifdef USE_SEMAPHORE @@ -657,6 +661,7 @@ void set_jcr_defaults(JCR *jcr, JOB *job) jcr->job = job; jcr->JobType = job->JobType; jcr->JobLevel = job->level; + jcr->JobPriority = job->Priority; jcr->store = job->storage; jcr->client = job->client; if (!jcr->client_name) { diff --git a/bacula/src/dird/jobq.c b/bacula/src/dird/jobq.c index c3f2cb7b0d..cc9502df1b 100755 --- a/bacula/src/dird/jobq.c +++ b/bacula/src/dird/jobq.c @@ -1,6 +1,13 @@ /* * Bacula job queue routines. * + * This code consists of three queues, the waiting_jobs + * queue, where jobs are initially queued, the ready_jobs + * queue, where jobs are placed when all the resources are + * allocated and they can immediately be run, and the + * running queue where jobs are placed when they are + * running. + * * Kern Sibbald, July MMIII * * Version $Id$ @@ -9,23 +16,6 @@ * adapted from "Programming with POSIX Threads", by * David R. Butenhof * - * Example: - * - * static jobq_t jq; define job queue - * - * Initialize queue - * if ((stat = jobq_init(&jq, max_workers, job_thread)) != 0) { - * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno)); - * } - * - * Add an item to the queue - * if ((stat = jobq_add(&jq, jcr)) != 0) { - * Emsg1(M_ABORT, 0, "Could not add job to queue: ERR=%s\n", strerror(errno)); - * } - * - * Terminate the queue - * jobq_destroy(jobq_t *jq); - * */ /* Copyright (C) 2000-2003 Kern Sibbald and John Walker @@ -50,8 +40,11 @@ #include "bacula.h" #include "dird.h" +#ifdef JOB_QUEUE + /* Forward referenced functions */ static void *jobq_server(void *arg); +static int start_server(jobq_t *jq); /* * Initialize a job queue @@ -86,7 +79,10 @@ int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg)) jq->idle_workers = 0; /* no idle threads */ jq->engine = engine; /* routine to run */ jq->valid = JOBQ_VALID; - jq->list.init(item, &item->link); + /* Initialize the job queues */ + jq->waiting_jobs = new dlist(item, &item->link); + jq->running_jobs = new dlist(item, &item->link); + jq->ready_jobs = new dlist(item, &item->link); return 0; } @@ -132,10 +128,51 @@ int jobq_destroy(jobq_t *jq) stat = pthread_mutex_destroy(&jq->mutex); stat1 = pthread_cond_destroy(&jq->work); stat2 = pthread_attr_destroy(&jq->attr); - jq->list.destroy(); + delete jq->waiting_jobs; + delete jq->running_jobs; + delete jq->ready_jobs; return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2)); } +struct wait_pkt { + JCR *jcr; + jobq_t *jq; +}; + +/* + * Wait until schedule time arrives before starting + */ +static void *sched_wait(void *arg) +{ + JCR *jcr = ((wait_pkt *)arg)->jcr; + jobq_t *jq = ((wait_pkt *)arg)->jq; + + Dmsg0(100, "Enter sched_wait.\n"); + free(arg); + time_t wtime = jcr->sched_time - time(NULL); + /* Wait until scheduled time arrives */ + if (wtime > 0 && verbose) { + Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"), + jcr->Job, wtime); + set_jcr_job_status(jcr, JS_WaitStartTime); + } + /* Check every 30 seconds if canceled */ + while (wtime > 0) { + Dmsg2(100, "Waiting on sched time, jobid=%d secs=%d\n", jcr->JobId, wtime); + if (wtime > 30) { + wtime = 30; + } + bmicrosleep(wtime, 0); + if (job_canceled(jcr)) { + break; + } + wtime = jcr->sched_time - time(NULL); + } + jobq_add(jq, jcr); + Dmsg0(100, "Exit sched_wait\n"); + return NULL; +} + /* * Add a job to the queue @@ -146,53 +183,63 @@ int jobq_add(jobq_t *jq, JCR *jcr) { int stat; jobq_item_t *item, *li; - pthread_t id; bool inserted = false; + time_t wtime = jcr->sched_time - time(NULL); + pthread_t id; + wait_pkt *sched_pkt; + - Dmsg0(200, "jobq_add\n"); + Dmsg0(100, "jobq_add\n"); if (jq->valid != JOBQ_VALID) { return EINVAL; } - if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) { - return ENOMEM; + if (!job_canceled(jcr) && wtime > 0) { + set_thread_concurrency(jq->max_workers + 2); + sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt)); + sched_pkt->jcr = jcr; + sched_pkt->jq = jq; + if ((stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt)) != 0) { + return stat; + } } - item->jcr = jcr; + if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { - free(item); return stat; } - Dmsg0(200, "add item to queue\n"); - for (li=NULL; (li=(jobq_item_t *)jq->list.next(li)); ) { - if (li->jcr->JobPriority < jcr->JobPriority) { - jq->list.insert_before(item, li); - inserted = true; - } - } - if (!inserted) { - jq->list.append(item); + if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) { + return ENOMEM; } + item->jcr = jcr; - /* if any threads are idle, wake one */ - if (jq->idle_workers > 0) { - Dmsg0(200, "Signal worker\n"); - if ((stat = pthread_cond_signal(&jq->work)) != 0) { - pthread_mutex_unlock(&jq->mutex); - return stat; + Dmsg1(100, "add 0x%x to queue\n", (unsigned)item); + if (job_canceled(jcr)) { + /* Add job to ready queue so that it is canceled quickly */ + jq->ready_jobs->prepend(item); + } else { + /* Add this job to the wait queue in priority sorted order */ + for (li=NULL; (li=(jobq_item_t *)jq->waiting_jobs->next(li)); ) { + if (li->jcr->JobPriority < jcr->JobPriority) { + jq->waiting_jobs->insert_before(item, li); + Dmsg1(100, "insert_before 0x%x\n", (unsigned)li); + inserted = true; + } } - } else if (jq->num_workers < jq->max_workers) { - Dmsg0(200, "Create worker thread\n"); - /* No idle threads so create a new one */ - set_thread_concurrency(jq->max_workers + 1); - if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) { - pthread_mutex_unlock(&jq->mutex); - return stat; + /* If not jobs in wait queue, append it */ + if (!inserted) { + jq->waiting_jobs->append(item); + Dmsg0(100, "Appended item.\n"); } - jq->num_workers++; + Dmsg1(100, "Next=0x%x\n", (unsigned)jq->waiting_jobs->next(item)); } - pthread_mutex_unlock(&jq->mutex); - Dmsg0(200, "Return jobq_add\n"); + + stat = start_server(jq); + + if (stat == 0) { + pthread_mutex_unlock(&jq->mutex); + } + Dmsg0(100, "Return jobq_add\n"); return stat; } @@ -212,7 +259,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr) pthread_t id; jobq_item_t *item; - Dmsg0(200, "jobq_remove\n"); + Dmsg0(100, "jobq_remove\n"); if (jq->valid != JOBQ_VALID) { return EINVAL; } @@ -221,7 +268,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr) return stat; } - for (item=NULL; (item=(jobq_item_t *)jq->list.next(item)); ) { + for (item=NULL; (item=(jobq_item_t *)jq->waiting_jobs->next(item)); ) { if (jcr == item->jcr) { found = true; break; @@ -232,18 +279,18 @@ int jobq_remove(jobq_t *jq, JCR *jcr) } /* Move item to be the first on the list */ - jq->list.remove(item); - jq->list.prepend(item); + jq->waiting_jobs->remove(item); + jq->ready_jobs->prepend(item); /* if any threads are idle, wake one */ if (jq->idle_workers > 0) { - Dmsg0(200, "Signal worker\n"); + Dmsg0(100, "Signal worker\n"); if ((stat = pthread_cond_signal(&jq->work)) != 0) { pthread_mutex_unlock(&jq->mutex); return stat; } } else { - Dmsg0(200, "Create worker thread\n"); + Dmsg0(100, "Create worker thread\n"); /* No idle threads so create a new one */ set_thread_concurrency(jq->max_workers + 1); if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) { @@ -253,7 +300,36 @@ int jobq_remove(jobq_t *jq, JCR *jcr) jq->num_workers++; } pthread_mutex_unlock(&jq->mutex); - Dmsg0(200, "Return jobq_remove\n"); + Dmsg0(100, "Return jobq_remove\n"); + return stat; +} + + +/* + * Start the server thread + */ +static int start_server(jobq_t *jq) +{ + int stat = 0; + pthread_t id; + + /* if any threads are idle, wake one */ + if (jq->idle_workers > 0) { + Dmsg0(100, "Signal worker\n"); + if ((stat = pthread_cond_signal(&jq->work)) != 0) { + pthread_mutex_unlock(&jq->mutex); + return stat; + } + } else if (jq->num_workers < jq->max_workers) { + Dmsg0(100, "Create worker thread\n"); + /* No idle threads so create a new one */ + set_thread_concurrency(jq->max_workers + 1); + if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) { + pthread_mutex_unlock(&jq->mutex); + return stat; + } + jq->num_workers++; + } return stat; } @@ -271,7 +347,7 @@ static void *jobq_server(void *arg) int stat; bool timedout; - Dmsg0(200, "Start jobq_server\n"); + Dmsg0(100, "Start jobq_server\n"); if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { return NULL; } @@ -280,79 +356,163 @@ static void *jobq_server(void *arg) struct timeval tv; struct timezone tz; - Dmsg0(200, "Top of for loop\n"); + Dmsg0(100, "Top of for loop\n"); timedout = false; - Dmsg0(200, "gettimeofday()\n"); + Dmsg0(100, "gettimeofday()\n"); gettimeofday(&tv, &tz); timeout.tv_nsec = 0; - timeout.tv_sec = tv.tv_sec + 2; + timeout.tv_sec = tv.tv_sec + 4; - while (jq->list.empty() && !jq->quit) { + while (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && !jq->quit) { /* - * Wait 2 seconds, then if no more work, exit + * Wait 4 seconds, then if no more work, exit */ - Dmsg0(200, "pthread_cond_timedwait()\n"); + Dmsg0(100, "pthread_cond_timedwait()\n"); stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout); - Dmsg1(200, "timedwait=%d\n", stat); + Dmsg1(100, "timedwait=%d\n", stat); if (stat == ETIMEDOUT) { timedout = true; break; } else if (stat != 0) { /* This shouldn't happen */ - Dmsg0(200, "This shouldn't happen\n"); + Dmsg0(100, "This shouldn't happen\n"); jq->num_workers--; pthread_mutex_unlock(&jq->mutex); return NULL; } } - je = (jobq_item_t *)jq->list.first(); - if (je != NULL) { - jq->list.remove(je); + /* + * If anything is in the ready queue, run it + */ + Dmsg0(100, "Checking ready queue.\n"); + while (!jq->ready_jobs->empty() && !jq->quit) { + je = (jobq_item_t *)jq->ready_jobs->first(); + jq->ready_jobs->remove(je); + if (!jq->ready_jobs->empty()) { + if (start_server(jq) != 0) { + return NULL; + } + } + jq->running_jobs->append(je); if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { return NULL; } /* Call user's routine here */ - Dmsg0(200, "Calling user engine.\n"); + Dmsg0(100, "Calling user engine.\n"); jq->engine(je->jcr); - Dmsg0(200, "Back from user engine.\n"); - free(je); /* release job entry */ - Dmsg0(200, "relock mutex\n"); + Dmsg0(100, "Back from user engine.\n"); if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { + free(je); /* release job entry */ return NULL; } - Dmsg0(200, "Done lock mutex\n"); + Dmsg0(100, "Done lock mutex\n"); + jq->running_jobs->remove(je); + /* + * Release locks if acquired. Note, they will not have + * been acquired for jobs canceled before they were + * put into the ready queue. + */ + if (je->jcr->acquired_resource_locks) { + je->jcr->store->NumConcurrentJobs--; + je->jcr->client->NumConcurrentJobs--; + je->jcr->job->NumConcurrentJobs--; + } + free_jcr(je->jcr); + free(je); /* release job entry */ } + /* + * If any job in the wait queue can be run, + * move it to the ready queue + */ + Dmsg0(100, "Done check ready, now check wait queue.\n"); + while (!jq->waiting_jobs->empty() && !jq->quit) { + int Priority; + je = (jobq_item_t *)jq->waiting_jobs->first(); + jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first(); + if (re) { + Priority = re->jcr->JobPriority; + Dmsg1(100, "Set Run pri=%d\n", Priority); + } else { + Priority = je->jcr->JobPriority; + Dmsg1(100, "Set Job pri=%d\n", Priority); + } + /* + * Acquire locks + */ + for ( ; je; ) { + JCR *jcr = je->jcr; + jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je); + Dmsg2(100, "je=0x%x jn=0x%x\n", (unsigned)je, (unsigned)jn); + Dmsg3(100, "Examining Job=%d JobPri=%d want Pri=%d\n", + jcr->JobId, jcr->JobPriority, Priority); + /* Take only jobs of correct Priority */ + if (jcr->JobPriority != Priority) { + set_jcr_job_status(jcr, JS_WaitPriority); + break; + } + if (jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) { + jcr->store->NumConcurrentJobs++; + } else { + set_jcr_job_status(jcr, JS_WaitStoreRes); + je = jn; + continue; + } + if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) { + jcr->client->NumConcurrentJobs++; + } else { + jcr->store->NumConcurrentJobs--; + set_jcr_job_status(jcr, JS_WaitClientRes); + je = jn; + continue; + } + if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) { + jcr->job->NumConcurrentJobs++; + } else { + jcr->store->NumConcurrentJobs--; + jcr->client->NumConcurrentJobs--; + set_jcr_job_status(jcr, JS_WaitJobRes); + je = jn; + continue; + } + jcr->acquired_resource_locks = true; + jq->waiting_jobs->remove(je); + jq->ready_jobs->append(je); + Dmsg1(100, "moved JobId=%d from wait to ready queue\n", + je->jcr->JobId); + je = jn; + } /* end for loop */ + } /* end while loop */ + Dmsg0(100, "Done checking wait queue.\n"); /* * If no more work request, and we are asked to quit, then do it */ - if (jq->list.empty() && jq->quit) { + if (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && jq->quit) { jq->num_workers--; if (jq->num_workers == 0) { - Dmsg0(200, "Wake up destroy routine\n"); + Dmsg0(100, "Wake up destroy routine\n"); /* Wake up destroy routine if he is waiting */ pthread_cond_broadcast(&jq->work); } - Dmsg0(200, "Unlock mutex\n"); - pthread_mutex_unlock(&jq->mutex); - Dmsg0(200, "Return from jobq_server\n"); - return NULL; + break; } - Dmsg0(200, "Check for work request\n"); + Dmsg0(100, "Check for work request\n"); /* * If no more work requests, and we waited long enough, quit */ - Dmsg1(200, "jq empty = %d\n", jq->list.empty()); - Dmsg1(200, "timedout=%d\n", timedout); - if (jq->list.empty() && timedout) { - Dmsg0(200, "break big loop\n"); + Dmsg1(100, "jq empty = %d\n", jq->waiting_jobs->empty()); + Dmsg1(100, "timedout=%d\n", timedout); + if (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && timedout) { + Dmsg0(100, "break big loop\n"); jq->num_workers--; break; } - Dmsg0(200, "Loop again\n"); + Dmsg0(100, "Loop again\n"); } /* end of big for loop */ - Dmsg0(200, "unlock mutex\n"); + Dmsg0(100, "unlock mutex\n"); pthread_mutex_unlock(&jq->mutex); - Dmsg0(200, "End jobq_server\n"); + Dmsg0(100, "End jobq_server\n"); return NULL; } + +#endif /* JOB_QUEUE */ diff --git a/bacula/src/dird/jobq.h b/bacula/src/dird/jobq.h index 47922ef283..cab04b7cf8 100644 --- a/bacula/src/dird/jobq.h +++ b/bacula/src/dird/jobq.h @@ -47,7 +47,9 @@ struct jobq_t { pthread_mutex_t mutex; /* queue access control */ pthread_cond_t work; /* wait for work */ pthread_attr_t attr; /* create detached threads */ - dlist list; /* list of jobs */ + dlist *waiting_jobs; /* list of jobs waiting */ + dlist *running_jobs; /* jobs running */ + dlist *ready_jobs; /* jobs ready to run */ int valid; /* queue initialized */ bool quit; /* jobq should quit */ int max_workers; /* max threads */ diff --git a/bacula/src/dird/ua_cmds.c b/bacula/src/dird/ua_cmds.c index fd27b201e6..01099a1060 100644 --- a/bacula/src/dird/ua_cmds.c +++ b/bacula/src/dird/ua_cmds.c @@ -38,8 +38,12 @@ extern int r_last; extern struct s_res resources[]; extern char my_name[]; #ifndef USE_SEMAPHORE +#ifdef JOB_QUEUE +extern jobq_t job_queue; /* job queue */ +#else extern workq_t job_wq; /* work queue */ #endif +#endif extern char *list_pool; @@ -425,13 +429,18 @@ static int cancelcmd(UAContext *ua, char *cmd) case JS_WaitJobRes: case JS_WaitClientRes: case JS_WaitStoreRes: + case JS_WaitPriority: case JS_WaitMaxJobs: case JS_WaitStartTime: set_jcr_job_status(jcr, JS_Canceled); bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"), jcr->JobId, jcr->Job); #ifndef USE_SEMAPHORE +#ifdef JOB_QUEUE + jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */ +#else workq_remove(&job_wq, jcr->work_item); /* attempt to remove it from queue */ +#endif #endif free_jcr(jcr); return 1; diff --git a/bacula/src/dird/ua_status.c b/bacula/src/dird/ua_status.c index 56a7e0301e..32472f5bd9 100644 --- a/bacula/src/dird/ua_status.c +++ b/bacula/src/dird/ua_status.c @@ -260,6 +260,9 @@ static void do_director_status(UAContext *ua, char *cmd) case JS_WaitJobRes: msg = _("is waiting on max Job jobs"); break; + case JS_WaitPriority: + msg = _("is waiting on for higher priority jobs to finish"); + break; case JS_WaitMaxJobs: msg = _("is waiting on max total jobs"); break; diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index f6e2163652..9ba836f3b6 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -71,6 +71,7 @@ #define JS_WaitClientRes 'c' /* Waiting for Client resource */ #define JS_WaitMaxJobs 'd' /* Waiting for maximum jobs */ #define JS_WaitStartTime 't' /* Waiting for start time */ +#define JS_WaitPriority 'p' /* Waiting for higher priority jobs to finish */ #define job_canceled(jcr) \ (jcr->JobStatus == JS_Canceled || \ @@ -86,150 +87,150 @@ struct JCR { /* Global part of JCR common to all daemons */ JCR *next; JCR *prev; - volatile int use_count; /* use count */ - pthread_t my_thread_id; /* id of thread controlling jcr */ - pthread_mutex_t mutex; /* jcr mutex */ - BSOCK *dir_bsock; /* Director bsock or NULL if we are him */ - BSOCK *store_bsock; /* Storage connection socket */ - BSOCK *file_bsock; /* File daemon connection socket */ + volatile int use_count; /* use count */ + pthread_t my_thread_id; /* id of thread controlling jcr */ + pthread_mutex_t mutex; /* jcr mutex */ + BSOCK *dir_bsock; /* Director bsock or NULL if we are him */ + BSOCK *store_bsock; /* Storage connection socket */ + BSOCK *file_bsock; /* File daemon connection socket */ JCR_free_HANDLER *daemon_free_jcr; /* Local free routine */ - POOLMEM *errmsg; /* edited error message */ - char Job[MAX_NAME_LENGTH]; /* Unique name of this Job */ + POOLMEM *errmsg; /* edited error message */ + char Job[MAX_NAME_LENGTH]; /* Unique name of this Job */ uint32_t JobId; /* Director's JobId */ uint32_t VolSessionId; uint32_t VolSessionTime; - uint32_t JobFiles; /* Number of files written, this job */ - uint32_t JobErrors; /* */ - uint64_t JobBytes; /* Number of bytes processed this job */ - uint64_t ReadBytes; /* Bytes read -- before compression */ - uint32_t Errors; /* Number of non-fatal errors */ - volatile int JobStatus; /* ready, running, blocked, terminated */ - int JobType; /* backup, restore, verify ... */ - int JobLevel; /* Job level */ - int JobPriority; /* Job priority */ - int authenticated; /* set when client authenticated */ - time_t sched_time; /* job schedule time, i.e. when it should start */ - time_t start_time; /* when job actually started */ - time_t run_time; /* used for computing speed */ - time_t end_time; /* job end time */ - POOLMEM *VolumeName; /* Volume name desired -- pool_memory */ - POOLMEM *client_name; /* client name */ - POOLMEM *RestoreBootstrap; /* Bootstrap file to restore */ - char *sd_auth_key; /* SD auth key */ - MSGS *jcr_msgs; /* Copy of message resource -- actually used */ - uint32_t ClientId; /* Client associated with Job */ - char *where; /* prefix to restore files to */ - int prefix_links; /* Prefix links with Where path */ - int cached_pnl; /* cached path length */ - POOLMEM *cached_path; /* cached path */ + uint32_t JobFiles; /* Number of files written, this job */ + uint32_t JobErrors; /* */ + uint64_t JobBytes; /* Number of bytes processed this job */ + uint64_t ReadBytes; /* Bytes read -- before compression */ + uint32_t Errors; /* Number of non-fatal errors */ + volatile int JobStatus; /* ready, running, blocked, terminated */ + int JobType; /* backup, restore, verify ... */ + int JobLevel; /* Job level */ + int JobPriority; /* Job priority */ + int authenticated; /* set when client authenticated */ + time_t sched_time; /* job schedule time, i.e. when it should start */ + time_t start_time; /* when job actually started */ + time_t run_time; /* used for computing speed */ + time_t end_time; /* job end time */ + POOLMEM *VolumeName; /* Volume name desired -- pool_memory */ + POOLMEM *client_name; /* client name */ + POOLMEM *RestoreBootstrap; /* Bootstrap file to restore */ + char *sd_auth_key; /* SD auth key */ + MSGS *jcr_msgs; /* Copy of message resource -- actually used */ + uint32_t ClientId; /* Client associated with Job */ + char *where; /* prefix to restore files to */ + int prefix_links; /* Prefix links with Where path */ + int cached_pnl; /* cached path length */ + POOLMEM *cached_path; /* cached path */ /* Daemon specific part of JCR */ /* This should be empty in the library */ #ifdef DIRECTOR_DAEMON /* Director Daemon specific part of JCR */ - pthread_t SD_msg_chan; /* Message channel thread id */ - pthread_cond_t term_wait; /* Wait for job termination */ - workq_ele_t *work_item; /* Work queue item if scheduled */ + pthread_t SD_msg_chan; /* Message channel thread id */ + pthread_cond_t term_wait; /* Wait for job termination */ + workq_ele_t *work_item; /* Work queue item if scheduled */ volatile bool sd_msg_thread_done; /* Set when Storage message thread terms */ - BSOCK *ua; /* User agent */ - JOB *job; /* Job resource */ - STORE *store; /* Storage resource */ - CLIENT *client; /* Client resource */ - POOL *pool; /* Pool resource */ - FILESET *fileset; /* FileSet resource */ - CAT *catalog; /* Catalog resource */ - MSGS *messages; /* Default message handler */ - uint32_t SDJobFiles; /* Number of files written, this job */ - uint64_t SDJobBytes; /* Number of bytes processed this job */ - uint32_t SDErrors; /* Number of non-fatal errors */ - volatile int SDJobStatus; /* Storage Job Status */ - volatile int FDJobStatus; /* File daemon Job Status */ - B_DB *db; /* database pointer */ - uint32_t MediaId; /* DB record IDs associated with this job */ - uint32_t PoolId; /* Pool record id */ - FileId_t FileId; /* Last file id inserted */ - uint32_t FileIndex; /* Last FileIndex processed */ - POOLMEM *fname; /* name to put into catalog */ - int fn_printed; /* printed filename */ - POOLMEM *stime; /* start time for incremental/differential */ - JOB_DBR jr; /* Job record in Database */ - uint32_t RestoreJobId; /* Id specified by UA */ - POOLMEM *client_uname; /* client uname */ - int replace; /* Replace option */ + BSOCK *ua; /* User agent */ + JOB *job; /* Job resource */ + STORE *store; /* Storage resource */ + CLIENT *client; /* Client resource */ + POOL *pool; /* Pool resource */ + FILESET *fileset; /* FileSet resource */ + CAT *catalog; /* Catalog resource */ + MSGS *messages; /* Default message handler */ + uint32_t SDJobFiles; /* Number of files written, this job */ + uint64_t SDJobBytes; /* Number of bytes processed this job */ + uint32_t SDErrors; /* Number of non-fatal errors */ + volatile int SDJobStatus; /* Storage Job Status */ + volatile int FDJobStatus; /* File daemon Job Status */ + B_DB *db; /* database pointer */ + uint32_t MediaId; /* DB record IDs associated with this job */ + uint32_t PoolId; /* Pool record id */ + FileId_t FileId; /* Last file id inserted */ + uint32_t FileIndex; /* Last FileIndex processed */ + POOLMEM *fname; /* name to put into catalog */ + int fn_printed; /* printed filename */ + POOLMEM *stime; /* start time for incremental/differential */ + JOB_DBR jr; /* Job record in Database */ + uint32_t RestoreJobId; /* Id specified by UA */ + POOLMEM *client_uname; /* client uname */ + int replace; /* Replace option */ bool acquired_resource_locks; /* set if resource locks acquired */ - int NumVols; /* Number of Volume used in pool */ - int reschedule_count; /* Number of times rescheduled */ + int NumVols; /* Number of Volume used in pool */ + int reschedule_count; /* Number of times rescheduled */ #endif /* DIRECTOR_DAEMON */ #ifdef FILE_DAEMON /* File Daemon specific part of JCR */ uint32_t num_files_examined; /* files examined this job */ - POOLMEM *last_fname; /* last file saved/verified */ + POOLMEM *last_fname; /* last file saved/verified */ /*********FIXME********* add missing files and files to be retried */ - int incremental; /* set if incremental for SINCE */ - time_t mtime; /* begin time for SINCE */ - int mtime_only; /* compare only mtime and not ctime as well */ - int listing; /* job listing in estimate */ - long Ticket; /* Ticket */ - int save_level; /* save level */ - char *big_buf; /* I/O buffer */ - POOLMEM *compress_buf; /* Compression buffer */ - int32_t compress_buf_size; /* Length of compression buffer */ - int replace; /* Replace options */ - int buf_size; /* length of buffer */ - void *ff; /* Find Files packet */ + int incremental; /* set if incremental for SINCE */ + time_t mtime; /* begin time for SINCE */ + int mtime_only; /* compare only mtime and not ctime as well */ + int listing; /* job listing in estimate */ + long Ticket; /* Ticket */ + int save_level; /* save level */ + char *big_buf; /* I/O buffer */ + POOLMEM *compress_buf; /* Compression buffer */ + int32_t compress_buf_size; /* Length of compression buffer */ + int replace; /* Replace options */ + int buf_size; /* length of buffer */ + void *ff; /* Find Files packet */ char stored_addr[MAX_NAME_LENGTH]; /* storage daemon address */ uint32_t StartFile; uint32_t EndFile; uint32_t StartBlock; uint32_t EndBlock; - pthread_t heartbeat_id; /* id of heartbeat thread */ - volatile BSOCK *hb_bsock; /* duped SD socket */ + pthread_t heartbeat_id; /* id of heartbeat thread */ + volatile BSOCK *hb_bsock; /* duped SD socket */ #endif /* FILE_DAEMON */ #ifdef STORAGE_DAEMON /* Storage Daemon specific part of JCR */ - JCR *next_dev; /* next JCR attached to device */ - JCR *prev_dev; /* previous JCR attached to device */ + JCR *next_dev; /* next JCR attached to device */ + JCR *prev_dev; /* previous JCR attached to device */ pthread_cond_t job_start_wait; /* Wait for FD to start Job */ int type; - DEVRES *device; /* device to use */ - VOLUME_CAT_INFO VolCatInfo; /* Catalog info for desired volume */ - POOLMEM *job_name; /* base Job name (not unique) */ - POOLMEM *fileset_name; /* FileSet */ - POOLMEM *fileset_md5; /* MD5 for FileSet */ - POOLMEM *pool_name; /* pool to use */ - POOLMEM *pool_type; /* pool type to use */ - POOLMEM *media_type; /* media type */ - POOLMEM *dev_name; /* device name */ - VOL_LIST *VolList; /* list to read */ - int32_t NumVolumes; /* number of volumes used */ - int32_t CurVolume; /* current volume number */ - int spool_attributes; /* set if spooling attributes */ - int no_attributes; /* set if no attributes wanted */ - int label_status; /* device volume label status */ - int label_errors; /* count of label errors */ + DEVRES *device; /* device to use */ + VOLUME_CAT_INFO VolCatInfo; /* Catalog info for desired volume */ + POOLMEM *job_name; /* base Job name (not unique) */ + POOLMEM *fileset_name; /* FileSet */ + POOLMEM *fileset_md5; /* MD5 for FileSet */ + POOLMEM *pool_name; /* pool to use */ + POOLMEM *pool_type; /* pool type to use */ + POOLMEM *media_type; /* media type */ + POOLMEM *dev_name; /* device name */ + VOL_LIST *VolList; /* list to read */ + int32_t NumVolumes; /* number of volumes used */ + int32_t CurVolume; /* current volume number */ + int spool_attributes; /* set if spooling attributes */ + int no_attributes; /* set if no attributes wanted */ + int label_status; /* device volume label status */ + int label_errors; /* count of label errors */ int session_opened; - DEV_RECORD rec; /* Read/Write record */ - long Ticket; /* ticket for this job */ - uint32_t VolFirstIndex; /* First file index this Volume */ - uint32_t VolLastIndex; /* Last file index this Volume */ - uint32_t FileIndex; /* Current File Index */ - uint32_t EndFile; /* End file written */ - uint32_t StartFile; /* Start write file */ - uint32_t StartBlock; /* Start write block */ - uint32_t EndBlock; /* Ending block written */ - bool NewVol; /* set when new Volume mounted */ - bool WroteVol; /* set when Volume written */ - int CurVol; /* Current Volume count */ + DEV_RECORD rec; /* Read/Write record */ + long Ticket; /* ticket for this job */ + uint32_t VolFirstIndex; /* First file index this Volume */ + uint32_t VolLastIndex; /* Last file index this Volume */ + uint32_t FileIndex; /* Current File Index */ + uint32_t EndFile; /* End file written */ + uint32_t StartFile; /* Start write file */ + uint32_t StartBlock; /* Start write block */ + uint32_t EndBlock; /* Ending block written */ + bool NewVol; /* set when new Volume mounted */ + bool WroteVol; /* set when Volume written */ + int CurVol; /* Current Volume count */ - uint32_t FileId; /* Last file id inserted */ + uint32_t FileId; /* Last file id inserted */ /* Parmaters for Open Read Session */ - BSR *bsr; /* Bootstrap record -- has everything */ + BSR *bsr; /* Bootstrap record -- has everything */ uint32_t read_VolSessionId; uint32_t read_VolSessionTime; uint32_t read_StartFile; diff --git a/bacula/src/lib/dlist.c b/bacula/src/lib/dlist.c index f356658c90..15307bc3b3 100644 --- a/bacula/src/lib/dlist.c +++ b/bacula/src/lib/dlist.c @@ -105,17 +105,23 @@ void dlist::remove(void *item) dlink *ilink = (dlink *)((char *)item+loffset); /* item's link */ if (item == head) { head = ilink->next; - ((dlink *)((char *)head+loffset))->prev = NULL; + if (head) { + ((dlink *)((char *)head+loffset))->prev = NULL; + } + if (item == tail) { + tail = ilink->prev; + } } else if (item == tail) { tail = ilink->prev; - ((dlink *)((char *)tail+loffset))->next = NULL; + if (tail) { + ((dlink *)((char *)tail+loffset))->next = NULL; + } } else { xitem = ilink->next; ((dlink *)((char *)xitem+loffset))->prev = ilink->prev; xitem = ilink->prev; ((dlink *)((char *)xitem+loffset))->next = ilink->next; } - free(item); } void * dlist::next(void *item) @@ -193,6 +199,35 @@ int main() jcr_chain->destroy(); free(jcr_chain); + jcr_chain = new dlist(jcr, &jcr->link); + printf("append 20 items 0-19\n"); + for (int i=0; i<20; i++) { + sprintf(buf, "This is dlist item %d", i); + jcr = (MYJCR *)malloc(sizeof(MYJCR)); + jcr->buf = bstrdup(buf); + jcr_chain->append(jcr); + if (i == 10) { + save_jcr = jcr; + } + } + + next_jcr = (MYJCR *)jcr_chain->next(save_jcr); + printf("11th item=%s\n", next_jcr->buf); + jcr = (MYJCR *)malloc(sizeof(MYJCR)); + jcr->buf = save_jcr->buf; + printf("Remove 10th item\n"); + jcr_chain->remove(save_jcr); + printf("Re-insert 10th item\n"); + jcr_chain->insert_before(jcr, next_jcr); + + printf("Print remaining list.\n"); + for (MYJCR *jcr=NULL; (jcr=(MYJCR *)jcr_chain->next(jcr)); ) { + printf("Dlist item = %s\n", jcr->buf); + free(jcr->buf); + } + + delete jcr_chain; + sm_dump(False); } diff --git a/bacula/src/stored/mount.c b/bacula/src/stored/mount.c index 816c830e45..40fdc4dd9d 100644 --- a/bacula/src/stored/mount.c +++ b/bacula/src/stored/mount.c @@ -185,6 +185,9 @@ read_volume: } Dmsg1(100, "want new name=%s\n", jcr->VolumeName); memcpy(&dev->VolCatInfo, &jcr->VolCatInfo, sizeof(jcr->VolCatInfo)); + if (strcmp(dev->VolCatInfo.VolCatStatus, "Recycle") == 0) { + recycle = 1; + } break; /* got a Volume */ case VOL_NO_LABEL: diff --git a/bacula/src/version.h b/bacula/src/version.h index cae6f023ec..d490acd647 100644 --- a/bacula/src/version.h +++ b/bacula/src/version.h @@ -1,8 +1,8 @@ /* */ #define VERSION "1.31" #define VSTRING "1" -#define BDATE "17 Jul 2003" -#define LSMDATE "17Jul03" +#define BDATE "20 Jul 2003" +#define LSMDATE "20Jul03" /* Debug flags */ #define DEBUG 1 @@ -17,9 +17,16 @@ */ /* #define SEND_DMSG_TO_FILE 1 */ -/* Turn this on if you want to try the new Job semaphore code */ +/* Turn this on if you want to use the Job semaphore code */ #define USE_SEMAPHORE +/* Turn this on if you want to use the new Job scheduling code */ +#ifdef xxx +#undef USE_SEMAPHORE +#define JOB_QUEUE 1 +#endif + + /* #define NO_ATTRIBUTES_TEST 1 */ /* #define NO_TAPE_WRITE_TEST 1 */ /* #define FD_NO_SEND TEST 1 */ -- 2.39.5