- ***test GetFileAttributexEx, and remove MessageBox at 335 of winservice.cpp ****
For 1.30 release:
-- Cancelling of a queued job does NOT work!!!!!!
+- Refine SD waiting output:
+ Device is being positioned
+ > Device is being positioned for append
+ > Device is being positioned to file x
+ >
+- > Hello,
+>
+> Possibly your /etc/bacula/my_exclude is not
+> marked as executable. It MUST be executable.
+
+the script is executeabe (0777 for testing)
+
+> Another solution is simply to include the
+> find command in the exclude:
+> Exclude = {
+> "|find / -iname *.avi -o -iname *.mpg -o -iname *.mp3"
+> }
+
+it doesn't work :(
+
+i tried it on two different systems with bacula 1.29
+
+this works:
+Exclude {
+ <file.list
+ }
+
+Exclude {
+ *.avi
+ }
+
+this doesn't:
+Exclude {
+ "|find / -iname *.avi"
+ }
+
+Exclude {
+ "|/etc/bacula/my_exclude
+ }
+
+
+i dont understand it anymore :(
+
+- Issue message to mount a new tape before the rewind.
+- Simplified client job initiation for portables.
+- If SD cannot open a drive, make it periodically retry.
+- Implement LabelTemplate (at least first cut).
- Bevan Anderson suggests having a run queue for each device
so that multiple simultaneous jobs can run but each writing
to a different Volume.
-- Get two
-rufus-dir: Volume used once. Marking Volume "File0003" as Used.
-rufus-sd: Recycled volume File0003 on device /home/kern/bacula/working, all previous data lost.
-rufus-dir: Volume used once. Marking Volume "File0003" as Used.
-- Ability to backup to a file then later transfer to a tape -- Migration.
- Migration based on MaxJobs(MinJobs),MaxVols(MinVols),AgeJobs,MaxBytes(MinBytes)
- (i.e. HighwaterMark, LowwaterMark).
-- Eugeny Fisher <efischer@vip-rus.com> wants to cycle through a
- set of volumes recycling the oldest volume when it is needed.
- Fix "access not allowed" for backup of files on WinXP.
- Figure out some way to specify a retention period for files
that no longer exist on the machine -- so that we maintain
- Complete code in Bacula Resources -- this will permit
reading a new config file at any time.
- Handle ctl-c in Console
-- Implement LabelTemplate (at least first cut).
- Implement script driven addition of File daemon to config files.
- Think about how to make Bacula work better with File (non-tape) archives.
- Write Unix emulator for Windows.
- Put memory utilization in Status output of each daemon
if full status requested or if some level of debug on.
- Make database type selectable by .conf files i.e. at runtime
-- gethostbyname failure in bnet_connect() continues
- generating errors -- should stop.
- Set flag for uname -a. Add to Volume label.
- Implement throttled work queue.
- Check for EOT at ENOSPC or EIO or ENXIO (unix Pc)
- Implement Restore FileSet=
- Create a protocol.h and protocol.c where all protocol messages
are concentrated.
-- If SD cannot open a drive, make it periodically retry.
- Remove duplicate fields from jcr (e.g. jcr.level and jcr.jr.Level, ...).
- Timout a job or terminate if link goes down, or reopen link and query.
- Find general solution for sscanf size problems (as well
This could be the output of df; or perhaps some sort of /etc/mtab record.
Longer term to do:
-- Design at hierarchial storage for Bacula.
+- Design at hierarchial storage for Bacula. Migration and Clone.
- Implement FSM (File System Modules).
- Identify unchanged or "system" files and save them to a
special tape thus removing them from the standard
ERR=Operation not permited loop.
- Add code if there is no mtio.h (cannot do -- too many ioctl defines needed)
- Produce better error messages in when error/eof writing block.
+- Cancelling of a queued job does NOT work!!!!!!
+- Get two
+rufus-dir: Volume used once. Marking Volume "File0003" as Used.
+rufus-sd: Recycled volume File0003 on device /home/kern/bacula/working, all previous data lost.
+rufus-dir: Volume used once. Marking Volume "File0003" as Used.
+- Ability to backup to a file then later transfer to a tape -- Migration.
+ Migration based on MaxJobs(MinJobs),MaxVols(MinVols),AgeJobs,MaxBytes(MinBytes)
+ (i.e. HighwaterMark, LowwaterMark).
+- Eugeny Fisher <efischer@vip-rus.com> wants to cycle through a
+ set of volumes recycling the oldest volume when it is needed.
+- gethostbyname failure in bnet_connect() continues
+ generating errors -- should stop.
db_lock(mdb);
Mmsg(&mdb->cmd,
-"UPDATE Pool SET NumVols=%d,MaxVols=%d,UseOnce=%d,UseCatalog=%d,"
+"UPDATE Pool SET NumVols=%u,MaxVols=%u,UseOnce=%d,UseCatalog=%d,"
"AcceptAnyVolume=%d,VolRetention='%s',VolUseDuration='%s',"
"MaxVolJobs=%u,MaxVolFiles=%u,MaxVolBytes=%s,Recycle=%d,"
"AutoPrune=%d,LabelFormat='%s' WHERE PoolId=%u",
*
*/
Dmsg0(110, "Open connection with storage daemon\n");
- set_jcr_job_status(jcr, JS_Blocked);
+ set_jcr_job_status(jcr, JS_WaitSD);
/*
* Start conversation with Storage daemon
*/
}
Dmsg0(150, "Storage daemon connection OK\n");
- set_jcr_job_status(jcr, JS_Blocked);
+ set_jcr_job_status(jcr, JS_WaitFD);
if (!connect_to_file_daemon(jcr, 10, FDConnectTimeout, 1)) {
goto bail_out;
}
{"fileretention", store_time, ITEM(res_client.FileRetention), 0, ITEM_DEFAULT, 60*60*24*60},
{"jobretention", store_time, ITEM(res_client.JobRetention), 0, ITEM_DEFAULT, 60*60*24*180},
{"autoprune", store_yesno, ITEM(res_client.AutoPrune), 1, ITEM_DEFAULT, 1},
+ {"maximumconcurrentjobs", store_pint, ITEM(res_client.MaxConcurrentJobs), 0, ITEM_DEFAULT, 1},
{NULL, NULL, NULL, 0, 0, 0}
};
{"device", store_strname, ITEM(res_store.dev_name), 0, ITEM_REQUIRED, 0},
{"mediatype", store_strname, ITEM(res_store.media_type), 0, ITEM_REQUIRED, 0},
{"autochanger", store_yesno, ITEM(res_store.autochanger), 1, ITEM_DEFAULT, 0},
+ {"maximumconcurrentjobs", store_pint, ITEM(res_store.MaxConcurrentJobs), 0, ITEM_DEFAULT, 1},
{NULL, NULL, NULL, 0, 0, 0}
};
{"runafterjob", store_str, ITEM(res_job.RunAfterJob), 0, 0, 0},
{"spoolattributes", store_yesno, ITEM(res_job.SpoolAttributes), 1, ITEM_DEFAULT, 0},
{"writebootstrap", store_dir, ITEM(res_job.WriteBootstrap), 0, 0, 0},
+ {"maximumconcurrentjobs", store_pint, ITEM(res_job.MaxConcurrentJobs), 0, ITEM_DEFAULT, 1},
{NULL, NULL, NULL, 0, 0, 0}
};
switch (type) {
case R_DIRECTOR:
char ed1[30], ed2[30];
- sendit(sock, "Director: name=%s maxjobs=%d FDtimeout=%s SDtimeout=%s\n",
+ sendit(sock, "Director: name=%s MaxJobs=%d FDtimeout=%s SDtimeout=%s\n",
reshdr->name, res->res_dir.MaxConcurrentJobs,
edit_uint64(res->res_dir.FDConnectTimeout, ed1),
edit_uint64(res->res_dir.SDConnectTimeout, ed2));
}
break;
case R_CLIENT:
- sendit(sock, "Client: name=%s address=%s FDport=%d\n",
- res->res_client.hdr.name, res->res_client.address, res->res_client.FDport);
+ sendit(sock, "Client: name=%s address=%s FDport=%d MaxJobs=%u\n",
+ res->res_client.hdr.name, res->res_client.address, res->res_client.FDport,
+ res->res_client.MaxConcurrentJobs);
sendit(sock, " JobRetention=%" lld " FileRetention=%" lld " AutoPrune=%d\n",
res->res_client.JobRetention, res->res_client.FileRetention,
res->res_client.AutoPrune);
}
break;
case R_STORAGE:
- sendit(sock, "Storage: name=%s address=%s SDport=%d\n\
+ sendit(sock, "Storage: name=%s address=%s SDport=%d MaxJobs=%u\n\
DeviceName=%s MediaType=%s\n",
res->res_store.hdr.name, res->res_store.address, res->res_store.SDport,
+ res->res_store.MaxConcurrentJobs,
res->res_store.dev_name, res->res_store.media_type);
break;
case R_CATALOG:
res->res_cat.db_port, res->res_cat.db_name, NPRT(res->res_cat.db_user));
break;
case R_JOB:
- sendit(sock, "Job: name=%s JobType=%d level=%s\n", res->res_job.hdr.name,
- res->res_job.JobType, level_to_str(res->res_job.level));
+ sendit(sock, "Job: name=%s JobType=%d level=%s MaxJobs=%u\n",
+ res->res_job.hdr.name, res->res_job.JobType,
+ level_to_str(res->res_job.level), res->res_job.MaxConcurrentJobs);
if (res->res_job.client) {
sendit(sock, " --> ");
dump_resource(-R_CLIENT, (RES *)res->res_job.client, sendit, sock);
char *pid_directory; /* PidDirectory */
char *subsys_directory; /* SubsysDirectory */
struct s_res_msgs *messages; /* Daemon message handler */
- int MaxConcurrentJobs;
+ uint32_t MaxConcurrentJobs; /* Max concurrent jobs for whole director */
utime_t FDConnectTimeout; /* timeout for connect in seconds */
utime_t SDConnectTimeout; /* timeout in seconds */
};
char *address;
char *password;
struct s_res_cat *catalog; /* Catalog resource */
+ uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */
+ semlock_t sem; /* client semaphore */
};
typedef struct s_res_client CLIENT;
char *media_type;
char *dev_name;
int autochanger; /* set if autochanger */
+ uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */
+ semlock_t sem; /* storage semaphore */
};
typedef struct s_res_store STORE;
int PruneFiles; /* Force pruning of Files */
int PruneVolumes; /* Force pruning of Volumes */
int SpoolAttributes; /* Set to spool attributes in SD */
-
+ uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */
+
struct s_res_msgs *messages; /* How and where to send messages */
struct s_res_sch *schedule; /* When -- Automatic schedule */
struct s_res_client *client; /* Who to backup */
struct s_res_fs *fileset; /* What to backup -- Fileset */
struct s_res_store *storage; /* Where is device -- Storage daemon */
struct s_res_pool *pool; /* Where is media -- Media Pool */
+
+ semlock_t sem; /* Job semaphore */
};
typedef struct s_res_job JOB;
#include "dird.h"
/* Forward referenced subroutines */
-static void job_thread(void *arg);
+static void *job_thread(void *arg);
static char *edit_run_codes(JCR *jcr, char *omsg, char *imsg);
+static void release_resource_locks(JCR *jcr);
+static int acquire_resource_locks(JCR *jcr);
+#ifdef USE_SEMAPHORE
+static void backoff_resource_locks(JCR *jcr, int count);
+#endif
/* Exported subroutines */
void run_job(JCR *jcr);
-void init_job_server(int max_workers);
/* Imported subroutines */
extern int do_verify(JCR *jcr);
extern void backup_cleanup(void);
+#ifdef USE_SEMAPHORE
+static semlock_t job_lock;
+static pthread_mutex_t mutex;
+static pthread_cond_t resource_wait;
+#else
/* Queue of jobs to be run */
workq_t job_wq; /* our job work queue */
-
+#endif
void init_job_server(int max_workers)
{
int stat;
+#ifdef USE_SEMAPHORE
+ if ((stat = sem_init(&job_lock, max_workers)) != 0) {
+ Emsg1(M_ABORT, 0, _("Could not init job lock: ERR=%s\n"), strerror(stat));
+ }
+ if ((stat = pthread_mutex_init(&mutex, NULL)) != 0) {
+ Emsg1(M_ABORT, 0, _("Could not init resource mutex: ERR=%s\n"), strerror(stat));
+ }
+ if ((stat = pthread_cond_init(&resource_wait, NULL)) != 0) {
+ Emsg1(M_ABORT, 0, _("Could not init resource wait: ERR=%s\n"), strerror(stat));
+ }
+#else
if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
Emsg1(M_ABORT, 0, _("Could not init job work queue: ERR=%s\n"), strerror(stat));
}
+#endif
return;
}
void run_job(JCR *jcr)
{
int stat, errstat;
+#ifdef USE_SEMAPHORE
+ pthread_t tid;
+#else
workq_ele_t *work_item;
+#endif
sm_check(__FILE__, __LINE__, True);
init_msg(jcr, jcr->messages);
jcr->JobId, jcr->Job, jcr->jr.Type, jcr->jr.Level);
Dmsg0(200, "Add jrc to work queue\n");
-
+#ifdef USE_SEMAPHORE
+ if ((stat = pthread_create(&tid, NULL, job_thread, (void *)jcr)) != 0) {
+ Emsg1(M_ABORT, 0, _("Unable to create job thread: ERR=%s\n"), strerror(stat));
+ }
+#else
/* Queue the job to be run */
if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) {
Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat));
}
jcr->work_item = work_item;
+#endif
Dmsg0(200, "Done run_job()\n");
}
* from the work queue.
* At this point, we are running in our own thread
*/
-static void job_thread(void *arg)
+static void *job_thread(void *arg)
{
time_t now;
JCR *jcr = (JCR *)arg;
+ pthread_detach(pthread_self());
time(&now);
sm_check(__FILE__, __LINE__, True);
- Dmsg0(100, "=====Start Job=========\n");
+ if (!acquire_resource_locks(jcr)) {
+ set_jcr_job_status(jcr, JS_Cancelled);
+ }
+
+ Dmsg0(200, "=====Start Job=========\n");
jcr->start_time = now; /* set the real start time */
+ Dmsg2(200, "jcr->JobStatus=%d %c\n", jcr->JobStatus, (char)jcr->JobStatus);
if (job_cancelled(jcr)) {
update_job_end_record(jcr);
} else if (jcr->job->MaxStartDelay != 0 && jcr->job->MaxStartDelay <
(utime_t)(jcr->start_time - jcr->sched_time)) {
- Jmsg(jcr, M_FATAL, 0, _("Job cancelled because max delay time exceeded.\n"));
- set_jcr_job_status(jcr, JS_ErrorTerminated);
+ Jmsg(jcr, M_FATAL, 0, _("Job cancelled because max start delay time exceeded.\n"));
+ set_jcr_job_status(jcr, JS_Cancelled);
update_job_end_record(jcr);
} else {
free_pool_memory(after);
}
}
+ release_resource_locks(jcr);
Dmsg0(50, "Before free jcr\n");
free_jcr(jcr);
Dmsg0(50, "======== End Job ==========\n");
sm_check(__FILE__, __LINE__, True);
+ return NULL;
+}
+
+static int acquire_resource_locks(JCR *jcr)
+{
+#ifdef USE_SEMAPHORE
+ int stat;
+
+ if (jcr->store->sem.valid != SEMLOCK_VALID) {
+ if ((stat = sem_init(&jcr->store->sem, jcr->store->MaxConcurrentJobs)) != 0) {
+ Emsg1(M_ABORT, 0, _("Could not init Storage semaphore: ERR=%s\n"), strerror(stat));
+ }
+ }
+ if (jcr->client->sem.valid != SEMLOCK_VALID) {
+ if ((stat = sem_init(&jcr->client->sem, jcr->client->MaxConcurrentJobs)) != 0) {
+ Emsg1(M_ABORT, 0, _("Could not init Client semaphore: ERR=%s\n"), strerror(stat));
+ }
+ }
+ if (jcr->job->sem.valid != SEMLOCK_VALID) {
+ if ((stat = sem_init(&jcr->job->sem, jcr->job->MaxConcurrentJobs)) != 0) {
+ Emsg1(M_ABORT, 0, _("Could not init Job semaphore: ERR=%s\n"), strerror(stat));
+ }
+ }
+
+ for ( ;; ) {
+ /* Acquire semaphore */
+ set_jcr_job_status(jcr, JS_WaitJobRes);
+ if ((stat = sem_lock(&jcr->job->sem)) != 0) {
+ Emsg1(M_ABORT, 0, _("Could not acquire Job max jobs lock: ERR=%s\n"), strerror(stat));
+ }
+ set_jcr_job_status(jcr, JS_WaitClientRes);
+ if ((stat = sem_trylock(&jcr->client->sem)) != 0) {
+ if (stat == EBUSY) {
+ backoff_resource_locks(jcr, 1);
+ goto wait;
+ } else {
+ Emsg1(M_ABORT, 0, _("Could not acquire Client max jobs lock: ERR=%s\n"), strerror(stat));
+ }
+ }
+ set_jcr_job_status(jcr, JS_WaitStoreRes);
+ if ((stat = sem_trylock(&jcr->store->sem)) != 0) {
+ if (stat == EBUSY) {
+ backoff_resource_locks(jcr, 2);
+ goto wait;
+ } else {
+ Emsg1(M_ABORT, 0, _("Could not acquire Storage max jobs lock: ERR=%s\n"), strerror(stat));
+ }
+ }
+ set_jcr_job_status(jcr, JS_WaitMaxJobs);
+ if ((stat = sem_trylock(&job_lock)) != 0) {
+ if (stat == EBUSY) {
+ backoff_resource_locks(jcr, 3);
+ goto wait;
+ } else {
+ Emsg1(M_ABORT, 0, _("Could not acquire max jobs lock: ERR=%s\n"), strerror(stat));
+ }
+ }
+ break;
+
+wait:
+ P(mutex);
+ /* Wait for some resource to be released */
+ pthread_cond_wait(&resource_wait, &mutex);
+ V(mutex);
+ /* Try again */
+ }
+#endif
+ return 1;
+}
+
+#ifdef USE_SEMAPHORE
+static void backoff_resource_locks(JCR *jcr, int count)
+{
+ switch (count) {
+ case 3:
+ sem_unlock(&jcr->store->sem);
+ case 2:
+ sem_unlock(&jcr->client->sem);
+ case 1:
+ sem_unlock(&jcr->job->sem);
+ break;
+ }
+}
+#endif
+
+static void release_resource_locks(JCR *jcr)
+{
+#ifdef USE_SEMAPHORE
+ P(mutex);
+ sem_unlock(&jcr->store->sem);
+ sem_unlock(&jcr->client->sem);
+ sem_unlock(&jcr->job->sem);
+ sem_unlock(&job_lock);
+ pthread_cond_signal(&resource_wait);
+ V(mutex);
+#endif
}
/*
{
POOL_DBR pr;
char name[MAXSTRING];
+ char num[20];
memset(&pr, 0, sizeof(pr));
if (pr.MaxVols == 0 || pr.NumVols < pr.MaxVols) {
set_pool_dbr_defaults_in_media_dbr(mr, &pr);
mr->LabelDate = time(NULL);
- strcpy(mr->MediaType, jcr->store->media_type);
- strcpy(name, pr.LabelFormat);
+ bstrncpy(mr->MediaType, jcr->store->media_type, sizeof(mr->MediaType));
+ bstrncpy(name, pr.LabelFormat, sizeof(name));
if (strchr(name, (int)'%') != NULL) {
db_unlock(jcr->db);
Jmsg(jcr, M_ERROR, 0, _("Illegal character in Label Format\n"));
return 0;
}
- strcat(name, "%04d");
- sprintf(mr->VolumeName, name, ++pr.NumVols);
+ sprintf(num, "%04d", ++pr.NumVols);
+ bstrncpy(mr->VolumeName, name, sizeof(mr->VolumeName));
+ bstrncat(mr->VolumeName, num, sizeof(mr->VolumeName));
if (db_create_media_record(jcr, jcr->db, mr) &&
- db_update_pool_record(jcr, jcr->db, &pr) == 1) {
+ db_update_pool_record(jcr, jcr->db, &pr)) {
db_unlock(jcr->db);
Dmsg1(90, "Created new Volume=%s\n", mr->VolumeName);
return 1;
extern int r_last;
extern struct s_res resources[];
extern char my_name[];
+#ifndef USE_SEMAPHORE
extern workq_t job_wq; /* work queue */
+#endif
extern char *list_pool;
set_jcr_job_status(jcr, JS_Cancelled);
bsendmsg(ua, _("JobId %d, Job %s marked to be cancelled.\n"),
jcr->JobId, jcr->Job);
+#ifndef USE_SEMAPHORE
workq_remove(&job_wq, jcr->work_item); /* attempt to remove it from queue */
+#endif
free_jcr(jcr);
return 1;
bash_spaces(pr.Name);
bnet_fsend(sd, _("label %s VolumeName=%s PoolName=%s MediaType=%s Slot=%d"),
dev_name, mr.VolumeName, pr.Name, mr.MediaType, mr.Slot);
- bsendmsg(ua, "Sending label command ...\n");
+ bsendmsg(ua, _("Sending label command ...\n"));
while (bget_msg(sd, 0) >= 0) {
bsendmsg(ua, "%s", sd->msg);
if (strncmp(sd->msg, "3000 OK label.", 14) == 0) {
ok = TRUE;
+ } else {
+ bsendmsg(ua, _("Label command failed.\n"));
}
}
ua->jcr->store_bsock = NULL;
/* Forward referenced functions */
static void *connect_thread(void *arg);
-static void handle_UA_client_request(void *arg);
+static void *handle_UA_client_request(void *arg);
/* Global variables */
* Handle Director User Agent commands
*
*/
-static void handle_UA_client_request(void *arg)
+static void *handle_UA_client_request(void *arg)
{
int stat;
UAContext ua;
if (ua.args) {
free_pool_memory(ua.args);
}
- return;
+ return NULL;
}
/*
Mmsg(&msg, _("is waiting on Storage %s"), jcr->store->hdr.name);
pool_mem = TRUE;
break;
+ case JS_WaitStoreRes:
+ msg = _("is waiting on max Storage jobs");
+ break;
+ case JS_WaitClientRes:
+ msg = _("is waiting on max Client jobs");
+ break;
+ case JS_WaitJobRes:
+ msg = _("is waiting on max Job jobs");
+ break;
+ case JS_WaitMaxJobs:
+ msg = _("is waiting on max total jobs");
+ break;
+
default:
msg = (char *) get_pool_memory(PM_FNAME);
Mmsg(&msg, _("is in unknown state %c"), jcr->JobStatus);
#include "filed.h"
/* Imported Functions */
-extern void handle_client_request(void *dir_sock);
+extern void *handle_client_request(void *dir_sock);
/* Forward referenced functions */
void terminate_filed(int sig);
#define JS_WaitSD 'S' /* waiting on the Storage daemon */
#define JS_WaitMedia 'm' /* waiting for new media */
#define JS_WaitMount 'M' /* waiting for Mount */
+#define JS_WaitStoreRes 's' /* Waiting for storage resource */
+#define JS_WaitJobRes 'j' /* Waiting for job resource */
+#define JS_WaitClientRes 'c' /* Waiting for Client resource */
+#define JS_WaitMaxJobs 'd' /* Waiting for maximum jobs */
#define job_cancelled(jcr) \
(jcr->JobStatus == JS_Cancelled || \
hmac.c idcache.c jcr.c lex.c \
md5.c message.c mem_pool.c parse_conf.c \
queue.c rwlock.c serial.c sha1.c \
- signal.c smartall.c tree.c util.c watchdog.c workq.c
+ semlock.c signal.c smartall.c tree.c \
+ util.c watchdog.c workq.c
# immortal.c filesys.c
hmac.o idcache.o jcr.o lex.o \
md5.o message.o mem_pool.o parse_conf.o \
queue.o rwlock.o serial.o sha1.o \
- signal.o smartall.o tree.o util.o watchdog.o workq.o
+ semlock.o signal.o smartall.o tree.o \
+ util.o watchdog.o workq.o
# immortal.o filesys.o
/* Become Threaded Network Server */
void
bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_wq,
- void handle_client_request(void *bsock))
+ void *handle_client_request(void *bsock))
{
int newsockfd, sockfd, stat;
socklen_t clilen;
#include "bshm.h"
#include "workq.h"
#include "rwlock.h"
+#include "semlock.h"
#include "queue.h"
#include "serial.h"
#ifndef HAVE_FNMATCH
/* bnet_server.c */
void bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_wq,
- void handle_client_request(void *bsock));
+ void *handle_client_request(void *bsock));
void bnet_server (int port, void handle_client_request(BSOCK *bsock));
int net_connect (int port);
BSOCK * bnet_bind (int port);
*
*/
/*
- Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker
+ Copyright (C) 2000-2003 Kern Sibbald and John Walker
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
* Initialize a read/write lock
*
* Returns: 0 on success
- * errno on failure
+ * errno on failure
*/
int rwl_init(brwlock_t *rwl)
{
int stat;
-
+
rwl->r_active = rwl->w_active = 0;
rwl->r_wait = rwl->w_wait = 0;
if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
* Destroy a read/write lock
*
* Returns: 0 on success
- * errno on failure
+ * errno on failure
*/
int rwl_destroy(brwlock_t *rwl)
{
if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
return stat;
}
- stat = pthread_mutex_destroy(&rwl->mutex);
+ stat = pthread_mutex_destroy(&rwl->mutex);
stat1 = pthread_cond_destroy(&rwl->read);
stat2 = pthread_cond_destroy(&rwl->write);
return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
return stat;
}
if (rwl->w_active) {
- rwl->r_wait++; /* indicate that we are waiting */
+ rwl->r_wait++; /* indicate that we are waiting */
pthread_cleanup_push(rwl_read_release, (void *)rwl);
while (rwl->w_active) {
- stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
- if (stat != 0) {
- break; /* error, bail out */
- }
+ stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
+ if (stat != 0) {
+ break; /* error, bail out */
+ }
}
pthread_cleanup_pop(0);
- rwl->r_wait--; /* we are no longer waiting */
+ rwl->r_wait--; /* we are no longer waiting */
}
if (stat == 0) {
- rwl->r_active++; /* we are running */
+ rwl->r_active++; /* we are running */
}
pthread_mutex_unlock(&rwl->mutex);
return stat;
if (rwl->w_active) {
stat = EBUSY;
} else {
- rwl->r_active++; /* we are running */
+ rwl->r_active++; /* we are running */
}
stat2 = pthread_mutex_unlock(&rwl->mutex);
return (stat == 0 ? stat2 : stat);
return 0;
}
if (rwl->w_active || rwl->r_active > 0) {
- rwl->w_wait++; /* indicate that we are waiting */
+ rwl->w_wait++; /* indicate that we are waiting */
pthread_cleanup_push(rwl_write_release, (void *)rwl);
while (rwl->w_active || rwl->r_active > 0) {
- if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
- break; /* error, bail out */
- }
+ if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
+ break; /* error, bail out */
+ }
}
pthread_cleanup_pop(0);
- rwl->w_wait--; /* we are no longer waiting */
+ rwl->w_wait--; /* we are no longer waiting */
}
if (stat == 0) {
- rwl->w_active = 1; /* we are running */
+ rwl->w_active = 1; /* we are running */
rwl->writer_id = pthread_self(); /* save writer thread's id */
}
pthread_mutex_unlock(&rwl->mutex);
if (rwl->w_active || rwl->r_active > 0) {
stat = EBUSY;
} else {
- rwl->w_active = 1; /* we are running */
+ rwl->w_active = 1; /* we are running */
rwl->writer_id = pthread_self(); /* save writer thread's id */
}
stat2 = pthread_mutex_unlock(&rwl->mutex);
Emsg0(M_ABORT, 0, "rwl_writeunlock by non-owner.\n");
}
if (rwl->w_active > 0) {
- stat = 0; /* writers still active */
+ stat = 0; /* writers still active */
} else {
/* No more writers, awaken someone */
- if (rwl->r_wait > 0) { /* if readers waiting */
- stat = pthread_cond_broadcast(&rwl->read);
+ if (rwl->r_wait > 0) { /* if readers waiting */
+ stat = pthread_cond_broadcast(&rwl->read);
} else if (rwl->w_wait > 0) {
- stat = pthread_cond_signal(&rwl->write);
+ stat = pthread_cond_signal(&rwl->write);
}
}
stat2 = pthread_mutex_unlock(&rwl->mutex);
* lock).
*/
if ((iteration % self->interval) == 0) {
- status = rwl_writelock(&data[element].lock);
- if (status != 0) {
+ status = rwl_writelock(&data[element].lock);
+ if (status != 0) {
Emsg1(M_ABORT, 0, "Write lock failed. ERR=%s\n", strerror(status));
- }
- data[element].data = self->thread_num;
- data[element].writes++;
- self->writes++;
- status = rwl_writeunlock(&data[element].lock);
- if (status != 0) {
+ }
+ data[element].data = self->thread_num;
+ data[element].writes++;
+ self->writes++;
+ status = rwl_writeunlock(&data[element].lock);
+ if (status != 0) {
Emsg1(M_ABORT, 0, "Write unlock failed. ERR=%s\n", strerror(status));
- }
+ }
} else {
- /*
- * Look at the current data element to see whether
- * the current thread last updated it. Count the
- * times to report later.
- */
- status = rwl_readlock(&data[element].lock);
- if (status != 0) {
+ /*
+ * Look at the current data element to see whether
+ * the current thread last updated it. Count the
+ * times to report later.
+ */
+ status = rwl_readlock(&data[element].lock);
+ if (status != 0) {
Emsg1(M_ABORT, 0, "Read lock failed. ERR=%s\n", strerror(status));
- }
- self->reads++;
- if (data[element].data == self->thread_num)
- repeats++;
- status = rwl_readunlock(&data[element].lock);
- if (status != 0) {
+ }
+ self->reads++;
+ if (data[element].data == self->thread_num)
+ repeats++;
+ status = rwl_readunlock(&data[element].lock);
+ if (status != 0) {
Emsg1(M_ABORT, 0, "Read unlock failed. ERR=%s\n", strerror(status));
- }
+ }
}
element++;
if (element >= DATASIZE) {
- element = 0;
+ element = 0;
}
}
if (repeats > 0) {
Dmsg2(000, "Thread %d found unchanged elements %d times\n",
- self->thread_num, repeats);
+ self->thread_num, repeats);
}
return NULL;
}
* Initialize the shared data.
*/
for (data_count = 0; data_count < DATASIZE; data_count++) {
- data[data_count].data = 0;
- data[data_count].writes = 0;
- status = rwl_init (&data[data_count].lock);
- if (status != 0) {
+ data[data_count].data = 0;
+ data[data_count].writes = 0;
+ status = rwl_init (&data[data_count].lock);
+ if (status != 0) {
Emsg1(M_ABORT, 0, "Init rwlock failed. ERR=%s\n", strerror(status));
- }
+ }
}
/*
* Create THREADS threads to access shared data.
*/
for (count = 0; count < THREADS; count++) {
- threads[count].thread_num = count + 1;
- threads[count].writes = 0;
- threads[count].reads = 0;
- threads[count].interval = rand_r (&seed) % 71;
- status = pthread_create (&threads[count].thread_id,
- NULL, thread_routine, (void*)&threads[count]);
- if (status != 0) {
+ threads[count].thread_num = count + 1;
+ threads[count].writes = 0;
+ threads[count].reads = 0;
+ threads[count].interval = rand_r (&seed) % 71;
+ status = pthread_create (&threads[count].thread_id,
+ NULL, thread_routine, (void*)&threads[count]);
+ if (status != 0) {
Emsg1(M_ABORT, 0, "Create thread failed. ERR=%s\n", strerror(status));
- }
+ }
}
/*
* statistics.
*/
for (count = 0; count < THREADS; count++) {
- status = pthread_join (threads[count].thread_id, NULL);
- if (status != 0) {
+ status = pthread_join (threads[count].thread_id, NULL);
+ if (status != 0) {
Emsg1(M_ABORT, 0, "Join thread failed. ERR=%s\n", strerror(status));
- }
- thread_writes += threads[count].writes;
+ }
+ thread_writes += threads[count].writes;
printf ("%02d: interval %d, writes %d, reads %d\n",
- count, threads[count].interval,
- threads[count].writes, threads[count].reads);
+ count, threads[count].interval,
+ threads[count].writes, threads[count].reads);
}
/*
* Collect statistics for the data.
*/
for (data_count = 0; data_count < DATASIZE; data_count++) {
- data_writes += data[data_count].writes;
+ data_writes += data[data_count].writes;
printf ("data %02d: value %d, %d writes\n",
- data_count, data[data_count].data, data[data_count].writes);
- rwl_destroy (&data[data_count].lock);
+ data_count, data[data_count].data, data[data_count].writes);
+ rwl_destroy (&data[data_count].lock);
}
printf ("Total: %d thread writes, %d data writes\n",
- thread_writes, data_writes);
+ thread_writes, data_writes);
return 0;
}
#include "rwlock.h"
#include "errors.h"
-#define THREADS 5
-#define ITERATIONS 1000
-#define DATASIZE 15
+#define THREADS 5
+#define ITERATIONS 1000
+#define DATASIZE 15
/*
* Keep statistics for each thread.
*/
typedef struct thread_tag {
- int thread_num;
- pthread_t thread_id;
- int r_collisions;
- int w_collisions;
- int updates;
- int interval;
+ int thread_num;
+ pthread_t thread_id;
+ int r_collisions;
+ int w_collisions;
+ int updates;
+ int interval;
} thread_t;
/*
* Read-write lock and shared data
*/
typedef struct data_tag {
- brwlock_t lock;
- int data;
- int updates;
+ brwlock_t lock;
+ int data;
+ int updates;
} data_t;
thread_t threads[THREADS];
int element;
int status;
- element = 0; /* Current data element */
+ element = 0; /* Current data element */
for (iteration = 0; iteration < ITERATIONS; iteration++) {
- if ((iteration % self->interval) == 0) {
- status = rwl_writetrylock (&data[element].lock);
- if (status == EBUSY)
- self->w_collisions++;
- else if (status == 0) {
- data[element].data++;
- data[element].updates++;
- self->updates++;
- rwl_writeunlock (&data[element].lock);
- } else
+ if ((iteration % self->interval) == 0) {
+ status = rwl_writetrylock (&data[element].lock);
+ if (status == EBUSY)
+ self->w_collisions++;
+ else if (status == 0) {
+ data[element].data++;
+ data[element].updates++;
+ self->updates++;
+ rwl_writeunlock (&data[element].lock);
+ } else
err_abort (status, "Try write lock");
- } else {
- status = rwl_readtrylock (&data[element].lock);
- if (status == EBUSY)
- self->r_collisions++;
- else if (status != 0) {
+ } else {
+ status = rwl_readtrylock (&data[element].lock);
+ if (status == EBUSY)
+ self->r_collisions++;
+ else if (status != 0) {
err_abort (status, "Try read lock");
- } else {
- if (data[element].data != data[element].updates)
+ } else {
+ if (data[element].data != data[element].updates)
printf ("%d: data[%d] %d != %d\n",
- self->thread_num, element,
- data[element].data, data[element].updates);
- rwl_readunlock (&data[element].lock);
- }
- }
-
- element++;
- if (element >= DATASIZE)
- element = 0;
+ self->thread_num, element,
+ data[element].data, data[element].updates);
+ rwl_readunlock (&data[element].lock);
+ }
+ }
+
+ element++;
+ if (element >= DATASIZE)
+ element = 0;
}
return NULL;
}
* Initialize the shared data.
*/
for (data_count = 0; data_count < DATASIZE; data_count++) {
- data[data_count].data = 0;
- data[data_count].updates = 0;
- rwl_init (&data[data_count].lock);
+ data[data_count].data = 0;
+ data[data_count].updates = 0;
+ rwl_init (&data[data_count].lock);
}
/*
* Create THREADS threads to access shared data.
*/
for (count = 0; count < THREADS; count++) {
- threads[count].thread_num = count;
- threads[count].r_collisions = 0;
- threads[count].w_collisions = 0;
- threads[count].updates = 0;
- threads[count].interval = rand_r (&seed) % ITERATIONS;
- status = pthread_create (&threads[count].thread_id,
- NULL, thread_routine, (void*)&threads[count]);
- if (status != 0)
+ threads[count].thread_num = count;
+ threads[count].r_collisions = 0;
+ threads[count].w_collisions = 0;
+ threads[count].updates = 0;
+ threads[count].interval = rand_r (&seed) % ITERATIONS;
+ status = pthread_create (&threads[count].thread_id,
+ NULL, thread_routine, (void*)&threads[count]);
+ if (status != 0)
err_abort (status, "Create thread");
}
* statistics.
*/
for (count = 0; count < THREADS; count++) {
- status = pthread_join (threads[count].thread_id, NULL);
- if (status != 0)
+ status = pthread_join (threads[count].thread_id, NULL);
+ if (status != 0)
err_abort (status, "Join thread");
- thread_updates += threads[count].updates;
+ thread_updates += threads[count].updates;
printf ("%02d: interval %d, updates %d, "
"r_collisions %d, w_collisions %d\n",
- count, threads[count].interval,
- threads[count].updates,
- threads[count].r_collisions, threads[count].w_collisions);
+ count, threads[count].interval,
+ threads[count].updates,
+ threads[count].r_collisions, threads[count].w_collisions);
}
/*
* Collect statistics for the data.
*/
for (data_count = 0; data_count < DATASIZE; data_count++) {
- data_updates += data[data_count].updates;
+ data_updates += data[data_count].updates;
printf ("data %02d: value %d, %d updates\n",
- data_count, data[data_count].data, data[data_count].updates);
- rwl_destroy (&data[data_count].lock);
+ data_count, data[data_count].data, data[data_count].updates);
+ rwl_destroy (&data[data_count].lock);
}
return 0;
*
*/
/*
- Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker
+ Copyright (C) 2000-2003 Kern Sibbald and John Walker
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
--- /dev/null
+/*
+ * Bacula Semaphore code. This code permits setting up
+ * a semaphore that lets through a specified number
+ * of callers simultaneously. Once the number of callers
+ * exceed the limit, they block.
+ *
+ * Kern Sibbald, March MMIII
+ *
+ * Derived from rwlock.h which was in turn derived from code in
+ * "Programming with POSIX Threads" By David R. Butenhof
+ &
+ * Version $Id$
+ *
+ */
+/*
+ Copyright (C) 2000-2003 Kern Sibbald and John Walker
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public
+ License along with this program; if not, write to the Free
+ Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+ MA 02111-1307, USA.
+
+ */
+
+#include "bacula.h"
+
+/*
+ * Initialize a semaphore
+ *
+ * Returns: 0 on success
+ * errno on failure
+ */
+int sem_init(semlock_t *sem, int max_active)
+{
+ int stat;
+
+ sem->active = sem->waiting = 0;
+ sem->max_active = max_active;
+ if ((stat = pthread_mutex_init(&sem->mutex, NULL)) != 0) {
+ return stat;
+ }
+ if ((stat = pthread_cond_init(&sem->wait, NULL)) != 0) {
+ pthread_mutex_destroy(&sem->mutex);
+ return stat;
+ }
+ sem->valid = SEMLOCK_VALID;
+ return 0;
+}
+
+/*
+ * Destroy a semaphore
+ *
+ * Returns: 0 on success
+ * errno on failure
+ */
+int sem_destroy(semlock_t *sem)
+{
+ int stat, stat1;
+
+ if (sem->valid != SEMLOCK_VALID) {
+ return EINVAL;
+ }
+ if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
+ return stat;
+ }
+
+ /*
+ * If any threads are active, report EBUSY
+ */
+ if (sem->active > 0) {
+ pthread_mutex_unlock(&sem->mutex);
+ return EBUSY;
+ }
+
+ /*
+ * If any threads are waiting, report EBUSY
+ */
+ if (sem->waiting > 0) {
+ pthread_mutex_unlock(&sem->mutex);
+ return EBUSY;
+ }
+
+ sem->valid = 0;
+ if ((stat = pthread_mutex_unlock(&sem->mutex)) != 0) {
+ return stat;
+ }
+ stat = pthread_mutex_destroy(&sem->mutex);
+ stat1 = pthread_cond_destroy(&sem->wait);
+ return (stat != 0 ? stat : stat1);
+}
+
+/*
+ * Handle cleanup when the wait lock condition variable
+ * wait is released.
+ */
+static void sem_release(void *arg)
+{
+ semlock_t *sem = (semlock_t *)arg;
+
+ sem->waiting--;
+ pthread_mutex_unlock(&sem->mutex);
+}
+
+
+/*
+ * Lock semaphore, wait until locked (or error).
+ */
+int sem_lock(semlock_t *sem)
+{
+ int stat;
+
+ if (sem->valid != SEMLOCK_VALID) {
+ return EINVAL;
+ }
+ if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
+ return stat;
+ }
+ if (sem->active >= sem->max_active) {
+ sem->waiting++; /* indicate that we are waiting */
+ pthread_cleanup_push(sem_release, (void *)sem);
+ while (sem->active >= sem->max_active) {
+ if ((stat = pthread_cond_wait(&sem->wait, &sem->mutex)) != 0) {
+ break; /* error, bail out */
+ }
+ }
+ pthread_cleanup_pop(0);
+ sem->waiting--; /* we are no longer waiting */
+ }
+ if (stat == 0) {
+ sem->active++; /* we are running */
+ }
+ pthread_mutex_unlock(&sem->mutex);
+ return stat;
+}
+
+/*
+ * Attempt to lock semaphore, don't wait
+ */
+int sem_trylock(semlock_t *sem)
+{
+ int stat, stat1;
+
+ if (sem->valid != SEMLOCK_VALID) {
+ return EINVAL;
+ }
+ if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
+ return stat;
+ }
+
+ if (sem->active >= sem->max_active) {
+ stat = EBUSY;
+ } else {
+ sem->active++; /* we are running */
+ }
+ stat1 = pthread_mutex_unlock(&sem->mutex);
+ return (stat == 0 ? stat1 : stat);
+}
+
+/*
+ * Unlock semaphore
+ * Start any waiting callers
+ */
+int sem_unlock(semlock_t *sem)
+{
+ int stat, stat1;
+
+ if (sem->valid != SEMLOCK_VALID) {
+ return EINVAL;
+ }
+ if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
+ return stat;
+ }
+ sem->active--;
+ if (sem->active < 0) {
+ Emsg0(M_ABORT, 0, "sem_unlock by non-owner.\n");
+ }
+ if (sem->active >= sem->max_active) {
+ stat = 0; /* caller(s) still active */
+ } else {
+ /* No more active, awaken someone */
+ if (sem->waiting > 0) { /* if someone waiting */
+ stat = pthread_cond_broadcast(&sem->wait);
+ }
+ }
+ stat1 = pthread_mutex_unlock(&sem->mutex);
+ return (stat == 0 ? stat1 : stat);
+}
+
+#ifdef TEST_SEMLOCK
+
+#define THREADS 5
+#define DATASIZE 15
+#define ITERATIONS 10000
+
+/*
+ * Keep statics for each thread.
+ */
+typedef struct thread_tag {
+ int thread_num;
+ pthread_t thread_id;
+ int writes;
+ int reads;
+ int interval;
+} thread_t;
+
+/*
+ * Semaphore lock and shared data.
+ */
+typedef struct data_tag {
+ semlock_t lock;
+ int data;
+ int writes;
+} data_t;
+
+thread_t threads[THREADS];
+data_t data[DATASIZE];
+
+/*
+ * Thread start routine that uses semaphores locks.
+ */
+void *thread_routine(void *arg)
+{
+ thread_t *self = (thread_t *)arg;
+ int repeats = 0;
+ int iteration;
+ int element = 0;
+ int status;
+
+ for (iteration=0; iteration < ITERATIONS; iteration++) {
+ /*
+ * Each "self->interval" iterations, perform an
+ * update operation (write lock instead of read
+ * lock).
+ */
+ if ((iteration % self->interval) == 0) {
+ status = sem_writelock(&data[element].lock);
+ if (status != 0) {
+ Emsg1(M_ABORT, 0, "Write lock failed. ERR=%s\n", strerror(status));
+ }
+ data[element].data = self->thread_num;
+ data[element].writes++;
+ self->writes++;
+ status = sem_writeunlock(&data[element].lock);
+ if (status != 0) {
+ Emsg1(M_ABORT, 0, "Write unlock failed. ERR=%s\n", strerror(status));
+ }
+ } else {
+ /*
+ * Look at the current data element to see whether
+ * the current thread last updated it. Count the
+ * times to report later.
+ */
+ status = sem_readlock(&data[element].lock);
+ if (status != 0) {
+ Emsg1(M_ABORT, 0, "Read lock failed. ERR=%s\n", strerror(status));
+ }
+ self->reads++;
+ if (data[element].data == self->thread_num)
+ repeats++;
+ status = sem_readunlock(&data[element].lock);
+ if (status != 0) {
+ Emsg1(M_ABORT, 0, "Read unlock failed. ERR=%s\n", strerror(status));
+ }
+ }
+ element++;
+ if (element >= DATASIZE) {
+ element = 0;
+ }
+ }
+ if (repeats > 0) {
+ Dmsg2(000, "Thread %d found unchanged elements %d times\n",
+ self->thread_num, repeats);
+ }
+ return NULL;
+}
+
+int main (int argc, char *argv[])
+{
+ int count;
+ int data_count;
+ int status;
+ unsigned int seed = 1;
+ int thread_writes = 0;
+ int data_writes = 0;
+
+#ifdef sun
+ /*
+ * On Solaris 2.5, threads are not timesliced. To ensure
+ * that our threads can run concurrently, we need to
+ * increase the concurrency level to THREADS.
+ */
+ thr_setconcurrency (THREADS);
+#endif
+
+ /*
+ * Initialize the shared data.
+ */
+ for (data_count = 0; data_count < DATASIZE; data_count++) {
+ data[data_count].data = 0;
+ data[data_count].writes = 0;
+ status = sem_init (&data[data_count].lock);
+ if (status != 0) {
+ Emsg1(M_ABORT, 0, "Init rwlock failed. ERR=%s\n", strerror(status));
+ }
+ }
+
+ /*
+ * Create THREADS threads to access shared data.
+ */
+ for (count = 0; count < THREADS; count++) {
+ threads[count].thread_num = count + 1;
+ threads[count].writes = 0;
+ threads[count].reads = 0;
+ threads[count].interval = rand_r (&seed) % 71;
+ status = pthread_create(&threads[count].thread_id,
+ NULL, thread_routine, (void*)&threads[count]);
+ if (status != 0) {
+ Emsg1(M_ABORT, 0, "Create thread failed. ERR=%s\n", strerror(status));
+ }
+ }
+
+ /*
+ * Wait for all threads to complete, and collect
+ * statistics.
+ */
+ for (count = 0; count < THREADS; count++) {
+ status = pthread_join (threads[count].thread_id, NULL);
+ if (status != 0) {
+ Emsg1(M_ABORT, 0, "Join thread failed. ERR=%s\n", strerror(status));
+ }
+ thread_writes += threads[count].writes;
+ printf ("%02d: interval %d, writes %d, reads %d\n",
+ count, threads[count].interval,
+ threads[count].writes, threads[count].reads);
+ }
+
+ /*
+ * Collect statistics for the data.
+ */
+ for (data_count = 0; data_count < DATASIZE; data_count++) {
+ data_writes += data[data_count].writes;
+ printf ("data %02d: value %d, %d writes\n",
+ data_count, data[data_count].data, data[data_count].writes);
+ sem_destroy (&data[data_count].lock);
+ }
+
+ printf ("Total: %d thread writes, %d data writes\n",
+ thread_writes, data_writes);
+ return 0;
+}
+
+#endif
+
+#ifdef TEST_SEM_TRY_LOCK
+/*
+ * semlock_try_main.c
+ *
+ * Demonstrate use of non-blocking read-write locks.
+ *
+ * Special notes: On a Solaris system, call thr_setconcurrency()
+ * to allow interleaved thread execution, since threads are not
+ * timesliced.
+ */
+#include <pthread.h>
+#include "semlock.h"
+#include "errors.h"
+
+#define THREADS 5
+#define ITERATIONS 1000
+#define DATASIZE 15
+
+/*
+ * Keep statistics for each thread.
+ */
+typedef struct thread_tag {
+ int thread_num;
+ pthread_t thread_id;
+ int r_collisions;
+ int w_collisions;
+ int updates;
+ int interval;
+} thread_t;
+
+/*
+ * Read-write lock and shared data
+ */
+typedef struct data_tag {
+ semlock_t lock;
+ int data;
+ int updates;
+} data_t;
+
+thread_t threads[THREADS];
+data_t data[DATASIZE];
+
+/*
+ * Thread start routine that uses read-write locks
+ */
+void *thread_routine (void *arg)
+{
+ thread_t *self = (thread_t*)arg;
+ int iteration;
+ int element;
+ int status;
+
+ element = 0; /* Current data element */
+
+ for (iteration = 0; iteration < ITERATIONS; iteration++) {
+ if ((iteration % self->interval) == 0) {
+ status = sem_writetrylock (&data[element].lock);
+ if (status == EBUSY)
+ self->w_collisions++;
+ else if (status == 0) {
+ data[element].data++;
+ data[element].updates++;
+ self->updates++;
+ sem_writeunlock (&data[element].lock);
+ } else
+ err_abort (status, "Try write lock");
+ } else {
+ status = sem_readtrylock (&data[element].lock);
+ if (status == EBUSY)
+ self->r_collisions++;
+ else if (status != 0) {
+ err_abort (status, "Try read lock");
+ } else {
+ if (data[element].data != data[element].updates)
+ printf ("%d: data[%d] %d != %d\n",
+ self->thread_num, element,
+ data[element].data, data[element].updates);
+ sem_readunlock (&data[element].lock);
+ }
+ }
+
+ element++;
+ if (element >= DATASIZE)
+ element = 0;
+ }
+ return NULL;
+}
+
+int main (int argc, char *argv[])
+{
+ int count, data_count;
+ unsigned int seed = 1;
+ int thread_updates = 0, data_updates = 0;
+ int status;
+
+#ifdef sun
+ /*
+ * On Solaris 2.5, threads are not timesliced. To ensure
+ * that our threads can run concurrently, we need to
+ * increase the concurrency level to THREADS.
+ */
+ DPRINTF (("Setting concurrency level to %d\n", THREADS));
+ thr_setconcurrency (THREADS);
+#endif
+
+ /*
+ * Initialize the shared data.
+ */
+ for (data_count = 0; data_count < DATASIZE; data_count++) {
+ data[data_count].data = 0;
+ data[data_count].updates = 0;
+ sem_init (&data[data_count].lock);
+ }
+
+ /*
+ * Create THREADS threads to access shared data.
+ */
+ for (count = 0; count < THREADS; count++) {
+ threads[count].thread_num = count;
+ threads[count].r_collisions = 0;
+ threads[count].w_collisions = 0;
+ threads[count].updates = 0;
+ threads[count].interval = rand_r (&seed) % ITERATIONS;
+ status = pthread_create (&threads[count].thread_id,
+ NULL, thread_routine, (void*)&threads[count]);
+ if (status != 0)
+ err_abort (status, "Create thread");
+ }
+
+ /*
+ * Wait for all threads to complete, and collect
+ * statistics.
+ */
+ for (count = 0; count < THREADS; count++) {
+ status = pthread_join(threads[count].thread_id, NULL);
+ if (status != 0)
+ err_abort(status, "Join thread");
+ thread_updates += threads[count].updates;
+ printf ("%02d: interval %d, updates %d, "
+ "r_collisions %d, w_collisions %d\n",
+ count, threads[count].interval,
+ threads[count].updates,
+ threads[count].r_collisions, threads[count].w_collisions);
+ }
+
+ /*
+ * Collect statistics for the data.
+ */
+ for (data_count = 0; data_count < DATASIZE; data_count++) {
+ data_updates += data[data_count].updates;
+ printf ("data %02d: value %d, %d updates\n",
+ data_count, data[data_count].data, data[data_count].updates);
+ sem_destroy(&data[data_count].lock);
+ }
+
+ return 0;
+}
+
+#endif
--- /dev/null
+/*
+ * Bacula Semaphore code. This code permits setting up
+ * a semaphore that lets through a specified number
+ * of callers simultaneously. Once the number of callers
+ * exceed the limit, they block.
+ *
+ * Kern Sibbald, March MMIII
+ *
+ * Derived from rwlock.h which was in turn derived from code in
+ * "Programming with POSIX Threads" By David R. Butenhof
+ *
+ * Version $Id$
+ */
+/*
+ Copyright (C) 2000-2003 Kern Sibbald and John Walker
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public
+ License along with this program; if not, write to the Free
+ Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+ MA 02111-1307, USA.
+
+ */
+
+#ifndef __SEMLOCK_H
+#define __SEMLOCK_H 1
+
+typedef struct s_semlock_tag {
+ pthread_mutex_t mutex; /* main lock */
+ pthread_cond_t wait; /* wait for available slot */
+ int valid; /* set when valid */
+ int waiting; /* number of callers waiting */
+ int max_active; /* maximum active callers */
+ int active; /* number of active callers */
+} semlock_t;
+
+#define SEMLOCK_VALID 0xfacade
+
+#define SEM_INIIALIZER \
+ {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, \
+ PTHREAD_COND_INITIALIZER, SEMLOCK_VALID, 0, 0, 0, 0}
+
+/*
+ * semaphore lock prototypes
+ */
+extern int sem_init(semlock_t *sem, int max_active);
+extern int sem_destroy(semlock_t *sem);
+extern int sem_lock(semlock_t *sem);
+extern int sem_trylock(semlock_t *sem);
+extern int sem_unlock(semlock_t *sem);
+
+#endif /* __SEMLOCK_H */
* Initialize a work queue
*
* Returns: 0 on success
- * errno on failure
+ * errno on failure
*/
-int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
+int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
{
int stat;
-
+
if ((stat = pthread_attr_init(&wq->attr)) != 0) {
return stat;
}
}
wq->quit = 0;
wq->first = wq->last = NULL;
- wq->max_workers = threads; /* max threads to create */
- wq->num_workers = 0; /* no threads yet */
- wq->idle_workers = 0; /* no idle threads */
- wq->engine = engine; /* routine to run */
+ wq->max_workers = threads; /* max threads to create */
+ wq->num_workers = 0; /* no threads yet */
+ wq->idle_workers = 0; /* no idle threads */
+ wq->engine = engine; /* routine to run */
wq->valid = WORKQ_VALID;
return 0;
}
* Destroy a work queue
*
* Returns: 0 on success
- * errno on failure
+ * errno on failure
*/
int workq_destroy(workq_t *wq)
{
if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
return stat;
}
- wq->valid = 0; /* prevent any more operations */
+ wq->valid = 0; /* prevent any more operations */
/*
* If any threads are active, wake them
if (wq->num_workers > 0) {
wq->quit = 1;
if (wq->idle_workers) {
- if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
- }
+ if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
+ }
}
while (wq->num_workers > 0) {
- if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
- }
+ if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
+ }
}
}
if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
return stat;
}
- stat = pthread_mutex_destroy(&wq->mutex);
+ stat = pthread_mutex_destroy(&wq->mutex);
stat1 = pthread_cond_destroy(&wq->work);
stat2 = pthread_attr_destroy(&wq->attr);
return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
* Add work to a queue
* wq is a queue that was created with workq_init
* element is a user unique item that will be passed to the
- * processing routine
+ * processing routine
* work_item will get internal work queue item -- if it is not NULL
* priority if non-zero will cause the item to be placed on the
- * head of the list instead of the tail.
+ * head of the list instead of the tail.
*/
int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
{
if (priority) {
/* Add to head of queue */
if (wq->first == NULL) {
- wq->first = item;
- wq->last = item;
+ wq->first = item;
+ wq->last = item;
} else {
- item->next = wq->first;
- wq->first = item;
+ item->next = wq->first;
+ wq->first = item;
}
} else {
/* Add to end of queue */
if (wq->first == NULL) {
- wq->first = item;
+ wq->first = item;
} else {
- wq->last->next = item;
+ wq->last->next = item;
}
wq->last = item;
}
if (wq->idle_workers > 0) {
Dmsg0(200, "Signal worker\n");
if ((stat = pthread_cond_signal(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
}
} else if (wq->num_workers < wq->max_workers) {
Dmsg0(200, "Create worker thread\n");
/* No idle threads so create a new one */
set_thread_concurrency(wq->max_workers + 1);
if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
}
wq->num_workers++;
}
for (prev=item=wq->first; item; item=item->next) {
if (item == work_item) {
- found = 1;
- break;
+ found = 1;
+ break;
}
prev = item;
}
if (wq->first != work_item) {
prev->next = work_item->next;
if (wq->last == work_item) {
- wq->last = prev;
+ wq->last = prev;
}
work_item->next = wq->first;
wq->first = work_item;
if (wq->idle_workers > 0) {
Dmsg0(200, "Signal worker\n");
if ((stat = pthread_cond_signal(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
}
} else {
Dmsg0(200, "Create worker thread\n");
/* No idle threads so create a new one */
set_thread_concurrency(wq->max_workers + 1);
if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
}
wq->num_workers++;
}
timeout.tv_sec = tv.tv_sec + 2;
while (wq->first == NULL && !wq->quit) {
- /*
- * Wait 2 seconds, then if no more work, exit
- */
+ /*
+ * Wait 2 seconds, then if no more work, exit
+ */
Dmsg0(200, "pthread_cond_timedwait()\n");
#ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
- /* CYGWIN dies with a page fault the second
- * time that pthread_cond_timedwait() is called
- * so fake it out.
- */
- pthread_mutex_lock(&wq->mutex);
- stat = ETIMEDOUT;
+ /* CYGWIN dies with a page fault the second
+ * time that pthread_cond_timedwait() is called
+ * so fake it out.
+ */
+ pthread_mutex_lock(&wq->mutex);
+ stat = ETIMEDOUT;
#else
- stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
+ stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
#endif
Dmsg1(200, "timedwait=%d\n", stat);
- if (stat == ETIMEDOUT) {
- timedout = 1;
- break;
- } else if (stat != 0) {
+ if (stat == ETIMEDOUT) {
+ timedout = 1;
+ break;
+ } else if (stat != 0) {
/* This shouldn't happen */
Dmsg0(200, "This shouldn't happen\n");
- wq->num_workers--;
- pthread_mutex_unlock(&wq->mutex);
- return NULL;
- }
+ wq->num_workers--;
+ pthread_mutex_unlock(&wq->mutex);
+ return NULL;
+ }
}
we = wq->first;
if (we != NULL) {
- wq->first = we->next;
- if (wq->last == we) {
- wq->last = NULL;
- }
- if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
- return NULL;
- }
+ wq->first = we->next;
+ if (wq->last == we) {
+ wq->last = NULL;
+ }
+ if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
+ return NULL;
+ }
/* Call user's routine here */
Dmsg0(200, "Calling user engine.\n");
- wq->engine(we->data);
+ wq->engine(we->data);
Dmsg0(200, "Back from user engine.\n");
- free(we); /* release work entry */
+ free(we); /* release work entry */
Dmsg0(200, "relock mutex\n");
- if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
- return NULL;
- }
+ if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
+ return NULL;
+ }
Dmsg0(200, "Done lock mutex\n");
}
/*
* If no more work request, and we are asked to quit, then do it
*/
if (wq->first == NULL && wq->quit) {
- wq->num_workers--;
- if (wq->num_workers == 0) {
+ wq->num_workers--;
+ if (wq->num_workers == 0) {
Dmsg0(200, "Wake up destroy routine\n");
- /* Wake up destroy routine if he is waiting */
- pthread_cond_broadcast(&wq->work);
- }
+ /* Wake up destroy routine if he is waiting */
+ pthread_cond_broadcast(&wq->work);
+ }
Dmsg0(200, "Unlock mutex\n");
- pthread_mutex_unlock(&wq->mutex);
+ pthread_mutex_unlock(&wq->mutex);
Dmsg0(200, "Return from workq_server\n");
- return NULL;
+ return NULL;
}
Dmsg0(200, "Check for work request\n");
/*
Dmsg1(200, "timedout=%d\n", timedout);
if (wq->first == NULL && timedout) {
Dmsg0(200, "break big loop\n");
- wq->num_workers--;
- break;
+ wq->num_workers--;
+ break;
}
Dmsg0(200, "Loop again\n");
} /* end of big for loop */
int max_workers; /* max threads */
int num_workers; /* current threads */
int idle_workers; /* idle threads */
- void (*engine)(void *arg); /* user engine */
+ void *(*engine)(void *arg); /* user engine */
} workq_t;
#define WORKQ_VALID 0xdec1992
extern int workq_init(
workq_t *wq,
int threads, /* maximum threads */
- void (*engine)(void *) /* engine routine */
+ void *(*engine)(void *) /* engine routine */
);
extern int workq_destroy(workq_t *wq);
extern int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority);
* - We execute the command
* - We continue or exit depending on the return status
*/
-void connection_request(void *arg)
+void *connection_request(void *arg)
{
BSOCK *bs = (BSOCK *)arg;
JCR *jcr;
if (bnet_recv(bs) <= 0) {
Emsg0(M_ERROR, 0, "Connection request failed.\n");
- return;
+ return NULL;
}
/*
*/
if (sscanf(bs->msg, "Hello Start Job %127s calling\n", name) == 1) {
handle_filed_connection(bs, name);
- return;
+ return NULL;
}
jcr = new_jcr(sizeof(JCR), stored_free_jcr); /* create Job Control Record */
if (!authenticate_director(jcr)) {
Jmsg(jcr, M_FATAL, 0, _("Unable to authenticate Director\n"));
free_jcr(jcr);
- return;
+ return NULL;
}
Dmsg0(90, "Message channel init completed.\n");
bnet_sig(bs, BNET_TERMINATE);
}
free_jcr(jcr);
- return;
+ return NULL;
}
/*
uint32_t new_VolSessionId();
/* From acquire.c */
-DEVICE *acquire_device_for_append(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-int acquire_device_for_read(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-int release_device(JCR *jcr, DEVICE *dev);
+DEVICE *acquire_device_for_append(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int acquire_device_for_read(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int release_device(JCR *jcr, DEVICE *dev);
/* From askdir.c */
-int dir_get_volume_info(JCR *jcr, int writing);
-int dir_find_next_appendable_volume(JCR *jcr);
-int dir_update_volume_info(JCR *jcr, VOLUME_CAT_INFO *vol, int relabel);
-int dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev);
-int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev);
-int dir_update_file_attributes(JCR *jcr, DEV_RECORD *rec);
-int dir_send_job_status(JCR *jcr);
-int dir_create_jobmedia_record(JCR *jcr);
+int dir_get_volume_info(JCR *jcr, int writing);
+int dir_find_next_appendable_volume(JCR *jcr);
+int dir_update_volume_info(JCR *jcr, VOLUME_CAT_INFO *vol, int relabel);
+int dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev);
+int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev);
+int dir_update_file_attributes(JCR *jcr, DEV_RECORD *rec);
+int dir_send_job_status(JCR *jcr);
+int dir_create_jobmedia_record(JCR *jcr);
/* authenticate.c */
-int authenticate_director(JCR *jcr);
-int authenticate_filed(JCR *jcr);
+int authenticate_director(JCR *jcr);
+int authenticate_filed(JCR *jcr);
/* From block.c */
-void dump_block(DEV_BLOCK *b, char *msg);
+void dump_block(DEV_BLOCK *b, char *msg);
DEV_BLOCK *new_block(DEVICE *dev);
-void init_block_write(DEV_BLOCK *block);
-void empty_block(DEV_BLOCK *block);
-void free_block(DEV_BLOCK *block);
-int write_block_to_device(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-int write_block_to_dev(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-int read_block_from_device(DEVICE *dev, DEV_BLOCK *block);
-int read_block_from_dev(DEVICE *dev, DEV_BLOCK *block);
+void init_block_write(DEV_BLOCK *block);
+void empty_block(DEV_BLOCK *block);
+void free_block(DEV_BLOCK *block);
+int write_block_to_device(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int write_block_to_dev(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int read_block_from_device(DEVICE *dev, DEV_BLOCK *block);
+int read_block_from_dev(DEVICE *dev, DEV_BLOCK *block);
/* From butil.c -- utilities for SD tool programs */
-void print_ls_output(char *fname, char *link, int type, struct stat *statp);
+void print_ls_output(char *fname, char *link, int type, struct stat *statp);
JCR *setup_jcr(char *name, char *device, BSR *bsr, char *VolumeName);
DEVICE *setup_to_access_device(JCR *jcr, int read_access);
-void display_error_status(DEVICE *dev);
+void display_error_status(DEVICE *dev);
DEVRES *find_device_res(char *device_name, int read_access);
/* From dev.c */
-DEVICE *init_dev(DEVICE *dev, DEVRES *device);
-int open_dev(DEVICE *dev, char *VolName, int mode);
-void close_dev(DEVICE *dev);
-void force_close_dev(DEVICE *dev);
-int truncate_dev(DEVICE *dev);
-void term_dev(DEVICE *dev);
-char * strerror_dev(DEVICE *dev);
-void clrerror_dev(DEVICE *dev, int func);
-int update_pos_dev(DEVICE *dev);
-int rewind_dev(DEVICE *dev);
-int load_dev(DEVICE *dev);
-int offline_dev(DEVICE *dev);
-int flush_dev(DEVICE *dev);
-int weof_dev(DEVICE *dev, int num);
-int write_block(DEVICE *dev);
-int write_dev(DEVICE *dev, char *buf, size_t len);
-int read_dev(DEVICE *dev, char *buf, size_t len);
-int status_dev(DEVICE *dev, uint32_t *status);
-int eod_dev(DEVICE *dev);
-int fsf_dev(DEVICE *dev, int num);
-int fsr_dev(DEVICE *dev, int num);
-int bsf_dev(DEVICE *dev, int num);
-int bsr_dev(DEVICE *dev, int num);
-void attach_jcr_to_device(DEVICE *dev, JCR *jcr);
-void detach_jcr_from_device(DEVICE *dev, JCR *jcr);
-JCR *next_attached_jcr(DEVICE *dev, JCR *jcr);
+DEVICE *init_dev(DEVICE *dev, DEVRES *device);
+int open_dev(DEVICE *dev, char *VolName, int mode);
+void close_dev(DEVICE *dev);
+void force_close_dev(DEVICE *dev);
+int truncate_dev(DEVICE *dev);
+void term_dev(DEVICE *dev);
+char * strerror_dev(DEVICE *dev);
+void clrerror_dev(DEVICE *dev, int func);
+int update_pos_dev(DEVICE *dev);
+int rewind_dev(DEVICE *dev);
+int load_dev(DEVICE *dev);
+int offline_dev(DEVICE *dev);
+int flush_dev(DEVICE *dev);
+int weof_dev(DEVICE *dev, int num);
+int write_block(DEVICE *dev);
+int write_dev(DEVICE *dev, char *buf, size_t len);
+int read_dev(DEVICE *dev, char *buf, size_t len);
+int status_dev(DEVICE *dev, uint32_t *status);
+int eod_dev(DEVICE *dev);
+int fsf_dev(DEVICE *dev, int num);
+int fsr_dev(DEVICE *dev, int num);
+int bsf_dev(DEVICE *dev, int num);
+int bsr_dev(DEVICE *dev, int num);
+void attach_jcr_to_device(DEVICE *dev, JCR *jcr);
+void detach_jcr_from_device(DEVICE *dev, JCR *jcr);
+JCR *next_attached_jcr(DEVICE *dev, JCR *jcr);
/* Get info about device */
-char * dev_name(DEVICE *dev);
-char * dev_vol_name(DEVICE *dev);
+char * dev_name(DEVICE *dev);
+char * dev_vol_name(DEVICE *dev);
uint32_t dev_block(DEVICE *dev);
uint32_t dev_file(DEVICE *dev);
-int dev_is_tape(DEVICE *dev);
+int dev_is_tape(DEVICE *dev);
/* From device.c */
-int open_device(DEVICE *dev);
-int fixup_device_block_write_error(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int open_device(DEVICE *dev);
+int fixup_device_block_write_error(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
void _lock_device(char *file, int line, DEVICE *dev);
void _unlock_device(char *file, int line, DEVICE *dev);
void _block_device(char *file, int line, DEVICE *dev, int state);
void new_return_device_lock(DEVICE *dev, brwsteal_t *hold);
/* From dircmd.c */
-void connection_request(void *arg);
+void *connection_request(void *arg);
/* From fd_cmds.c */
-void run_job(JCR *jcr);
+void run_job(JCR *jcr);
/* From fdmsg.c */
-int bget_msg(BSOCK *sock);
+int bget_msg(BSOCK *sock);
/* From job.c */
-void stored_free_jcr(JCR *jcr);
-void connection_from_filed(void *arg);
-void handle_filed_connection(BSOCK *fd, char *job_name);
+void stored_free_jcr(JCR *jcr);
+void connection_from_filed(void *arg);
+void handle_filed_connection(BSOCK *fd, char *job_name);
/* From label.c */
-int read_dev_volume_label(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-void create_session_label(JCR *jcr, DEV_RECORD *rec, int label);
-void create_volume_label(DEVICE *dev, char *VolName);
-int write_volume_label_to_dev(JCR *jcr, DEVRES *device, char *VolName, char *PoolName);
-int write_session_label(JCR *jcr, DEV_BLOCK *block, int label);
-int write_volume_label_to_block(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-void dump_volume_label(DEVICE *dev);
-void dump_label_record(DEVICE *dev, DEV_RECORD *rec, int verbose);
-int unser_volume_label(DEVICE *dev, DEV_RECORD *rec);
-int unser_session_label(SESSION_LABEL *label, DEV_RECORD *rec);
+int read_dev_volume_label(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+void create_session_label(JCR *jcr, DEV_RECORD *rec, int label);
+void create_volume_label(DEVICE *dev, char *VolName);
+int write_volume_label_to_dev(JCR *jcr, DEVRES *device, char *VolName, char *PoolName);
+int write_session_label(JCR *jcr, DEV_BLOCK *block, int label);
+int write_volume_label_to_block(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+void dump_volume_label(DEVICE *dev);
+void dump_label_record(DEVICE *dev, DEV_RECORD *rec, int verbose);
+int unser_volume_label(DEVICE *dev, DEV_RECORD *rec);
+int unser_session_label(SESSION_LABEL *label, DEV_RECORD *rec);
/* From match_bsr.c */
int match_bsr(BSR *bsr, DEV_RECORD *rec, VOLUME_LABEL *volrec,
- SESSION_LABEL *sesrec);
+ SESSION_LABEL *sesrec);
/* From mount.c */
-int mount_next_write_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block, int release);
-int mount_next_read_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-int autoload_device(JCR *jcr, DEVICE *dev, int writing, BSOCK *dir);
+int mount_next_write_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block, int release);
+int mount_next_read_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int autoload_device(JCR *jcr, DEVICE *dev, int writing, BSOCK *dir);
/* From parse_bsr.c */
/* From record.c */
char *FI_to_ascii(int fi);
char *stream_to_ascii(int stream, int fi);
-int write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec);
-int can_write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec);
-int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec);
+int write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec);
+int can_write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec);
+int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec);
DEV_RECORD *new_record();
-void free_record(DEV_RECORD *rec);
+void free_record(DEV_RECORD *rec);
/* From read_record.c */
int read_records(JCR *jcr, DEVICE *dev,
/* */
#define VERSION "1.30"
#define VSTRING "1"
-#define BDATE "14 March 2003"
-#define LSMDATE "14Mar03"
+#define BDATE "17 March 2003"
+#define LSMDATE "17Mar03"
/* Debug flags */
#define DEBUG 1
#define SMCHECK
#define TRACE_FILE 1
+/* Turn this on if you want to try the new Job semaphore code */
+/* #define USE_SEMAPHORE */
+
/* IF you undefine this, Bacula will run 10X slower */
#define NO_POLL_TEST 1