From: Kern Sibbald Date: Tue, 18 Mar 2003 19:51:58 +0000 (+0000) Subject: New semaphore job scheduling code X-Git-Tag: Release-1.30~83 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=6a41e15944691f6428fdf667afd80ffb0e63e1c5;p=bacula%2Fbacula New semaphore job scheduling code git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@384 91ce42f0-d328-0410-95d8-f526ca767f89 --- diff --git a/bacula/kernstodo b/bacula/kernstodo index 0a4d5ab4f7..eb15b2f1ea 100644 --- a/bacula/kernstodo +++ b/bacula/kernstodo @@ -21,19 +21,56 @@ Testing to do: (painful) - ***test GetFileAttributexEx, and remove MessageBox at 335 of winservice.cpp **** For 1.30 release: -- Cancelling of a queued job does NOT work!!!!!! +- Refine SD waiting output: + Device is being positioned + > Device is being positioned for append + > Device is being positioned to file x + > +- > Hello, +> +> Possibly your /etc/bacula/my_exclude is not +> marked as executable. It MUST be executable. + +the script is executeabe (0777 for testing) + +> Another solution is simply to include the +> find command in the exclude: +> Exclude = { +> "|find / -iname *.avi -o -iname *.mpg -o -iname *.mp3" +> } + +it doesn't work :( + +i tried it on two different systems with bacula 1.29 + +this works: +Exclude { + wants to cycle through a - set of volumes recycling the oldest volume when it is needed. - Fix "access not allowed" for backup of files on WinXP. - Figure out some way to specify a retention period for files that no longer exist on the machine -- so that we maintain @@ -153,7 +190,6 @@ rufus-dir: Volume used once. Marking Volume "File0003" as Used. - Complete code in Bacula Resources -- this will permit reading a new config file at any time. - Handle ctl-c in Console -- Implement LabelTemplate (at least first cut). - Implement script driven addition of File daemon to config files. - Think about how to make Bacula work better with File (non-tape) archives. - Write Unix emulator for Windows. @@ -165,8 +201,6 @@ rufus-dir: Volume used once. Marking Volume "File0003" as Used. - Put memory utilization in Status output of each daemon if full status requested or if some level of debug on. - Make database type selectable by .conf files i.e. at runtime -- gethostbyname failure in bnet_connect() continues - generating errors -- should stop. - Set flag for uname -a. Add to Volume label. - Implement throttled work queue. - Check for EOT at ENOSPC or EIO or ENXIO (unix Pc) @@ -187,7 +221,6 @@ rufus-dir: Volume used once. Marking Volume "File0003" as Used. - Implement Restore FileSet= - Create a protocol.h and protocol.c where all protocol messages are concentrated. -- If SD cannot open a drive, make it periodically retry. - Remove duplicate fields from jcr (e.g. jcr.level and jcr.jr.Level, ...). - Timout a job or terminate if link goes down, or reopen link and query. - Find general solution for sscanf size problems (as well @@ -268,7 +301,7 @@ rufus-dir: Volume used once. Marking Volume "File0003" as Used. This could be the output of df; or perhaps some sort of /etc/mtab record. Longer term to do: -- Design at hierarchial storage for Bacula. +- Design at hierarchial storage for Bacula. Migration and Clone. - Implement FSM (File System Modules). - Identify unchanged or "system" files and save them to a special tape thus removing them from the standard @@ -897,3 +930,15 @@ Done: (see kernsdone for more) ERR=Operation not permited loop. - Add code if there is no mtio.h (cannot do -- too many ioctl defines needed) - Produce better error messages in when error/eof writing block. +- Cancelling of a queued job does NOT work!!!!!! +- Get two +rufus-dir: Volume used once. Marking Volume "File0003" as Used. +rufus-sd: Recycled volume File0003 on device /home/kern/bacula/working, all previous data lost. +rufus-dir: Volume used once. Marking Volume "File0003" as Used. +- Ability to backup to a file then later transfer to a tape -- Migration. + Migration based on MaxJobs(MinJobs),MaxVols(MinVols),AgeJobs,MaxBytes(MinBytes) + (i.e. HighwaterMark, LowwaterMark). +- Eugeny Fisher wants to cycle through a + set of volumes recycling the oldest volume when it is needed. +- gethostbyname failure in bnet_connect() continues + generating errors -- should stop. diff --git a/bacula/src/cats/sql_update.c b/bacula/src/cats/sql_update.c index e127ae1467..ff31234a5a 100644 --- a/bacula/src/cats/sql_update.c +++ b/bacula/src/cats/sql_update.c @@ -158,7 +158,7 @@ db_update_pool_record(void *jcr, B_DB *mdb, POOL_DBR *pr) db_lock(mdb); Mmsg(&mdb->cmd, -"UPDATE Pool SET NumVols=%d,MaxVols=%d,UseOnce=%d,UseCatalog=%d," +"UPDATE Pool SET NumVols=%u,MaxVols=%u,UseOnce=%d,UseCatalog=%d," "AcceptAnyVolume=%d,VolRetention='%s',VolUseDuration='%s'," "MaxVolJobs=%u,MaxVolFiles=%u,MaxVolBytes=%s,Recycle=%d," "AutoPrune=%d,LabelFormat='%s' WHERE PoolId=%u", diff --git a/bacula/src/dird/backup.c b/bacula/src/dird/backup.c index 54c97673e2..6960262d64 100644 --- a/bacula/src/dird/backup.c +++ b/bacula/src/dird/backup.c @@ -166,7 +166,7 @@ int do_backup(JCR *jcr) * */ Dmsg0(110, "Open connection with storage daemon\n"); - set_jcr_job_status(jcr, JS_Blocked); + set_jcr_job_status(jcr, JS_WaitSD); /* * Start conversation with Storage daemon */ @@ -187,7 +187,7 @@ int do_backup(JCR *jcr) } Dmsg0(150, "Storage daemon connection OK\n"); - set_jcr_job_status(jcr, JS_Blocked); + set_jcr_job_status(jcr, JS_WaitFD); if (!connect_to_file_daemon(jcr, 10, FDConnectTimeout, 1)) { goto bail_out; } diff --git a/bacula/src/dird/dird_conf.c b/bacula/src/dird/dird_conf.c index 6bb7211b9b..ac4a71447a 100644 --- a/bacula/src/dird/dird_conf.c +++ b/bacula/src/dird/dird_conf.c @@ -118,6 +118,7 @@ static struct res_items cli_items[] = { {"fileretention", store_time, ITEM(res_client.FileRetention), 0, ITEM_DEFAULT, 60*60*24*60}, {"jobretention", store_time, ITEM(res_client.JobRetention), 0, ITEM_DEFAULT, 60*60*24*180}, {"autoprune", store_yesno, ITEM(res_client.AutoPrune), 1, ITEM_DEFAULT, 1}, + {"maximumconcurrentjobs", store_pint, ITEM(res_client.MaxConcurrentJobs), 0, ITEM_DEFAULT, 1}, {NULL, NULL, NULL, 0, 0, 0} }; @@ -135,6 +136,7 @@ static struct res_items store_items[] = { {"device", store_strname, ITEM(res_store.dev_name), 0, ITEM_REQUIRED, 0}, {"mediatype", store_strname, ITEM(res_store.media_type), 0, ITEM_REQUIRED, 0}, {"autochanger", store_yesno, ITEM(res_store.autochanger), 1, ITEM_DEFAULT, 0}, + {"maximumconcurrentjobs", store_pint, ITEM(res_store.MaxConcurrentJobs), 0, ITEM_DEFAULT, 1}, {NULL, NULL, NULL, 0, 0, 0} }; @@ -189,6 +191,7 @@ static struct res_items job_items[] = { {"runafterjob", store_str, ITEM(res_job.RunAfterJob), 0, 0, 0}, {"spoolattributes", store_yesno, ITEM(res_job.SpoolAttributes), 1, ITEM_DEFAULT, 0}, {"writebootstrap", store_dir, ITEM(res_job.WriteBootstrap), 0, 0, 0}, + {"maximumconcurrentjobs", store_pint, ITEM(res_job.MaxConcurrentJobs), 0, ITEM_DEFAULT, 1}, {NULL, NULL, NULL, 0, 0, 0} }; @@ -383,7 +386,7 @@ void dump_resource(int type, RES *reshdr, void sendit(void *sock, char *fmt, ... switch (type) { case R_DIRECTOR: char ed1[30], ed2[30]; - sendit(sock, "Director: name=%s maxjobs=%d FDtimeout=%s SDtimeout=%s\n", + sendit(sock, "Director: name=%s MaxJobs=%d FDtimeout=%s SDtimeout=%s\n", reshdr->name, res->res_dir.MaxConcurrentJobs, edit_uint64(res->res_dir.FDConnectTimeout, ed1), edit_uint64(res->res_dir.SDConnectTimeout, ed2)); @@ -396,8 +399,9 @@ void dump_resource(int type, RES *reshdr, void sendit(void *sock, char *fmt, ... } break; case R_CLIENT: - sendit(sock, "Client: name=%s address=%s FDport=%d\n", - res->res_client.hdr.name, res->res_client.address, res->res_client.FDport); + sendit(sock, "Client: name=%s address=%s FDport=%d MaxJobs=%u\n", + res->res_client.hdr.name, res->res_client.address, res->res_client.FDport, + res->res_client.MaxConcurrentJobs); sendit(sock, " JobRetention=%" lld " FileRetention=%" lld " AutoPrune=%d\n", res->res_client.JobRetention, res->res_client.FileRetention, res->res_client.AutoPrune); @@ -407,9 +411,10 @@ void dump_resource(int type, RES *reshdr, void sendit(void *sock, char *fmt, ... } break; case R_STORAGE: - sendit(sock, "Storage: name=%s address=%s SDport=%d\n\ + sendit(sock, "Storage: name=%s address=%s SDport=%d MaxJobs=%u\n\ DeviceName=%s MediaType=%s\n", res->res_store.hdr.name, res->res_store.address, res->res_store.SDport, + res->res_store.MaxConcurrentJobs, res->res_store.dev_name, res->res_store.media_type); break; case R_CATALOG: @@ -419,8 +424,9 @@ void dump_resource(int type, RES *reshdr, void sendit(void *sock, char *fmt, ... res->res_cat.db_port, res->res_cat.db_name, NPRT(res->res_cat.db_user)); break; case R_JOB: - sendit(sock, "Job: name=%s JobType=%d level=%s\n", res->res_job.hdr.name, - res->res_job.JobType, level_to_str(res->res_job.level)); + sendit(sock, "Job: name=%s JobType=%d level=%s MaxJobs=%u\n", + res->res_job.hdr.name, res->res_job.JobType, + level_to_str(res->res_job.level), res->res_job.MaxConcurrentJobs); if (res->res_job.client) { sendit(sock, " --> "); dump_resource(-R_CLIENT, (RES *)res->res_job.client, sendit, sock); diff --git a/bacula/src/dird/dird_conf.h b/bacula/src/dird/dird_conf.h index 5572399c19..4cd6fba0bf 100644 --- a/bacula/src/dird/dird_conf.h +++ b/bacula/src/dird/dird_conf.h @@ -91,7 +91,7 @@ struct s_res_dir { char *pid_directory; /* PidDirectory */ char *subsys_directory; /* SubsysDirectory */ struct s_res_msgs *messages; /* Daemon message handler */ - int MaxConcurrentJobs; + uint32_t MaxConcurrentJobs; /* Max concurrent jobs for whole director */ utime_t FDConnectTimeout; /* timeout for connect in seconds */ utime_t SDConnectTimeout; /* timeout in seconds */ }; @@ -111,6 +111,8 @@ struct s_res_client { char *address; char *password; struct s_res_cat *catalog; /* Catalog resource */ + uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */ + semlock_t sem; /* client semaphore */ }; typedef struct s_res_client CLIENT; @@ -128,6 +130,8 @@ struct s_res_store { char *media_type; char *dev_name; int autochanger; /* set if autochanger */ + uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */ + semlock_t sem; /* storage semaphore */ }; typedef struct s_res_store STORE; @@ -170,13 +174,16 @@ struct s_res_job { int PruneFiles; /* Force pruning of Files */ int PruneVolumes; /* Force pruning of Volumes */ int SpoolAttributes; /* Set to spool attributes in SD */ - + uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */ + struct s_res_msgs *messages; /* How and where to send messages */ struct s_res_sch *schedule; /* When -- Automatic schedule */ struct s_res_client *client; /* Who to backup */ struct s_res_fs *fileset; /* What to backup -- Fileset */ struct s_res_store *storage; /* Where is device -- Storage daemon */ struct s_res_pool *pool; /* Where is media -- Media Pool */ + + semlock_t sem; /* Job semaphore */ }; typedef struct s_res_job JOB; diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index aff54c3175..b35be631d1 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -30,12 +30,16 @@ #include "dird.h" /* Forward referenced subroutines */ -static void job_thread(void *arg); +static void *job_thread(void *arg); static char *edit_run_codes(JCR *jcr, char *omsg, char *imsg); +static void release_resource_locks(JCR *jcr); +static int acquire_resource_locks(JCR *jcr); +#ifdef USE_SEMAPHORE +static void backoff_resource_locks(JCR *jcr, int count); +#endif /* Exported subroutines */ void run_job(JCR *jcr); -void init_job_server(int max_workers); /* Imported subroutines */ @@ -46,17 +50,34 @@ extern int do_restore(JCR *jcr); extern int do_verify(JCR *jcr); extern void backup_cleanup(void); +#ifdef USE_SEMAPHORE +static semlock_t job_lock; +static pthread_mutex_t mutex; +static pthread_cond_t resource_wait; +#else /* Queue of jobs to be run */ workq_t job_wq; /* our job work queue */ - +#endif void init_job_server(int max_workers) { int stat; +#ifdef USE_SEMAPHORE + if ((stat = sem_init(&job_lock, max_workers)) != 0) { + Emsg1(M_ABORT, 0, _("Could not init job lock: ERR=%s\n"), strerror(stat)); + } + if ((stat = pthread_mutex_init(&mutex, NULL)) != 0) { + Emsg1(M_ABORT, 0, _("Could not init resource mutex: ERR=%s\n"), strerror(stat)); + } + if ((stat = pthread_cond_init(&resource_wait, NULL)) != 0) { + Emsg1(M_ABORT, 0, _("Could not init resource wait: ERR=%s\n"), strerror(stat)); + } +#else if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) { Emsg1(M_ABORT, 0, _("Could not init job work queue: ERR=%s\n"), strerror(stat)); } +#endif return; } @@ -68,7 +89,11 @@ void init_job_server(int max_workers) void run_job(JCR *jcr) { int stat, errstat; +#ifdef USE_SEMAPHORE + pthread_t tid; +#else workq_ele_t *work_item; +#endif sm_check(__FILE__, __LINE__, True); init_msg(jcr, jcr->messages); @@ -123,12 +148,17 @@ void run_job(JCR *jcr) jcr->JobId, jcr->Job, jcr->jr.Type, jcr->jr.Level); Dmsg0(200, "Add jrc to work queue\n"); - +#ifdef USE_SEMAPHORE + if ((stat = pthread_create(&tid, NULL, job_thread, (void *)jcr)) != 0) { + Emsg1(M_ABORT, 0, _("Unable to create job thread: ERR=%s\n"), strerror(stat)); + } +#else /* Queue the job to be run */ if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) { Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat)); } jcr->work_item = work_item; +#endif Dmsg0(200, "Done run_job()\n"); } @@ -137,22 +167,28 @@ void run_job(JCR *jcr) * from the work queue. * At this point, we are running in our own thread */ -static void job_thread(void *arg) +static void *job_thread(void *arg) { time_t now; JCR *jcr = (JCR *)arg; + pthread_detach(pthread_self()); time(&now); sm_check(__FILE__, __LINE__, True); - Dmsg0(100, "=====Start Job=========\n"); + if (!acquire_resource_locks(jcr)) { + set_jcr_job_status(jcr, JS_Cancelled); + } + + Dmsg0(200, "=====Start Job=========\n"); jcr->start_time = now; /* set the real start time */ + Dmsg2(200, "jcr->JobStatus=%d %c\n", jcr->JobStatus, (char)jcr->JobStatus); if (job_cancelled(jcr)) { update_job_end_record(jcr); } else if (jcr->job->MaxStartDelay != 0 && jcr->job->MaxStartDelay < (utime_t)(jcr->start_time - jcr->sched_time)) { - Jmsg(jcr, M_FATAL, 0, _("Job cancelled because max delay time exceeded.\n")); - set_jcr_job_status(jcr, JS_ErrorTerminated); + Jmsg(jcr, M_FATAL, 0, _("Job cancelled because max start delay time exceeded.\n")); + set_jcr_job_status(jcr, JS_Cancelled); update_job_end_record(jcr); } else { @@ -204,10 +240,107 @@ static void job_thread(void *arg) free_pool_memory(after); } } + release_resource_locks(jcr); Dmsg0(50, "Before free jcr\n"); free_jcr(jcr); Dmsg0(50, "======== End Job ==========\n"); sm_check(__FILE__, __LINE__, True); + return NULL; +} + +static int acquire_resource_locks(JCR *jcr) +{ +#ifdef USE_SEMAPHORE + int stat; + + if (jcr->store->sem.valid != SEMLOCK_VALID) { + if ((stat = sem_init(&jcr->store->sem, jcr->store->MaxConcurrentJobs)) != 0) { + Emsg1(M_ABORT, 0, _("Could not init Storage semaphore: ERR=%s\n"), strerror(stat)); + } + } + if (jcr->client->sem.valid != SEMLOCK_VALID) { + if ((stat = sem_init(&jcr->client->sem, jcr->client->MaxConcurrentJobs)) != 0) { + Emsg1(M_ABORT, 0, _("Could not init Client semaphore: ERR=%s\n"), strerror(stat)); + } + } + if (jcr->job->sem.valid != SEMLOCK_VALID) { + if ((stat = sem_init(&jcr->job->sem, jcr->job->MaxConcurrentJobs)) != 0) { + Emsg1(M_ABORT, 0, _("Could not init Job semaphore: ERR=%s\n"), strerror(stat)); + } + } + + for ( ;; ) { + /* Acquire semaphore */ + set_jcr_job_status(jcr, JS_WaitJobRes); + if ((stat = sem_lock(&jcr->job->sem)) != 0) { + Emsg1(M_ABORT, 0, _("Could not acquire Job max jobs lock: ERR=%s\n"), strerror(stat)); + } + set_jcr_job_status(jcr, JS_WaitClientRes); + if ((stat = sem_trylock(&jcr->client->sem)) != 0) { + if (stat == EBUSY) { + backoff_resource_locks(jcr, 1); + goto wait; + } else { + Emsg1(M_ABORT, 0, _("Could not acquire Client max jobs lock: ERR=%s\n"), strerror(stat)); + } + } + set_jcr_job_status(jcr, JS_WaitStoreRes); + if ((stat = sem_trylock(&jcr->store->sem)) != 0) { + if (stat == EBUSY) { + backoff_resource_locks(jcr, 2); + goto wait; + } else { + Emsg1(M_ABORT, 0, _("Could not acquire Storage max jobs lock: ERR=%s\n"), strerror(stat)); + } + } + set_jcr_job_status(jcr, JS_WaitMaxJobs); + if ((stat = sem_trylock(&job_lock)) != 0) { + if (stat == EBUSY) { + backoff_resource_locks(jcr, 3); + goto wait; + } else { + Emsg1(M_ABORT, 0, _("Could not acquire max jobs lock: ERR=%s\n"), strerror(stat)); + } + } + break; + +wait: + P(mutex); + /* Wait for some resource to be released */ + pthread_cond_wait(&resource_wait, &mutex); + V(mutex); + /* Try again */ + } +#endif + return 1; +} + +#ifdef USE_SEMAPHORE +static void backoff_resource_locks(JCR *jcr, int count) +{ + switch (count) { + case 3: + sem_unlock(&jcr->store->sem); + case 2: + sem_unlock(&jcr->client->sem); + case 1: + sem_unlock(&jcr->job->sem); + break; + } +} +#endif + +static void release_resource_locks(JCR *jcr) +{ +#ifdef USE_SEMAPHORE + P(mutex); + sem_unlock(&jcr->store->sem); + sem_unlock(&jcr->client->sem); + sem_unlock(&jcr->job->sem); + sem_unlock(&job_lock); + pthread_cond_signal(&resource_wait); + V(mutex); +#endif } /* diff --git a/bacula/src/dird/newvol.c b/bacula/src/dird/newvol.c index f9aead3740..0b66a86a2b 100644 --- a/bacula/src/dird/newvol.c +++ b/bacula/src/dird/newvol.c @@ -44,6 +44,7 @@ int newVolume(JCR *jcr, MEDIA_DBR *mr) { POOL_DBR pr; char name[MAXSTRING]; + char num[20]; memset(&pr, 0, sizeof(pr)); @@ -55,17 +56,18 @@ int newVolume(JCR *jcr, MEDIA_DBR *mr) if (pr.MaxVols == 0 || pr.NumVols < pr.MaxVols) { set_pool_dbr_defaults_in_media_dbr(mr, &pr); mr->LabelDate = time(NULL); - strcpy(mr->MediaType, jcr->store->media_type); - strcpy(name, pr.LabelFormat); + bstrncpy(mr->MediaType, jcr->store->media_type, sizeof(mr->MediaType)); + bstrncpy(name, pr.LabelFormat, sizeof(name)); if (strchr(name, (int)'%') != NULL) { db_unlock(jcr->db); Jmsg(jcr, M_ERROR, 0, _("Illegal character in Label Format\n")); return 0; } - strcat(name, "%04d"); - sprintf(mr->VolumeName, name, ++pr.NumVols); + sprintf(num, "%04d", ++pr.NumVols); + bstrncpy(mr->VolumeName, name, sizeof(mr->VolumeName)); + bstrncat(mr->VolumeName, num, sizeof(mr->VolumeName)); if (db_create_media_record(jcr, jcr->db, mr) && - db_update_pool_record(jcr, jcr->db, &pr) == 1) { + db_update_pool_record(jcr, jcr->db, &pr)) { db_unlock(jcr->db); Dmsg1(90, "Created new Volume=%s\n", mr->VolumeName); return 1; diff --git a/bacula/src/dird/ua_cmds.c b/bacula/src/dird/ua_cmds.c index 7163d92dd1..23e77532bb 100644 --- a/bacula/src/dird/ua_cmds.c +++ b/bacula/src/dird/ua_cmds.c @@ -38,7 +38,9 @@ extern int r_first; extern int r_last; extern struct s_res resources[]; extern char my_name[]; +#ifndef USE_SEMAPHORE extern workq_t job_wq; /* work queue */ +#endif extern char *list_pool; @@ -416,7 +418,9 @@ static int cancelcmd(UAContext *ua, char *cmd) set_jcr_job_status(jcr, JS_Cancelled); bsendmsg(ua, _("JobId %d, Job %s marked to be cancelled.\n"), jcr->JobId, jcr->Job); +#ifndef USE_SEMAPHORE workq_remove(&job_wq, jcr->work_item); /* attempt to remove it from queue */ +#endif free_jcr(jcr); return 1; @@ -1337,11 +1341,13 @@ gotVol: bash_spaces(pr.Name); bnet_fsend(sd, _("label %s VolumeName=%s PoolName=%s MediaType=%s Slot=%d"), dev_name, mr.VolumeName, pr.Name, mr.MediaType, mr.Slot); - bsendmsg(ua, "Sending label command ...\n"); + bsendmsg(ua, _("Sending label command ...\n")); while (bget_msg(sd, 0) >= 0) { bsendmsg(ua, "%s", sd->msg); if (strncmp(sd->msg, "3000 OK label.", 14) == 0) { ok = TRUE; + } else { + bsendmsg(ua, _("Label command failed.\n")); } } ua->jcr->store_bsock = NULL; diff --git a/bacula/src/dird/ua_server.c b/bacula/src/dird/ua_server.c index d63b6ec176..40378b8b0f 100644 --- a/bacula/src/dird/ua_server.c +++ b/bacula/src/dird/ua_server.c @@ -50,7 +50,7 @@ int quit_cmd_thread = 0; /* Forward referenced functions */ static void *connect_thread(void *arg); -static void handle_UA_client_request(void *arg); +static void *handle_UA_client_request(void *arg); /* Global variables */ @@ -96,7 +96,7 @@ static void *connect_thread(void *arg) * Handle Director User Agent commands * */ -static void handle_UA_client_request(void *arg) +static void *handle_UA_client_request(void *arg) { int stat; UAContext ua; @@ -173,7 +173,7 @@ getout: if (ua.args) { free_pool_memory(ua.args); } - return; + return NULL; } /* diff --git a/bacula/src/dird/ua_status.c b/bacula/src/dird/ua_status.c index 49f6c85aef..e64b325e49 100644 --- a/bacula/src/dird/ua_status.c +++ b/bacula/src/dird/ua_status.c @@ -251,6 +251,19 @@ static void do_director_status(UAContext *ua, char *cmd) Mmsg(&msg, _("is waiting on Storage %s"), jcr->store->hdr.name); pool_mem = TRUE; break; + case JS_WaitStoreRes: + msg = _("is waiting on max Storage jobs"); + break; + case JS_WaitClientRes: + msg = _("is waiting on max Client jobs"); + break; + case JS_WaitJobRes: + msg = _("is waiting on max Job jobs"); + break; + case JS_WaitMaxJobs: + msg = _("is waiting on max total jobs"); + break; + default: msg = (char *) get_pool_memory(PM_FNAME); Mmsg(&msg, _("is in unknown state %c"), jcr->JobStatus); diff --git a/bacula/src/filed/filed.c b/bacula/src/filed/filed.c index 52bf099170..df71130740 100644 --- a/bacula/src/filed/filed.c +++ b/bacula/src/filed/filed.c @@ -30,7 +30,7 @@ #include "filed.h" /* Imported Functions */ -extern void handle_client_request(void *dir_sock); +extern void *handle_client_request(void *dir_sock); /* Forward referenced functions */ void terminate_filed(int sig); diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index 0669b5e63f..c96dabbd1c 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -65,6 +65,10 @@ #define JS_WaitSD 'S' /* waiting on the Storage daemon */ #define JS_WaitMedia 'm' /* waiting for new media */ #define JS_WaitMount 'M' /* waiting for Mount */ +#define JS_WaitStoreRes 's' /* Waiting for storage resource */ +#define JS_WaitJobRes 'j' /* Waiting for job resource */ +#define JS_WaitClientRes 'c' /* Waiting for Client resource */ +#define JS_WaitMaxJobs 'd' /* Waiting for maximum jobs */ #define job_cancelled(jcr) \ (jcr->JobStatus == JS_Cancelled || \ diff --git a/bacula/src/lib/Makefile.in b/bacula/src/lib/Makefile.in index bd866d1be2..1c6f6f15a6 100644 --- a/bacula/src/lib/Makefile.in +++ b/bacula/src/lib/Makefile.in @@ -37,7 +37,8 @@ LIBSRCS = alloc.c base64.c bmisc.c bnet.c bnet_server.c \ hmac.c idcache.c jcr.c lex.c \ md5.c message.c mem_pool.c parse_conf.c \ queue.c rwlock.c serial.c sha1.c \ - signal.c smartall.c tree.c util.c watchdog.c workq.c + semlock.c signal.c smartall.c tree.c \ + util.c watchdog.c workq.c # immortal.c filesys.c @@ -47,7 +48,8 @@ LIBOBJS = alloc.o base64.o bmisc.o bnet.o bnet_server.o \ hmac.o idcache.o jcr.o lex.o \ md5.o message.o mem_pool.o parse_conf.o \ queue.o rwlock.o serial.o sha1.o \ - signal.o smartall.o tree.o util.o watchdog.o workq.o + semlock.o signal.o smartall.o tree.o \ + util.o watchdog.o workq.o # immortal.o filesys.o diff --git a/bacula/src/lib/bnet_server.c b/bacula/src/lib/bnet_server.c index 16b5e6b513..ecf8fa38f2 100644 --- a/bacula/src/lib/bnet_server.c +++ b/bacula/src/lib/bnet_server.c @@ -46,7 +46,7 @@ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* Become Threaded Network Server */ void bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_wq, - void handle_client_request(void *bsock)) + void *handle_client_request(void *bsock)) { int newsockfd, sockfd, stat; socklen_t clilen; diff --git a/bacula/src/lib/lib.h b/bacula/src/lib/lib.h index b9bd672830..9eeb48fed3 100644 --- a/bacula/src/lib/lib.h +++ b/bacula/src/lib/lib.h @@ -39,6 +39,7 @@ #include "bshm.h" #include "workq.h" #include "rwlock.h" +#include "semlock.h" #include "queue.h" #include "serial.h" #ifndef HAVE_FNMATCH diff --git a/bacula/src/lib/protos.h b/bacula/src/lib/protos.h index 6f35b8ffbe..4041e8488f 100644 --- a/bacula/src/lib/protos.h +++ b/bacula/src/lib/protos.h @@ -117,7 +117,7 @@ int close_spool_file (void *vjcr, BSOCK *bs); /* bnet_server.c */ void bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_wq, - void handle_client_request(void *bsock)); + void *handle_client_request(void *bsock)); void bnet_server (int port, void handle_client_request(BSOCK *bsock)); int net_connect (int port); BSOCK * bnet_bind (int port); diff --git a/bacula/src/lib/rwlock.c b/bacula/src/lib/rwlock.c index 4ec8b85809..68adf340b7 100644 --- a/bacula/src/lib/rwlock.c +++ b/bacula/src/lib/rwlock.c @@ -13,7 +13,7 @@ * */ /* - Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker + Copyright (C) 2000-2003 Kern Sibbald and John Walker This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -38,12 +38,12 @@ * Initialize a read/write lock * * Returns: 0 on success - * errno on failure + * errno on failure */ int rwl_init(brwlock_t *rwl) { int stat; - + rwl->r_active = rwl->w_active = 0; rwl->r_wait = rwl->w_wait = 0; if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) { @@ -66,7 +66,7 @@ int rwl_init(brwlock_t *rwl) * Destroy a read/write lock * * Returns: 0 on success - * errno on failure + * errno on failure */ int rwl_destroy(brwlock_t *rwl) { @@ -99,7 +99,7 @@ int rwl_destroy(brwlock_t *rwl) if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) { return stat; } - stat = pthread_mutex_destroy(&rwl->mutex); + stat = pthread_mutex_destroy(&rwl->mutex); stat1 = pthread_cond_destroy(&rwl->read); stat2 = pthread_cond_destroy(&rwl->write); return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2)); @@ -143,19 +143,19 @@ int rwl_readlock(brwlock_t *rwl) return stat; } if (rwl->w_active) { - rwl->r_wait++; /* indicate that we are waiting */ + rwl->r_wait++; /* indicate that we are waiting */ pthread_cleanup_push(rwl_read_release, (void *)rwl); while (rwl->w_active) { - stat = pthread_cond_wait(&rwl->read, &rwl->mutex); - if (stat != 0) { - break; /* error, bail out */ - } + stat = pthread_cond_wait(&rwl->read, &rwl->mutex); + if (stat != 0) { + break; /* error, bail out */ + } } pthread_cleanup_pop(0); - rwl->r_wait--; /* we are no longer waiting */ + rwl->r_wait--; /* we are no longer waiting */ } if (stat == 0) { - rwl->r_active++; /* we are running */ + rwl->r_active++; /* we are running */ } pthread_mutex_unlock(&rwl->mutex); return stat; @@ -177,7 +177,7 @@ int rwl_readtrylock(brwlock_t *rwl) if (rwl->w_active) { stat = EBUSY; } else { - rwl->r_active++; /* we are running */ + rwl->r_active++; /* we are running */ } stat2 = pthread_mutex_unlock(&rwl->mutex); return (stat == 0 ? stat2 : stat); @@ -225,18 +225,18 @@ int rwl_writelock(brwlock_t *rwl) return 0; } if (rwl->w_active || rwl->r_active > 0) { - rwl->w_wait++; /* indicate that we are waiting */ + rwl->w_wait++; /* indicate that we are waiting */ pthread_cleanup_push(rwl_write_release, (void *)rwl); while (rwl->w_active || rwl->r_active > 0) { - if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) { - break; /* error, bail out */ - } + if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) { + break; /* error, bail out */ + } } pthread_cleanup_pop(0); - rwl->w_wait--; /* we are no longer waiting */ + rwl->w_wait--; /* we are no longer waiting */ } if (stat == 0) { - rwl->w_active = 1; /* we are running */ + rwl->w_active = 1; /* we are running */ rwl->writer_id = pthread_self(); /* save writer thread's id */ } pthread_mutex_unlock(&rwl->mutex); @@ -264,7 +264,7 @@ int rwl_writetrylock(brwlock_t *rwl) if (rwl->w_active || rwl->r_active > 0) { stat = EBUSY; } else { - rwl->w_active = 1; /* we are running */ + rwl->w_active = 1; /* we are running */ rwl->writer_id = pthread_self(); /* save writer thread's id */ } stat2 = pthread_mutex_unlock(&rwl->mutex); @@ -290,13 +290,13 @@ int rwl_writeunlock(brwlock_t *rwl) Emsg0(M_ABORT, 0, "rwl_writeunlock by non-owner.\n"); } if (rwl->w_active > 0) { - stat = 0; /* writers still active */ + stat = 0; /* writers still active */ } else { /* No more writers, awaken someone */ - if (rwl->r_wait > 0) { /* if readers waiting */ - stat = pthread_cond_broadcast(&rwl->read); + if (rwl->r_wait > 0) { /* if readers waiting */ + stat = pthread_cond_broadcast(&rwl->read); } else if (rwl->w_wait > 0) { - stat = pthread_cond_signal(&rwl->write); + stat = pthread_cond_signal(&rwl->write); } } stat2 = pthread_mutex_unlock(&rwl->mutex); @@ -350,43 +350,43 @@ void *thread_routine(void *arg) * lock). */ if ((iteration % self->interval) == 0) { - status = rwl_writelock(&data[element].lock); - if (status != 0) { + status = rwl_writelock(&data[element].lock); + if (status != 0) { Emsg1(M_ABORT, 0, "Write lock failed. ERR=%s\n", strerror(status)); - } - data[element].data = self->thread_num; - data[element].writes++; - self->writes++; - status = rwl_writeunlock(&data[element].lock); - if (status != 0) { + } + data[element].data = self->thread_num; + data[element].writes++; + self->writes++; + status = rwl_writeunlock(&data[element].lock); + if (status != 0) { Emsg1(M_ABORT, 0, "Write unlock failed. ERR=%s\n", strerror(status)); - } + } } else { - /* - * Look at the current data element to see whether - * the current thread last updated it. Count the - * times to report later. - */ - status = rwl_readlock(&data[element].lock); - if (status != 0) { + /* + * Look at the current data element to see whether + * the current thread last updated it. Count the + * times to report later. + */ + status = rwl_readlock(&data[element].lock); + if (status != 0) { Emsg1(M_ABORT, 0, "Read lock failed. ERR=%s\n", strerror(status)); - } - self->reads++; - if (data[element].data == self->thread_num) - repeats++; - status = rwl_readunlock(&data[element].lock); - if (status != 0) { + } + self->reads++; + if (data[element].data == self->thread_num) + repeats++; + status = rwl_readunlock(&data[element].lock); + if (status != 0) { Emsg1(M_ABORT, 0, "Read unlock failed. ERR=%s\n", strerror(status)); - } + } } element++; if (element >= DATASIZE) { - element = 0; + element = 0; } } if (repeats > 0) { Dmsg2(000, "Thread %d found unchanged elements %d times\n", - self->thread_num, repeats); + self->thread_num, repeats); } return NULL; } @@ -413,27 +413,27 @@ int main (int argc, char *argv[]) * Initialize the shared data. */ for (data_count = 0; data_count < DATASIZE; data_count++) { - data[data_count].data = 0; - data[data_count].writes = 0; - status = rwl_init (&data[data_count].lock); - if (status != 0) { + data[data_count].data = 0; + data[data_count].writes = 0; + status = rwl_init (&data[data_count].lock); + if (status != 0) { Emsg1(M_ABORT, 0, "Init rwlock failed. ERR=%s\n", strerror(status)); - } + } } /* * Create THREADS threads to access shared data. */ for (count = 0; count < THREADS; count++) { - threads[count].thread_num = count + 1; - threads[count].writes = 0; - threads[count].reads = 0; - threads[count].interval = rand_r (&seed) % 71; - status = pthread_create (&threads[count].thread_id, - NULL, thread_routine, (void*)&threads[count]); - if (status != 0) { + threads[count].thread_num = count + 1; + threads[count].writes = 0; + threads[count].reads = 0; + threads[count].interval = rand_r (&seed) % 71; + status = pthread_create (&threads[count].thread_id, + NULL, thread_routine, (void*)&threads[count]); + if (status != 0) { Emsg1(M_ABORT, 0, "Create thread failed. ERR=%s\n", strerror(status)); - } + } } /* @@ -441,28 +441,28 @@ int main (int argc, char *argv[]) * statistics. */ for (count = 0; count < THREADS; count++) { - status = pthread_join (threads[count].thread_id, NULL); - if (status != 0) { + status = pthread_join (threads[count].thread_id, NULL); + if (status != 0) { Emsg1(M_ABORT, 0, "Join thread failed. ERR=%s\n", strerror(status)); - } - thread_writes += threads[count].writes; + } + thread_writes += threads[count].writes; printf ("%02d: interval %d, writes %d, reads %d\n", - count, threads[count].interval, - threads[count].writes, threads[count].reads); + count, threads[count].interval, + threads[count].writes, threads[count].reads); } /* * Collect statistics for the data. */ for (data_count = 0; data_count < DATASIZE; data_count++) { - data_writes += data[data_count].writes; + data_writes += data[data_count].writes; printf ("data %02d: value %d, %d writes\n", - data_count, data[data_count].data, data[data_count].writes); - rwl_destroy (&data[data_count].lock); + data_count, data[data_count].data, data[data_count].writes); + rwl_destroy (&data[data_count].lock); } printf ("Total: %d thread writes, %d data writes\n", - thread_writes, data_writes); + thread_writes, data_writes); return 0; } @@ -482,29 +482,29 @@ int main (int argc, char *argv[]) #include "rwlock.h" #include "errors.h" -#define THREADS 5 -#define ITERATIONS 1000 -#define DATASIZE 15 +#define THREADS 5 +#define ITERATIONS 1000 +#define DATASIZE 15 /* * Keep statistics for each thread. */ typedef struct thread_tag { - int thread_num; - pthread_t thread_id; - int r_collisions; - int w_collisions; - int updates; - int interval; + int thread_num; + pthread_t thread_id; + int r_collisions; + int w_collisions; + int updates; + int interval; } thread_t; /* * Read-write lock and shared data */ typedef struct data_tag { - brwlock_t lock; - int data; - int updates; + brwlock_t lock; + int data; + int updates; } data_t; thread_t threads[THREADS]; @@ -520,38 +520,38 @@ void *thread_routine (void *arg) int element; int status; - element = 0; /* Current data element */ + element = 0; /* Current data element */ for (iteration = 0; iteration < ITERATIONS; iteration++) { - if ((iteration % self->interval) == 0) { - status = rwl_writetrylock (&data[element].lock); - if (status == EBUSY) - self->w_collisions++; - else if (status == 0) { - data[element].data++; - data[element].updates++; - self->updates++; - rwl_writeunlock (&data[element].lock); - } else + if ((iteration % self->interval) == 0) { + status = rwl_writetrylock (&data[element].lock); + if (status == EBUSY) + self->w_collisions++; + else if (status == 0) { + data[element].data++; + data[element].updates++; + self->updates++; + rwl_writeunlock (&data[element].lock); + } else err_abort (status, "Try write lock"); - } else { - status = rwl_readtrylock (&data[element].lock); - if (status == EBUSY) - self->r_collisions++; - else if (status != 0) { + } else { + status = rwl_readtrylock (&data[element].lock); + if (status == EBUSY) + self->r_collisions++; + else if (status != 0) { err_abort (status, "Try read lock"); - } else { - if (data[element].data != data[element].updates) + } else { + if (data[element].data != data[element].updates) printf ("%d: data[%d] %d != %d\n", - self->thread_num, element, - data[element].data, data[element].updates); - rwl_readunlock (&data[element].lock); - } - } - - element++; - if (element >= DATASIZE) - element = 0; + self->thread_num, element, + data[element].data, data[element].updates); + rwl_readunlock (&data[element].lock); + } + } + + element++; + if (element >= DATASIZE) + element = 0; } return NULL; } @@ -577,23 +577,23 @@ int main (int argc, char *argv[]) * Initialize the shared data. */ for (data_count = 0; data_count < DATASIZE; data_count++) { - data[data_count].data = 0; - data[data_count].updates = 0; - rwl_init (&data[data_count].lock); + data[data_count].data = 0; + data[data_count].updates = 0; + rwl_init (&data[data_count].lock); } /* * Create THREADS threads to access shared data. */ for (count = 0; count < THREADS; count++) { - threads[count].thread_num = count; - threads[count].r_collisions = 0; - threads[count].w_collisions = 0; - threads[count].updates = 0; - threads[count].interval = rand_r (&seed) % ITERATIONS; - status = pthread_create (&threads[count].thread_id, - NULL, thread_routine, (void*)&threads[count]); - if (status != 0) + threads[count].thread_num = count; + threads[count].r_collisions = 0; + threads[count].w_collisions = 0; + threads[count].updates = 0; + threads[count].interval = rand_r (&seed) % ITERATIONS; + status = pthread_create (&threads[count].thread_id, + NULL, thread_routine, (void*)&threads[count]); + if (status != 0) err_abort (status, "Create thread"); } @@ -602,25 +602,25 @@ int main (int argc, char *argv[]) * statistics. */ for (count = 0; count < THREADS; count++) { - status = pthread_join (threads[count].thread_id, NULL); - if (status != 0) + status = pthread_join (threads[count].thread_id, NULL); + if (status != 0) err_abort (status, "Join thread"); - thread_updates += threads[count].updates; + thread_updates += threads[count].updates; printf ("%02d: interval %d, updates %d, " "r_collisions %d, w_collisions %d\n", - count, threads[count].interval, - threads[count].updates, - threads[count].r_collisions, threads[count].w_collisions); + count, threads[count].interval, + threads[count].updates, + threads[count].r_collisions, threads[count].w_collisions); } /* * Collect statistics for the data. */ for (data_count = 0; data_count < DATASIZE; data_count++) { - data_updates += data[data_count].updates; + data_updates += data[data_count].updates; printf ("data %02d: value %d, %d updates\n", - data_count, data[data_count].data, data[data_count].updates); - rwl_destroy (&data[data_count].lock); + data_count, data[data_count].data, data[data_count].updates); + rwl_destroy (&data[data_count].lock); } return 0; diff --git a/bacula/src/lib/rwlock.h b/bacula/src/lib/rwlock.h index 4b7ca78852..d310e1b3da 100644 --- a/bacula/src/lib/rwlock.h +++ b/bacula/src/lib/rwlock.h @@ -11,7 +11,7 @@ * */ /* - Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker + Copyright (C) 2000-2003 Kern Sibbald and John Walker This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as diff --git a/bacula/src/lib/semlock.c b/bacula/src/lib/semlock.c new file mode 100644 index 0000000000..7682682056 --- /dev/null +++ b/bacula/src/lib/semlock.c @@ -0,0 +1,522 @@ +/* + * Bacula Semaphore code. This code permits setting up + * a semaphore that lets through a specified number + * of callers simultaneously. Once the number of callers + * exceed the limit, they block. + * + * Kern Sibbald, March MMIII + * + * Derived from rwlock.h which was in turn derived from code in + * "Programming with POSIX Threads" By David R. Butenhof + & + * Version $Id$ + * + */ +/* + Copyright (C) 2000-2003 Kern Sibbald and John Walker + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this program; if not, write to the Free + Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, + MA 02111-1307, USA. + + */ + +#include "bacula.h" + +/* + * Initialize a semaphore + * + * Returns: 0 on success + * errno on failure + */ +int sem_init(semlock_t *sem, int max_active) +{ + int stat; + + sem->active = sem->waiting = 0; + sem->max_active = max_active; + if ((stat = pthread_mutex_init(&sem->mutex, NULL)) != 0) { + return stat; + } + if ((stat = pthread_cond_init(&sem->wait, NULL)) != 0) { + pthread_mutex_destroy(&sem->mutex); + return stat; + } + sem->valid = SEMLOCK_VALID; + return 0; +} + +/* + * Destroy a semaphore + * + * Returns: 0 on success + * errno on failure + */ +int sem_destroy(semlock_t *sem) +{ + int stat, stat1; + + if (sem->valid != SEMLOCK_VALID) { + return EINVAL; + } + if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) { + return stat; + } + + /* + * If any threads are active, report EBUSY + */ + if (sem->active > 0) { + pthread_mutex_unlock(&sem->mutex); + return EBUSY; + } + + /* + * If any threads are waiting, report EBUSY + */ + if (sem->waiting > 0) { + pthread_mutex_unlock(&sem->mutex); + return EBUSY; + } + + sem->valid = 0; + if ((stat = pthread_mutex_unlock(&sem->mutex)) != 0) { + return stat; + } + stat = pthread_mutex_destroy(&sem->mutex); + stat1 = pthread_cond_destroy(&sem->wait); + return (stat != 0 ? stat : stat1); +} + +/* + * Handle cleanup when the wait lock condition variable + * wait is released. + */ +static void sem_release(void *arg) +{ + semlock_t *sem = (semlock_t *)arg; + + sem->waiting--; + pthread_mutex_unlock(&sem->mutex); +} + + +/* + * Lock semaphore, wait until locked (or error). + */ +int sem_lock(semlock_t *sem) +{ + int stat; + + if (sem->valid != SEMLOCK_VALID) { + return EINVAL; + } + if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) { + return stat; + } + if (sem->active >= sem->max_active) { + sem->waiting++; /* indicate that we are waiting */ + pthread_cleanup_push(sem_release, (void *)sem); + while (sem->active >= sem->max_active) { + if ((stat = pthread_cond_wait(&sem->wait, &sem->mutex)) != 0) { + break; /* error, bail out */ + } + } + pthread_cleanup_pop(0); + sem->waiting--; /* we are no longer waiting */ + } + if (stat == 0) { + sem->active++; /* we are running */ + } + pthread_mutex_unlock(&sem->mutex); + return stat; +} + +/* + * Attempt to lock semaphore, don't wait + */ +int sem_trylock(semlock_t *sem) +{ + int stat, stat1; + + if (sem->valid != SEMLOCK_VALID) { + return EINVAL; + } + if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) { + return stat; + } + + if (sem->active >= sem->max_active) { + stat = EBUSY; + } else { + sem->active++; /* we are running */ + } + stat1 = pthread_mutex_unlock(&sem->mutex); + return (stat == 0 ? stat1 : stat); +} + +/* + * Unlock semaphore + * Start any waiting callers + */ +int sem_unlock(semlock_t *sem) +{ + int stat, stat1; + + if (sem->valid != SEMLOCK_VALID) { + return EINVAL; + } + if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) { + return stat; + } + sem->active--; + if (sem->active < 0) { + Emsg0(M_ABORT, 0, "sem_unlock by non-owner.\n"); + } + if (sem->active >= sem->max_active) { + stat = 0; /* caller(s) still active */ + } else { + /* No more active, awaken someone */ + if (sem->waiting > 0) { /* if someone waiting */ + stat = pthread_cond_broadcast(&sem->wait); + } + } + stat1 = pthread_mutex_unlock(&sem->mutex); + return (stat == 0 ? stat1 : stat); +} + +#ifdef TEST_SEMLOCK + +#define THREADS 5 +#define DATASIZE 15 +#define ITERATIONS 10000 + +/* + * Keep statics for each thread. + */ +typedef struct thread_tag { + int thread_num; + pthread_t thread_id; + int writes; + int reads; + int interval; +} thread_t; + +/* + * Semaphore lock and shared data. + */ +typedef struct data_tag { + semlock_t lock; + int data; + int writes; +} data_t; + +thread_t threads[THREADS]; +data_t data[DATASIZE]; + +/* + * Thread start routine that uses semaphores locks. + */ +void *thread_routine(void *arg) +{ + thread_t *self = (thread_t *)arg; + int repeats = 0; + int iteration; + int element = 0; + int status; + + for (iteration=0; iteration < ITERATIONS; iteration++) { + /* + * Each "self->interval" iterations, perform an + * update operation (write lock instead of read + * lock). + */ + if ((iteration % self->interval) == 0) { + status = sem_writelock(&data[element].lock); + if (status != 0) { + Emsg1(M_ABORT, 0, "Write lock failed. ERR=%s\n", strerror(status)); + } + data[element].data = self->thread_num; + data[element].writes++; + self->writes++; + status = sem_writeunlock(&data[element].lock); + if (status != 0) { + Emsg1(M_ABORT, 0, "Write unlock failed. ERR=%s\n", strerror(status)); + } + } else { + /* + * Look at the current data element to see whether + * the current thread last updated it. Count the + * times to report later. + */ + status = sem_readlock(&data[element].lock); + if (status != 0) { + Emsg1(M_ABORT, 0, "Read lock failed. ERR=%s\n", strerror(status)); + } + self->reads++; + if (data[element].data == self->thread_num) + repeats++; + status = sem_readunlock(&data[element].lock); + if (status != 0) { + Emsg1(M_ABORT, 0, "Read unlock failed. ERR=%s\n", strerror(status)); + } + } + element++; + if (element >= DATASIZE) { + element = 0; + } + } + if (repeats > 0) { + Dmsg2(000, "Thread %d found unchanged elements %d times\n", + self->thread_num, repeats); + } + return NULL; +} + +int main (int argc, char *argv[]) +{ + int count; + int data_count; + int status; + unsigned int seed = 1; + int thread_writes = 0; + int data_writes = 0; + +#ifdef sun + /* + * On Solaris 2.5, threads are not timesliced. To ensure + * that our threads can run concurrently, we need to + * increase the concurrency level to THREADS. + */ + thr_setconcurrency (THREADS); +#endif + + /* + * Initialize the shared data. + */ + for (data_count = 0; data_count < DATASIZE; data_count++) { + data[data_count].data = 0; + data[data_count].writes = 0; + status = sem_init (&data[data_count].lock); + if (status != 0) { + Emsg1(M_ABORT, 0, "Init rwlock failed. ERR=%s\n", strerror(status)); + } + } + + /* + * Create THREADS threads to access shared data. + */ + for (count = 0; count < THREADS; count++) { + threads[count].thread_num = count + 1; + threads[count].writes = 0; + threads[count].reads = 0; + threads[count].interval = rand_r (&seed) % 71; + status = pthread_create(&threads[count].thread_id, + NULL, thread_routine, (void*)&threads[count]); + if (status != 0) { + Emsg1(M_ABORT, 0, "Create thread failed. ERR=%s\n", strerror(status)); + } + } + + /* + * Wait for all threads to complete, and collect + * statistics. + */ + for (count = 0; count < THREADS; count++) { + status = pthread_join (threads[count].thread_id, NULL); + if (status != 0) { + Emsg1(M_ABORT, 0, "Join thread failed. ERR=%s\n", strerror(status)); + } + thread_writes += threads[count].writes; + printf ("%02d: interval %d, writes %d, reads %d\n", + count, threads[count].interval, + threads[count].writes, threads[count].reads); + } + + /* + * Collect statistics for the data. + */ + for (data_count = 0; data_count < DATASIZE; data_count++) { + data_writes += data[data_count].writes; + printf ("data %02d: value %d, %d writes\n", + data_count, data[data_count].data, data[data_count].writes); + sem_destroy (&data[data_count].lock); + } + + printf ("Total: %d thread writes, %d data writes\n", + thread_writes, data_writes); + return 0; +} + +#endif + +#ifdef TEST_SEM_TRY_LOCK +/* + * semlock_try_main.c + * + * Demonstrate use of non-blocking read-write locks. + * + * Special notes: On a Solaris system, call thr_setconcurrency() + * to allow interleaved thread execution, since threads are not + * timesliced. + */ +#include +#include "semlock.h" +#include "errors.h" + +#define THREADS 5 +#define ITERATIONS 1000 +#define DATASIZE 15 + +/* + * Keep statistics for each thread. + */ +typedef struct thread_tag { + int thread_num; + pthread_t thread_id; + int r_collisions; + int w_collisions; + int updates; + int interval; +} thread_t; + +/* + * Read-write lock and shared data + */ +typedef struct data_tag { + semlock_t lock; + int data; + int updates; +} data_t; + +thread_t threads[THREADS]; +data_t data[DATASIZE]; + +/* + * Thread start routine that uses read-write locks + */ +void *thread_routine (void *arg) +{ + thread_t *self = (thread_t*)arg; + int iteration; + int element; + int status; + + element = 0; /* Current data element */ + + for (iteration = 0; iteration < ITERATIONS; iteration++) { + if ((iteration % self->interval) == 0) { + status = sem_writetrylock (&data[element].lock); + if (status == EBUSY) + self->w_collisions++; + else if (status == 0) { + data[element].data++; + data[element].updates++; + self->updates++; + sem_writeunlock (&data[element].lock); + } else + err_abort (status, "Try write lock"); + } else { + status = sem_readtrylock (&data[element].lock); + if (status == EBUSY) + self->r_collisions++; + else if (status != 0) { + err_abort (status, "Try read lock"); + } else { + if (data[element].data != data[element].updates) + printf ("%d: data[%d] %d != %d\n", + self->thread_num, element, + data[element].data, data[element].updates); + sem_readunlock (&data[element].lock); + } + } + + element++; + if (element >= DATASIZE) + element = 0; + } + return NULL; +} + +int main (int argc, char *argv[]) +{ + int count, data_count; + unsigned int seed = 1; + int thread_updates = 0, data_updates = 0; + int status; + +#ifdef sun + /* + * On Solaris 2.5, threads are not timesliced. To ensure + * that our threads can run concurrently, we need to + * increase the concurrency level to THREADS. + */ + DPRINTF (("Setting concurrency level to %d\n", THREADS)); + thr_setconcurrency (THREADS); +#endif + + /* + * Initialize the shared data. + */ + for (data_count = 0; data_count < DATASIZE; data_count++) { + data[data_count].data = 0; + data[data_count].updates = 0; + sem_init (&data[data_count].lock); + } + + /* + * Create THREADS threads to access shared data. + */ + for (count = 0; count < THREADS; count++) { + threads[count].thread_num = count; + threads[count].r_collisions = 0; + threads[count].w_collisions = 0; + threads[count].updates = 0; + threads[count].interval = rand_r (&seed) % ITERATIONS; + status = pthread_create (&threads[count].thread_id, + NULL, thread_routine, (void*)&threads[count]); + if (status != 0) + err_abort (status, "Create thread"); + } + + /* + * Wait for all threads to complete, and collect + * statistics. + */ + for (count = 0; count < THREADS; count++) { + status = pthread_join(threads[count].thread_id, NULL); + if (status != 0) + err_abort(status, "Join thread"); + thread_updates += threads[count].updates; + printf ("%02d: interval %d, updates %d, " + "r_collisions %d, w_collisions %d\n", + count, threads[count].interval, + threads[count].updates, + threads[count].r_collisions, threads[count].w_collisions); + } + + /* + * Collect statistics for the data. + */ + for (data_count = 0; data_count < DATASIZE; data_count++) { + data_updates += data[data_count].updates; + printf ("data %02d: value %d, %d updates\n", + data_count, data[data_count].data, data[data_count].updates); + sem_destroy(&data[data_count].lock); + } + + return 0; +} + +#endif diff --git a/bacula/src/lib/semlock.h b/bacula/src/lib/semlock.h new file mode 100644 index 0000000000..83c19c8092 --- /dev/null +++ b/bacula/src/lib/semlock.h @@ -0,0 +1,61 @@ +/* + * Bacula Semaphore code. This code permits setting up + * a semaphore that lets through a specified number + * of callers simultaneously. Once the number of callers + * exceed the limit, they block. + * + * Kern Sibbald, March MMIII + * + * Derived from rwlock.h which was in turn derived from code in + * "Programming with POSIX Threads" By David R. Butenhof + * + * Version $Id$ + */ +/* + Copyright (C) 2000-2003 Kern Sibbald and John Walker + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this program; if not, write to the Free + Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, + MA 02111-1307, USA. + + */ + +#ifndef __SEMLOCK_H +#define __SEMLOCK_H 1 + +typedef struct s_semlock_tag { + pthread_mutex_t mutex; /* main lock */ + pthread_cond_t wait; /* wait for available slot */ + int valid; /* set when valid */ + int waiting; /* number of callers waiting */ + int max_active; /* maximum active callers */ + int active; /* number of active callers */ +} semlock_t; + +#define SEMLOCK_VALID 0xfacade + +#define SEM_INIIALIZER \ + {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, \ + PTHREAD_COND_INITIALIZER, SEMLOCK_VALID, 0, 0, 0, 0} + +/* + * semaphore lock prototypes + */ +extern int sem_init(semlock_t *sem, int max_active); +extern int sem_destroy(semlock_t *sem); +extern int sem_lock(semlock_t *sem); +extern int sem_trylock(semlock_t *sem); +extern int sem_unlock(semlock_t *sem); + +#endif /* __SEMLOCK_H */ diff --git a/bacula/src/lib/workq.c b/bacula/src/lib/workq.c index 3ccc663923..dbe0001fff 100755 --- a/bacula/src/lib/workq.c +++ b/bacula/src/lib/workq.c @@ -56,12 +56,12 @@ static void *workq_server(void *arg); * Initialize a work queue * * Returns: 0 on success - * errno on failure + * errno on failure */ -int workq_init(workq_t *wq, int threads, void (*engine)(void *arg)) +int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg)) { int stat; - + if ((stat = pthread_attr_init(&wq->attr)) != 0) { return stat; } @@ -80,10 +80,10 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg)) } wq->quit = 0; wq->first = wq->last = NULL; - wq->max_workers = threads; /* max threads to create */ - wq->num_workers = 0; /* no threads yet */ - wq->idle_workers = 0; /* no idle threads */ - wq->engine = engine; /* routine to run */ + wq->max_workers = threads; /* max threads to create */ + wq->num_workers = 0; /* no threads yet */ + wq->idle_workers = 0; /* no idle threads */ + wq->engine = engine; /* routine to run */ wq->valid = WORKQ_VALID; return 0; } @@ -92,7 +92,7 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg)) * Destroy a work queue * * Returns: 0 on success - * errno on failure + * errno on failure */ int workq_destroy(workq_t *wq) { @@ -104,7 +104,7 @@ int workq_destroy(workq_t *wq) if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { return stat; } - wq->valid = 0; /* prevent any more operations */ + wq->valid = 0; /* prevent any more operations */ /* * If any threads are active, wake them @@ -112,22 +112,22 @@ int workq_destroy(workq_t *wq) if (wq->num_workers > 0) { wq->quit = 1; if (wq->idle_workers) { - if ((stat = pthread_cond_broadcast(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; - } + if ((stat = pthread_cond_broadcast(&wq->work)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } } while (wq->num_workers > 0) { - if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; - } + if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } } } if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { return stat; } - stat = pthread_mutex_destroy(&wq->mutex); + stat = pthread_mutex_destroy(&wq->mutex); stat1 = pthread_cond_destroy(&wq->work); stat2 = pthread_attr_destroy(&wq->attr); return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2)); @@ -138,10 +138,10 @@ int workq_destroy(workq_t *wq) * Add work to a queue * wq is a queue that was created with workq_init * element is a user unique item that will be passed to the - * processing routine + * processing routine * work_item will get internal work queue item -- if it is not NULL * priority if non-zero will cause the item to be placed on the - * head of the list instead of the tail. + * head of the list instead of the tail. */ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) { @@ -168,18 +168,18 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) if (priority) { /* Add to head of queue */ if (wq->first == NULL) { - wq->first = item; - wq->last = item; + wq->first = item; + wq->last = item; } else { - item->next = wq->first; - wq->first = item; + item->next = wq->first; + wq->first = item; } } else { /* Add to end of queue */ if (wq->first == NULL) { - wq->first = item; + wq->first = item; } else { - wq->last->next = item; + wq->last->next = item; } wq->last = item; } @@ -188,16 +188,16 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) if (wq->idle_workers > 0) { Dmsg0(200, "Signal worker\n"); if ((stat = pthread_cond_signal(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } } else if (wq->num_workers < wq->max_workers) { Dmsg0(200, "Create worker thread\n"); /* No idle threads so create a new one */ set_thread_concurrency(wq->max_workers + 1); if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } wq->num_workers++; } @@ -236,8 +236,8 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item) for (prev=item=wq->first; item; item=item->next) { if (item == work_item) { - found = 1; - break; + found = 1; + break; } prev = item; } @@ -249,7 +249,7 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item) if (wq->first != work_item) { prev->next = work_item->next; if (wq->last == work_item) { - wq->last = prev; + wq->last = prev; } work_item->next = wq->first; wq->first = work_item; @@ -259,16 +259,16 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item) if (wq->idle_workers > 0) { Dmsg0(200, "Signal worker\n"); if ((stat = pthread_cond_signal(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } } else { Dmsg0(200, "Create worker thread\n"); /* No idle threads so create a new one */ set_thread_concurrency(wq->max_workers + 1); if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } wq->num_workers++; } @@ -306,66 +306,66 @@ static void *workq_server(void *arg) timeout.tv_sec = tv.tv_sec + 2; while (wq->first == NULL && !wq->quit) { - /* - * Wait 2 seconds, then if no more work, exit - */ + /* + * Wait 2 seconds, then if no more work, exit + */ Dmsg0(200, "pthread_cond_timedwait()\n"); #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN - /* CYGWIN dies with a page fault the second - * time that pthread_cond_timedwait() is called - * so fake it out. - */ - pthread_mutex_lock(&wq->mutex); - stat = ETIMEDOUT; + /* CYGWIN dies with a page fault the second + * time that pthread_cond_timedwait() is called + * so fake it out. + */ + pthread_mutex_lock(&wq->mutex); + stat = ETIMEDOUT; #else - stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout); + stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout); #endif Dmsg1(200, "timedwait=%d\n", stat); - if (stat == ETIMEDOUT) { - timedout = 1; - break; - } else if (stat != 0) { + if (stat == ETIMEDOUT) { + timedout = 1; + break; + } else if (stat != 0) { /* This shouldn't happen */ Dmsg0(200, "This shouldn't happen\n"); - wq->num_workers--; - pthread_mutex_unlock(&wq->mutex); - return NULL; - } + wq->num_workers--; + pthread_mutex_unlock(&wq->mutex); + return NULL; + } } we = wq->first; if (we != NULL) { - wq->first = we->next; - if (wq->last == we) { - wq->last = NULL; - } - if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { - return NULL; - } + wq->first = we->next; + if (wq->last == we) { + wq->last = NULL; + } + if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { + return NULL; + } /* Call user's routine here */ Dmsg0(200, "Calling user engine.\n"); - wq->engine(we->data); + wq->engine(we->data); Dmsg0(200, "Back from user engine.\n"); - free(we); /* release work entry */ + free(we); /* release work entry */ Dmsg0(200, "relock mutex\n"); - if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { - return NULL; - } + if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { + return NULL; + } Dmsg0(200, "Done lock mutex\n"); } /* * If no more work request, and we are asked to quit, then do it */ if (wq->first == NULL && wq->quit) { - wq->num_workers--; - if (wq->num_workers == 0) { + wq->num_workers--; + if (wq->num_workers == 0) { Dmsg0(200, "Wake up destroy routine\n"); - /* Wake up destroy routine if he is waiting */ - pthread_cond_broadcast(&wq->work); - } + /* Wake up destroy routine if he is waiting */ + pthread_cond_broadcast(&wq->work); + } Dmsg0(200, "Unlock mutex\n"); - pthread_mutex_unlock(&wq->mutex); + pthread_mutex_unlock(&wq->mutex); Dmsg0(200, "Return from workq_server\n"); - return NULL; + return NULL; } Dmsg0(200, "Check for work request\n"); /* @@ -375,8 +375,8 @@ static void *workq_server(void *arg) Dmsg1(200, "timedout=%d\n", timedout); if (wq->first == NULL && timedout) { Dmsg0(200, "break big loop\n"); - wq->num_workers--; - break; + wq->num_workers--; + break; } Dmsg0(200, "Loop again\n"); } /* end of big for loop */ diff --git a/bacula/src/lib/workq.h b/bacula/src/lib/workq.h index 64ef4834f3..dc95fccdb6 100644 --- a/bacula/src/lib/workq.h +++ b/bacula/src/lib/workq.h @@ -53,7 +53,7 @@ typedef struct workq_tag { int max_workers; /* max threads */ int num_workers; /* current threads */ int idle_workers; /* idle threads */ - void (*engine)(void *arg); /* user engine */ + void *(*engine)(void *arg); /* user engine */ } workq_t; #define WORKQ_VALID 0xdec1992 @@ -61,7 +61,7 @@ typedef struct workq_tag { extern int workq_init( workq_t *wq, int threads, /* maximum threads */ - void (*engine)(void *) /* engine routine */ + void *(*engine)(void *) /* engine routine */ ); extern int workq_destroy(workq_t *wq); extern int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority); diff --git a/bacula/src/stored/dircmd.c b/bacula/src/stored/dircmd.c index 6d8dbf71b9..b8327afc5d 100644 --- a/bacula/src/stored/dircmd.c +++ b/bacula/src/stored/dircmd.c @@ -106,7 +106,7 @@ static struct s_cmds cmds[] = { * - We execute the command * - We continue or exit depending on the return status */ -void connection_request(void *arg) +void *connection_request(void *arg) { BSOCK *bs = (BSOCK *)arg; JCR *jcr; @@ -116,7 +116,7 @@ void connection_request(void *arg) if (bnet_recv(bs) <= 0) { Emsg0(M_ERROR, 0, "Connection request failed.\n"); - return; + return NULL; } /* @@ -124,7 +124,7 @@ void connection_request(void *arg) */ if (sscanf(bs->msg, "Hello Start Job %127s calling\n", name) == 1) { handle_filed_connection(bs, name); - return; + return NULL; } jcr = new_jcr(sizeof(JCR), stored_free_jcr); /* create Job Control Record */ @@ -138,7 +138,7 @@ void connection_request(void *arg) if (!authenticate_director(jcr)) { Jmsg(jcr, M_FATAL, 0, _("Unable to authenticate Director\n")); free_jcr(jcr); - return; + return NULL; } Dmsg0(90, "Message channel init completed.\n"); @@ -170,7 +170,7 @@ void connection_request(void *arg) bnet_sig(bs, BNET_TERMINATE); } free_jcr(jcr); - return; + return NULL; } /* diff --git a/bacula/src/stored/protos.h b/bacula/src/stored/protos.h index c74f40e7f5..a75a16055f 100644 --- a/bacula/src/stored/protos.h +++ b/bacula/src/stored/protos.h @@ -28,82 +28,82 @@ uint32_t new_VolSessionId(); /* From acquire.c */ -DEVICE *acquire_device_for_append(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -int acquire_device_for_read(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -int release_device(JCR *jcr, DEVICE *dev); +DEVICE *acquire_device_for_append(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int acquire_device_for_read(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int release_device(JCR *jcr, DEVICE *dev); /* From askdir.c */ -int dir_get_volume_info(JCR *jcr, int writing); -int dir_find_next_appendable_volume(JCR *jcr); -int dir_update_volume_info(JCR *jcr, VOLUME_CAT_INFO *vol, int relabel); -int dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev); -int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev); -int dir_update_file_attributes(JCR *jcr, DEV_RECORD *rec); -int dir_send_job_status(JCR *jcr); -int dir_create_jobmedia_record(JCR *jcr); +int dir_get_volume_info(JCR *jcr, int writing); +int dir_find_next_appendable_volume(JCR *jcr); +int dir_update_volume_info(JCR *jcr, VOLUME_CAT_INFO *vol, int relabel); +int dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev); +int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev); +int dir_update_file_attributes(JCR *jcr, DEV_RECORD *rec); +int dir_send_job_status(JCR *jcr); +int dir_create_jobmedia_record(JCR *jcr); /* authenticate.c */ -int authenticate_director(JCR *jcr); -int authenticate_filed(JCR *jcr); +int authenticate_director(JCR *jcr); +int authenticate_filed(JCR *jcr); /* From block.c */ -void dump_block(DEV_BLOCK *b, char *msg); +void dump_block(DEV_BLOCK *b, char *msg); DEV_BLOCK *new_block(DEVICE *dev); -void init_block_write(DEV_BLOCK *block); -void empty_block(DEV_BLOCK *block); -void free_block(DEV_BLOCK *block); -int write_block_to_device(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -int write_block_to_dev(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -int read_block_from_device(DEVICE *dev, DEV_BLOCK *block); -int read_block_from_dev(DEVICE *dev, DEV_BLOCK *block); +void init_block_write(DEV_BLOCK *block); +void empty_block(DEV_BLOCK *block); +void free_block(DEV_BLOCK *block); +int write_block_to_device(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int write_block_to_dev(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int read_block_from_device(DEVICE *dev, DEV_BLOCK *block); +int read_block_from_dev(DEVICE *dev, DEV_BLOCK *block); /* From butil.c -- utilities for SD tool programs */ -void print_ls_output(char *fname, char *link, int type, struct stat *statp); +void print_ls_output(char *fname, char *link, int type, struct stat *statp); JCR *setup_jcr(char *name, char *device, BSR *bsr, char *VolumeName); DEVICE *setup_to_access_device(JCR *jcr, int read_access); -void display_error_status(DEVICE *dev); +void display_error_status(DEVICE *dev); DEVRES *find_device_res(char *device_name, int read_access); /* From dev.c */ -DEVICE *init_dev(DEVICE *dev, DEVRES *device); -int open_dev(DEVICE *dev, char *VolName, int mode); -void close_dev(DEVICE *dev); -void force_close_dev(DEVICE *dev); -int truncate_dev(DEVICE *dev); -void term_dev(DEVICE *dev); -char * strerror_dev(DEVICE *dev); -void clrerror_dev(DEVICE *dev, int func); -int update_pos_dev(DEVICE *dev); -int rewind_dev(DEVICE *dev); -int load_dev(DEVICE *dev); -int offline_dev(DEVICE *dev); -int flush_dev(DEVICE *dev); -int weof_dev(DEVICE *dev, int num); -int write_block(DEVICE *dev); -int write_dev(DEVICE *dev, char *buf, size_t len); -int read_dev(DEVICE *dev, char *buf, size_t len); -int status_dev(DEVICE *dev, uint32_t *status); -int eod_dev(DEVICE *dev); -int fsf_dev(DEVICE *dev, int num); -int fsr_dev(DEVICE *dev, int num); -int bsf_dev(DEVICE *dev, int num); -int bsr_dev(DEVICE *dev, int num); -void attach_jcr_to_device(DEVICE *dev, JCR *jcr); -void detach_jcr_from_device(DEVICE *dev, JCR *jcr); -JCR *next_attached_jcr(DEVICE *dev, JCR *jcr); +DEVICE *init_dev(DEVICE *dev, DEVRES *device); +int open_dev(DEVICE *dev, char *VolName, int mode); +void close_dev(DEVICE *dev); +void force_close_dev(DEVICE *dev); +int truncate_dev(DEVICE *dev); +void term_dev(DEVICE *dev); +char * strerror_dev(DEVICE *dev); +void clrerror_dev(DEVICE *dev, int func); +int update_pos_dev(DEVICE *dev); +int rewind_dev(DEVICE *dev); +int load_dev(DEVICE *dev); +int offline_dev(DEVICE *dev); +int flush_dev(DEVICE *dev); +int weof_dev(DEVICE *dev, int num); +int write_block(DEVICE *dev); +int write_dev(DEVICE *dev, char *buf, size_t len); +int read_dev(DEVICE *dev, char *buf, size_t len); +int status_dev(DEVICE *dev, uint32_t *status); +int eod_dev(DEVICE *dev); +int fsf_dev(DEVICE *dev, int num); +int fsr_dev(DEVICE *dev, int num); +int bsf_dev(DEVICE *dev, int num); +int bsr_dev(DEVICE *dev, int num); +void attach_jcr_to_device(DEVICE *dev, JCR *jcr); +void detach_jcr_from_device(DEVICE *dev, JCR *jcr); +JCR *next_attached_jcr(DEVICE *dev, JCR *jcr); /* Get info about device */ -char * dev_name(DEVICE *dev); -char * dev_vol_name(DEVICE *dev); +char * dev_name(DEVICE *dev); +char * dev_vol_name(DEVICE *dev); uint32_t dev_block(DEVICE *dev); uint32_t dev_file(DEVICE *dev); -int dev_is_tape(DEVICE *dev); +int dev_is_tape(DEVICE *dev); /* From device.c */ -int open_device(DEVICE *dev); -int fixup_device_block_write_error(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int open_device(DEVICE *dev); +int fixup_device_block_write_error(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); void _lock_device(char *file, int line, DEVICE *dev); void _unlock_device(char *file, int line, DEVICE *dev); void _block_device(char *file, int line, DEVICE *dev, int state); @@ -119,40 +119,40 @@ void new_steal_device_lock(DEVICE *dev, brwsteal_t *hold, int state); void new_return_device_lock(DEVICE *dev, brwsteal_t *hold); /* From dircmd.c */ -void connection_request(void *arg); +void *connection_request(void *arg); /* From fd_cmds.c */ -void run_job(JCR *jcr); +void run_job(JCR *jcr); /* From fdmsg.c */ -int bget_msg(BSOCK *sock); +int bget_msg(BSOCK *sock); /* From job.c */ -void stored_free_jcr(JCR *jcr); -void connection_from_filed(void *arg); -void handle_filed_connection(BSOCK *fd, char *job_name); +void stored_free_jcr(JCR *jcr); +void connection_from_filed(void *arg); +void handle_filed_connection(BSOCK *fd, char *job_name); /* From label.c */ -int read_dev_volume_label(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -void create_session_label(JCR *jcr, DEV_RECORD *rec, int label); -void create_volume_label(DEVICE *dev, char *VolName); -int write_volume_label_to_dev(JCR *jcr, DEVRES *device, char *VolName, char *PoolName); -int write_session_label(JCR *jcr, DEV_BLOCK *block, int label); -int write_volume_label_to_block(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -void dump_volume_label(DEVICE *dev); -void dump_label_record(DEVICE *dev, DEV_RECORD *rec, int verbose); -int unser_volume_label(DEVICE *dev, DEV_RECORD *rec); -int unser_session_label(SESSION_LABEL *label, DEV_RECORD *rec); +int read_dev_volume_label(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +void create_session_label(JCR *jcr, DEV_RECORD *rec, int label); +void create_volume_label(DEVICE *dev, char *VolName); +int write_volume_label_to_dev(JCR *jcr, DEVRES *device, char *VolName, char *PoolName); +int write_session_label(JCR *jcr, DEV_BLOCK *block, int label); +int write_volume_label_to_block(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +void dump_volume_label(DEVICE *dev); +void dump_label_record(DEVICE *dev, DEV_RECORD *rec, int verbose); +int unser_volume_label(DEVICE *dev, DEV_RECORD *rec); +int unser_session_label(SESSION_LABEL *label, DEV_RECORD *rec); /* From match_bsr.c */ int match_bsr(BSR *bsr, DEV_RECORD *rec, VOLUME_LABEL *volrec, - SESSION_LABEL *sesrec); + SESSION_LABEL *sesrec); /* From mount.c */ -int mount_next_write_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block, int release); -int mount_next_read_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -int autoload_device(JCR *jcr, DEVICE *dev, int writing, BSOCK *dir); +int mount_next_write_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block, int release); +int mount_next_read_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int autoload_device(JCR *jcr, DEVICE *dev, int writing, BSOCK *dir); /* From parse_bsr.c */ @@ -167,11 +167,11 @@ extern void create_vol_list(JCR *jcr); /* From record.c */ char *FI_to_ascii(int fi); char *stream_to_ascii(int stream, int fi); -int write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec); -int can_write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec); -int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec); +int write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec); +int can_write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec); +int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec); DEV_RECORD *new_record(); -void free_record(DEV_RECORD *rec); +void free_record(DEV_RECORD *rec); /* From read_record.c */ int read_records(JCR *jcr, DEVICE *dev, diff --git a/bacula/src/version.h b/bacula/src/version.h index 0647d910d8..909e06a87c 100644 --- a/bacula/src/version.h +++ b/bacula/src/version.h @@ -1,8 +1,8 @@ /* */ #define VERSION "1.30" #define VSTRING "1" -#define BDATE "14 March 2003" -#define LSMDATE "14Mar03" +#define BDATE "17 March 2003" +#define LSMDATE "17Mar03" /* Debug flags */ #define DEBUG 1 @@ -10,6 +10,9 @@ #define SMCHECK #define TRACE_FILE 1 +/* Turn this on if you want to try the new Job semaphore code */ +/* #define USE_SEMAPHORE */ + /* IF you undefine this, Bacula will run 10X slower */ #define NO_POLL_TEST 1