+2003-07-xx Version 1.31 Beta xxJul03
+- Take serial.h provided by David Craigon, which corrects differences in
+ prototypes between serial.h and serial.c.
+- Make db_get_media_ids() return Media Ids only for the current pool.
+- Add new jobq.h and jobq.c drived from workq.
+- Add JobPriority to jcr, and Priority to Job resource as well as
+ to the run line in a Schedule.
+- Remove unused pool record from autoprune.c.
+- Implement Nic Bellamy's RecycleCurrentVolume.
+- Implement RecycleOldestVolume.
+- Begin adding new JOB_QUEUE code to the Director.
+- Create a single routine recycle_volume().
+- Retry accept(), bind() and socket() if EINTR occurs.
+- Implement insert_before(), insert_after(), and empty() for dlist class.
+ Also require offset to be given by giving item and link address.
+- Make error some messages in smtp.c a bit more explicit.
+
2003-07-12 Version 1.31 Beta 14Jul03
- Marc Brueckner reported a crash during restore (a missing tree->)
- Moved host.h.in file from filed to src.
(./create_mys... ./make_my...).
- Document all the status codes JobLevel, JobType, JobStatus.
- Document dynamic DNS
-
+- Document using multiple Pools for daily rotating tapes.
+- Fix default time in seconds without qualifier -- bacula-dir.conf
+- From Marc Brueckner der Megauser
+ Backup OS Restore OS Result
+ -------------------------------------------------------------------------------
+ WinME WinME Works
+ WinME WinNT Works (minor problems with rights)
+ WinME WINXP Works (minor problems with rights)
+ WinME Linux Works (minor problems with rights)
+
+
+ WinXP WinXP Works
+ WinXP WinNT Works *
+ WinXP WinME Error: Win32 data stream not supported on this Client
+ WinXP Linux Error: Win32 data stream not supported on this Client
+
+ WinNT WinNT Works
+ WinNT WinXP Works
+ WinNT WinME Error: Win32 data stream not supported on this Client
+ WinNT Linux Error: Win32 data stream not supported on this Client
+
+ Linux Linux Works
+ Linux WinNT Works (minor problems with rights)
+ Linux WinME Client hangs
+ * When restoring files on WinNT previously backed up on XP I get the following
+ Message for some files.
+ But any file was restored correctly !
+
+ Write error on c:/tmp/transfer/WinSCP2.exe: ERR=Die Daten sind unzulassig. (the
+ data is invalid).
+- Add Nic Bellamy's backup scheme.
Testing to do: (painful)
- that ALL console command line options work and are always implemented
- Update Automatic Volume Labeling in disk.wml
- Figure out how to handle DHCP IP addresses -- use dynamic DNS.
- Add OS, GCC version to traceback output.
-
/* sql.c */
B_DB *db_init_database(JCR *jcr, char *db_name, char *db_user, char *db_password,
- char *db_address, int db_port, char *db_socket);
+ char *db_address, int db_port, char *db_socket);
int db_open_database(JCR *jcr, B_DB *db);
void db_close_database(JCR *jcr, B_DB *db);
void db_escape_string(char *snew, char *old, int len);
int db_create_media_record(JCR *jcr, B_DB *db, MEDIA_DBR *media_dbr);
int db_create_client_record(JCR *jcr, B_DB *db, CLIENT_DBR *cr);
int db_create_fileset_record(JCR *jcr, B_DB *db, FILESET_DBR *fsr);
-int db_create_pool_record(JCR *jcr, B_DB *db, POOL_DBR *pool_dbr);
+int db_create_pool_record(JCR *jcr, B_DB *db, POOL_DBR *pool_dbr);
int db_create_jobmedia_record(JCR *jcr, B_DB *mdb, JOBMEDIA_DBR *jr);
int db_create_counter_record(JCR *jcr, B_DB *mdb, COUNTER_DBR *cr);
int db_get_num_pool_records(JCR *jcr, B_DB *mdb);
int db_get_pool_ids(JCR *jcr, B_DB *mdb, int *num_ids, uint32_t **ids);
int db_get_client_ids(JCR *jcr, B_DB *mdb, int *num_ids, uint32_t **ids);
-int db_get_media_ids(JCR *jcr, B_DB *mdb, int *num_ids, uint32_t **ids);
+int db_get_media_ids(JCR *jcr, B_DB *mdb, uint32_t PoolId, int *num_ids, uint32_t **ids);
int db_get_job_volume_parameters(JCR *jcr, B_DB *mdb, uint32_t JobId, VOL_PARAMS **VolParams);
int db_get_client_record(JCR *jcr, B_DB *mdb, CLIENT_DBR *cdbr);
int db_get_counter_record(JCR *jcr, B_DB *mdb, COUNTER_DBR *cr);
/*
- * This function returns a list of all the Media record ids.
+ * This function returns a list of all the Media record ids for
+ * the current Pool.
* The caller must free ids if non-NULL.
*
* Returns 0: on failure
* 1: on success
*/
-int db_get_media_ids(JCR *jcr, B_DB *mdb, int *num_ids, uint32_t *ids[])
+int db_get_media_ids(JCR *jcr, B_DB *mdb, uint32_t PoolId, int *num_ids, uint32_t *ids[])
{
SQL_ROW row;
int stat = 0;
db_lock(mdb);
*ids = NULL;
- Mmsg(&mdb->cmd, "SELECT MediaId FROM Media");
+ Mmsg(&mdb->cmd, "SELECT MediaId FROM Media WHERE PoolId=%u", PoolId);
if (QUERY_DB(jcr, mdb, mdb->cmd)) {
*num_ids = sql_num_rows(mdb);
if (*num_ids > 0) {
autoprune.c backup.c bsr.c \
catreq.c dird_conf.c expand.c \
fd_cmds.c getmsg.c inc_conf.c job.c \
- mountreq.c msgchan.c newvol.c \
+ jobq.c mountreq.c msgchan.c newvol.c \
recycle.c restore.c run_conf.c \
scheduler.c sql_cmds.c \
ua_cmds.c ua_dotcmds.c \
autoprune.o backup.o bsr.o \
catreq.o dird_conf.o expand.o \
fd_cmds.o getmsg.o inc_conf.o job.o \
- mountreq.o msgchan.o newvol.o \
+ jobq.o mountreq.o msgchan.o newvol.o \
recycle.o restore.o run_conf.o \
scheduler.o sql_cmds.o \
ua_cmds.o ua_dotcmds.o \
uint32_t *ids = NULL;
int num_ids = 0;
MEDIA_DBR mr;
- POOL_DBR pr;
UAContext *ua;
if (!jcr->job->PruneVolumes && !jcr->pool->AutoPrune) {
return 0;
}
memset(&mr, 0, sizeof(mr));
- memset(&pr, 0, sizeof(pr));
ua = new_ua_context(jcr);
db_lock(jcr->db);
- /* Get the Pool Record and a list of Media Id's in the Pool */
- pr.PoolId = jcr->PoolId;
- if (!db_get_pool_record(jcr, jcr->db, &pr) || !db_get_media_ids(jcr, jcr->db, &num_ids, &ids)) {
+ /* Get the List of all media ids in the current Pool */
+ if (!db_get_media_ids(jcr, jcr->db, jcr->PoolId, &num_ids, &ids)) {
Jmsg(jcr, M_ERROR, 0, "%s", db_strerror(jcr->db));
goto bail_out;
}
continue;
}
/* Prune only Volumes from current Pool */
- if (pr.PoolId != mr.PoolId) {
+ if (jcr->PoolId != mr.PoolId) {
continue;
}
/* Prune only Volumes with status "Full", "Used", or "Append" */
strcmp(mr.VolStatus, "Append") == 0 ||
strcmp(mr.VolStatus, "Used") == 0) {
Dmsg1(200, "Prune Volume %s\n", mr.VolumeName);
- stat += prune_volume(ua, &pr, &mr);
+ stat += prune_volume(ua, &mr);
Dmsg1(200, "Num pruned = %d\n", stat);
}
}
}
}
- if (!ok && jcr->pool->purge_oldest_volume) {
- Dmsg1(200, "No next volume found. PurgeOldest=%d\n",
- jcr->pool->purge_oldest_volume);
+ if (!ok && (jcr->pool->purge_oldest_volume ||
+ jcr->pool->recycle_oldest_volume)) {
+ Dmsg2(200, "No next volume found. PurgeOldest=%d\n RecyleOldest=%d",
+ jcr->pool->purge_oldest_volume, jcr->pool->recycle_oldest_volume);
/* Find oldest volume to recycle */
ok = db_find_next_volume(jcr, jcr->db, -1, &mr);
Dmsg1(400, "Find oldest=%d\n", ok);
Dmsg0(400, "Try purge.\n");
/* Try to purge oldest volume */
ua = new_ua_context(jcr);
- Jmsg(jcr, M_INFO, 0, _("Purging oldest volume \"%s\"\n"), mr.VolumeName);
- ok = purge_jobs_from_volume(ua, &mr);
+ if (jcr->pool->purge_oldest_volume) {
+ Jmsg(jcr, M_INFO, 0, _("Purging oldest volume \"%s\"\n"), mr.VolumeName);
+ ok = purge_jobs_from_volume(ua, &mr);
+ } else {
+ Jmsg(jcr, M_INFO, 0, _("Pruning oldest volume \"%s\"\n"), mr.VolumeName);
+ ok = prune_volume(ua, &mr);
+ }
free_ua_context(ua);
if (ok) {
- ok = recycle_oldest_purged_volume(jcr, &mr);
- Dmsg1(400, "Recycle after recycle oldest=%d\n", ok);
+ ok = recycle_volume(jcr, &mr);
+ Dmsg1(400, "Recycle after purge oldest=%d\n", ok);
}
}
}
*/
unbash_spaces(mr.VolumeName);
if (db_get_media_record(jcr, jcr->db, &mr)) {
- int VolSuitable = 0;
+ bool VolSuitable = false;
char *reason = ""; /* detailed reason for rejection */
jcr->MediaId = mr.MediaId;
Dmsg1(120, "VolumeInfo MediaId=%d\n", jcr->MediaId);
pm_strcpy(&jcr->VolumeName, mr.VolumeName);
if (!writing) {
- VolSuitable = 1; /* accept anything for read */
+ VolSuitable = true; /* accept anything for read */
} else {
/*
* SD wants to write this Volume, so make
} else if (strcmp(mr.VolStatus, "Append") != 0 &&
strcmp(mr.VolStatus, "Recycle") != 0) {
reason = "not Append or Recycle";
+ /* XXX nicb start */
+ /* What we're trying to do here is see if the current volume is
+ * "recycleable" - ie. if we prune all expired jobs off it, is
+ * it now possible to reuse it for the job that it is currently
+ * needed for?
+ */
+ if ((mr.LastWritten + mr.VolRetention) < (utime_t)time(NULL)
+ && mr.Recycle && jcr->pool->recycle_current_volume
+ && (strcmp(mr.VolStatus, "Full") == 0 ||
+ strcmp(mr.VolStatus, "Used") == 0)) {
+ /*
+ * Attempt prune of current volume to see if we can
+ * recycle it for use.
+ */
+ UAContext *ua;
+
+ reason = "not Append or Recycle (auto recycle failed)";
+
+ ua = new_ua_context(jcr);
+ ok = prune_volume(ua, &mr);
+ free_ua_context(ua);
+
+ if (ok) {
+ /* If fully purged, recycle current volume */
+ if (recycle_volume(jcr, &mr)) {
+ Jmsg(jcr, M_INFO, 0, "Recycled current "
+ "volume \"%s\"\n", mr.VolumeName);
+ VolSuitable = true;
+ }
+ }
+ }
+ /* XXX nicb end */
} else if (strcmp(mr.MediaType, jcr->store->media_type) != 0) {
reason = "not correct MediaType";
} else if (!jcr->pool->accept_any_volume) {
reason = "Volume not in sequence";
} else {
- VolSuitable = 1;
+ VolSuitable = true;
}
}
if (VolSuitable) {
#include "bsr.h"
#include "ua.h"
#include "protos.h"
+#include "jobq.h"
/* Globals that dird.c exports */
extern int debug_level;
/*
* Main configuration file parser for Bacula Directors,
* some parts may be split into separate files such as
- * the schedule configuration (sch_config.c).
+ * the schedule configuration (run_config.c).
*
* Note, the configuration file parser consists of three parts
*
*/
int r_first = R_FIRST;
int r_last = R_LAST;
+
pthread_mutex_t res_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Imported subroutines */
{"rescheduleonerror", store_yesno, ITEM(res_job.RescheduleOnError), 1, ITEM_DEFAULT, 0},
{"rescheduleinterval", store_time, ITEM(res_job.RescheduleInterval), 0, ITEM_DEFAULT, 60 * 30},
{"rescheduletimes", store_pint, ITEM(res_job.RescheduleTimes), 0, 0, 0},
+ {"priority", store_pint, ITEM(res_job.Priority), 0, ITEM_DEFAULT, 10},
{NULL, NULL, NULL, 0, 0, 0}
};
{"usecatalog", store_yesno, ITEM(res_pool.use_catalog), 1, ITEM_DEFAULT, 1},
{"usevolumeonce", store_yesno, ITEM(res_pool.use_volume_once), 1, 0, 0},
{"purgeoldestvolume", store_yesno, ITEM(res_pool.purge_oldest_volume), 1, 0, 0},
+ {"recycleoldestvolume", store_yesno, ITEM(res_pool.recycle_oldest_volume), 1, 0, 0},
+ {"recyclecurrentvolume", store_yesno, ITEM(res_pool.recycle_current_volume), 1, 0, 0},
{"maximumvolumes", store_pint, ITEM(res_pool.max_volumes), 0, 0, 0},
{"maximumvolumejobs", store_pint, ITEM(res_pool.MaxVolJobs), 0, 0, 0},
{"maximumvolumefiles", store_pint, ITEM(res_pool.MaxVolFiles), 0, 0, 0},
sendit(sock, " --> ");
dump_resource(-R_COUNTER, (RES *)res->res_counter.WrapCounter, sendit, sock);
}
+ break;
-
-
- break;
case R_CLIENT:
sendit(sock, "Client: name=%s address=%s FDport=%d MaxJobs=%u\n",
res->res_client.hdr.name, res->res_client.address, res->res_client.FDport,
res->res_cat.db_port, res->res_cat.db_name, NPRT(res->res_cat.db_user));
break;
case R_JOB:
- sendit(sock, "Job: name=%s JobType=%d level=%s MaxJobs=%u\n",
+ sendit(sock, "Job: name=%s JobType=%d level=%s Priority=%d MaxJobs=%u\n",
res->res_job.hdr.name, res->res_job.JobType,
- level_to_str(res->res_job.level), res->res_job.MaxConcurrentJobs);
+ level_to_str(res->res_job.level), res->res_job.Priority,
+ res->res_job.MaxConcurrentJobs);
sendit(sock, " Resched=%d Times=%d Interval=%s\n",
res->res_job.RescheduleOnError, res->res_job.RescheduleTimes,
edit_uint64_with_commas(res->res_job.RescheduleInterval, ed1));
((JOB *)(item->value))->JobType = item->code;
while ((token = lex_get_token(lc, T_ALL)) != T_EOL) {
- int found;
+ bool found = false;
Dmsg1(150, "store_backup got token=%s\n", lex_tok_to_str(token));
scan_err1(lc, "Expected a backup/verify keyword, got: %s", lc->str);
}
Dmsg1(190, "Got keyword: %s\n", lc->str);
- found = FALSE;
for (i=0; BakVerFields[i].name; i++) {
if (strcasecmp(lc->str, BakVerFields[i].name) == 0) {
- found = TRUE;
+ found = true;
if (lex_get_token(lc, T_ALL) != T_EQUALS) {
scan_err1(lc, "Expected an equals, got: %s", lc->str);
}
token = lex_get_token(lc, T_NAME);
Dmsg1(190, "Got value: %s\n", lc->str);
switch (BakVerFields[i].token) {
- case 'C':
- /* Find Client Resource */
- if (pass == 2) {
- res = GetResWithName(R_CLIENT, lc->str);
- if (res == NULL) {
- scan_err1(lc, "Could not find specified Client Resource: %s",
- lc->str);
- }
- res_all.res_job.client = (CLIENT *)res;
+ case 'C':
+ /* Find Client Resource */
+ if (pass == 2) {
+ res = GetResWithName(R_CLIENT, lc->str);
+ if (res == NULL) {
+ scan_err1(lc, "Could not find specified Client Resource: %s",
+ lc->str);
}
- break;
- case 'F':
- /* Find FileSet Resource */
- if (pass == 2) {
- res = GetResWithName(R_FILESET, lc->str);
- if (res == NULL) {
- scan_err1(lc, "Could not find specified FileSet Resource: %s\n",
- lc->str);
- }
- res_all.res_job.fileset = (FILESET *)res;
- }
- break;
- case 'L':
- /* Get level */
- for (i=0; joblevels[i].level_name; i++) {
- if (joblevels[i].job_type == item->code &&
- strcasecmp(lc->str, joblevels[i].level_name) == 0) {
- ((JOB *)(item->value))->level = joblevels[i].level;
- i = 0;
- break;
- }
+ res_all.res_job.client = (CLIENT *)res;
+ }
+ break;
+ case 'F':
+ /* Find FileSet Resource */
+ if (pass == 2) {
+ res = GetResWithName(R_FILESET, lc->str);
+ if (res == NULL) {
+ scan_err1(lc, "Could not find specified FileSet Resource: %s\n",
+ lc->str);
}
- if (i != 0) {
- scan_err1(lc, "Expected a Job Level keyword, got: %s", lc->str);
+ res_all.res_job.fileset = (FILESET *)res;
+ }
+ break;
+ case 'L':
+ /* Get level */
+ for (i=0; joblevels[i].level_name; i++) {
+ if (joblevels[i].job_type == item->code &&
+ strcasecmp(lc->str, joblevels[i].level_name) == 0) {
+ ((JOB *)(item->value))->level = joblevels[i].level;
+ i = 0;
+ break;
}
- break;
+ }
+ if (i != 0) {
+ scan_err1(lc, "Expected a Job Level keyword, got: %s", lc->str);
+ }
+ break;
} /* end switch */
break;
} /* end if strcmp() */
((JOB *)(item->value))->JobType = item->code;
while ((token = lex_get_token(lc, T_ALL)) != T_EOL) {
- int found;
+ bool found = false;
if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) {
scan_err1(lc, "expected a name, got: %s", lc->str);
}
- found = FALSE;
for (i=0; RestoreFields[i].name; i++) {
Dmsg1(190, "Restore kw=%s\n", lc->str);
if (strcasecmp(lc->str, RestoreFields[i].name) == 0) {
- found = TRUE;
+ found = true;
if (lex_get_token(lc, T_ALL) != T_EQUALS) {
scan_err1(lc, "Expected an equals, got: %s", lc->str);
}
token = lex_get_token(lc, T_ALL);
Dmsg1(190, "Restore value=%s\n", lc->str);
switch (RestoreFields[i].token) {
- case 'B':
- /* Bootstrap */
- if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) {
- scan_err1(lc, "Expected a Restore bootstrap file, got: %s", lc->str);
- }
- if (pass == 1) {
- res_all.res_job.RestoreBootstrap = bstrdup(lc->str);
- }
- break;
- case 'C':
- /* Find Client Resource */
- if (pass == 2) {
- res = GetResWithName(R_CLIENT, lc->str);
- if (res == NULL) {
- scan_err1(lc, "Could not find specified Client Resource: %s",
- lc->str);
- }
- res_all.res_job.client = (CLIENT *)res;
- }
- break;
- case 'F':
- /* Find FileSet Resource */
- if (pass == 2) {
- res = GetResWithName(R_FILESET, lc->str);
- if (res == NULL) {
- scan_err1(lc, "Could not find specified FileSet Resource: %s\n",
- lc->str);
- }
- res_all.res_job.fileset = (FILESET *)res;
- }
- break;
- case 'J':
- /* JobId */
- if (token != T_NUMBER) {
- scan_err1(lc, "expected an integer number, got: %s", lc->str);
- }
- errno = 0;
- res_all.res_job.RestoreJobId = strtol(lc->str, NULL, 0);
- Dmsg1(190, "RestorJobId=%d\n", res_all.res_job.RestoreJobId);
- if (errno != 0) {
- scan_err1(lc, "expected an integer number, got: %s", lc->str);
- }
- break;
- case 'W':
- /* Where */
- if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) {
- scan_err1(lc, "Expected a Restore root directory, got: %s", lc->str);
- }
- if (pass == 1) {
- res_all.res_job.RestoreWhere = bstrdup(lc->str);
- }
- break;
- case 'R':
- /* Replacement options */
- if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) {
- scan_err1(lc, "Expected a keyword name, got: %s", lc->str);
+ case 'B':
+ /* Bootstrap */
+ if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) {
+ scan_err1(lc, "Expected a Restore bootstrap file, got: %s", lc->str);
+ }
+ if (pass == 1) {
+ res_all.res_job.RestoreBootstrap = bstrdup(lc->str);
+ }
+ break;
+ case 'C':
+ /* Find Client Resource */
+ if (pass == 2) {
+ res = GetResWithName(R_CLIENT, lc->str);
+ if (res == NULL) {
+ scan_err1(lc, "Could not find specified Client Resource: %s",
+ lc->str);
}
- /* Fix to scan Replacement options */
- for (i=0; ReplaceOptions[i].name; i++) {
- if (strcasecmp(lc->str, ReplaceOptions[i].name) == 0) {
- ((JOB *)(item->value))->replace = ReplaceOptions[i].token;
- i = 0;
- break;
- }
+ res_all.res_job.client = (CLIENT *)res;
+ }
+ break;
+ case 'F':
+ /* Find FileSet Resource */
+ if (pass == 2) {
+ res = GetResWithName(R_FILESET, lc->str);
+ if (res == NULL) {
+ scan_err1(lc, "Could not find specified FileSet Resource: %s\n",
+ lc->str);
}
- if (i != 0) {
- scan_err1(lc, "Expected a Restore replacement option, got: %s", lc->str);
+ res_all.res_job.fileset = (FILESET *)res;
+ }
+ break;
+ case 'J':
+ /* JobId */
+ if (token != T_NUMBER) {
+ scan_err1(lc, "expected an integer number, got: %s", lc->str);
+ }
+ errno = 0;
+ res_all.res_job.RestoreJobId = strtol(lc->str, NULL, 0);
+ Dmsg1(190, "RestorJobId=%d\n", res_all.res_job.RestoreJobId);
+ if (errno != 0) {
+ scan_err1(lc, "expected an integer number, got: %s", lc->str);
+ }
+ break;
+ case 'W':
+ /* Where */
+ if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) {
+ scan_err1(lc, "Expected a Restore root directory, got: %s", lc->str);
+ }
+ if (pass == 1) {
+ res_all.res_job.RestoreWhere = bstrdup(lc->str);
+ }
+ break;
+ case 'R':
+ /* Replacement options */
+ if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) {
+ scan_err1(lc, "Expected a keyword name, got: %s", lc->str);
+ }
+ /* Fix to scan Replacement options */
+ for (i=0; ReplaceOptions[i].name; i++) {
+ if (strcasecmp(lc->str, ReplaceOptions[i].name) == 0) {
+ ((JOB *)(item->value))->replace = ReplaceOptions[i].token;
+ i = 0;
+ break;
}
- break;
+ }
+ if (i != 0) {
+ scan_err1(lc, "Expected a Restore replacement option, got: %s", lc->str);
+ }
+ break;
} /* end switch */
break;
} /* end if strcmp() */
char *password;
CAT *catalog; /* Catalog resource */
uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */
- semlock_t sem; /* client semaphore */
+#ifdef USE_SEMAPHORE
+ semlock_t sem; /* storage semaphore */
+#endif
+#ifdef JOB_QUEUE
+ uint32_t NumConcurrentJobs; /* number of concurrent jobs running */
+#endif
int enable_ssl; /* Use SSL */
};
char *dev_name;
int autochanger; /* set if autochanger */
uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */
+#ifdef USE_SEMAPHORE
semlock_t sem; /* storage semaphore */
+#endif
+#ifdef JOB_QUEUE
+ uint32_t NumConcurrentJobs; /* number of concurrent jobs running */
+#endif
int enable_ssl; /* Use SSL */
};
int JobType; /* job type (backup, verify, restore */
int level; /* default backup/verify level */
+ int Priority; /* Job priority */
int RestoreJobId; /* What -- JobId to restore */
char *RestoreWhere; /* Where on disk to restore -- directory */
char *RestoreBootstrap; /* Bootstrap file */
STORE *storage; /* Where is device -- Storage daemon */
POOL *pool; /* Where is media -- Media Pool */
- semlock_t sem; /* Job semaphore */
+#ifdef USE_SEMAPHORE
+ semlock_t sem; /* storage semaphore */
+#endif
+#ifdef JOB_QUEUE
+ uint32_t NumConcurrentJobs; /* number of concurrent jobs running */
+#endif
};
#define MAX_FOPTS 30
int catalog_files; /* maintain file entries in catalog */
int use_volume_once; /* write on volume only once */
int accept_any_volume; /* accept any volume */
- int purge_oldest_volume; /* purge oldest volume */
+ int purge_oldest_volume; /* purge oldest volume */
+ int recycle_oldest_volume; /* attempt to recycle oldest volume */
+ int recycle_current_volume; /* attempt recycle of current volume */
uint32_t max_volumes; /* max number of volumes */
utime_t VolRetention; /* volume retention period in seconds */
utime_t VolUseDuration; /* duration volume can be used */
struct RUN {
RUN *next; /* points to next run record */
int level; /* level override */
+ int Priority; /* priority override */
int job_type;
POOL *pool; /* Pool override */
STORE *storage; /* Storage override */
static pthread_cond_t resource_wait;
static int waiting = 0; /* count of waiting threads */
#else
+#ifdef JOB_QUEUE
+jobq_t job_queue;
+#else
/* Queue of jobs to be run */
workq_t job_wq; /* our job work queue */
#endif
+#endif
void init_job_server(int max_workers)
{
}
#else
+#ifdef JOB_QUEUE
+ if ((stat = job_init(&job_queue, max_workers, job_thread)) != 0) {
+ Emsg1(M_ABORT, 0, _("Could not init job queue: ERR=%s\n"), strerror(stat));
+ }
+#else
+ /* This is the OLD work queue code to go away */
if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
Emsg1(M_ABORT, 0, _("Could not init job work queue: ERR=%s\n"), strerror(stat));
}
+#endif
#endif
return;
}
#ifdef USE_SEMAPHORE
pthread_t tid;
#else
+#ifndef JOB_QUEUE
workq_ele_t *work_item;
+#endif
#endif
sm_check(__FILE__, __LINE__, True);
if ((stat = pthread_create(&tid, NULL, job_thread, (void *)jcr)) != 0) {
Emsg1(M_ABORT, 0, _("Unable to create job thread: ERR=%s\n"), strerror(stat));
}
+#else
+#ifdef JOB_QUEUE
+ /* Queue the job to be run */
+ if ((stat = jobq_add(&job_queue, jcr)) != 0) {
+ Emsg1(M_ABORT, 0, _("Could not add job queue: ERR=%s\n"), strerror(stat));
+ }
#else
/* Queue the job to be run */
if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) {
Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat));
}
jcr->work_item = work_item;
+#endif
#endif
Dmsg0(200, "Done run_job()\n");
}
}
}
switch (jcr->JobType) {
- case JT_BACKUP:
- do_backup(jcr);
- if (jcr->JobStatus == JS_Terminated) {
- do_autoprune(jcr);
- }
- break;
- case JT_VERIFY:
- do_verify(jcr);
- if (jcr->JobStatus == JS_Terminated) {
- do_autoprune(jcr);
- }
- break;
- case JT_RESTORE:
- do_restore(jcr);
- if (jcr->JobStatus == JS_Terminated) {
- do_autoprune(jcr);
- }
- break;
- case JT_ADMIN:
- do_admin(jcr);
- if (jcr->JobStatus == JS_Terminated) {
- do_autoprune(jcr);
- }
- break;
- default:
- Pmsg1(0, "Unimplemented job type: %d\n", jcr->JobType);
- break;
+ case JT_BACKUP:
+ do_backup(jcr);
+ if (jcr->JobStatus == JS_Terminated) {
+ do_autoprune(jcr);
}
+ break;
+ case JT_VERIFY:
+ do_verify(jcr);
+ if (jcr->JobStatus == JS_Terminated) {
+ do_autoprune(jcr);
+ }
+ break;
+ case JT_RESTORE:
+ do_restore(jcr);
+ if (jcr->JobStatus == JS_Terminated) {
+ do_autoprune(jcr);
+ }
+ break;
+ case JT_ADMIN:
+ do_admin(jcr);
+ if (jcr->JobStatus == JS_Terminated) {
+ do_autoprune(jcr);
+ }
+ break;
+ default:
+ Pmsg1(0, "Unimplemented job type: %d\n", jcr->JobType);
+ break;
+ }
if (jcr->job->RunAfterJob) {
POOLMEM *after = get_pool_memory(PM_FNAME);
int status;
V(mutex);
/* Try again */
}
- jcr->acquired_resource_locks = 1;
+ jcr->acquired_resource_locks = true;
#endif
return 1;
}
if (waiting > 0) {
pthread_cond_broadcast(&resource_wait);
}
- jcr->acquired_resource_locks = 0;
+ jcr->acquired_resource_locks = false;
V(mutex);
#endif
}
--- /dev/null
+/*
+ * Bacula job queue routines.
+ *
+ * Kern Sibbald, July MMIII
+ *
+ * Version $Id$
+ *
+ * This code was adapted from the Bacula workq, which was
+ * adapted from "Programming with POSIX Threads", by
+ * David R. Butenhof
+ *
+ * Example:
+ *
+ * static jobq_t jq; define job queue
+ *
+ * Initialize queue
+ * if ((stat = jobq_init(&jq, max_workers, job_thread)) != 0) {
+ * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
+ * }
+ *
+ * Add an item to the queue
+ * if ((stat = jobq_add(&jq, jcr)) != 0) {
+ * Emsg1(M_ABORT, 0, "Could not add job to queue: ERR=%s\n", strerror(errno));
+ * }
+ *
+ * Terminate the queue
+ * jobq_destroy(jobq_t *jq);
+ *
+ */
+/*
+ Copyright (C) 2000-2003 Kern Sibbald and John Walker
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public
+ License along with this program; if not, write to the Free
+ Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+ MA 02111-1307, USA.
+
+ */
+
+#include "bacula.h"
+#include "dird.h"
+
+/* Forward referenced functions */
+static void *jobq_server(void *arg);
+
+/*
+ * Initialize a job queue
+ *
+ * Returns: 0 on success
+ * errno on failure
+ */
+int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
+{
+ int stat;
+ jobq_item_t *item = NULL;
+
+ if ((stat = pthread_attr_init(&jq->attr)) != 0) {
+ return stat;
+ }
+ if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
+ pthread_attr_destroy(&jq->attr);
+ return stat;
+ }
+ if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
+ pthread_attr_destroy(&jq->attr);
+ return stat;
+ }
+ if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
+ pthread_mutex_destroy(&jq->mutex);
+ pthread_attr_destroy(&jq->attr);
+ return stat;
+ }
+ jq->quit = false;
+ jq->max_workers = threads; /* max threads to create */
+ jq->num_workers = 0; /* no threads yet */
+ jq->idle_workers = 0; /* no idle threads */
+ jq->engine = engine; /* routine to run */
+ jq->valid = JOBQ_VALID;
+ jq->list.init(item, &item->link);
+ return 0;
+}
+
+/*
+ * Destroy the job queue
+ *
+ * Returns: 0 on success
+ * errno on failure
+ */
+int jobq_destroy(jobq_t *jq)
+{
+ int stat, stat1, stat2;
+
+ if (jq->valid != JOBQ_VALID) {
+ return EINVAL;
+ }
+ if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+ return stat;
+ }
+ jq->valid = 0; /* prevent any more operations */
+
+ /*
+ * If any threads are active, wake them
+ */
+ if (jq->num_workers > 0) {
+ jq->quit = true;
+ if (jq->idle_workers) {
+ if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
+ pthread_mutex_unlock(&jq->mutex);
+ return stat;
+ }
+ }
+ while (jq->num_workers > 0) {
+ if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
+ pthread_mutex_unlock(&jq->mutex);
+ return stat;
+ }
+ }
+ }
+ if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
+ return stat;
+ }
+ stat = pthread_mutex_destroy(&jq->mutex);
+ stat1 = pthread_cond_destroy(&jq->work);
+ stat2 = pthread_attr_destroy(&jq->attr);
+ jq->list.destroy();
+ return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
+}
+
+
+/*
+ * Add a job to the queue
+ * jq is a queue that was created with jobq_init
+ *
+ */
+int jobq_add(jobq_t *jq, JCR *jcr)
+{
+ int stat;
+ jobq_item_t *item, *li;
+ pthread_t id;
+ bool inserted = false;
+
+ Dmsg0(200, "jobq_add\n");
+ if (jq->valid != JOBQ_VALID) {
+ return EINVAL;
+ }
+
+ if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
+ return ENOMEM;
+ }
+ item->jcr = jcr;
+ if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+ free(item);
+ return stat;
+ }
+
+ Dmsg0(200, "add item to queue\n");
+ for (li=NULL; (li=(jobq_item_t *)jq->list.next(li)); ) {
+ if (li->jcr->JobPriority < jcr->JobPriority) {
+ jq->list.insert_before(item, li);
+ inserted = true;
+ }
+ }
+ if (!inserted) {
+ jq->list.append(item);
+ }
+
+ /* if any threads are idle, wake one */
+ if (jq->idle_workers > 0) {
+ Dmsg0(200, "Signal worker\n");
+ if ((stat = pthread_cond_signal(&jq->work)) != 0) {
+ pthread_mutex_unlock(&jq->mutex);
+ return stat;
+ }
+ } else if (jq->num_workers < jq->max_workers) {
+ Dmsg0(200, "Create worker thread\n");
+ /* No idle threads so create a new one */
+ set_thread_concurrency(jq->max_workers + 1);
+ if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
+ pthread_mutex_unlock(&jq->mutex);
+ return stat;
+ }
+ jq->num_workers++;
+ }
+ pthread_mutex_unlock(&jq->mutex);
+ Dmsg0(200, "Return jobq_add\n");
+ return stat;
+}
+
+/*
+ * Remove a job from the job queue
+ * jq is a queue that was created with jobq_init
+ * work_item is an element of work
+ *
+ * Note, it is "removed" by immediately calling a processing routine.
+ * if you want to cancel it, you need to provide some external means
+ * of doing so.
+ */
+int jobq_remove(jobq_t *jq, JCR *jcr)
+{
+ int stat;
+ bool found = false;
+ pthread_t id;
+ jobq_item_t *item;
+
+ Dmsg0(200, "jobq_remove\n");
+ if (jq->valid != JOBQ_VALID) {
+ return EINVAL;
+ }
+
+ if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+ return stat;
+ }
+
+ for (item=NULL; (item=(jobq_item_t *)jq->list.next(item)); ) {
+ if (jcr == item->jcr) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ return EINVAL;
+ }
+
+ /* Move item to be the first on the list */
+ jq->list.remove(item);
+ jq->list.prepend(item);
+
+ /* if any threads are idle, wake one */
+ if (jq->idle_workers > 0) {
+ Dmsg0(200, "Signal worker\n");
+ if ((stat = pthread_cond_signal(&jq->work)) != 0) {
+ pthread_mutex_unlock(&jq->mutex);
+ return stat;
+ }
+ } else {
+ Dmsg0(200, "Create worker thread\n");
+ /* No idle threads so create a new one */
+ set_thread_concurrency(jq->max_workers + 1);
+ if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
+ pthread_mutex_unlock(&jq->mutex);
+ return stat;
+ }
+ jq->num_workers++;
+ }
+ pthread_mutex_unlock(&jq->mutex);
+ Dmsg0(200, "Return jobq_remove\n");
+ return stat;
+}
+
+
+/*
+ * This is the worker thread that serves the job queue.
+ * When all the resources are acquired for the job,
+ * it will call the user's engine.
+ */
+static void *jobq_server(void *arg)
+{
+ struct timespec timeout;
+ jobq_t *jq = (jobq_t *)arg;
+ jobq_item_t *je; /* job entry in queue */
+ int stat;
+ bool timedout;
+
+ Dmsg0(200, "Start jobq_server\n");
+ if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+ return NULL;
+ }
+
+ for (;;) {
+ struct timeval tv;
+ struct timezone tz;
+
+ Dmsg0(200, "Top of for loop\n");
+ timedout = false;
+ Dmsg0(200, "gettimeofday()\n");
+ gettimeofday(&tv, &tz);
+ timeout.tv_nsec = 0;
+ timeout.tv_sec = tv.tv_sec + 2;
+
+ while (jq->list.empty() && !jq->quit) {
+ /*
+ * Wait 2 seconds, then if no more work, exit
+ */
+ Dmsg0(200, "pthread_cond_timedwait()\n");
+ stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
+ Dmsg1(200, "timedwait=%d\n", stat);
+ if (stat == ETIMEDOUT) {
+ timedout = true;
+ break;
+ } else if (stat != 0) {
+ /* This shouldn't happen */
+ Dmsg0(200, "This shouldn't happen\n");
+ jq->num_workers--;
+ pthread_mutex_unlock(&jq->mutex);
+ return NULL;
+ }
+ }
+ je = (jobq_item_t *)jq->list.first();
+ if (je != NULL) {
+ jq->list.remove(je);
+ if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
+ return NULL;
+ }
+ /* Call user's routine here */
+ Dmsg0(200, "Calling user engine.\n");
+ jq->engine(je->jcr);
+ Dmsg0(200, "Back from user engine.\n");
+ free(je); /* release job entry */
+ Dmsg0(200, "relock mutex\n");
+ if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+ return NULL;
+ }
+ Dmsg0(200, "Done lock mutex\n");
+ }
+ /*
+ * If no more work request, and we are asked to quit, then do it
+ */
+ if (jq->list.empty() && jq->quit) {
+ jq->num_workers--;
+ if (jq->num_workers == 0) {
+ Dmsg0(200, "Wake up destroy routine\n");
+ /* Wake up destroy routine if he is waiting */
+ pthread_cond_broadcast(&jq->work);
+ }
+ Dmsg0(200, "Unlock mutex\n");
+ pthread_mutex_unlock(&jq->mutex);
+ Dmsg0(200, "Return from jobq_server\n");
+ return NULL;
+ }
+ Dmsg0(200, "Check for work request\n");
+ /*
+ * If no more work requests, and we waited long enough, quit
+ */
+ Dmsg1(200, "jq empty = %d\n", jq->list.empty());
+ Dmsg1(200, "timedout=%d\n", timedout);
+ if (jq->list.empty() && timedout) {
+ Dmsg0(200, "break big loop\n");
+ jq->num_workers--;
+ break;
+ }
+ Dmsg0(200, "Loop again\n");
+ } /* end of big for loop */
+
+ Dmsg0(200, "unlock mutex\n");
+ pthread_mutex_unlock(&jq->mutex);
+ Dmsg0(200, "End jobq_server\n");
+ return NULL;
+}
--- /dev/null
+/*
+ * Bacula job queue routines.
+ *
+ * Kern Sibbald, July MMIII
+ *
+ * This code adapted from Bacula work queue code, which was
+ * adapted from "Programming with POSIX Threads", by
+ * David R. Butenhof
+ *
+ * Version $Id$
+ */
+/*
+ Copyright (C) 2000-2003 Kern Sibbald and John Walker
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public
+ License along with this program; if not, write to the Free
+ Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+ MA 02111-1307, USA.
+
+ */
+
+#ifndef __JOBQ_H
+#define __JOBQ_H 1
+
+/*
+ * Structure to keep track of job queue request
+ */
+struct jobq_item_t {
+ dlink link;
+ JCR *jcr;
+};
+
+/*
+ * Structure describing a work queue
+ */
+struct jobq_t {
+ pthread_mutex_t mutex; /* queue access control */
+ pthread_cond_t work; /* wait for work */
+ pthread_attr_t attr; /* create detached threads */
+ dlist list; /* list of jobs */
+ int valid; /* queue initialized */
+ bool quit; /* jobq should quit */
+ int max_workers; /* max threads */
+ int num_workers; /* current threads */
+ int idle_workers; /* idle threads */
+ void *(*engine)(void *arg); /* user engine */
+};
+
+#define JOBQ_VALID 0xdec1993
+
+extern int jobq_init(
+ jobq_t *wq,
+ int threads, /* maximum threads */
+ void *(*engine)(void *) /* engine routine */
+ );
+extern int jobq_destroy(jobq_t *wq);
+extern int jobq_add(jobq_t *wq, JCR *jcr);
+extern int jobq_remove(jobq_t *wq, JCR *jcr);
+
+#endif /* __JOBQ_H */
/* autorecycle.c */
extern int recycle_oldest_purged_volume(JCR *jcr, MEDIA_DBR *mr);
+extern int recycle_volume(JCR *jcr, MEDIA_DBR *mr);
extern int find_recycled_volume(JCR *jcr, MEDIA_DBR *mr);
/* backup.c */
/* ua_prune.c */
int prune_files(UAContext *ua, CLIENT *client);
int prune_jobs(UAContext *ua, CLIENT *client, int JobType);
-int prune_volume(UAContext *ua, POOL_DBR *pr, MEDIA_DBR *mr);
+int prune_volume(UAContext *ua, MEDIA_DBR *mr);
/* ua_purge.c */
int purge_jobs_from_volume(UAContext *ua, MEDIA_DBR *mr);
if (oldest.MediaId != 0) {
mr->MediaId = oldest.MediaId;
if (db_get_media_record(jcr, jcr->db, mr)) {
- strcpy(mr->VolStatus, "Recycle");
- mr->VolJobs = mr->VolFiles = mr->VolBlocks = mr->VolErrors = 0;
- mr->VolBytes = 0;
- mr->FirstWritten = mr->LastWritten = 0;
- if (db_update_media_record(jcr, jcr->db, mr)) {
+ if (recycle_volume(jcr, mr)) {
Jmsg(jcr, M_INFO, 0, "Recycled volume \"%s\"\n", mr->VolumeName);
Dmsg1(100, "Exit 1 recycle_oldest_purged_volume Vol=%s\n", mr->VolumeName);
return 1;
Dmsg0(100, "Exit 0 recycle_oldest_purged_volume end\n");
return 0;
}
+
+/*
+ * Recycle the specified volume
+ */
+int recycle_volume(JCR *jcr, MEDIA_DBR *mr)
+{
+ strcpy(mr->VolStatus, "Recycle");
+ mr->VolJobs = mr->VolFiles = mr->VolBlocks = mr->VolErrors = 0;
+ mr->VolBytes = 0;
+ mr->FirstWritten = mr->LastWritten = 0;
+ return db_update_media_record(jcr, jcr->db, mr);
+}
{"level", 'L'},
{"storage", 'S'},
{"messages", 'M'},
+ {"priority", 'p'},
{NULL, 0}
};
scan_err1(lc, "Expected an equals, got: %s", lc->str);
/* NOT REACHED */
}
- token = lex_get_token(lc, T_NAME);
switch (RunFields[i].token) {
case 'L': /* level */
+ token = lex_get_token(lc, T_NAME);
for (j=0; joblevels[j].level_name; j++) {
if (strcasecmp(lc->str, joblevels[j].level_name) == 0) {
lrun.level = joblevels[j].level;
/* NOT REACHED */
}
break;
+ case 'p': /* Priority */
+ token = lex_get_token(lc, T_PINT32);
+ if (pass == 2) {
+ lrun.Priority = lc->pint32_val;
+ }
+ break;
case 'P': /* Pool */
+ token = lex_get_token(lc, T_NAME);
if (pass == 2) {
res = GetResWithName(R_POOL, lc->str);
if (res == NULL) {
}
break;
case 'S': /* storage */
+ token = lex_get_token(lc, T_NAME);
if (pass == 2) {
res = GetResWithName(R_STORAGE, lc->str);
if (res == NULL) {
}
break;
case 'M': /* messages */
+ token = lex_get_token(lc, T_NAME);
if (pass == 2) {
res = GetResWithName(R_MSGS, lc->str);
if (res == NULL) {
if (run->msgs) {
jcr->messages = run->msgs; /* override messages */
}
+ if (run->Priority) {
+ jcr->JobPriority = run->Priority;
+ }
Dmsg0(200, "Leave wait_for_next_job()\n");
return jcr;
}
int mark_media_purged(UAContext *ua, MEDIA_DBR *mr);
/* Forward referenced functions */
-int prune_files(UAContext *ua, CLIENT *client);
-int prune_jobs(UAContext *ua, CLIENT *client, int JobType);
-int prune_volume(UAContext *ua, POOL_DBR *pr, MEDIA_DBR *mr);
#define MAX_DEL_LIST_LEN 1000000
if (!confirm_retention(ua, &mr.VolRetention, "Volume")) {
return 0;
}
- prune_volume(ua, &pr, &mr);
+ prune_volume(ua, &mr);
return 1;
default:
break;
/*
* Prune a given Volume
*/
-int prune_volume(UAContext *ua, POOL_DBR *pr, MEDIA_DBR *mr)
+int prune_volume(UAContext *ua, MEDIA_DBR *mr)
{
char *query = (char *)get_pool_memory(PM_MESSAGE);
struct s_count_ctx cnt;
volatile int JobStatus; /* ready, running, blocked, terminated */
int JobType; /* backup, restore, verify ... */
int JobLevel; /* Job level */
+ int JobPriority; /* Job priority */
int authenticated; /* set when client authenticated */
time_t sched_time; /* job schedule time, i.e. when it should start */
time_t start_time; /* when job actually started */
uint32_t RestoreJobId; /* Id specified by UA */
POOLMEM *client_uname; /* client uname */
int replace; /* Replace option */
- int acquired_resource_locks; /* set if resource locks acquired */
+ bool acquired_resource_locks; /* set if resource locks acquired */
int NumVols; /* Number of Volume used in pool */
int reschedule_count; /* Number of times rescheduled */
#endif /* DIRECTOR_DAEMON */
Emsg1(M_FATAL, 0, _("Error in select: %s\n"), strerror(errno));
break;
}
- clilen = sizeof(cli_addr);
- newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen);
+ do {
+ clilen = sizeof(cli_addr);
+ newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen);
+ } while (newsockfd < 0 && errno == EINTR);
+ if (newsockfd < 0) {
+ continue;
+ }
#ifdef HAVE_LIBWRAP
P(mutex); /* hosts_access is not thread safe */
* Open a TCP socket
*/
for (tlog=0; (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0; tlog -= 10 ) {
+ if (errno == EINTR) {
+ continue;
+ }
if (tlog <= 0) {
tlog = 2*60;
Emsg1(M_ERROR, 0, _("Cannot open stream socket: %s\n"), strerror(errno));
serv_addr.sin_port = htons(port);
for (tlog=0; bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0; tlog -= 5 ) {
+ if (errno == EINTR) {
+ continue;
+ }
if (tlog <= 0) {
tlog = 2*60;
Emsg2(M_WARNING, 0, _("Cannot bind port %d: %s: retrying ...\n"), port, strerror(errno));
newsockfd = -1;
break;
}
- clilen = sizeof(cli_addr);
- newsockfd = accept(bsock->fd, (struct sockaddr *)&cli_addr, &clilen);
- break;
+ do {
+ clilen = sizeof(cli_addr);
+ newsockfd = accept(bsock->fd, (struct sockaddr *)&cli_addr, &clilen);
+ } while (newsockfd < 0 && errno == EINTR);
+ if (newsockfd >= 0) {
+ break;
+ }
}
#ifdef HAVE_LIBWRAP
}
}
+void dlist::insert_before(void *item, void *where)
+{
+ dlink *where_link = (dlink *)((char *)where+loffset);
+
+ ((dlink *)((char *)item+loffset))->next = where;
+ ((dlink *)((char *)item+loffset))->prev = where_link->prev;
+
+ if (where_link->prev) {
+ ((dlink *)((char *)(where_link->prev)+loffset))->next = item;
+ where_link->prev = item;
+ }
+ if (head == where) {
+ head = item;
+ }
+}
+
+void dlist::insert_after(void *item, void *where)
+{
+ dlink *where_link = (dlink *)((char *)where+loffset);
+
+ ((dlink *)((char *)item+loffset))->next = where_link->next;
+ ((dlink *)((char *)item+loffset))->prev = where;
+
+ if (where_link->next) {
+ ((dlink *)((char *)(where_link->next)+loffset))->prev = item;
+ where_link->next = item;
+ }
+ if (tail == where) {
+ tail = item;
+ }
+}
+
+
void dlist::remove(void *item)
{
void *xitem;
return ((dlink *)((char *)item+loffset))->next;
}
+void * dlist::prev(void *item)
+{
+ if (item == NULL) {
+ return tail;
+ }
+ return ((dlink *)((char *)item+loffset))->prev;
+}
+
+
/* Destroy the list and its contents */
void dlist::destroy()
{
{
char buf[30];
dlist *jcr_chain;
+ MYJCR *jcr = NULL;
MYJCR *save_jcr = NULL;
+ MYJCR *next_jcr;
jcr_chain = (dlist *)malloc(sizeof(dlist));
- jcr_chain->init((int)&MYJCR::link);
+ jcr_chain->init(jcr, &jcr->link);
printf("Prepend 20 items 0-19\n");
for (int i=0; i<20; i++) {
- MYJCR *jcr;
sprintf(buf, "This is dlist item %d", i);
jcr = (MYJCR *)malloc(sizeof(MYJCR));
jcr->buf = bstrdup(buf);
}
}
+ next_jcr = (MYJCR *)jcr_chain->next(save_jcr);
+ printf("11th item=%s\n", next_jcr->buf);
+ jcr = (MYJCR *)malloc(sizeof(MYJCR));
+ jcr->buf = save_jcr->buf;
printf("Remove 10th item\n");
- free(save_jcr->buf);
jcr_chain->remove(save_jcr);
+ printf("Re-insert 10th item\n");
+ jcr_chain->insert_before(jcr, next_jcr);
printf("Print remaining list.\n");
for (MYJCR *jcr=NULL; (jcr=(MYJCR *)jcr_chain->next(jcr)); ) {
void *tail;
int loffset;
public:
- dlist(int offset);
- void init(int offset);
+ dlist(void *item, void *link);
+ void init(void *item, void *link);
void prepend(void *item);
void append(void *item);
+ void insert_before(void *item, void *where);
+ void insert_after(void *item, void *where);
void remove(void *item);
+ bool empty();
void *next(void *item);
+ void *prev(void *item);
void destroy();
void *first();
void *last();
* allowing us to mix C++ classes inside malloc'ed
* C structures. Define before called in constructor.
*/
-inline void dlist::init(int offset) {
+inline void dlist::init(void *item, void *link)
+{
head = tail = NULL;
- loffset = (int)offset;
+ loffset = (char *)link - (char *)item;
}
/* Constructor */
-inline dlist::dlist(int offset) {
- this->init(offset);
+inline dlist::dlist(void *item, void *link)
+{
+ this->init(item, link);
+}
+
+inline bool dlist::empty()
+{
+ return head == NULL;
}
inline void * dlist::operator new(size_t)
/* Serialisation support functions from serial.c. */
-extern void serial_int16(uint8_t * * ptr, int16_t v);
-extern void serial_uint16(uint8_t * * ptr, uint16_t v);
-extern void serial_int32(uint8_t * * ptr, int32_t v);
-extern void serial_uint32(uint8_t * * ptr, uint32_t v);
+extern void serial_int16(uint8_t * * const ptr, const int16_t v);
+extern void serial_uint16(uint8_t * * const ptr, const uint16_t v);
+extern void serial_int32(uint8_t * * const ptr, const int32_t v);
+extern void serial_uint32(uint8_t * * const ptr, const uint32_t v);
extern void serial_int64(uint8_t * * ptr, int64_t v);
-extern void serial_uint64(uint8_t * * ptr, uint64_t v);
-extern void serial_btime(uint8_t * * ptr, btime_t v);
-extern void serial_float64(uint8_t * * ptr, float64_t v);
-extern int serial_string(uint8_t * ptr, char * str);
-extern int16_t unserial_int16(uint8_t * * ptr);
-extern uint16_t unserial_uint16(uint8_t * * ptr);
-extern int32_t unserial_int32(uint8_t * * ptr);
-extern uint32_t unserial_uint32(uint8_t * * ptr);
-extern int64_t unserial_int64(uint8_t * * ptr);
-extern uint64_t unserial_uint64(uint8_t * * ptr);
-extern btime_t unserial_btime(uint8_t * * ptr);
-extern float64_t unserial_float64(uint8_t * * ptr);
-extern int unserial_string(uint8_t * ptr, char * str);
+extern void serial_uint64(uint8_t * * const ptr, const uint64_t v);
+extern void serial_btime(uint8_t * * const ptr, const btime_t v);
+extern void serial_float64(uint8_t * * const ptr, const float64_t v);
+extern int serial_string(uint8_t * const ptr, char * const str);
+extern int16_t unserial_int16(uint8_t * * const ptr);
+extern uint16_t unserial_uint16(uint8_t * * const ptr);
+extern int32_t unserial_int32(uint8_t * * const ptr);
+extern uint32_t unserial_uint32(uint8_t * * const ptr);
+extern int64_t unserial_int64(uint8_t * * const ptr);
+extern uint64_t unserial_uint64(uint8_t * * const ptr);
+extern btime_t unserial_btime(uint8_t * * const ptr);
+extern float64_t unserial_float64(uint8_t * * const ptr);
+extern int unserial_string(uint8_t * const ptr, char * const str);
/*
- Serialisation Macros
+ Serialisation Macros
These macros use a uint8_t pointer, ser_ptr, which must be
defined by the code which uses them.
#define __SERIAL_H_ 1
/* ser_declare -- Declare ser_ptr locally within a function. */
-#define ser_declare uint8_t *ser_ptr
-#define unser_declare uint8_t *ser_ptr
+#define ser_declare uint8_t *ser_ptr
+#define unser_declare uint8_t *ser_ptr
/* ser_begin(x, s) -- Begin serialisation into a buffer x of size s. */
#define ser_begin(x, s) ser_ptr = ((uint8_t *)(x))
#define unser_begin(x, s) ser_ptr = ((uint8_t *)(x))
-/* ser_length -- Determine length in bytes of serialised into a
- buffer x. */
+/* ser_length -- Determine length in bytes of serialised into a
+ buffer x. */
#define ser_length(x) (ser_ptr - (uint8_t *)(x))
#define unser_length(x) (ser_ptr - (uint8_t *)(x))
/* ser_end(x, s) -- End serialisation into a buffer x of size s. */
-#define ser_end(x, s) ASSERT(ser_length(x) <= (s))
+#define ser_end(x, s) ASSERT(ser_length(x) <= (s))
#define unser_end(x, s) ASSERT(ser_length(x) <= (s))
/* ser_check(x, s) -- Verify length of serialised data in buffer x is
- expected length s. */
+ expected length s. */
#define ser_check(x, s) ASSERT(ser_length(x) == (s))
-/* Serialisation */
+/* Serialisation */
/* 8 bit signed integer */
-#define ser_int8(x) *ser_ptr++ = (x)
+#define ser_int8(x) *ser_ptr++ = (x)
/* 8 bit unsigned integer */
-#define ser_uint8(x) *ser_ptr++ = (x)
+#define ser_uint8(x) *ser_ptr++ = (x)
/* 16 bit signed integer */
-#define ser_int16(x) serial_int16(&ser_ptr, x)
+#define ser_int16(x) serial_int16(&ser_ptr, x)
/* 16 bit unsigned integer */
-#define ser_uint16(x) serial_uint16(&ser_ptr, x)
+#define ser_uint16(x) serial_uint16(&ser_ptr, x)
/* 32 bit signed integer */
-#define ser_int32(x) serial_int32(&ser_ptr, x)
+#define ser_int32(x) serial_int32(&ser_ptr, x)
/* 32 bit unsigned integer */
-#define ser_uint32(x) serial_uint32(&ser_ptr, x)
+#define ser_uint32(x) serial_uint32(&ser_ptr, x)
/* 64 bit signed integer */
-#define ser_int64(x) serial_int64(&ser_ptr, x)
+#define ser_int64(x) serial_int64(&ser_ptr, x)
/* 64 bit unsigned integer */
-#define ser_uint64(x) serial_uint64(&ser_ptr, x)
+#define ser_uint64(x) serial_uint64(&ser_ptr, x)
/* btime -- 64 bit unsigned integer */
#define ser_btime(x) serial_btime(&ser_ptr, x)
/* 64 bit IEEE floating point number */
-#define ser_float64(x) serial_float64(&ser_ptr, x)
+#define ser_float64(x) serial_float64(&ser_ptr, x)
/* 128 bit signed integer */
-#define ser_int128(x) memcpy(ser_ptr, x, sizeof(int128_t)), ser_ptr += sizeof(int128_t)
+#define ser_int128(x) memcpy(ser_ptr, x, sizeof(int128_t)), ser_ptr += sizeof(int128_t)
/* Binary byte stream len bytes not requiring serialisation */
#define ser_bytes(x, len) memcpy(ser_ptr, (x), (len)), ser_ptr += (len)
-/* Binary byte stream not requiring serialisation (length obtained by sizeof) */
-#define ser_buffer(x) ser_bytes((x), (sizeof (x)))
+/* Binary byte stream not requiring serialisation (length obtained by sizeof) */
+#define ser_buffer(x) ser_bytes((x), (sizeof (x)))
/* Binary string not requiring serialization */
-#define ser_string(x) ser_ptr += serial_string(ser_ptr, (x))
+#define ser_string(x) ser_ptr += serial_string(ser_ptr, (x))
-/* Unserialisation */
+/* Unserialisation */
/* 8 bit signed integer */
-#define unser_int8(x) (x) = *ser_ptr++
+#define unser_int8(x) (x) = *ser_ptr++
/* 8 bit unsigned integer */
-#define unser_uint8(x) (x) = *ser_ptr++
+#define unser_uint8(x) (x) = *ser_ptr++
/* 16 bit signed integer */
-#define unser_int16(x) (x) = unserial_int16(&ser_ptr)
+#define unser_int16(x) (x) = unserial_int16(&ser_ptr)
/* 16 bit unsigned integer */
#define unser_uint16(x) (x) = unserial_uint16(&ser_ptr)
/* 32 bit signed integer */
-#define unser_int32(x) (x) = unserial_int32(&ser_ptr)
+#define unser_int32(x) (x) = unserial_int32(&ser_ptr)
/* 32 bit unsigned integer */
#define unser_uint32(x) (x) = unserial_uint32(&ser_ptr)
/* 64 bit signed integer */
-#define unser_int64(x) (x) = unserial_int64(&ser_ptr)
+#define unser_int64(x) (x) = unserial_int64(&ser_ptr)
/* 64 bit unsigned integer */
#define unser_uint64(x) (x) = unserial_uint64(&ser_ptr)
/* Binary byte stream len bytes not requiring serialisation */
#define unser_bytes(x, len) memcpy((x), ser_ptr, (len)), ser_ptr += (len)
-/* Binary byte stream not requiring serialisation (length obtained by sizeof) */
+/* Binary byte stream not requiring serialisation (length obtained by sizeof) */
#define unser_buffer(x) unser_bytes((x), (sizeof (x)))
/* Binary string not requiring serialization */
* Version $Id$
*/
/*
- Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker
+ Copyright (C) 2000-2003 Kern Sibbald and John Walker
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
/* */
#define VERSION "1.31"
#define VSTRING "1"
-#define BDATE "14 Jul 2003"
-#define LSMDATE "14Jul03"
+#define BDATE "17 Jul 2003"
+#define LSMDATE "17Jul03"
/* Debug flags */
#define DEBUG 1