2003-07-xx Version 1.31 Beta xxJul03
+- Complete implementation of new job scheduler. jobq.h jobq.c
+ This code is turned off unless specifically enabled in src/version.h
+- Integrate code from Nic Bellamy to check for recycled volume in
+ mount.c in SD.
+- Fix a couple of bugs in dlist.c
+- Begin implementation of new job scheduler.
- Take serial.h provided by David Craigon, which corrects differences in
prototypes between serial.h and serial.c.
- Make db_get_media_ids() return Media Ids only for the current pool.
#define DIRECTOR_DAEMON 1
-/* The following includes are at the bottom of
- * this file rather than at the top because the
- * #include "jcr.h" uses the definition of JOB
- * as supplied above.
- */
-
#include "cats/cats.h"
#include "jcr.h"
-
#include "bsr.h"
#include "ua.h"
#include "protos.h"
+
#include "jobq.h"
/* Globals that dird.c exports */
-extern int debug_level;
-extern time_t start_time;
extern DIRRES *director; /* Director resource */
-extern char *working_directory; /* export our working directory */
extern int FDConnectTimeout;
extern int SDConnectTimeout;
#else
#ifdef JOB_QUEUE
- if ((stat = job_init(&job_queue, max_workers, job_thread)) != 0) {
+ if ((stat = jobq_init(&job_queue, max_workers, job_thread)) != 0) {
Emsg1(M_ABORT, 0, _("Could not init job queue: ERR=%s\n"), strerror(stat));
}
#else
db_close_database(jcr, jcr->db);
jcr->db = NULL;
}
+#ifndef JOB_QUEUE
free_jcr(jcr);
+#endif
Dmsg0(50, "======== End Job ==========\n");
sm_check(__FILE__, __LINE__, True);
return NULL;
*/
static int acquire_resource_locks(JCR *jcr)
{
+#ifndef JOB_QUEUE
time_t now = time(NULL);
time_t wtime = jcr->sched_time - now;
}
wtime = jcr->sched_time - time(NULL);
}
+#endif
#ifdef USE_SEMAPHORE
jcr->job = job;
jcr->JobType = job->JobType;
jcr->JobLevel = job->level;
+ jcr->JobPriority = job->Priority;
jcr->store = job->storage;
jcr->client = job->client;
if (!jcr->client_name) {
/*
* Bacula job queue routines.
*
+ * This code consists of three queues, the waiting_jobs
+ * queue, where jobs are initially queued, the ready_jobs
+ * queue, where jobs are placed when all the resources are
+ * allocated and they can immediately be run, and the
+ * running queue where jobs are placed when they are
+ * running.
+ *
* Kern Sibbald, July MMIII
*
* Version $Id$
* adapted from "Programming with POSIX Threads", by
* David R. Butenhof
*
- * Example:
- *
- * static jobq_t jq; define job queue
- *
- * Initialize queue
- * if ((stat = jobq_init(&jq, max_workers, job_thread)) != 0) {
- * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
- * }
- *
- * Add an item to the queue
- * if ((stat = jobq_add(&jq, jcr)) != 0) {
- * Emsg1(M_ABORT, 0, "Could not add job to queue: ERR=%s\n", strerror(errno));
- * }
- *
- * Terminate the queue
- * jobq_destroy(jobq_t *jq);
- *
*/
/*
Copyright (C) 2000-2003 Kern Sibbald and John Walker
#include "bacula.h"
#include "dird.h"
+#ifdef JOB_QUEUE
+
/* Forward referenced functions */
static void *jobq_server(void *arg);
+static int start_server(jobq_t *jq);
/*
* Initialize a job queue
jq->idle_workers = 0; /* no idle threads */
jq->engine = engine; /* routine to run */
jq->valid = JOBQ_VALID;
- jq->list.init(item, &item->link);
+ /* Initialize the job queues */
+ jq->waiting_jobs = new dlist(item, &item->link);
+ jq->running_jobs = new dlist(item, &item->link);
+ jq->ready_jobs = new dlist(item, &item->link);
return 0;
}
stat = pthread_mutex_destroy(&jq->mutex);
stat1 = pthread_cond_destroy(&jq->work);
stat2 = pthread_attr_destroy(&jq->attr);
- jq->list.destroy();
+ delete jq->waiting_jobs;
+ delete jq->running_jobs;
+ delete jq->ready_jobs;
return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
}
+struct wait_pkt {
+ JCR *jcr;
+ jobq_t *jq;
+};
+
+/*
+ * Wait until schedule time arrives before starting
+ */
+static void *sched_wait(void *arg)
+{
+ JCR *jcr = ((wait_pkt *)arg)->jcr;
+ jobq_t *jq = ((wait_pkt *)arg)->jq;
+
+ Dmsg0(100, "Enter sched_wait.\n");
+ free(arg);
+ time_t wtime = jcr->sched_time - time(NULL);
+ /* Wait until scheduled time arrives */
+ if (wtime > 0 && verbose) {
+ Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
+ jcr->Job, wtime);
+ set_jcr_job_status(jcr, JS_WaitStartTime);
+ }
+ /* Check every 30 seconds if canceled */
+ while (wtime > 0) {
+ Dmsg2(100, "Waiting on sched time, jobid=%d secs=%d\n", jcr->JobId, wtime);
+ if (wtime > 30) {
+ wtime = 30;
+ }
+ bmicrosleep(wtime, 0);
+ if (job_canceled(jcr)) {
+ break;
+ }
+ wtime = jcr->sched_time - time(NULL);
+ }
+ jobq_add(jq, jcr);
+ Dmsg0(100, "Exit sched_wait\n");
+ return NULL;
+}
+
/*
* Add a job to the queue
{
int stat;
jobq_item_t *item, *li;
- pthread_t id;
bool inserted = false;
+ time_t wtime = jcr->sched_time - time(NULL);
+ pthread_t id;
+ wait_pkt *sched_pkt;
+
- Dmsg0(200, "jobq_add\n");
+ Dmsg0(100, "jobq_add\n");
if (jq->valid != JOBQ_VALID) {
return EINVAL;
}
- if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
- return ENOMEM;
+ if (!job_canceled(jcr) && wtime > 0) {
+ set_thread_concurrency(jq->max_workers + 2);
+ sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
+ sched_pkt->jcr = jcr;
+ sched_pkt->jq = jq;
+ if ((stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt)) != 0) {
+ return stat;
+ }
}
- item->jcr = jcr;
+
if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
- free(item);
return stat;
}
- Dmsg0(200, "add item to queue\n");
- for (li=NULL; (li=(jobq_item_t *)jq->list.next(li)); ) {
- if (li->jcr->JobPriority < jcr->JobPriority) {
- jq->list.insert_before(item, li);
- inserted = true;
- }
- }
- if (!inserted) {
- jq->list.append(item);
+ if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
+ return ENOMEM;
}
+ item->jcr = jcr;
- /* if any threads are idle, wake one */
- if (jq->idle_workers > 0) {
- Dmsg0(200, "Signal worker\n");
- if ((stat = pthread_cond_signal(&jq->work)) != 0) {
- pthread_mutex_unlock(&jq->mutex);
- return stat;
+ Dmsg1(100, "add 0x%x to queue\n", (unsigned)item);
+ if (job_canceled(jcr)) {
+ /* Add job to ready queue so that it is canceled quickly */
+ jq->ready_jobs->prepend(item);
+ } else {
+ /* Add this job to the wait queue in priority sorted order */
+ for (li=NULL; (li=(jobq_item_t *)jq->waiting_jobs->next(li)); ) {
+ if (li->jcr->JobPriority < jcr->JobPriority) {
+ jq->waiting_jobs->insert_before(item, li);
+ Dmsg1(100, "insert_before 0x%x\n", (unsigned)li);
+ inserted = true;
+ }
}
- } else if (jq->num_workers < jq->max_workers) {
- Dmsg0(200, "Create worker thread\n");
- /* No idle threads so create a new one */
- set_thread_concurrency(jq->max_workers + 1);
- if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
- pthread_mutex_unlock(&jq->mutex);
- return stat;
+ /* If not jobs in wait queue, append it */
+ if (!inserted) {
+ jq->waiting_jobs->append(item);
+ Dmsg0(100, "Appended item.\n");
}
- jq->num_workers++;
+ Dmsg1(100, "Next=0x%x\n", (unsigned)jq->waiting_jobs->next(item));
}
- pthread_mutex_unlock(&jq->mutex);
- Dmsg0(200, "Return jobq_add\n");
+
+ stat = start_server(jq);
+
+ if (stat == 0) {
+ pthread_mutex_unlock(&jq->mutex);
+ }
+ Dmsg0(100, "Return jobq_add\n");
return stat;
}
pthread_t id;
jobq_item_t *item;
- Dmsg0(200, "jobq_remove\n");
+ Dmsg0(100, "jobq_remove\n");
if (jq->valid != JOBQ_VALID) {
return EINVAL;
}
return stat;
}
- for (item=NULL; (item=(jobq_item_t *)jq->list.next(item)); ) {
+ for (item=NULL; (item=(jobq_item_t *)jq->waiting_jobs->next(item)); ) {
if (jcr == item->jcr) {
found = true;
break;
}
/* Move item to be the first on the list */
- jq->list.remove(item);
- jq->list.prepend(item);
+ jq->waiting_jobs->remove(item);
+ jq->ready_jobs->prepend(item);
/* if any threads are idle, wake one */
if (jq->idle_workers > 0) {
- Dmsg0(200, "Signal worker\n");
+ Dmsg0(100, "Signal worker\n");
if ((stat = pthread_cond_signal(&jq->work)) != 0) {
pthread_mutex_unlock(&jq->mutex);
return stat;
}
} else {
- Dmsg0(200, "Create worker thread\n");
+ Dmsg0(100, "Create worker thread\n");
/* No idle threads so create a new one */
set_thread_concurrency(jq->max_workers + 1);
if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
jq->num_workers++;
}
pthread_mutex_unlock(&jq->mutex);
- Dmsg0(200, "Return jobq_remove\n");
+ Dmsg0(100, "Return jobq_remove\n");
+ return stat;
+}
+
+
+/*
+ * Start the server thread
+ */
+static int start_server(jobq_t *jq)
+{
+ int stat = 0;
+ pthread_t id;
+
+ /* if any threads are idle, wake one */
+ if (jq->idle_workers > 0) {
+ Dmsg0(100, "Signal worker\n");
+ if ((stat = pthread_cond_signal(&jq->work)) != 0) {
+ pthread_mutex_unlock(&jq->mutex);
+ return stat;
+ }
+ } else if (jq->num_workers < jq->max_workers) {
+ Dmsg0(100, "Create worker thread\n");
+ /* No idle threads so create a new one */
+ set_thread_concurrency(jq->max_workers + 1);
+ if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
+ pthread_mutex_unlock(&jq->mutex);
+ return stat;
+ }
+ jq->num_workers++;
+ }
return stat;
}
int stat;
bool timedout;
- Dmsg0(200, "Start jobq_server\n");
+ Dmsg0(100, "Start jobq_server\n");
if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
return NULL;
}
struct timeval tv;
struct timezone tz;
- Dmsg0(200, "Top of for loop\n");
+ Dmsg0(100, "Top of for loop\n");
timedout = false;
- Dmsg0(200, "gettimeofday()\n");
+ Dmsg0(100, "gettimeofday()\n");
gettimeofday(&tv, &tz);
timeout.tv_nsec = 0;
- timeout.tv_sec = tv.tv_sec + 2;
+ timeout.tv_sec = tv.tv_sec + 4;
- while (jq->list.empty() && !jq->quit) {
+ while (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && !jq->quit) {
/*
- * Wait 2 seconds, then if no more work, exit
+ * Wait 4 seconds, then if no more work, exit
*/
- Dmsg0(200, "pthread_cond_timedwait()\n");
+ Dmsg0(100, "pthread_cond_timedwait()\n");
stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
- Dmsg1(200, "timedwait=%d\n", stat);
+ Dmsg1(100, "timedwait=%d\n", stat);
if (stat == ETIMEDOUT) {
timedout = true;
break;
} else if (stat != 0) {
/* This shouldn't happen */
- Dmsg0(200, "This shouldn't happen\n");
+ Dmsg0(100, "This shouldn't happen\n");
jq->num_workers--;
pthread_mutex_unlock(&jq->mutex);
return NULL;
}
}
- je = (jobq_item_t *)jq->list.first();
- if (je != NULL) {
- jq->list.remove(je);
+ /*
+ * If anything is in the ready queue, run it
+ */
+ Dmsg0(100, "Checking ready queue.\n");
+ while (!jq->ready_jobs->empty() && !jq->quit) {
+ je = (jobq_item_t *)jq->ready_jobs->first();
+ jq->ready_jobs->remove(je);
+ if (!jq->ready_jobs->empty()) {
+ if (start_server(jq) != 0) {
+ return NULL;
+ }
+ }
+ jq->running_jobs->append(je);
if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
return NULL;
}
/* Call user's routine here */
- Dmsg0(200, "Calling user engine.\n");
+ Dmsg0(100, "Calling user engine.\n");
jq->engine(je->jcr);
- Dmsg0(200, "Back from user engine.\n");
- free(je); /* release job entry */
- Dmsg0(200, "relock mutex\n");
+ Dmsg0(100, "Back from user engine.\n");
if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+ free(je); /* release job entry */
return NULL;
}
- Dmsg0(200, "Done lock mutex\n");
+ Dmsg0(100, "Done lock mutex\n");
+ jq->running_jobs->remove(je);
+ /*
+ * Release locks if acquired. Note, they will not have
+ * been acquired for jobs canceled before they were
+ * put into the ready queue.
+ */
+ if (je->jcr->acquired_resource_locks) {
+ je->jcr->store->NumConcurrentJobs--;
+ je->jcr->client->NumConcurrentJobs--;
+ je->jcr->job->NumConcurrentJobs--;
+ }
+ free_jcr(je->jcr);
+ free(je); /* release job entry */
}
+ /*
+ * If any job in the wait queue can be run,
+ * move it to the ready queue
+ */
+ Dmsg0(100, "Done check ready, now check wait queue.\n");
+ while (!jq->waiting_jobs->empty() && !jq->quit) {
+ int Priority;
+ je = (jobq_item_t *)jq->waiting_jobs->first();
+ jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
+ if (re) {
+ Priority = re->jcr->JobPriority;
+ Dmsg1(100, "Set Run pri=%d\n", Priority);
+ } else {
+ Priority = je->jcr->JobPriority;
+ Dmsg1(100, "Set Job pri=%d\n", Priority);
+ }
+ /*
+ * Acquire locks
+ */
+ for ( ; je; ) {
+ JCR *jcr = je->jcr;
+ jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
+ Dmsg2(100, "je=0x%x jn=0x%x\n", (unsigned)je, (unsigned)jn);
+ Dmsg3(100, "Examining Job=%d JobPri=%d want Pri=%d\n",
+ jcr->JobId, jcr->JobPriority, Priority);
+ /* Take only jobs of correct Priority */
+ if (jcr->JobPriority != Priority) {
+ set_jcr_job_status(jcr, JS_WaitPriority);
+ break;
+ }
+ if (jcr->store->NumConcurrentJobs < jcr->store->MaxConcurrentJobs) {
+ jcr->store->NumConcurrentJobs++;
+ } else {
+ set_jcr_job_status(jcr, JS_WaitStoreRes);
+ je = jn;
+ continue;
+ }
+ if (jcr->client->NumConcurrentJobs < jcr->client->MaxConcurrentJobs) {
+ jcr->client->NumConcurrentJobs++;
+ } else {
+ jcr->store->NumConcurrentJobs--;
+ set_jcr_job_status(jcr, JS_WaitClientRes);
+ je = jn;
+ continue;
+ }
+ if (jcr->job->NumConcurrentJobs < jcr->job->MaxConcurrentJobs) {
+ jcr->job->NumConcurrentJobs++;
+ } else {
+ jcr->store->NumConcurrentJobs--;
+ jcr->client->NumConcurrentJobs--;
+ set_jcr_job_status(jcr, JS_WaitJobRes);
+ je = jn;
+ continue;
+ }
+ jcr->acquired_resource_locks = true;
+ jq->waiting_jobs->remove(je);
+ jq->ready_jobs->append(je);
+ Dmsg1(100, "moved JobId=%d from wait to ready queue\n",
+ je->jcr->JobId);
+ je = jn;
+ } /* end for loop */
+ } /* end while loop */
+ Dmsg0(100, "Done checking wait queue.\n");
/*
* If no more work request, and we are asked to quit, then do it
*/
- if (jq->list.empty() && jq->quit) {
+ if (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && jq->quit) {
jq->num_workers--;
if (jq->num_workers == 0) {
- Dmsg0(200, "Wake up destroy routine\n");
+ Dmsg0(100, "Wake up destroy routine\n");
/* Wake up destroy routine if he is waiting */
pthread_cond_broadcast(&jq->work);
}
- Dmsg0(200, "Unlock mutex\n");
- pthread_mutex_unlock(&jq->mutex);
- Dmsg0(200, "Return from jobq_server\n");
- return NULL;
+ break;
}
- Dmsg0(200, "Check for work request\n");
+ Dmsg0(100, "Check for work request\n");
/*
* If no more work requests, and we waited long enough, quit
*/
- Dmsg1(200, "jq empty = %d\n", jq->list.empty());
- Dmsg1(200, "timedout=%d\n", timedout);
- if (jq->list.empty() && timedout) {
- Dmsg0(200, "break big loop\n");
+ Dmsg1(100, "jq empty = %d\n", jq->waiting_jobs->empty());
+ Dmsg1(100, "timedout=%d\n", timedout);
+ if (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && timedout) {
+ Dmsg0(100, "break big loop\n");
jq->num_workers--;
break;
}
- Dmsg0(200, "Loop again\n");
+ Dmsg0(100, "Loop again\n");
} /* end of big for loop */
- Dmsg0(200, "unlock mutex\n");
+ Dmsg0(100, "unlock mutex\n");
pthread_mutex_unlock(&jq->mutex);
- Dmsg0(200, "End jobq_server\n");
+ Dmsg0(100, "End jobq_server\n");
return NULL;
}
+
+#endif /* JOB_QUEUE */
pthread_mutex_t mutex; /* queue access control */
pthread_cond_t work; /* wait for work */
pthread_attr_t attr; /* create detached threads */
- dlist list; /* list of jobs */
+ dlist *waiting_jobs; /* list of jobs waiting */
+ dlist *running_jobs; /* jobs running */
+ dlist *ready_jobs; /* jobs ready to run */
int valid; /* queue initialized */
bool quit; /* jobq should quit */
int max_workers; /* max threads */
extern struct s_res resources[];
extern char my_name[];
#ifndef USE_SEMAPHORE
+#ifdef JOB_QUEUE
+extern jobq_t job_queue; /* job queue */
+#else
extern workq_t job_wq; /* work queue */
#endif
+#endif
extern char *list_pool;
case JS_WaitJobRes:
case JS_WaitClientRes:
case JS_WaitStoreRes:
+ case JS_WaitPriority:
case JS_WaitMaxJobs:
case JS_WaitStartTime:
set_jcr_job_status(jcr, JS_Canceled);
bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"),
jcr->JobId, jcr->Job);
#ifndef USE_SEMAPHORE
+#ifdef JOB_QUEUE
+ jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */
+#else
workq_remove(&job_wq, jcr->work_item); /* attempt to remove it from queue */
+#endif
#endif
free_jcr(jcr);
return 1;
case JS_WaitJobRes:
msg = _("is waiting on max Job jobs");
break;
+ case JS_WaitPriority:
+ msg = _("is waiting on for higher priority jobs to finish");
+ break;
case JS_WaitMaxJobs:
msg = _("is waiting on max total jobs");
break;
#define JS_WaitClientRes 'c' /* Waiting for Client resource */
#define JS_WaitMaxJobs 'd' /* Waiting for maximum jobs */
#define JS_WaitStartTime 't' /* Waiting for start time */
+#define JS_WaitPriority 'p' /* Waiting for higher priority jobs to finish */
#define job_canceled(jcr) \
(jcr->JobStatus == JS_Canceled || \
/* Global part of JCR common to all daemons */
JCR *next;
JCR *prev;
- volatile int use_count; /* use count */
- pthread_t my_thread_id; /* id of thread controlling jcr */
- pthread_mutex_t mutex; /* jcr mutex */
- BSOCK *dir_bsock; /* Director bsock or NULL if we are him */
- BSOCK *store_bsock; /* Storage connection socket */
- BSOCK *file_bsock; /* File daemon connection socket */
+ volatile int use_count; /* use count */
+ pthread_t my_thread_id; /* id of thread controlling jcr */
+ pthread_mutex_t mutex; /* jcr mutex */
+ BSOCK *dir_bsock; /* Director bsock or NULL if we are him */
+ BSOCK *store_bsock; /* Storage connection socket */
+ BSOCK *file_bsock; /* File daemon connection socket */
JCR_free_HANDLER *daemon_free_jcr; /* Local free routine */
- POOLMEM *errmsg; /* edited error message */
- char Job[MAX_NAME_LENGTH]; /* Unique name of this Job */
+ POOLMEM *errmsg; /* edited error message */
+ char Job[MAX_NAME_LENGTH]; /* Unique name of this Job */
uint32_t JobId; /* Director's JobId */
uint32_t VolSessionId;
uint32_t VolSessionTime;
- uint32_t JobFiles; /* Number of files written, this job */
- uint32_t JobErrors; /* */
- uint64_t JobBytes; /* Number of bytes processed this job */
- uint64_t ReadBytes; /* Bytes read -- before compression */
- uint32_t Errors; /* Number of non-fatal errors */
- volatile int JobStatus; /* ready, running, blocked, terminated */
- int JobType; /* backup, restore, verify ... */
- int JobLevel; /* Job level */
- int JobPriority; /* Job priority */
- int authenticated; /* set when client authenticated */
- time_t sched_time; /* job schedule time, i.e. when it should start */
- time_t start_time; /* when job actually started */
- time_t run_time; /* used for computing speed */
- time_t end_time; /* job end time */
- POOLMEM *VolumeName; /* Volume name desired -- pool_memory */
- POOLMEM *client_name; /* client name */
- POOLMEM *RestoreBootstrap; /* Bootstrap file to restore */
- char *sd_auth_key; /* SD auth key */
- MSGS *jcr_msgs; /* Copy of message resource -- actually used */
- uint32_t ClientId; /* Client associated with Job */
- char *where; /* prefix to restore files to */
- int prefix_links; /* Prefix links with Where path */
- int cached_pnl; /* cached path length */
- POOLMEM *cached_path; /* cached path */
+ uint32_t JobFiles; /* Number of files written, this job */
+ uint32_t JobErrors; /* */
+ uint64_t JobBytes; /* Number of bytes processed this job */
+ uint64_t ReadBytes; /* Bytes read -- before compression */
+ uint32_t Errors; /* Number of non-fatal errors */
+ volatile int JobStatus; /* ready, running, blocked, terminated */
+ int JobType; /* backup, restore, verify ... */
+ int JobLevel; /* Job level */
+ int JobPriority; /* Job priority */
+ int authenticated; /* set when client authenticated */
+ time_t sched_time; /* job schedule time, i.e. when it should start */
+ time_t start_time; /* when job actually started */
+ time_t run_time; /* used for computing speed */
+ time_t end_time; /* job end time */
+ POOLMEM *VolumeName; /* Volume name desired -- pool_memory */
+ POOLMEM *client_name; /* client name */
+ POOLMEM *RestoreBootstrap; /* Bootstrap file to restore */
+ char *sd_auth_key; /* SD auth key */
+ MSGS *jcr_msgs; /* Copy of message resource -- actually used */
+ uint32_t ClientId; /* Client associated with Job */
+ char *where; /* prefix to restore files to */
+ int prefix_links; /* Prefix links with Where path */
+ int cached_pnl; /* cached path length */
+ POOLMEM *cached_path; /* cached path */
/* Daemon specific part of JCR */
/* This should be empty in the library */
#ifdef DIRECTOR_DAEMON
/* Director Daemon specific part of JCR */
- pthread_t SD_msg_chan; /* Message channel thread id */
- pthread_cond_t term_wait; /* Wait for job termination */
- workq_ele_t *work_item; /* Work queue item if scheduled */
+ pthread_t SD_msg_chan; /* Message channel thread id */
+ pthread_cond_t term_wait; /* Wait for job termination */
+ workq_ele_t *work_item; /* Work queue item if scheduled */
volatile bool sd_msg_thread_done; /* Set when Storage message thread terms */
- BSOCK *ua; /* User agent */
- JOB *job; /* Job resource */
- STORE *store; /* Storage resource */
- CLIENT *client; /* Client resource */
- POOL *pool; /* Pool resource */
- FILESET *fileset; /* FileSet resource */
- CAT *catalog; /* Catalog resource */
- MSGS *messages; /* Default message handler */
- uint32_t SDJobFiles; /* Number of files written, this job */
- uint64_t SDJobBytes; /* Number of bytes processed this job */
- uint32_t SDErrors; /* Number of non-fatal errors */
- volatile int SDJobStatus; /* Storage Job Status */
- volatile int FDJobStatus; /* File daemon Job Status */
- B_DB *db; /* database pointer */
- uint32_t MediaId; /* DB record IDs associated with this job */
- uint32_t PoolId; /* Pool record id */
- FileId_t FileId; /* Last file id inserted */
- uint32_t FileIndex; /* Last FileIndex processed */
- POOLMEM *fname; /* name to put into catalog */
- int fn_printed; /* printed filename */
- POOLMEM *stime; /* start time for incremental/differential */
- JOB_DBR jr; /* Job record in Database */
- uint32_t RestoreJobId; /* Id specified by UA */
- POOLMEM *client_uname; /* client uname */
- int replace; /* Replace option */
+ BSOCK *ua; /* User agent */
+ JOB *job; /* Job resource */
+ STORE *store; /* Storage resource */
+ CLIENT *client; /* Client resource */
+ POOL *pool; /* Pool resource */
+ FILESET *fileset; /* FileSet resource */
+ CAT *catalog; /* Catalog resource */
+ MSGS *messages; /* Default message handler */
+ uint32_t SDJobFiles; /* Number of files written, this job */
+ uint64_t SDJobBytes; /* Number of bytes processed this job */
+ uint32_t SDErrors; /* Number of non-fatal errors */
+ volatile int SDJobStatus; /* Storage Job Status */
+ volatile int FDJobStatus; /* File daemon Job Status */
+ B_DB *db; /* database pointer */
+ uint32_t MediaId; /* DB record IDs associated with this job */
+ uint32_t PoolId; /* Pool record id */
+ FileId_t FileId; /* Last file id inserted */
+ uint32_t FileIndex; /* Last FileIndex processed */
+ POOLMEM *fname; /* name to put into catalog */
+ int fn_printed; /* printed filename */
+ POOLMEM *stime; /* start time for incremental/differential */
+ JOB_DBR jr; /* Job record in Database */
+ uint32_t RestoreJobId; /* Id specified by UA */
+ POOLMEM *client_uname; /* client uname */
+ int replace; /* Replace option */
bool acquired_resource_locks; /* set if resource locks acquired */
- int NumVols; /* Number of Volume used in pool */
- int reschedule_count; /* Number of times rescheduled */
+ int NumVols; /* Number of Volume used in pool */
+ int reschedule_count; /* Number of times rescheduled */
#endif /* DIRECTOR_DAEMON */
#ifdef FILE_DAEMON
/* File Daemon specific part of JCR */
uint32_t num_files_examined; /* files examined this job */
- POOLMEM *last_fname; /* last file saved/verified */
+ POOLMEM *last_fname; /* last file saved/verified */
/*********FIXME********* add missing files and files to be retried */
- int incremental; /* set if incremental for SINCE */
- time_t mtime; /* begin time for SINCE */
- int mtime_only; /* compare only mtime and not ctime as well */
- int listing; /* job listing in estimate */
- long Ticket; /* Ticket */
- int save_level; /* save level */
- char *big_buf; /* I/O buffer */
- POOLMEM *compress_buf; /* Compression buffer */
- int32_t compress_buf_size; /* Length of compression buffer */
- int replace; /* Replace options */
- int buf_size; /* length of buffer */
- void *ff; /* Find Files packet */
+ int incremental; /* set if incremental for SINCE */
+ time_t mtime; /* begin time for SINCE */
+ int mtime_only; /* compare only mtime and not ctime as well */
+ int listing; /* job listing in estimate */
+ long Ticket; /* Ticket */
+ int save_level; /* save level */
+ char *big_buf; /* I/O buffer */
+ POOLMEM *compress_buf; /* Compression buffer */
+ int32_t compress_buf_size; /* Length of compression buffer */
+ int replace; /* Replace options */
+ int buf_size; /* length of buffer */
+ void *ff; /* Find Files packet */
char stored_addr[MAX_NAME_LENGTH]; /* storage daemon address */
uint32_t StartFile;
uint32_t EndFile;
uint32_t StartBlock;
uint32_t EndBlock;
- pthread_t heartbeat_id; /* id of heartbeat thread */
- volatile BSOCK *hb_bsock; /* duped SD socket */
+ pthread_t heartbeat_id; /* id of heartbeat thread */
+ volatile BSOCK *hb_bsock; /* duped SD socket */
#endif /* FILE_DAEMON */
#ifdef STORAGE_DAEMON
/* Storage Daemon specific part of JCR */
- JCR *next_dev; /* next JCR attached to device */
- JCR *prev_dev; /* previous JCR attached to device */
+ JCR *next_dev; /* next JCR attached to device */
+ JCR *prev_dev; /* previous JCR attached to device */
pthread_cond_t job_start_wait; /* Wait for FD to start Job */
int type;
- DEVRES *device; /* device to use */
- VOLUME_CAT_INFO VolCatInfo; /* Catalog info for desired volume */
- POOLMEM *job_name; /* base Job name (not unique) */
- POOLMEM *fileset_name; /* FileSet */
- POOLMEM *fileset_md5; /* MD5 for FileSet */
- POOLMEM *pool_name; /* pool to use */
- POOLMEM *pool_type; /* pool type to use */
- POOLMEM *media_type; /* media type */
- POOLMEM *dev_name; /* device name */
- VOL_LIST *VolList; /* list to read */
- int32_t NumVolumes; /* number of volumes used */
- int32_t CurVolume; /* current volume number */
- int spool_attributes; /* set if spooling attributes */
- int no_attributes; /* set if no attributes wanted */
- int label_status; /* device volume label status */
- int label_errors; /* count of label errors */
+ DEVRES *device; /* device to use */
+ VOLUME_CAT_INFO VolCatInfo; /* Catalog info for desired volume */
+ POOLMEM *job_name; /* base Job name (not unique) */
+ POOLMEM *fileset_name; /* FileSet */
+ POOLMEM *fileset_md5; /* MD5 for FileSet */
+ POOLMEM *pool_name; /* pool to use */
+ POOLMEM *pool_type; /* pool type to use */
+ POOLMEM *media_type; /* media type */
+ POOLMEM *dev_name; /* device name */
+ VOL_LIST *VolList; /* list to read */
+ int32_t NumVolumes; /* number of volumes used */
+ int32_t CurVolume; /* current volume number */
+ int spool_attributes; /* set if spooling attributes */
+ int no_attributes; /* set if no attributes wanted */
+ int label_status; /* device volume label status */
+ int label_errors; /* count of label errors */
int session_opened;
- DEV_RECORD rec; /* Read/Write record */
- long Ticket; /* ticket for this job */
- uint32_t VolFirstIndex; /* First file index this Volume */
- uint32_t VolLastIndex; /* Last file index this Volume */
- uint32_t FileIndex; /* Current File Index */
- uint32_t EndFile; /* End file written */
- uint32_t StartFile; /* Start write file */
- uint32_t StartBlock; /* Start write block */
- uint32_t EndBlock; /* Ending block written */
- bool NewVol; /* set when new Volume mounted */
- bool WroteVol; /* set when Volume written */
- int CurVol; /* Current Volume count */
+ DEV_RECORD rec; /* Read/Write record */
+ long Ticket; /* ticket for this job */
+ uint32_t VolFirstIndex; /* First file index this Volume */
+ uint32_t VolLastIndex; /* Last file index this Volume */
+ uint32_t FileIndex; /* Current File Index */
+ uint32_t EndFile; /* End file written */
+ uint32_t StartFile; /* Start write file */
+ uint32_t StartBlock; /* Start write block */
+ uint32_t EndBlock; /* Ending block written */
+ bool NewVol; /* set when new Volume mounted */
+ bool WroteVol; /* set when Volume written */
+ int CurVol; /* Current Volume count */
- uint32_t FileId; /* Last file id inserted */
+ uint32_t FileId; /* Last file id inserted */
/* Parmaters for Open Read Session */
- BSR *bsr; /* Bootstrap record -- has everything */
+ BSR *bsr; /* Bootstrap record -- has everything */
uint32_t read_VolSessionId;
uint32_t read_VolSessionTime;
uint32_t read_StartFile;
dlink *ilink = (dlink *)((char *)item+loffset); /* item's link */
if (item == head) {
head = ilink->next;
- ((dlink *)((char *)head+loffset))->prev = NULL;
+ if (head) {
+ ((dlink *)((char *)head+loffset))->prev = NULL;
+ }
+ if (item == tail) {
+ tail = ilink->prev;
+ }
} else if (item == tail) {
tail = ilink->prev;
- ((dlink *)((char *)tail+loffset))->next = NULL;
+ if (tail) {
+ ((dlink *)((char *)tail+loffset))->next = NULL;
+ }
} else {
xitem = ilink->next;
((dlink *)((char *)xitem+loffset))->prev = ilink->prev;
xitem = ilink->prev;
((dlink *)((char *)xitem+loffset))->next = ilink->next;
}
- free(item);
}
void * dlist::next(void *item)
jcr_chain->destroy();
free(jcr_chain);
+ jcr_chain = new dlist(jcr, &jcr->link);
+ printf("append 20 items 0-19\n");
+ for (int i=0; i<20; i++) {
+ sprintf(buf, "This is dlist item %d", i);
+ jcr = (MYJCR *)malloc(sizeof(MYJCR));
+ jcr->buf = bstrdup(buf);
+ jcr_chain->append(jcr);
+ if (i == 10) {
+ save_jcr = jcr;
+ }
+ }
+
+ next_jcr = (MYJCR *)jcr_chain->next(save_jcr);
+ printf("11th item=%s\n", next_jcr->buf);
+ jcr = (MYJCR *)malloc(sizeof(MYJCR));
+ jcr->buf = save_jcr->buf;
+ printf("Remove 10th item\n");
+ jcr_chain->remove(save_jcr);
+ printf("Re-insert 10th item\n");
+ jcr_chain->insert_before(jcr, next_jcr);
+
+ printf("Print remaining list.\n");
+ for (MYJCR *jcr=NULL; (jcr=(MYJCR *)jcr_chain->next(jcr)); ) {
+ printf("Dlist item = %s\n", jcr->buf);
+ free(jcr->buf);
+ }
+
+ delete jcr_chain;
+
sm_dump(False);
}
}
Dmsg1(100, "want new name=%s\n", jcr->VolumeName);
memcpy(&dev->VolCatInfo, &jcr->VolCatInfo, sizeof(jcr->VolCatInfo));
+ if (strcmp(dev->VolCatInfo.VolCatStatus, "Recycle") == 0) {
+ recycle = 1;
+ }
break; /* got a Volume */
case VOL_NO_LABEL:
/* */
#define VERSION "1.31"
#define VSTRING "1"
-#define BDATE "17 Jul 2003"
-#define LSMDATE "17Jul03"
+#define BDATE "20 Jul 2003"
+#define LSMDATE "20Jul03"
/* Debug flags */
#define DEBUG 1
*/
/* #define SEND_DMSG_TO_FILE 1 */
-/* Turn this on if you want to try the new Job semaphore code */
+/* Turn this on if you want to use the Job semaphore code */
#define USE_SEMAPHORE
+/* Turn this on if you want to use the new Job scheduling code */
+#ifdef xxx
+#undef USE_SEMAPHORE
+#define JOB_QUEUE 1
+#endif
+
+
/* #define NO_ATTRIBUTES_TEST 1 */
/* #define NO_TAPE_WRITE_TEST 1 */
/* #define FD_NO_SEND TEST 1 */