From 9a464ac0bc26a48b417b02099b6ee508ef4842fc Mon Sep 17 00:00:00 2001 From: Kern Sibbald Date: Thu, 17 Jul 2003 20:43:34 +0000 Subject: [PATCH] add jobq+serial.h+priorities+recycling git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@636 91ce42f0-d328-0410-95d8-f526ca767f89 --- bacula/ChangeLog | 17 ++ bacula/kernstodo | 33 +++- bacula/src/cats/protos.h | 6 +- bacula/src/cats/sql_get.c | 7 +- bacula/src/dird/Makefile.in | 4 +- bacula/src/dird/autoprune.c | 11 +- bacula/src/dird/catreq.c | 58 +++++- bacula/src/dird/dird.h | 1 + bacula/src/dird/dird_conf.c | 223 +++++++++++----------- bacula/src/dird/dird_conf.h | 25 ++- bacula/src/dird/job.c | 78 +++++--- bacula/src/dird/jobq.c | 358 +++++++++++++++++++++++++++++++++++ bacula/src/dird/jobq.h | 70 +++++++ bacula/src/dird/protos.h | 3 +- bacula/src/dird/recycle.c | 18 +- bacula/src/dird/run_conf.c | 12 +- bacula/src/dird/scheduler.c | 3 + bacula/src/dird/ua_prune.c | 7 +- bacula/src/jcr.h | 3 +- bacula/src/lib/bnet_server.c | 25 ++- bacula/src/lib/dlist.c | 54 +++++- bacula/src/lib/dlist.h | 23 ++- bacula/src/lib/serial.h | 90 ++++----- bacula/src/lib/workq.h | 2 +- bacula/src/version.h | 4 +- 25 files changed, 890 insertions(+), 245 deletions(-) create mode 100755 bacula/src/dird/jobq.c create mode 100644 bacula/src/dird/jobq.h diff --git a/bacula/ChangeLog b/bacula/ChangeLog index 3f0f48720c..c0848f4018 100644 --- a/bacula/ChangeLog +++ b/bacula/ChangeLog @@ -1,4 +1,21 @@ +2003-07-xx Version 1.31 Beta xxJul03 +- Take serial.h provided by David Craigon, which corrects differences in + prototypes between serial.h and serial.c. +- Make db_get_media_ids() return Media Ids only for the current pool. +- Add new jobq.h and jobq.c drived from workq. +- Add JobPriority to jcr, and Priority to Job resource as well as + to the run line in a Schedule. +- Remove unused pool record from autoprune.c. +- Implement Nic Bellamy's RecycleCurrentVolume. +- Implement RecycleOldestVolume. +- Begin adding new JOB_QUEUE code to the Director. +- Create a single routine recycle_volume(). +- Retry accept(), bind() and socket() if EINTR occurs. +- Implement insert_before(), insert_after(), and empty() for dlist class. + Also require offset to be given by giving item and link address. +- Make error some messages in smtp.c a bit more explicit. + 2003-07-12 Version 1.31 Beta 14Jul03 - Marc Brueckner reported a crash during restore (a missing tree->) - Moved host.h.in file from filed to src. diff --git a/bacula/kernstodo b/bacula/kernstodo index ae110d00ce..d7bc5160ee 100644 --- a/bacula/kernstodo +++ b/bacula/kernstodo @@ -18,7 +18,37 @@ Documentation to do: (any release a little bit at a time) (./create_mys... ./make_my...). - Document all the status codes JobLevel, JobType, JobStatus. - Document dynamic DNS - +- Document using multiple Pools for daily rotating tapes. +- Fix default time in seconds without qualifier -- bacula-dir.conf +- From Marc Brueckner der Megauser + Backup OS Restore OS Result + ------------------------------------------------------------------------------- + WinME WinME Works + WinME WinNT Works (minor problems with rights) + WinME WINXP Works (minor problems with rights) + WinME Linux Works (minor problems with rights) + + + WinXP WinXP Works + WinXP WinNT Works * + WinXP WinME Error: Win32 data stream not supported on this Client + WinXP Linux Error: Win32 data stream not supported on this Client + + WinNT WinNT Works + WinNT WinXP Works + WinNT WinME Error: Win32 data stream not supported on this Client + WinNT Linux Error: Win32 data stream not supported on this Client + + Linux Linux Works + Linux WinNT Works (minor problems with rights) + Linux WinME Client hangs + * When restoring files on WinNT previously backed up on XP I get the following + Message for some files. + But any file was restored correctly ! + + Write error on c:/tmp/transfer/WinSCP2.exe: ERR=Die Daten sind unzulassig. (the + data is invalid). +- Add Nic Bellamy's backup scheme. Testing to do: (painful) - that ALL console command line options work and are always implemented @@ -1047,4 +1077,3 @@ Done: (see kernsdone for more) - Update Automatic Volume Labeling in disk.wml - Figure out how to handle DHCP IP addresses -- use dynamic DNS. - Add OS, GCC version to traceback output. - diff --git a/bacula/src/cats/protos.h b/bacula/src/cats/protos.h index ff1ccede2c..6ce3efee0c 100644 --- a/bacula/src/cats/protos.h +++ b/bacula/src/cats/protos.h @@ -31,7 +31,7 @@ /* sql.c */ B_DB *db_init_database(JCR *jcr, char *db_name, char *db_user, char *db_password, - char *db_address, int db_port, char *db_socket); + char *db_address, int db_port, char *db_socket); int db_open_database(JCR *jcr, B_DB *db); void db_close_database(JCR *jcr, B_DB *db); void db_escape_string(char *snew, char *old, int len); @@ -51,7 +51,7 @@ int db_create_job_record(JCR *jcr, B_DB *db, JOB_DBR *jr); int db_create_media_record(JCR *jcr, B_DB *db, MEDIA_DBR *media_dbr); int db_create_client_record(JCR *jcr, B_DB *db, CLIENT_DBR *cr); int db_create_fileset_record(JCR *jcr, B_DB *db, FILESET_DBR *fsr); -int db_create_pool_record(JCR *jcr, B_DB *db, POOL_DBR *pool_dbr); +int db_create_pool_record(JCR *jcr, B_DB *db, POOL_DBR *pool_dbr); int db_create_jobmedia_record(JCR *jcr, B_DB *mdb, JOBMEDIA_DBR *jr); int db_create_counter_record(JCR *jcr, B_DB *mdb, COUNTER_DBR *cr); @@ -76,7 +76,7 @@ int db_get_num_media_records(JCR *jcr, B_DB *mdb); int db_get_num_pool_records(JCR *jcr, B_DB *mdb); int db_get_pool_ids(JCR *jcr, B_DB *mdb, int *num_ids, uint32_t **ids); int db_get_client_ids(JCR *jcr, B_DB *mdb, int *num_ids, uint32_t **ids); -int db_get_media_ids(JCR *jcr, B_DB *mdb, int *num_ids, uint32_t **ids); +int db_get_media_ids(JCR *jcr, B_DB *mdb, uint32_t PoolId, int *num_ids, uint32_t **ids); int db_get_job_volume_parameters(JCR *jcr, B_DB *mdb, uint32_t JobId, VOL_PARAMS **VolParams); int db_get_client_record(JCR *jcr, B_DB *mdb, CLIENT_DBR *cdbr); int db_get_counter_record(JCR *jcr, B_DB *mdb, COUNTER_DBR *cr); diff --git a/bacula/src/cats/sql_get.c b/bacula/src/cats/sql_get.c index 0663905db8..285558286c 100644 --- a/bacula/src/cats/sql_get.c +++ b/bacula/src/cats/sql_get.c @@ -723,13 +723,14 @@ int db_get_num_media_records(JCR *jcr, B_DB *mdb) /* - * This function returns a list of all the Media record ids. + * This function returns a list of all the Media record ids for + * the current Pool. * The caller must free ids if non-NULL. * * Returns 0: on failure * 1: on success */ -int db_get_media_ids(JCR *jcr, B_DB *mdb, int *num_ids, uint32_t *ids[]) +int db_get_media_ids(JCR *jcr, B_DB *mdb, uint32_t PoolId, int *num_ids, uint32_t *ids[]) { SQL_ROW row; int stat = 0; @@ -738,7 +739,7 @@ int db_get_media_ids(JCR *jcr, B_DB *mdb, int *num_ids, uint32_t *ids[]) db_lock(mdb); *ids = NULL; - Mmsg(&mdb->cmd, "SELECT MediaId FROM Media"); + Mmsg(&mdb->cmd, "SELECT MediaId FROM Media WHERE PoolId=%u", PoolId); if (QUERY_DB(jcr, mdb, mdb->cmd)) { *num_ids = sql_num_rows(mdb); if (*num_ids > 0) { diff --git a/bacula/src/dird/Makefile.in b/bacula/src/dird/Makefile.in index 2b0cafa68a..8ad21fbb0d 100644 --- a/bacula/src/dird/Makefile.in +++ b/bacula/src/dird/Makefile.in @@ -26,7 +26,7 @@ SVRSRCS = dird.c admin.c authenticate.c \ autoprune.c backup.c bsr.c \ catreq.c dird_conf.c expand.c \ fd_cmds.c getmsg.c inc_conf.c job.c \ - mountreq.c msgchan.c newvol.c \ + jobq.c mountreq.c msgchan.c newvol.c \ recycle.c restore.c run_conf.c \ scheduler.c sql_cmds.c \ ua_cmds.c ua_dotcmds.c \ @@ -39,7 +39,7 @@ SVROBJS = dird.o admin.o authenticate.o \ autoprune.o backup.o bsr.o \ catreq.o dird_conf.o expand.o \ fd_cmds.o getmsg.o inc_conf.o job.o \ - mountreq.o msgchan.o newvol.o \ + jobq.o mountreq.o msgchan.o newvol.o \ recycle.o restore.o run_conf.o \ scheduler.o sql_cmds.o \ ua_cmds.o ua_dotcmds.o \ diff --git a/bacula/src/dird/autoprune.c b/bacula/src/dird/autoprune.c index 46107d7426..a7fb458a59 100644 --- a/bacula/src/dird/autoprune.c +++ b/bacula/src/dird/autoprune.c @@ -89,7 +89,6 @@ int prune_volumes(JCR *jcr) uint32_t *ids = NULL; int num_ids = 0; MEDIA_DBR mr; - POOL_DBR pr; UAContext *ua; if (!jcr->job->PruneVolumes && !jcr->pool->AutoPrune) { @@ -97,14 +96,12 @@ int prune_volumes(JCR *jcr) return 0; } memset(&mr, 0, sizeof(mr)); - memset(&pr, 0, sizeof(pr)); ua = new_ua_context(jcr); db_lock(jcr->db); - /* Get the Pool Record and a list of Media Id's in the Pool */ - pr.PoolId = jcr->PoolId; - if (!db_get_pool_record(jcr, jcr->db, &pr) || !db_get_media_ids(jcr, jcr->db, &num_ids, &ids)) { + /* Get the List of all media ids in the current Pool */ + if (!db_get_media_ids(jcr, jcr->db, jcr->PoolId, &num_ids, &ids)) { Jmsg(jcr, M_ERROR, 0, "%s", db_strerror(jcr->db)); goto bail_out; } @@ -117,7 +114,7 @@ int prune_volumes(JCR *jcr) continue; } /* Prune only Volumes from current Pool */ - if (pr.PoolId != mr.PoolId) { + if (jcr->PoolId != mr.PoolId) { continue; } /* Prune only Volumes with status "Full", "Used", or "Append" */ @@ -125,7 +122,7 @@ int prune_volumes(JCR *jcr) strcmp(mr.VolStatus, "Append") == 0 || strcmp(mr.VolStatus, "Used") == 0) { Dmsg1(200, "Prune Volume %s\n", mr.VolumeName); - stat += prune_volume(ua, &pr, &mr); + stat += prune_volume(ua, &mr); Dmsg1(200, "Num pruned = %d\n", stat); } } diff --git a/bacula/src/dird/catreq.c b/bacula/src/dird/catreq.c index 5d3c306fd3..dc08a48291 100644 --- a/bacula/src/dird/catreq.c +++ b/bacula/src/dird/catreq.c @@ -108,9 +108,10 @@ void catalog_request(JCR *jcr, BSOCK *bs, char *msg) } } - if (!ok && jcr->pool->purge_oldest_volume) { - Dmsg1(200, "No next volume found. PurgeOldest=%d\n", - jcr->pool->purge_oldest_volume); + if (!ok && (jcr->pool->purge_oldest_volume || + jcr->pool->recycle_oldest_volume)) { + Dmsg2(200, "No next volume found. PurgeOldest=%d\n RecyleOldest=%d", + jcr->pool->purge_oldest_volume, jcr->pool->recycle_oldest_volume); /* Find oldest volume to recycle */ ok = db_find_next_volume(jcr, jcr->db, -1, &mr); Dmsg1(400, "Find oldest=%d\n", ok); @@ -119,12 +120,17 @@ void catalog_request(JCR *jcr, BSOCK *bs, char *msg) Dmsg0(400, "Try purge.\n"); /* Try to purge oldest volume */ ua = new_ua_context(jcr); - Jmsg(jcr, M_INFO, 0, _("Purging oldest volume \"%s\"\n"), mr.VolumeName); - ok = purge_jobs_from_volume(ua, &mr); + if (jcr->pool->purge_oldest_volume) { + Jmsg(jcr, M_INFO, 0, _("Purging oldest volume \"%s\"\n"), mr.VolumeName); + ok = purge_jobs_from_volume(ua, &mr); + } else { + Jmsg(jcr, M_INFO, 0, _("Pruning oldest volume \"%s\"\n"), mr.VolumeName); + ok = prune_volume(ua, &mr); + } free_ua_context(ua); if (ok) { - ok = recycle_oldest_purged_volume(jcr, &mr); - Dmsg1(400, "Recycle after recycle oldest=%d\n", ok); + ok = recycle_volume(jcr, &mr); + Dmsg1(400, "Recycle after purge oldest=%d\n", ok); } } } @@ -187,13 +193,13 @@ void catalog_request(JCR *jcr, BSOCK *bs, char *msg) */ unbash_spaces(mr.VolumeName); if (db_get_media_record(jcr, jcr->db, &mr)) { - int VolSuitable = 0; + bool VolSuitable = false; char *reason = ""; /* detailed reason for rejection */ jcr->MediaId = mr.MediaId; Dmsg1(120, "VolumeInfo MediaId=%d\n", jcr->MediaId); pm_strcpy(&jcr->VolumeName, mr.VolumeName); if (!writing) { - VolSuitable = 1; /* accept anything for read */ + VolSuitable = true; /* accept anything for read */ } else { /* * SD wants to write this Volume, so make @@ -206,12 +212,44 @@ void catalog_request(JCR *jcr, BSOCK *bs, char *msg) } else if (strcmp(mr.VolStatus, "Append") != 0 && strcmp(mr.VolStatus, "Recycle") != 0) { reason = "not Append or Recycle"; + /* XXX nicb start */ + /* What we're trying to do here is see if the current volume is + * "recycleable" - ie. if we prune all expired jobs off it, is + * it now possible to reuse it for the job that it is currently + * needed for? + */ + if ((mr.LastWritten + mr.VolRetention) < (utime_t)time(NULL) + && mr.Recycle && jcr->pool->recycle_current_volume + && (strcmp(mr.VolStatus, "Full") == 0 || + strcmp(mr.VolStatus, "Used") == 0)) { + /* + * Attempt prune of current volume to see if we can + * recycle it for use. + */ + UAContext *ua; + + reason = "not Append or Recycle (auto recycle failed)"; + + ua = new_ua_context(jcr); + ok = prune_volume(ua, &mr); + free_ua_context(ua); + + if (ok) { + /* If fully purged, recycle current volume */ + if (recycle_volume(jcr, &mr)) { + Jmsg(jcr, M_INFO, 0, "Recycled current " + "volume \"%s\"\n", mr.VolumeName); + VolSuitable = true; + } + } + } + /* XXX nicb end */ } else if (strcmp(mr.MediaType, jcr->store->media_type) != 0) { reason = "not correct MediaType"; } else if (!jcr->pool->accept_any_volume) { reason = "Volume not in sequence"; } else { - VolSuitable = 1; + VolSuitable = true; } } if (VolSuitable) { diff --git a/bacula/src/dird/dird.h b/bacula/src/dird/dird.h index ec6f1ebb68..81fd847f40 100644 --- a/bacula/src/dird/dird.h +++ b/bacula/src/dird/dird.h @@ -42,6 +42,7 @@ #include "bsr.h" #include "ua.h" #include "protos.h" +#include "jobq.h" /* Globals that dird.c exports */ extern int debug_level; diff --git a/bacula/src/dird/dird_conf.c b/bacula/src/dird/dird_conf.c index 7b8b14aece..99c8577b9f 100644 --- a/bacula/src/dird/dird_conf.c +++ b/bacula/src/dird/dird_conf.c @@ -1,7 +1,7 @@ /* * Main configuration file parser for Bacula Directors, * some parts may be split into separate files such as - * the schedule configuration (sch_config.c). + * the schedule configuration (run_config.c). * * Note, the configuration file parser consists of three parts * @@ -50,6 +50,7 @@ */ int r_first = R_FIRST; int r_last = R_LAST; + pthread_mutex_t res_mutex = PTHREAD_MUTEX_INITIALIZER; /* Imported subroutines */ @@ -213,6 +214,7 @@ static struct res_items job_items[] = { {"rescheduleonerror", store_yesno, ITEM(res_job.RescheduleOnError), 1, ITEM_DEFAULT, 0}, {"rescheduleinterval", store_time, ITEM(res_job.RescheduleInterval), 0, ITEM_DEFAULT, 60 * 30}, {"rescheduletimes", store_pint, ITEM(res_job.RescheduleTimes), 0, 0, 0}, + {"priority", store_pint, ITEM(res_job.Priority), 0, ITEM_DEFAULT, 10}, {NULL, NULL, NULL, 0, 0, 0} }; @@ -265,6 +267,8 @@ static struct res_items pool_items[] = { {"usecatalog", store_yesno, ITEM(res_pool.use_catalog), 1, ITEM_DEFAULT, 1}, {"usevolumeonce", store_yesno, ITEM(res_pool.use_volume_once), 1, 0, 0}, {"purgeoldestvolume", store_yesno, ITEM(res_pool.purge_oldest_volume), 1, 0, 0}, + {"recycleoldestvolume", store_yesno, ITEM(res_pool.recycle_oldest_volume), 1, 0, 0}, + {"recyclecurrentvolume", store_yesno, ITEM(res_pool.recycle_current_volume), 1, 0, 0}, {"maximumvolumes", store_pint, ITEM(res_pool.max_volumes), 0, 0, 0}, {"maximumvolumejobs", store_pint, ITEM(res_pool.MaxVolJobs), 0, 0, 0}, {"maximumvolumefiles", store_pint, ITEM(res_pool.MaxVolFiles), 0, 0, 0}, @@ -438,10 +442,8 @@ void dump_resource(int type, RES *reshdr, void sendit(void *sock, char *fmt, ... sendit(sock, " --> "); dump_resource(-R_COUNTER, (RES *)res->res_counter.WrapCounter, sendit, sock); } + break; - - - break; case R_CLIENT: sendit(sock, "Client: name=%s address=%s FDport=%d MaxJobs=%u\n", res->res_client.hdr.name, res->res_client.address, res->res_client.FDport, @@ -468,9 +470,10 @@ void dump_resource(int type, RES *reshdr, void sendit(void *sock, char *fmt, ... res->res_cat.db_port, res->res_cat.db_name, NPRT(res->res_cat.db_user)); break; case R_JOB: - sendit(sock, "Job: name=%s JobType=%d level=%s MaxJobs=%u\n", + sendit(sock, "Job: name=%s JobType=%d level=%s Priority=%d MaxJobs=%u\n", res->res_job.hdr.name, res->res_job.JobType, - level_to_str(res->res_job.level), res->res_job.MaxConcurrentJobs); + level_to_str(res->res_job.level), res->res_job.Priority, + res->res_job.MaxConcurrentJobs); sendit(sock, " Resched=%d Times=%d Interval=%s\n", res->res_job.RescheduleOnError, res->res_job.RescheduleTimes, edit_uint64_with_commas(res->res_job.RescheduleInterval, ed1)); @@ -1115,7 +1118,7 @@ static void store_backup(LEX *lc, struct res_items *item, int index, int pass) ((JOB *)(item->value))->JobType = item->code; while ((token = lex_get_token(lc, T_ALL)) != T_EOL) { - int found; + bool found = false; Dmsg1(150, "store_backup got token=%s\n", lex_tok_to_str(token)); @@ -1123,52 +1126,51 @@ static void store_backup(LEX *lc, struct res_items *item, int index, int pass) scan_err1(lc, "Expected a backup/verify keyword, got: %s", lc->str); } Dmsg1(190, "Got keyword: %s\n", lc->str); - found = FALSE; for (i=0; BakVerFields[i].name; i++) { if (strcasecmp(lc->str, BakVerFields[i].name) == 0) { - found = TRUE; + found = true; if (lex_get_token(lc, T_ALL) != T_EQUALS) { scan_err1(lc, "Expected an equals, got: %s", lc->str); } token = lex_get_token(lc, T_NAME); Dmsg1(190, "Got value: %s\n", lc->str); switch (BakVerFields[i].token) { - case 'C': - /* Find Client Resource */ - if (pass == 2) { - res = GetResWithName(R_CLIENT, lc->str); - if (res == NULL) { - scan_err1(lc, "Could not find specified Client Resource: %s", - lc->str); - } - res_all.res_job.client = (CLIENT *)res; + case 'C': + /* Find Client Resource */ + if (pass == 2) { + res = GetResWithName(R_CLIENT, lc->str); + if (res == NULL) { + scan_err1(lc, "Could not find specified Client Resource: %s", + lc->str); } - break; - case 'F': - /* Find FileSet Resource */ - if (pass == 2) { - res = GetResWithName(R_FILESET, lc->str); - if (res == NULL) { - scan_err1(lc, "Could not find specified FileSet Resource: %s\n", - lc->str); - } - res_all.res_job.fileset = (FILESET *)res; - } - break; - case 'L': - /* Get level */ - for (i=0; joblevels[i].level_name; i++) { - if (joblevels[i].job_type == item->code && - strcasecmp(lc->str, joblevels[i].level_name) == 0) { - ((JOB *)(item->value))->level = joblevels[i].level; - i = 0; - break; - } + res_all.res_job.client = (CLIENT *)res; + } + break; + case 'F': + /* Find FileSet Resource */ + if (pass == 2) { + res = GetResWithName(R_FILESET, lc->str); + if (res == NULL) { + scan_err1(lc, "Could not find specified FileSet Resource: %s\n", + lc->str); } - if (i != 0) { - scan_err1(lc, "Expected a Job Level keyword, got: %s", lc->str); + res_all.res_job.fileset = (FILESET *)res; + } + break; + case 'L': + /* Get level */ + for (i=0; joblevels[i].level_name; i++) { + if (joblevels[i].job_type == item->code && + strcasecmp(lc->str, joblevels[i].level_name) == 0) { + ((JOB *)(item->value))->level = joblevels[i].level; + i = 0; + break; } - break; + } + if (i != 0) { + scan_err1(lc, "Expected a Job Level keyword, got: %s", lc->str); + } + break; } /* end switch */ break; } /* end if strcmp() */ @@ -1199,91 +1201,90 @@ static void store_restore(LEX *lc, struct res_items *item, int index, int pass) ((JOB *)(item->value))->JobType = item->code; while ((token = lex_get_token(lc, T_ALL)) != T_EOL) { - int found; + bool found = false; if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) { scan_err1(lc, "expected a name, got: %s", lc->str); } - found = FALSE; for (i=0; RestoreFields[i].name; i++) { Dmsg1(190, "Restore kw=%s\n", lc->str); if (strcasecmp(lc->str, RestoreFields[i].name) == 0) { - found = TRUE; + found = true; if (lex_get_token(lc, T_ALL) != T_EQUALS) { scan_err1(lc, "Expected an equals, got: %s", lc->str); } token = lex_get_token(lc, T_ALL); Dmsg1(190, "Restore value=%s\n", lc->str); switch (RestoreFields[i].token) { - case 'B': - /* Bootstrap */ - if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) { - scan_err1(lc, "Expected a Restore bootstrap file, got: %s", lc->str); - } - if (pass == 1) { - res_all.res_job.RestoreBootstrap = bstrdup(lc->str); - } - break; - case 'C': - /* Find Client Resource */ - if (pass == 2) { - res = GetResWithName(R_CLIENT, lc->str); - if (res == NULL) { - scan_err1(lc, "Could not find specified Client Resource: %s", - lc->str); - } - res_all.res_job.client = (CLIENT *)res; - } - break; - case 'F': - /* Find FileSet Resource */ - if (pass == 2) { - res = GetResWithName(R_FILESET, lc->str); - if (res == NULL) { - scan_err1(lc, "Could not find specified FileSet Resource: %s\n", - lc->str); - } - res_all.res_job.fileset = (FILESET *)res; - } - break; - case 'J': - /* JobId */ - if (token != T_NUMBER) { - scan_err1(lc, "expected an integer number, got: %s", lc->str); - } - errno = 0; - res_all.res_job.RestoreJobId = strtol(lc->str, NULL, 0); - Dmsg1(190, "RestorJobId=%d\n", res_all.res_job.RestoreJobId); - if (errno != 0) { - scan_err1(lc, "expected an integer number, got: %s", lc->str); - } - break; - case 'W': - /* Where */ - if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) { - scan_err1(lc, "Expected a Restore root directory, got: %s", lc->str); - } - if (pass == 1) { - res_all.res_job.RestoreWhere = bstrdup(lc->str); - } - break; - case 'R': - /* Replacement options */ - if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) { - scan_err1(lc, "Expected a keyword name, got: %s", lc->str); + case 'B': + /* Bootstrap */ + if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) { + scan_err1(lc, "Expected a Restore bootstrap file, got: %s", lc->str); + } + if (pass == 1) { + res_all.res_job.RestoreBootstrap = bstrdup(lc->str); + } + break; + case 'C': + /* Find Client Resource */ + if (pass == 2) { + res = GetResWithName(R_CLIENT, lc->str); + if (res == NULL) { + scan_err1(lc, "Could not find specified Client Resource: %s", + lc->str); } - /* Fix to scan Replacement options */ - for (i=0; ReplaceOptions[i].name; i++) { - if (strcasecmp(lc->str, ReplaceOptions[i].name) == 0) { - ((JOB *)(item->value))->replace = ReplaceOptions[i].token; - i = 0; - break; - } + res_all.res_job.client = (CLIENT *)res; + } + break; + case 'F': + /* Find FileSet Resource */ + if (pass == 2) { + res = GetResWithName(R_FILESET, lc->str); + if (res == NULL) { + scan_err1(lc, "Could not find specified FileSet Resource: %s\n", + lc->str); } - if (i != 0) { - scan_err1(lc, "Expected a Restore replacement option, got: %s", lc->str); + res_all.res_job.fileset = (FILESET *)res; + } + break; + case 'J': + /* JobId */ + if (token != T_NUMBER) { + scan_err1(lc, "expected an integer number, got: %s", lc->str); + } + errno = 0; + res_all.res_job.RestoreJobId = strtol(lc->str, NULL, 0); + Dmsg1(190, "RestorJobId=%d\n", res_all.res_job.RestoreJobId); + if (errno != 0) { + scan_err1(lc, "expected an integer number, got: %s", lc->str); + } + break; + case 'W': + /* Where */ + if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) { + scan_err1(lc, "Expected a Restore root directory, got: %s", lc->str); + } + if (pass == 1) { + res_all.res_job.RestoreWhere = bstrdup(lc->str); + } + break; + case 'R': + /* Replacement options */ + if (token != T_IDENTIFIER && token != T_UNQUOTED_STRING && token != T_QUOTED_STRING) { + scan_err1(lc, "Expected a keyword name, got: %s", lc->str); + } + /* Fix to scan Replacement options */ + for (i=0; ReplaceOptions[i].name; i++) { + if (strcasecmp(lc->str, ReplaceOptions[i].name) == 0) { + ((JOB *)(item->value))->replace = ReplaceOptions[i].token; + i = 0; + break; } - break; + } + if (i != 0) { + scan_err1(lc, "Expected a Restore replacement option, got: %s", lc->str); + } + break; } /* end switch */ break; } /* end if strcmp() */ diff --git a/bacula/src/dird/dird_conf.h b/bacula/src/dird/dird_conf.h index 146fb29a0d..5853832688 100644 --- a/bacula/src/dird/dird_conf.h +++ b/bacula/src/dird/dird_conf.h @@ -146,7 +146,12 @@ struct CLIENT { char *password; CAT *catalog; /* Catalog resource */ uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */ - semlock_t sem; /* client semaphore */ +#ifdef USE_SEMAPHORE + semlock_t sem; /* storage semaphore */ +#endif +#ifdef JOB_QUEUE + uint32_t NumConcurrentJobs; /* number of concurrent jobs running */ +#endif int enable_ssl; /* Use SSL */ }; @@ -165,7 +170,12 @@ struct STORE { char *dev_name; int autochanger; /* set if autochanger */ uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */ +#ifdef USE_SEMAPHORE semlock_t sem; /* storage semaphore */ +#endif +#ifdef JOB_QUEUE + uint32_t NumConcurrentJobs; /* number of concurrent jobs running */ +#endif int enable_ssl; /* Use SSL */ }; @@ -179,6 +189,7 @@ struct JOB { int JobType; /* job type (backup, verify, restore */ int level; /* default backup/verify level */ + int Priority; /* Job priority */ int RestoreJobId; /* What -- JobId to restore */ char *RestoreWhere; /* Where on disk to restore -- directory */ char *RestoreBootstrap; /* Bootstrap file */ @@ -205,7 +216,12 @@ struct JOB { STORE *storage; /* Where is device -- Storage daemon */ POOL *pool; /* Where is media -- Media Pool */ - semlock_t sem; /* Job semaphore */ +#ifdef USE_SEMAPHORE + semlock_t sem; /* storage semaphore */ +#endif +#ifdef JOB_QUEUE + uint32_t NumConcurrentJobs; /* number of concurrent jobs running */ +#endif }; #define MAX_FOPTS 30 @@ -292,7 +308,9 @@ struct POOL { int catalog_files; /* maintain file entries in catalog */ int use_volume_once; /* write on volume only once */ int accept_any_volume; /* accept any volume */ - int purge_oldest_volume; /* purge oldest volume */ + int purge_oldest_volume; /* purge oldest volume */ + int recycle_oldest_volume; /* attempt to recycle oldest volume */ + int recycle_current_volume; /* attempt recycle of current volume */ uint32_t max_volumes; /* max number of volumes */ utime_t VolRetention; /* volume retention period in seconds */ utime_t VolUseDuration; /* duration volume can be used */ @@ -329,6 +347,7 @@ union URES { struct RUN { RUN *next; /* points to next run record */ int level; /* level override */ + int Priority; /* priority override */ int job_type; POOL *pool; /* Pool override */ STORE *storage; /* Storage override */ diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index f63929a6f4..c41af8325b 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -56,9 +56,13 @@ static pthread_mutex_t mutex; static pthread_cond_t resource_wait; static int waiting = 0; /* count of waiting threads */ #else +#ifdef JOB_QUEUE +jobq_t job_queue; +#else /* Queue of jobs to be run */ workq_t job_wq; /* our job work queue */ #endif +#endif void init_job_server(int max_workers) { @@ -75,9 +79,16 @@ void init_job_server(int max_workers) } #else +#ifdef JOB_QUEUE + if ((stat = job_init(&job_queue, max_workers, job_thread)) != 0) { + Emsg1(M_ABORT, 0, _("Could not init job queue: ERR=%s\n"), strerror(stat)); + } +#else + /* This is the OLD work queue code to go away */ if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) { Emsg1(M_ABORT, 0, _("Could not init job work queue: ERR=%s\n"), strerror(stat)); } +#endif #endif return; } @@ -93,7 +104,9 @@ void run_job(JCR *jcr) #ifdef USE_SEMAPHORE pthread_t tid; #else +#ifndef JOB_QUEUE workq_ele_t *work_item; +#endif #endif sm_check(__FILE__, __LINE__, True); @@ -152,12 +165,19 @@ void run_job(JCR *jcr) if ((stat = pthread_create(&tid, NULL, job_thread, (void *)jcr)) != 0) { Emsg1(M_ABORT, 0, _("Unable to create job thread: ERR=%s\n"), strerror(stat)); } +#else +#ifdef JOB_QUEUE + /* Queue the job to be run */ + if ((stat = jobq_add(&job_queue, jcr)) != 0) { + Emsg1(M_ABORT, 0, _("Could not add job queue: ERR=%s\n"), strerror(stat)); + } #else /* Queue the job to be run */ if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) { Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat)); } jcr->work_item = work_item; +#endif #endif Dmsg0(200, "Done run_job()\n"); } @@ -215,34 +235,34 @@ static void *job_thread(void *arg) } } switch (jcr->JobType) { - case JT_BACKUP: - do_backup(jcr); - if (jcr->JobStatus == JS_Terminated) { - do_autoprune(jcr); - } - break; - case JT_VERIFY: - do_verify(jcr); - if (jcr->JobStatus == JS_Terminated) { - do_autoprune(jcr); - } - break; - case JT_RESTORE: - do_restore(jcr); - if (jcr->JobStatus == JS_Terminated) { - do_autoprune(jcr); - } - break; - case JT_ADMIN: - do_admin(jcr); - if (jcr->JobStatus == JS_Terminated) { - do_autoprune(jcr); - } - break; - default: - Pmsg1(0, "Unimplemented job type: %d\n", jcr->JobType); - break; + case JT_BACKUP: + do_backup(jcr); + if (jcr->JobStatus == JS_Terminated) { + do_autoprune(jcr); } + break; + case JT_VERIFY: + do_verify(jcr); + if (jcr->JobStatus == JS_Terminated) { + do_autoprune(jcr); + } + break; + case JT_RESTORE: + do_restore(jcr); + if (jcr->JobStatus == JS_Terminated) { + do_autoprune(jcr); + } + break; + case JT_ADMIN: + do_admin(jcr); + if (jcr->JobStatus == JS_Terminated) { + do_autoprune(jcr); + } + break; + default: + Pmsg1(0, "Unimplemented job type: %d\n", jcr->JobType); + break; + } if (jcr->job->RunAfterJob) { POOLMEM *after = get_pool_memory(PM_FNAME); int status; @@ -415,7 +435,7 @@ wait: V(mutex); /* Try again */ } - jcr->acquired_resource_locks = 1; + jcr->acquired_resource_locks = true; #endif return 1; } @@ -472,7 +492,7 @@ static void release_resource_locks(JCR *jcr) if (waiting > 0) { pthread_cond_broadcast(&resource_wait); } - jcr->acquired_resource_locks = 0; + jcr->acquired_resource_locks = false; V(mutex); #endif } diff --git a/bacula/src/dird/jobq.c b/bacula/src/dird/jobq.c new file mode 100755 index 0000000000..c3f2cb7b0d --- /dev/null +++ b/bacula/src/dird/jobq.c @@ -0,0 +1,358 @@ +/* + * Bacula job queue routines. + * + * Kern Sibbald, July MMIII + * + * Version $Id$ + * + * This code was adapted from the Bacula workq, which was + * adapted from "Programming with POSIX Threads", by + * David R. Butenhof + * + * Example: + * + * static jobq_t jq; define job queue + * + * Initialize queue + * if ((stat = jobq_init(&jq, max_workers, job_thread)) != 0) { + * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno)); + * } + * + * Add an item to the queue + * if ((stat = jobq_add(&jq, jcr)) != 0) { + * Emsg1(M_ABORT, 0, "Could not add job to queue: ERR=%s\n", strerror(errno)); + * } + * + * Terminate the queue + * jobq_destroy(jobq_t *jq); + * + */ +/* + Copyright (C) 2000-2003 Kern Sibbald and John Walker + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this program; if not, write to the Free + Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, + MA 02111-1307, USA. + + */ + +#include "bacula.h" +#include "dird.h" + +/* Forward referenced functions */ +static void *jobq_server(void *arg); + +/* + * Initialize a job queue + * + * Returns: 0 on success + * errno on failure + */ +int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg)) +{ + int stat; + jobq_item_t *item = NULL; + + if ((stat = pthread_attr_init(&jq->attr)) != 0) { + return stat; + } + if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) { + pthread_attr_destroy(&jq->attr); + return stat; + } + if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) { + pthread_attr_destroy(&jq->attr); + return stat; + } + if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) { + pthread_mutex_destroy(&jq->mutex); + pthread_attr_destroy(&jq->attr); + return stat; + } + jq->quit = false; + jq->max_workers = threads; /* max threads to create */ + jq->num_workers = 0; /* no threads yet */ + jq->idle_workers = 0; /* no idle threads */ + jq->engine = engine; /* routine to run */ + jq->valid = JOBQ_VALID; + jq->list.init(item, &item->link); + return 0; +} + +/* + * Destroy the job queue + * + * Returns: 0 on success + * errno on failure + */ +int jobq_destroy(jobq_t *jq) +{ + int stat, stat1, stat2; + + if (jq->valid != JOBQ_VALID) { + return EINVAL; + } + if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { + return stat; + } + jq->valid = 0; /* prevent any more operations */ + + /* + * If any threads are active, wake them + */ + if (jq->num_workers > 0) { + jq->quit = true; + if (jq->idle_workers) { + if ((stat = pthread_cond_broadcast(&jq->work)) != 0) { + pthread_mutex_unlock(&jq->mutex); + return stat; + } + } + while (jq->num_workers > 0) { + if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) { + pthread_mutex_unlock(&jq->mutex); + return stat; + } + } + } + if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { + return stat; + } + stat = pthread_mutex_destroy(&jq->mutex); + stat1 = pthread_cond_destroy(&jq->work); + stat2 = pthread_attr_destroy(&jq->attr); + jq->list.destroy(); + return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2)); +} + + +/* + * Add a job to the queue + * jq is a queue that was created with jobq_init + * + */ +int jobq_add(jobq_t *jq, JCR *jcr) +{ + int stat; + jobq_item_t *item, *li; + pthread_t id; + bool inserted = false; + + Dmsg0(200, "jobq_add\n"); + if (jq->valid != JOBQ_VALID) { + return EINVAL; + } + + if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) { + return ENOMEM; + } + item->jcr = jcr; + if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { + free(item); + return stat; + } + + Dmsg0(200, "add item to queue\n"); + for (li=NULL; (li=(jobq_item_t *)jq->list.next(li)); ) { + if (li->jcr->JobPriority < jcr->JobPriority) { + jq->list.insert_before(item, li); + inserted = true; + } + } + if (!inserted) { + jq->list.append(item); + } + + /* if any threads are idle, wake one */ + if (jq->idle_workers > 0) { + Dmsg0(200, "Signal worker\n"); + if ((stat = pthread_cond_signal(&jq->work)) != 0) { + pthread_mutex_unlock(&jq->mutex); + return stat; + } + } else if (jq->num_workers < jq->max_workers) { + Dmsg0(200, "Create worker thread\n"); + /* No idle threads so create a new one */ + set_thread_concurrency(jq->max_workers + 1); + if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) { + pthread_mutex_unlock(&jq->mutex); + return stat; + } + jq->num_workers++; + } + pthread_mutex_unlock(&jq->mutex); + Dmsg0(200, "Return jobq_add\n"); + return stat; +} + +/* + * Remove a job from the job queue + * jq is a queue that was created with jobq_init + * work_item is an element of work + * + * Note, it is "removed" by immediately calling a processing routine. + * if you want to cancel it, you need to provide some external means + * of doing so. + */ +int jobq_remove(jobq_t *jq, JCR *jcr) +{ + int stat; + bool found = false; + pthread_t id; + jobq_item_t *item; + + Dmsg0(200, "jobq_remove\n"); + if (jq->valid != JOBQ_VALID) { + return EINVAL; + } + + if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { + return stat; + } + + for (item=NULL; (item=(jobq_item_t *)jq->list.next(item)); ) { + if (jcr == item->jcr) { + found = true; + break; + } + } + if (!found) { + return EINVAL; + } + + /* Move item to be the first on the list */ + jq->list.remove(item); + jq->list.prepend(item); + + /* if any threads are idle, wake one */ + if (jq->idle_workers > 0) { + Dmsg0(200, "Signal worker\n"); + if ((stat = pthread_cond_signal(&jq->work)) != 0) { + pthread_mutex_unlock(&jq->mutex); + return stat; + } + } else { + Dmsg0(200, "Create worker thread\n"); + /* No idle threads so create a new one */ + set_thread_concurrency(jq->max_workers + 1); + if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) { + pthread_mutex_unlock(&jq->mutex); + return stat; + } + jq->num_workers++; + } + pthread_mutex_unlock(&jq->mutex); + Dmsg0(200, "Return jobq_remove\n"); + return stat; +} + + +/* + * This is the worker thread that serves the job queue. + * When all the resources are acquired for the job, + * it will call the user's engine. + */ +static void *jobq_server(void *arg) +{ + struct timespec timeout; + jobq_t *jq = (jobq_t *)arg; + jobq_item_t *je; /* job entry in queue */ + int stat; + bool timedout; + + Dmsg0(200, "Start jobq_server\n"); + if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { + return NULL; + } + + for (;;) { + struct timeval tv; + struct timezone tz; + + Dmsg0(200, "Top of for loop\n"); + timedout = false; + Dmsg0(200, "gettimeofday()\n"); + gettimeofday(&tv, &tz); + timeout.tv_nsec = 0; + timeout.tv_sec = tv.tv_sec + 2; + + while (jq->list.empty() && !jq->quit) { + /* + * Wait 2 seconds, then if no more work, exit + */ + Dmsg0(200, "pthread_cond_timedwait()\n"); + stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout); + Dmsg1(200, "timedwait=%d\n", stat); + if (stat == ETIMEDOUT) { + timedout = true; + break; + } else if (stat != 0) { + /* This shouldn't happen */ + Dmsg0(200, "This shouldn't happen\n"); + jq->num_workers--; + pthread_mutex_unlock(&jq->mutex); + return NULL; + } + } + je = (jobq_item_t *)jq->list.first(); + if (je != NULL) { + jq->list.remove(je); + if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { + return NULL; + } + /* Call user's routine here */ + Dmsg0(200, "Calling user engine.\n"); + jq->engine(je->jcr); + Dmsg0(200, "Back from user engine.\n"); + free(je); /* release job entry */ + Dmsg0(200, "relock mutex\n"); + if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { + return NULL; + } + Dmsg0(200, "Done lock mutex\n"); + } + /* + * If no more work request, and we are asked to quit, then do it + */ + if (jq->list.empty() && jq->quit) { + jq->num_workers--; + if (jq->num_workers == 0) { + Dmsg0(200, "Wake up destroy routine\n"); + /* Wake up destroy routine if he is waiting */ + pthread_cond_broadcast(&jq->work); + } + Dmsg0(200, "Unlock mutex\n"); + pthread_mutex_unlock(&jq->mutex); + Dmsg0(200, "Return from jobq_server\n"); + return NULL; + } + Dmsg0(200, "Check for work request\n"); + /* + * If no more work requests, and we waited long enough, quit + */ + Dmsg1(200, "jq empty = %d\n", jq->list.empty()); + Dmsg1(200, "timedout=%d\n", timedout); + if (jq->list.empty() && timedout) { + Dmsg0(200, "break big loop\n"); + jq->num_workers--; + break; + } + Dmsg0(200, "Loop again\n"); + } /* end of big for loop */ + + Dmsg0(200, "unlock mutex\n"); + pthread_mutex_unlock(&jq->mutex); + Dmsg0(200, "End jobq_server\n"); + return NULL; +} diff --git a/bacula/src/dird/jobq.h b/bacula/src/dird/jobq.h new file mode 100644 index 0000000000..47922ef283 --- /dev/null +++ b/bacula/src/dird/jobq.h @@ -0,0 +1,70 @@ +/* + * Bacula job queue routines. + * + * Kern Sibbald, July MMIII + * + * This code adapted from Bacula work queue code, which was + * adapted from "Programming with POSIX Threads", by + * David R. Butenhof + * + * Version $Id$ + */ +/* + Copyright (C) 2000-2003 Kern Sibbald and John Walker + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this program; if not, write to the Free + Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, + MA 02111-1307, USA. + + */ + +#ifndef __JOBQ_H +#define __JOBQ_H 1 + +/* + * Structure to keep track of job queue request + */ +struct jobq_item_t { + dlink link; + JCR *jcr; +}; + +/* + * Structure describing a work queue + */ +struct jobq_t { + pthread_mutex_t mutex; /* queue access control */ + pthread_cond_t work; /* wait for work */ + pthread_attr_t attr; /* create detached threads */ + dlist list; /* list of jobs */ + int valid; /* queue initialized */ + bool quit; /* jobq should quit */ + int max_workers; /* max threads */ + int num_workers; /* current threads */ + int idle_workers; /* idle threads */ + void *(*engine)(void *arg); /* user engine */ +}; + +#define JOBQ_VALID 0xdec1993 + +extern int jobq_init( + jobq_t *wq, + int threads, /* maximum threads */ + void *(*engine)(void *) /* engine routine */ + ); +extern int jobq_destroy(jobq_t *wq); +extern int jobq_add(jobq_t *wq, JCR *jcr); +extern int jobq_remove(jobq_t *wq, JCR *jcr); + +#endif /* __JOBQ_H */ diff --git a/bacula/src/dird/protos.h b/bacula/src/dird/protos.h index 48891b55a9..6614050f80 100644 --- a/bacula/src/dird/protos.h +++ b/bacula/src/dird/protos.h @@ -34,6 +34,7 @@ extern int prune_volumes(JCR *jcr); /* autorecycle.c */ extern int recycle_oldest_purged_volume(JCR *jcr, MEDIA_DBR *mr); +extern int recycle_volume(JCR *jcr, MEDIA_DBR *mr); extern int find_recycled_volume(JCR *jcr, MEDIA_DBR *mr); /* backup.c */ @@ -166,7 +167,7 @@ int insert_tree_handler(void *ctx, int num_fields, char **row); /* ua_prune.c */ int prune_files(UAContext *ua, CLIENT *client); int prune_jobs(UAContext *ua, CLIENT *client, int JobType); -int prune_volume(UAContext *ua, POOL_DBR *pr, MEDIA_DBR *mr); +int prune_volume(UAContext *ua, MEDIA_DBR *mr); /* ua_purge.c */ int purge_jobs_from_volume(UAContext *ua, MEDIA_DBR *mr); diff --git a/bacula/src/dird/recycle.c b/bacula/src/dird/recycle.c index 6da0b13369..778d82468b 100644 --- a/bacula/src/dird/recycle.c +++ b/bacula/src/dird/recycle.c @@ -95,11 +95,7 @@ int recycle_oldest_purged_volume(JCR *jcr, MEDIA_DBR *mr) if (oldest.MediaId != 0) { mr->MediaId = oldest.MediaId; if (db_get_media_record(jcr, jcr->db, mr)) { - strcpy(mr->VolStatus, "Recycle"); - mr->VolJobs = mr->VolFiles = mr->VolBlocks = mr->VolErrors = 0; - mr->VolBytes = 0; - mr->FirstWritten = mr->LastWritten = 0; - if (db_update_media_record(jcr, jcr->db, mr)) { + if (recycle_volume(jcr, mr)) { Jmsg(jcr, M_INFO, 0, "Recycled volume \"%s\"\n", mr->VolumeName); Dmsg1(100, "Exit 1 recycle_oldest_purged_volume Vol=%s\n", mr->VolumeName); return 1; @@ -110,3 +106,15 @@ int recycle_oldest_purged_volume(JCR *jcr, MEDIA_DBR *mr) Dmsg0(100, "Exit 0 recycle_oldest_purged_volume end\n"); return 0; } + +/* + * Recycle the specified volume + */ +int recycle_volume(JCR *jcr, MEDIA_DBR *mr) +{ + strcpy(mr->VolStatus, "Recycle"); + mr->VolJobs = mr->VolFiles = mr->VolBlocks = mr->VolErrors = 0; + mr->VolBytes = 0; + mr->FirstWritten = mr->LastWritten = 0; + return db_update_media_record(jcr, jcr->db, mr); +} diff --git a/bacula/src/dird/run_conf.c b/bacula/src/dird/run_conf.c index 032ce1037f..d08d9e315b 100644 --- a/bacula/src/dird/run_conf.c +++ b/bacula/src/dird/run_conf.c @@ -164,6 +164,7 @@ static struct s_kw RunFields[] = { {"level", 'L'}, {"storage", 'S'}, {"messages", 'M'}, + {"priority", 'p'}, {NULL, 0} }; @@ -207,9 +208,9 @@ void store_run(LEX *lc, struct res_items *item, int index, int pass) scan_err1(lc, "Expected an equals, got: %s", lc->str); /* NOT REACHED */ } - token = lex_get_token(lc, T_NAME); switch (RunFields[i].token) { case 'L': /* level */ + token = lex_get_token(lc, T_NAME); for (j=0; joblevels[j].level_name; j++) { if (strcasecmp(lc->str, joblevels[j].level_name) == 0) { lrun.level = joblevels[j].level; @@ -223,7 +224,14 @@ void store_run(LEX *lc, struct res_items *item, int index, int pass) /* NOT REACHED */ } break; + case 'p': /* Priority */ + token = lex_get_token(lc, T_PINT32); + if (pass == 2) { + lrun.Priority = lc->pint32_val; + } + break; case 'P': /* Pool */ + token = lex_get_token(lc, T_NAME); if (pass == 2) { res = GetResWithName(R_POOL, lc->str); if (res == NULL) { @@ -235,6 +243,7 @@ void store_run(LEX *lc, struct res_items *item, int index, int pass) } break; case 'S': /* storage */ + token = lex_get_token(lc, T_NAME); if (pass == 2) { res = GetResWithName(R_STORAGE, lc->str); if (res == NULL) { @@ -246,6 +255,7 @@ void store_run(LEX *lc, struct res_items *item, int index, int pass) } break; case 'M': /* messages */ + token = lex_get_token(lc, T_NAME); if (pass == 2) { res = GetResWithName(R_MSGS, lc->str); if (res == NULL) { diff --git a/bacula/src/dird/scheduler.c b/bacula/src/dird/scheduler.c index 8ec070d77a..5633fb3294 100644 --- a/bacula/src/dird/scheduler.c +++ b/bacula/src/dird/scheduler.c @@ -157,6 +157,9 @@ JCR *wait_for_next_job(char *job_to_run) if (run->msgs) { jcr->messages = run->msgs; /* override messages */ } + if (run->Priority) { + jcr->JobPriority = run->Priority; + } Dmsg0(200, "Leave wait_for_next_job()\n"); return jcr; } diff --git a/bacula/src/dird/ua_prune.c b/bacula/src/dird/ua_prune.c index 447f468dc7..cd6684a3f1 100644 --- a/bacula/src/dird/ua_prune.c +++ b/bacula/src/dird/ua_prune.c @@ -35,9 +35,6 @@ int mark_media_purged(UAContext *ua, MEDIA_DBR *mr); /* Forward referenced functions */ -int prune_files(UAContext *ua, CLIENT *client); -int prune_jobs(UAContext *ua, CLIENT *client, int JobType); -int prune_volume(UAContext *ua, POOL_DBR *pr, MEDIA_DBR *mr); #define MAX_DEL_LIST_LEN 1000000 @@ -205,7 +202,7 @@ int prunecmd(UAContext *ua, char *cmd) if (!confirm_retention(ua, &mr.VolRetention, "Volume")) { return 0; } - prune_volume(ua, &pr, &mr); + prune_volume(ua, &mr); return 1; default: break; @@ -477,7 +474,7 @@ bail_out: /* * Prune a given Volume */ -int prune_volume(UAContext *ua, POOL_DBR *pr, MEDIA_DBR *mr) +int prune_volume(UAContext *ua, MEDIA_DBR *mr) { char *query = (char *)get_pool_memory(PM_MESSAGE); struct s_count_ctx cnt; diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index 4e8159172a..f6e2163652 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -106,6 +106,7 @@ struct JCR { volatile int JobStatus; /* ready, running, blocked, terminated */ int JobType; /* backup, restore, verify ... */ int JobLevel; /* Job level */ + int JobPriority; /* Job priority */ int authenticated; /* set when client authenticated */ time_t sched_time; /* job schedule time, i.e. when it should start */ time_t start_time; /* when job actually started */ @@ -156,7 +157,7 @@ struct JCR { uint32_t RestoreJobId; /* Id specified by UA */ POOLMEM *client_uname; /* client uname */ int replace; /* Replace option */ - int acquired_resource_locks; /* set if resource locks acquired */ + bool acquired_resource_locks; /* set if resource locks acquired */ int NumVols; /* Number of Volume used in pool */ int reschedule_count; /* Number of times rescheduled */ #endif /* DIRECTOR_DAEMON */ diff --git a/bacula/src/lib/bnet_server.c b/bacula/src/lib/bnet_server.c index 22505364d4..cc6e9a3b50 100644 --- a/bacula/src/lib/bnet_server.c +++ b/bacula/src/lib/bnet_server.c @@ -137,8 +137,13 @@ bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_w Emsg1(M_FATAL, 0, _("Error in select: %s\n"), strerror(errno)); break; } - clilen = sizeof(cli_addr); - newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen); + do { + clilen = sizeof(cli_addr); + newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen); + } while (newsockfd < 0 && errno == EINTR); + if (newsockfd < 0) { + continue; + } #ifdef HAVE_LIBWRAP P(mutex); /* hosts_access is not thread safe */ @@ -196,6 +201,9 @@ bnet_bind(int port) * Open a TCP socket */ for (tlog=0; (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0; tlog -= 10 ) { + if (errno == EINTR) { + continue; + } if (tlog <= 0) { tlog = 2*60; Emsg1(M_ERROR, 0, _("Cannot open stream socket: %s\n"), strerror(errno)); @@ -219,6 +227,9 @@ bnet_bind(int port) serv_addr.sin_port = htons(port); for (tlog=0; bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0; tlog -= 5 ) { + if (errno == EINTR) { + continue; + } if (tlog <= 0) { tlog = 2*60; Emsg2(M_WARNING, 0, _("Cannot bind port %d: %s: retrying ...\n"), port, strerror(errno)); @@ -266,9 +277,13 @@ bnet_accept(BSOCK *bsock, char *who) newsockfd = -1; break; } - clilen = sizeof(cli_addr); - newsockfd = accept(bsock->fd, (struct sockaddr *)&cli_addr, &clilen); - break; + do { + clilen = sizeof(cli_addr); + newsockfd = accept(bsock->fd, (struct sockaddr *)&cli_addr, &clilen); + } while (newsockfd < 0 && errno == EINTR); + if (newsockfd >= 0) { + break; + } } #ifdef HAVE_LIBWRAP diff --git a/bacula/src/lib/dlist.c b/bacula/src/lib/dlist.c index cd905e6d19..f356658c90 100644 --- a/bacula/src/lib/dlist.c +++ b/bacula/src/lib/dlist.c @@ -66,6 +66,39 @@ void dlist::prepend(void *item) } } +void dlist::insert_before(void *item, void *where) +{ + dlink *where_link = (dlink *)((char *)where+loffset); + + ((dlink *)((char *)item+loffset))->next = where; + ((dlink *)((char *)item+loffset))->prev = where_link->prev; + + if (where_link->prev) { + ((dlink *)((char *)(where_link->prev)+loffset))->next = item; + where_link->prev = item; + } + if (head == where) { + head = item; + } +} + +void dlist::insert_after(void *item, void *where) +{ + dlink *where_link = (dlink *)((char *)where+loffset); + + ((dlink *)((char *)item+loffset))->next = where_link->next; + ((dlink *)((char *)item+loffset))->prev = where; + + if (where_link->next) { + ((dlink *)((char *)(where_link->next)+loffset))->prev = item; + where_link->next = item; + } + if (tail == where) { + tail = item; + } +} + + void dlist::remove(void *item) { void *xitem; @@ -93,6 +126,15 @@ void * dlist::next(void *item) return ((dlink *)((char *)item+loffset))->next; } +void * dlist::prev(void *item) +{ + if (item == NULL) { + return tail; + } + return ((dlink *)((char *)item+loffset))->prev; +} + + /* Destroy the list and its contents */ void dlist::destroy() { @@ -116,14 +158,15 @@ int main() { char buf[30]; dlist *jcr_chain; + MYJCR *jcr = NULL; MYJCR *save_jcr = NULL; + MYJCR *next_jcr; jcr_chain = (dlist *)malloc(sizeof(dlist)); - jcr_chain->init((int)&MYJCR::link); + jcr_chain->init(jcr, &jcr->link); printf("Prepend 20 items 0-19\n"); for (int i=0; i<20; i++) { - MYJCR *jcr; sprintf(buf, "This is dlist item %d", i); jcr = (MYJCR *)malloc(sizeof(MYJCR)); jcr->buf = bstrdup(buf); @@ -133,9 +176,14 @@ int main() } } + next_jcr = (MYJCR *)jcr_chain->next(save_jcr); + printf("11th item=%s\n", next_jcr->buf); + jcr = (MYJCR *)malloc(sizeof(MYJCR)); + jcr->buf = save_jcr->buf; printf("Remove 10th item\n"); - free(save_jcr->buf); jcr_chain->remove(save_jcr); + printf("Re-insert 10th item\n"); + jcr_chain->insert_before(jcr, next_jcr); printf("Print remaining list.\n"); for (MYJCR *jcr=NULL; (jcr=(MYJCR *)jcr_chain->next(jcr)); ) { diff --git a/bacula/src/lib/dlist.h b/bacula/src/lib/dlist.h index b698256d69..84121b1c2c 100644 --- a/bacula/src/lib/dlist.h +++ b/bacula/src/lib/dlist.h @@ -40,12 +40,16 @@ class dlist { void *tail; int loffset; public: - dlist(int offset); - void init(int offset); + dlist(void *item, void *link); + void init(void *item, void *link); void prepend(void *item); void append(void *item); + void insert_before(void *item, void *where); + void insert_after(void *item, void *where); void remove(void *item); + bool empty(); void *next(void *item); + void *prev(void *item); void destroy(); void *first(); void *last(); @@ -58,14 +62,21 @@ public: * allowing us to mix C++ classes inside malloc'ed * C structures. Define before called in constructor. */ -inline void dlist::init(int offset) { +inline void dlist::init(void *item, void *link) +{ head = tail = NULL; - loffset = (int)offset; + loffset = (char *)link - (char *)item; } /* Constructor */ -inline dlist::dlist(int offset) { - this->init(offset); +inline dlist::dlist(void *item, void *link) +{ + this->init(item, link); +} + +inline bool dlist::empty() +{ + return head == NULL; } inline void * dlist::operator new(size_t) diff --git a/bacula/src/lib/serial.h b/bacula/src/lib/serial.h index a7a5c52572..22bea5d2db 100644 --- a/bacula/src/lib/serial.h +++ b/bacula/src/lib/serial.h @@ -25,28 +25,28 @@ /* Serialisation support functions from serial.c. */ -extern void serial_int16(uint8_t * * ptr, int16_t v); -extern void serial_uint16(uint8_t * * ptr, uint16_t v); -extern void serial_int32(uint8_t * * ptr, int32_t v); -extern void serial_uint32(uint8_t * * ptr, uint32_t v); +extern void serial_int16(uint8_t * * const ptr, const int16_t v); +extern void serial_uint16(uint8_t * * const ptr, const uint16_t v); +extern void serial_int32(uint8_t * * const ptr, const int32_t v); +extern void serial_uint32(uint8_t * * const ptr, const uint32_t v); extern void serial_int64(uint8_t * * ptr, int64_t v); -extern void serial_uint64(uint8_t * * ptr, uint64_t v); -extern void serial_btime(uint8_t * * ptr, btime_t v); -extern void serial_float64(uint8_t * * ptr, float64_t v); -extern int serial_string(uint8_t * ptr, char * str); -extern int16_t unserial_int16(uint8_t * * ptr); -extern uint16_t unserial_uint16(uint8_t * * ptr); -extern int32_t unserial_int32(uint8_t * * ptr); -extern uint32_t unserial_uint32(uint8_t * * ptr); -extern int64_t unserial_int64(uint8_t * * ptr); -extern uint64_t unserial_uint64(uint8_t * * ptr); -extern btime_t unserial_btime(uint8_t * * ptr); -extern float64_t unserial_float64(uint8_t * * ptr); -extern int unserial_string(uint8_t * ptr, char * str); +extern void serial_uint64(uint8_t * * const ptr, const uint64_t v); +extern void serial_btime(uint8_t * * const ptr, const btime_t v); +extern void serial_float64(uint8_t * * const ptr, const float64_t v); +extern int serial_string(uint8_t * const ptr, char * const str); +extern int16_t unserial_int16(uint8_t * * const ptr); +extern uint16_t unserial_uint16(uint8_t * * const ptr); +extern int32_t unserial_int32(uint8_t * * const ptr); +extern uint32_t unserial_uint32(uint8_t * * const ptr); +extern int64_t unserial_int64(uint8_t * * const ptr); +extern uint64_t unserial_uint64(uint8_t * * const ptr); +extern btime_t unserial_btime(uint8_t * * const ptr); +extern float64_t unserial_float64(uint8_t * * const ptr); +extern int unserial_string(uint8_t * const ptr, char * const str); /* - Serialisation Macros + Serialisation Macros These macros use a uint8_t pointer, ser_ptr, which must be defined by the code which uses them. @@ -57,86 +57,86 @@ extern int unserial_string(uint8_t * ptr, char * str); #define __SERIAL_H_ 1 /* ser_declare -- Declare ser_ptr locally within a function. */ -#define ser_declare uint8_t *ser_ptr -#define unser_declare uint8_t *ser_ptr +#define ser_declare uint8_t *ser_ptr +#define unser_declare uint8_t *ser_ptr /* ser_begin(x, s) -- Begin serialisation into a buffer x of size s. */ #define ser_begin(x, s) ser_ptr = ((uint8_t *)(x)) #define unser_begin(x, s) ser_ptr = ((uint8_t *)(x)) -/* ser_length -- Determine length in bytes of serialised into a - buffer x. */ +/* ser_length -- Determine length in bytes of serialised into a + buffer x. */ #define ser_length(x) (ser_ptr - (uint8_t *)(x)) #define unser_length(x) (ser_ptr - (uint8_t *)(x)) /* ser_end(x, s) -- End serialisation into a buffer x of size s. */ -#define ser_end(x, s) ASSERT(ser_length(x) <= (s)) +#define ser_end(x, s) ASSERT(ser_length(x) <= (s)) #define unser_end(x, s) ASSERT(ser_length(x) <= (s)) /* ser_check(x, s) -- Verify length of serialised data in buffer x is - expected length s. */ + expected length s. */ #define ser_check(x, s) ASSERT(ser_length(x) == (s)) -/* Serialisation */ +/* Serialisation */ /* 8 bit signed integer */ -#define ser_int8(x) *ser_ptr++ = (x) +#define ser_int8(x) *ser_ptr++ = (x) /* 8 bit unsigned integer */ -#define ser_uint8(x) *ser_ptr++ = (x) +#define ser_uint8(x) *ser_ptr++ = (x) /* 16 bit signed integer */ -#define ser_int16(x) serial_int16(&ser_ptr, x) +#define ser_int16(x) serial_int16(&ser_ptr, x) /* 16 bit unsigned integer */ -#define ser_uint16(x) serial_uint16(&ser_ptr, x) +#define ser_uint16(x) serial_uint16(&ser_ptr, x) /* 32 bit signed integer */ -#define ser_int32(x) serial_int32(&ser_ptr, x) +#define ser_int32(x) serial_int32(&ser_ptr, x) /* 32 bit unsigned integer */ -#define ser_uint32(x) serial_uint32(&ser_ptr, x) +#define ser_uint32(x) serial_uint32(&ser_ptr, x) /* 64 bit signed integer */ -#define ser_int64(x) serial_int64(&ser_ptr, x) +#define ser_int64(x) serial_int64(&ser_ptr, x) /* 64 bit unsigned integer */ -#define ser_uint64(x) serial_uint64(&ser_ptr, x) +#define ser_uint64(x) serial_uint64(&ser_ptr, x) /* btime -- 64 bit unsigned integer */ #define ser_btime(x) serial_btime(&ser_ptr, x) /* 64 bit IEEE floating point number */ -#define ser_float64(x) serial_float64(&ser_ptr, x) +#define ser_float64(x) serial_float64(&ser_ptr, x) /* 128 bit signed integer */ -#define ser_int128(x) memcpy(ser_ptr, x, sizeof(int128_t)), ser_ptr += sizeof(int128_t) +#define ser_int128(x) memcpy(ser_ptr, x, sizeof(int128_t)), ser_ptr += sizeof(int128_t) /* Binary byte stream len bytes not requiring serialisation */ #define ser_bytes(x, len) memcpy(ser_ptr, (x), (len)), ser_ptr += (len) -/* Binary byte stream not requiring serialisation (length obtained by sizeof) */ -#define ser_buffer(x) ser_bytes((x), (sizeof (x))) +/* Binary byte stream not requiring serialisation (length obtained by sizeof) */ +#define ser_buffer(x) ser_bytes((x), (sizeof (x))) /* Binary string not requiring serialization */ -#define ser_string(x) ser_ptr += serial_string(ser_ptr, (x)) +#define ser_string(x) ser_ptr += serial_string(ser_ptr, (x)) -/* Unserialisation */ +/* Unserialisation */ /* 8 bit signed integer */ -#define unser_int8(x) (x) = *ser_ptr++ +#define unser_int8(x) (x) = *ser_ptr++ /* 8 bit unsigned integer */ -#define unser_uint8(x) (x) = *ser_ptr++ +#define unser_uint8(x) (x) = *ser_ptr++ /* 16 bit signed integer */ -#define unser_int16(x) (x) = unserial_int16(&ser_ptr) +#define unser_int16(x) (x) = unserial_int16(&ser_ptr) /* 16 bit unsigned integer */ #define unser_uint16(x) (x) = unserial_uint16(&ser_ptr) /* 32 bit signed integer */ -#define unser_int32(x) (x) = unserial_int32(&ser_ptr) +#define unser_int32(x) (x) = unserial_int32(&ser_ptr) /* 32 bit unsigned integer */ #define unser_uint32(x) (x) = unserial_uint32(&ser_ptr) /* 64 bit signed integer */ -#define unser_int64(x) (x) = unserial_int64(&ser_ptr) +#define unser_int64(x) (x) = unserial_int64(&ser_ptr) /* 64 bit unsigned integer */ #define unser_uint64(x) (x) = unserial_uint64(&ser_ptr) @@ -152,7 +152,7 @@ extern int unserial_string(uint8_t * ptr, char * str); /* Binary byte stream len bytes not requiring serialisation */ #define unser_bytes(x, len) memcpy((x), ser_ptr, (len)), ser_ptr += (len) -/* Binary byte stream not requiring serialisation (length obtained by sizeof) */ +/* Binary byte stream not requiring serialisation (length obtained by sizeof) */ #define unser_buffer(x) unser_bytes((x), (sizeof (x))) /* Binary string not requiring serialization */ diff --git a/bacula/src/lib/workq.h b/bacula/src/lib/workq.h index dc95fccdb6..d0a8443482 100644 --- a/bacula/src/lib/workq.h +++ b/bacula/src/lib/workq.h @@ -10,7 +10,7 @@ * Version $Id$ */ /* - Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker + Copyright (C) 2000-2003 Kern Sibbald and John Walker This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as diff --git a/bacula/src/version.h b/bacula/src/version.h index 551a570df5..cae6f023ec 100644 --- a/bacula/src/version.h +++ b/bacula/src/version.h @@ -1,8 +1,8 @@ /* */ #define VERSION "1.31" #define VSTRING "1" -#define BDATE "14 Jul 2003" -#define LSMDATE "14Jul03" +#define BDATE "17 Jul 2003" +#define LSMDATE "17Jul03" /* Debug flags */ #define DEBUG 1 -- 2.39.5