]> git.sur5r.net Git - bacula/bacula/commitdiff
New semaphore job scheduling code
authorKern Sibbald <kern@sibbald.com>
Tue, 18 Mar 2003 19:51:58 +0000 (19:51 +0000)
committerKern Sibbald <kern@sibbald.com>
Tue, 18 Mar 2003 19:51:58 +0000 (19:51 +0000)
git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@384 91ce42f0-d328-0410-95d8-f526ca767f89

25 files changed:
bacula/kernstodo
bacula/src/cats/sql_update.c
bacula/src/dird/backup.c
bacula/src/dird/dird_conf.c
bacula/src/dird/dird_conf.h
bacula/src/dird/job.c
bacula/src/dird/newvol.c
bacula/src/dird/ua_cmds.c
bacula/src/dird/ua_server.c
bacula/src/dird/ua_status.c
bacula/src/filed/filed.c
bacula/src/jcr.h
bacula/src/lib/Makefile.in
bacula/src/lib/bnet_server.c
bacula/src/lib/lib.h
bacula/src/lib/protos.h
bacula/src/lib/rwlock.c
bacula/src/lib/rwlock.h
bacula/src/lib/semlock.c [new file with mode: 0644]
bacula/src/lib/semlock.h [new file with mode: 0644]
bacula/src/lib/workq.c
bacula/src/lib/workq.h
bacula/src/stored/dircmd.c
bacula/src/stored/protos.h
bacula/src/version.h

index 0a4d5ab4f74016f8111997fbe6d0b3373fe23b0a..eb15b2f1ea509468ce47e7ec4835bfe58b297d20 100644 (file)
@@ -21,19 +21,56 @@ Testing to do: (painful)
 - ***test GetFileAttributexEx, and remove MessageBox at 335 of winservice.cpp ****
 
 For 1.30 release:
-- Cancelling of a queued job does NOT work!!!!!!
+- Refine SD waiting output:
+    Device is being positioned
+    >     Device is being positioned for append
+    >     Device is being positioned to file x
+    > 
+- > Hello,
+>
+> Possibly your /etc/bacula/my_exclude is not
+> marked as executable. It MUST be executable.
+
+the script is executeabe (0777 for testing) 
+
+> Another solution is simply to include the
+> find command in the exclude:
+>   Exclude = {
+>     "|find / -iname *.avi -o -iname *.mpg -o -iname *.mp3"
+>   }
+
+it doesn't work :(
+
+i tried it on two different systems with bacula 1.29
+
+this works:
+Exclude {
+  <file.list
+  }
+
+Exclude {
+  *.avi
+  }
+
+this doesn't:
+Exclude {
+ "|find / -iname *.avi"
+ }
+
+Exclude {
+ "|/etc/bacula/my_exclude
+ } 
+
+
+i dont understand it anymore :(
+
+- Issue message to mount a new tape before the rewind.
+- Simplified client job initiation for portables.
+- If SD cannot open a drive, make it periodically retry.
+- Implement LabelTemplate (at least first cut).
 - Bevan Anderson suggests having a run queue for each device
   so that multiple simultaneous jobs can run but each writing
   to a different Volume.
-- Get two 
-rufus-dir: Volume used once. Marking Volume "File0003" as Used.
-rufus-sd: Recycled volume File0003 on device /home/kern/bacula/working, all previous data lost.
-rufus-dir: Volume used once. Marking Volume "File0003" as Used.
-- Ability to backup to a file then later transfer to a tape -- Migration.
-  Migration based on MaxJobs(MinJobs),MaxVols(MinVols),AgeJobs,MaxBytes(MinBytes)
-  (i.e. HighwaterMark, LowwaterMark).
-- Eugeny Fisher <efischer@vip-rus.com> wants to cycle through a
-  set of volumes recycling the oldest volume when it is needed.
 - Fix "access not allowed" for backup of files on WinXP.
 - Figure out some way to specify a retention period for files
   that no longer exist on the machine -- so that we maintain
@@ -153,7 +190,6 @@ rufus-dir: Volume used once. Marking Volume "File0003" as Used.
 - Complete code in Bacula Resources -- this will permit
   reading a new config file at any time.
 - Handle ctl-c in Console
-- Implement LabelTemplate (at least first cut).
 - Implement script driven addition of File daemon to config files.
 - Think about how to make Bacula work better with File (non-tape) archives.
 - Write Unix emulator for Windows.
@@ -165,8 +201,6 @@ rufus-dir: Volume used once. Marking Volume "File0003" as Used.
 - Put memory utilization in Status output of each daemon
   if full status requested or if some level of debug on.
 - Make database type selectable by .conf files i.e. at runtime
-- gethostbyname failure in bnet_connect() continues
-  generating errors -- should stop.
 - Set flag for uname -a.  Add to Volume label.
 - Implement throttled work queue.
 - Check for EOT at ENOSPC or EIO or ENXIO (unix Pc)
@@ -187,7 +221,6 @@ rufus-dir: Volume used once. Marking Volume "File0003" as Used.
 - Implement Restore FileSet=
 - Create a protocol.h and protocol.c where all protocol messages
   are concentrated.
-- If SD cannot open a drive, make it periodically retry.
 - Remove duplicate fields from jcr (e.g. jcr.level and jcr.jr.Level, ...).
 - Timout a job or terminate if link goes down, or reopen link and query.
 - Find general solution for sscanf size problems (as well
@@ -268,7 +301,7 @@ rufus-dir: Volume used once. Marking Volume "File0003" as Used.
   This could be the output of df; or perhaps some sort of /etc/mtab record.
 
 Longer term to do:
-- Design at hierarchial storage for Bacula.
+- Design at hierarchial storage for Bacula. Migration and Clone. 
 - Implement FSM (File System Modules).
 - Identify unchanged or "system" files and save them to a
   special tape thus removing them from the standard 
@@ -897,3 +930,15 @@ Done: (see kernsdone for more)
      ERR=Operation not permited loop.
 - Add code if there is no mtio.h (cannot do -- too many ioctl defines needed)
 - Produce better error messages in when error/eof writing block.
+- Cancelling of a queued job does NOT work!!!!!!
+- Get two 
+rufus-dir: Volume used once. Marking Volume "File0003" as Used.
+rufus-sd: Recycled volume File0003 on device /home/kern/bacula/working, all previous data lost.
+rufus-dir: Volume used once. Marking Volume "File0003" as Used.
+- Ability to backup to a file then later transfer to a tape -- Migration.
+  Migration based on MaxJobs(MinJobs),MaxVols(MinVols),AgeJobs,MaxBytes(MinBytes)
+  (i.e. HighwaterMark, LowwaterMark).
+- Eugeny Fisher <efischer@vip-rus.com> wants to cycle through a
+  set of volumes recycling the oldest volume when it is needed.
+- gethostbyname failure in bnet_connect() continues
+  generating errors -- should stop.
index e127ae146741599fe628014af9291b04648a51fc..ff31234a5aed45c3cbf3d08f4ad385c00ef0cd64 100644 (file)
@@ -158,7 +158,7 @@ db_update_pool_record(void *jcr, B_DB *mdb, POOL_DBR *pr)
 
    db_lock(mdb);
    Mmsg(&mdb->cmd,
-"UPDATE Pool SET NumVols=%d,MaxVols=%d,UseOnce=%d,UseCatalog=%d," 
+"UPDATE Pool SET NumVols=%u,MaxVols=%u,UseOnce=%d,UseCatalog=%d," 
 "AcceptAnyVolume=%d,VolRetention='%s',VolUseDuration='%s',"
 "MaxVolJobs=%u,MaxVolFiles=%u,MaxVolBytes=%s,Recycle=%d,"
 "AutoPrune=%d,LabelFormat='%s' WHERE PoolId=%u",
index 54c97673e211de876c2310457ecd0ba917b113a3..6960262d64377fcd936c94b6bdf828ae74cf289b 100644 (file)
@@ -166,7 +166,7 @@ int do_backup(JCR *jcr)
     *
     */
    Dmsg0(110, "Open connection with storage daemon\n");
-   set_jcr_job_status(jcr, JS_Blocked);
+   set_jcr_job_status(jcr, JS_WaitSD);
    /*
     * Start conversation with Storage daemon  
     */
@@ -187,7 +187,7 @@ int do_backup(JCR *jcr)
    }
    Dmsg0(150, "Storage daemon connection OK\n");
 
-   set_jcr_job_status(jcr, JS_Blocked);
+   set_jcr_job_status(jcr, JS_WaitFD);
    if (!connect_to_file_daemon(jcr, 10, FDConnectTimeout, 1)) {
       goto bail_out;
    }
index 6bb7211b9bc6647e2f4162087936587b4787a409..ac4a71447ac203280b117cb1e50d2a44ada55616 100644 (file)
@@ -118,6 +118,7 @@ static struct res_items cli_items[] = {
    {"fileretention", store_time,  ITEM(res_client.FileRetention), 0, ITEM_DEFAULT, 60*60*24*60},
    {"jobretention",  store_time,  ITEM(res_client.JobRetention),  0, ITEM_DEFAULT, 60*60*24*180},
    {"autoprune", store_yesno,     ITEM(res_client.AutoPrune), 1, ITEM_DEFAULT, 1},
+   {"maximumconcurrentjobs", store_pint, ITEM(res_client.MaxConcurrentJobs), 0, ITEM_DEFAULT, 1},
    {NULL, NULL, NULL, 0, 0, 0} 
 };
 
@@ -135,6 +136,7 @@ static struct res_items store_items[] = {
    {"device",    store_strname,  ITEM(res_store.dev_name),   0, ITEM_REQUIRED, 0},
    {"mediatype", store_strname,  ITEM(res_store.media_type), 0, ITEM_REQUIRED, 0},
    {"autochanger", store_yesno,  ITEM(res_store.autochanger), 1, ITEM_DEFAULT, 0},
+   {"maximumconcurrentjobs", store_pint, ITEM(res_store.MaxConcurrentJobs), 0, ITEM_DEFAULT, 1},
    {NULL, NULL, NULL, 0, 0, 0} 
 };
 
@@ -189,6 +191,7 @@ static struct res_items job_items[] = {
    {"runafterjob",  store_str,  ITEM(res_job.RunAfterJob),  0, 0, 0},
    {"spoolattributes", store_yesno, ITEM(res_job.SpoolAttributes), 1, ITEM_DEFAULT, 0},
    {"writebootstrap", store_dir, ITEM(res_job.WriteBootstrap), 0, 0, 0},
+   {"maximumconcurrentjobs", store_pint, ITEM(res_job.MaxConcurrentJobs), 0, ITEM_DEFAULT, 1},
    {NULL, NULL, NULL, 0, 0, 0} 
 };
 
@@ -383,7 +386,7 @@ void dump_resource(int type, RES *reshdr, void sendit(void *sock, char *fmt, ...
    switch (type) {
       case R_DIRECTOR:
         char ed1[30], ed2[30];
-         sendit(sock, "Director: name=%s maxjobs=%d FDtimeout=%s SDtimeout=%s\n", 
+         sendit(sock, "Director: name=%s MaxJobs=%d FDtimeout=%s SDtimeout=%s\n", 
            reshdr->name, res->res_dir.MaxConcurrentJobs, 
            edit_uint64(res->res_dir.FDConnectTimeout, ed1),
            edit_uint64(res->res_dir.SDConnectTimeout, ed2));
@@ -396,8 +399,9 @@ void dump_resource(int type, RES *reshdr, void sendit(void *sock, char *fmt, ...
         }
         break;
       case R_CLIENT:
-         sendit(sock, "Client: name=%s address=%s FDport=%d\n",
-           res->res_client.hdr.name, res->res_client.address, res->res_client.FDport);
+         sendit(sock, "Client: name=%s address=%s FDport=%d MaxJobs=%u\n",
+           res->res_client.hdr.name, res->res_client.address, res->res_client.FDport,
+           res->res_client.MaxConcurrentJobs);
          sendit(sock, "      JobRetention=%" lld " FileRetention=%" lld " AutoPrune=%d\n",
            res->res_client.JobRetention, res->res_client.FileRetention,
            res->res_client.AutoPrune);
@@ -407,9 +411,10 @@ void dump_resource(int type, RES *reshdr, void sendit(void *sock, char *fmt, ...
         }
         break;
       case R_STORAGE:
-         sendit(sock, "Storage: name=%s address=%s SDport=%d\n\
+         sendit(sock, "Storage: name=%s address=%s SDport=%d MaxJobs=%u\n\
          DeviceName=%s MediaType=%s\n",
            res->res_store.hdr.name, res->res_store.address, res->res_store.SDport,
+           res->res_store.MaxConcurrentJobs,
            res->res_store.dev_name, res->res_store.media_type);
         break;
       case R_CATALOG:
@@ -419,8 +424,9 @@ void dump_resource(int type, RES *reshdr, void sendit(void *sock, char *fmt, ...
            res->res_cat.db_port, res->res_cat.db_name, NPRT(res->res_cat.db_user));
         break;
       case R_JOB:
-         sendit(sock, "Job: name=%s JobType=%d level=%s\n", res->res_job.hdr.name, 
-           res->res_job.JobType, level_to_str(res->res_job.level));
+         sendit(sock, "Job: name=%s JobType=%d level=%s MaxJobs=%u\n", 
+           res->res_job.hdr.name, res->res_job.JobType, 
+           level_to_str(res->res_job.level), res->res_job.MaxConcurrentJobs);
         if (res->res_job.client) {
             sendit(sock, "  --> ");
            dump_resource(-R_CLIENT, (RES *)res->res_job.client, sendit, sock);
index 5572399c19fe06e357f8c7e38f273f5b02c522ce..4cd6fba0bf2fbc4b60a1a26c5f8ac4c8c928bef7 100644 (file)
@@ -91,7 +91,7 @@ struct s_res_dir {
    char *pid_directory;               /* PidDirectory */
    char *subsys_directory;            /* SubsysDirectory */
    struct s_res_msgs *messages;       /* Daemon message handler */
-   int   MaxConcurrentJobs;
+   uint32_t MaxConcurrentJobs;        /* Max concurrent jobs for whole director */
    utime_t FDConnectTimeout;          /* timeout for connect in seconds */
    utime_t SDConnectTimeout;          /* timeout in seconds */
 };
@@ -111,6 +111,8 @@ struct s_res_client {
    char *address;
    char *password;
    struct s_res_cat    *catalog;       /* Catalog resource */
+   uint32_t MaxConcurrentJobs;        /* Maximume concurrent jobs */
+   semlock_t sem;                      /* client semaphore */
 };
 typedef struct s_res_client CLIENT;
 
@@ -128,6 +130,8 @@ struct s_res_store {
    char *media_type;
    char *dev_name;   
    int  autochanger;                  /* set if autochanger */
+   uint32_t MaxConcurrentJobs;        /* Maximume concurrent jobs */
+   semlock_t sem;                     /* storage semaphore */
 };
 typedef struct s_res_store STORE;
 
@@ -170,13 +174,16 @@ struct s_res_job {
    int PruneFiles;                    /* Force pruning of Files */
    int PruneVolumes;                  /* Force pruning of Volumes */
    int SpoolAttributes;               /* Set to spool attributes in SD */
-
+   uint32_t MaxConcurrentJobs;        /* Maximume concurrent jobs */
+  
    struct s_res_msgs   *messages;     /* How and where to send messages */
    struct s_res_sch    *schedule;     /* When -- Automatic schedule */
    struct s_res_client *client;       /* Who to backup */
    struct s_res_fs     *fileset;      /* What to backup -- Fileset */
    struct s_res_store  *storage;      /* Where is device -- Storage daemon */
    struct s_res_pool   *pool;         /* Where is media -- Media Pool */
+
+   semlock_t sem;                     /* Job semaphore */
 };
 typedef struct s_res_job JOB;
 
index aff54c31751b5938f94cb3f4d9ae453f25338fa1..b35be631d1f82cd4601cd02aa3f9d26002692be1 100644 (file)
 #include "dird.h"
 
 /* Forward referenced subroutines */
-static void job_thread(void *arg);
+static void *job_thread(void *arg);
 static char *edit_run_codes(JCR *jcr, char *omsg, char *imsg);
+static void release_resource_locks(JCR *jcr);
+static int acquire_resource_locks(JCR *jcr);
+#ifdef USE_SEMAPHORE
+static void backoff_resource_locks(JCR *jcr, int count);
+#endif
 
 /* Exported subroutines */
 void run_job(JCR *jcr);
-void init_job_server(int max_workers);
 
 
 /* Imported subroutines */
@@ -46,17 +50,34 @@ extern int do_restore(JCR *jcr);
 extern int do_verify(JCR *jcr);
 extern void backup_cleanup(void);
 
+#ifdef USE_SEMAPHORE
+static semlock_t job_lock;
+static pthread_mutex_t mutex;
+static pthread_cond_t  resource_wait;
+#else
 /* Queue of jobs to be run */
 workq_t job_wq;                  /* our job work queue */
-
+#endif
 
 void init_job_server(int max_workers)
 {
    int stat;
+#ifdef USE_SEMAPHORE
+   if ((stat = sem_init(&job_lock, max_workers)) != 0) {
+      Emsg1(M_ABORT, 0, _("Could not init job lock: ERR=%s\n"), strerror(stat));
+   }
+   if ((stat = pthread_mutex_init(&mutex, NULL)) != 0) {
+      Emsg1(M_ABORT, 0, _("Could not init resource mutex: ERR=%s\n"), strerror(stat));
+   }
+   if ((stat = pthread_cond_init(&resource_wait, NULL)) != 0) {
+      Emsg1(M_ABORT, 0, _("Could not init resource wait: ERR=%s\n"), strerror(stat));
+   }
 
+#else
    if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
       Emsg1(M_ABORT, 0, _("Could not init job work queue: ERR=%s\n"), strerror(stat));
    }
+#endif
    return;
 }
 
@@ -68,7 +89,11 @@ void init_job_server(int max_workers)
 void run_job(JCR *jcr)
 {
    int stat, errstat;
+#ifdef USE_SEMAPHORE
+   pthread_t tid;
+#else
    workq_ele_t *work_item;
+#endif
 
    sm_check(__FILE__, __LINE__, True);
    init_msg(jcr, jcr->messages);
@@ -123,12 +148,17 @@ void run_job(JCR *jcr)
        jcr->JobId, jcr->Job, jcr->jr.Type, jcr->jr.Level);
    Dmsg0(200, "Add jrc to work queue\n");
 
-
+#ifdef USE_SEMAPHORE
+  if ((stat = pthread_create(&tid, NULL, job_thread, (void *)jcr)) != 0) {
+      Emsg1(M_ABORT, 0, _("Unable to create job thread: ERR=%s\n"), strerror(stat));
+   }
+#else
    /* Queue the job to be run */
    if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) {
       Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat));
    }
    jcr->work_item = work_item;
+#endif
    Dmsg0(200, "Done run_job()\n");
 }
 
@@ -137,22 +167,28 @@ void run_job(JCR *jcr)
  *  from the work queue.
  *  At this point, we are running in our own thread 
  */
-static void job_thread(void *arg)
+static void *job_thread(void *arg)
 {
    time_t now;
    JCR *jcr = (JCR *)arg;
 
+   pthread_detach(pthread_self());
    time(&now);
    sm_check(__FILE__, __LINE__, True);
 
-   Dmsg0(100, "=====Start Job=========\n");
+   if (!acquire_resource_locks(jcr)) {
+      set_jcr_job_status(jcr, JS_Cancelled);
+   }
+
+   Dmsg0(200, "=====Start Job=========\n");
    jcr->start_time = now;            /* set the real start time */
+   Dmsg2(200, "jcr->JobStatus=%d %c\n", jcr->JobStatus, (char)jcr->JobStatus);
    if (job_cancelled(jcr)) {
       update_job_end_record(jcr);
    } else if (jcr->job->MaxStartDelay != 0 && jcr->job->MaxStartDelay <
        (utime_t)(jcr->start_time - jcr->sched_time)) {
-      Jmsg(jcr, M_FATAL, 0, _("Job cancelled because max delay time exceeded.\n"));
-      set_jcr_job_status(jcr, JS_ErrorTerminated);
+      Jmsg(jcr, M_FATAL, 0, _("Job cancelled because max start delay time exceeded.\n"));
+      set_jcr_job_status(jcr, JS_Cancelled);
       update_job_end_record(jcr);
    } else {
 
@@ -204,10 +240,107 @@ static void job_thread(void *arg)
         free_pool_memory(after);
       }
    }
+   release_resource_locks(jcr);
    Dmsg0(50, "Before free jcr\n");
    free_jcr(jcr);
    Dmsg0(50, "======== End Job ==========\n");
    sm_check(__FILE__, __LINE__, True);
+   return NULL;
+}
+
+static int acquire_resource_locks(JCR *jcr)
+{
+#ifdef USE_SEMAPHORE
+   int stat;
+
+   if (jcr->store->sem.valid != SEMLOCK_VALID) {
+      if ((stat = sem_init(&jcr->store->sem, jcr->store->MaxConcurrentJobs)) != 0) {
+         Emsg1(M_ABORT, 0, _("Could not init Storage semaphore: ERR=%s\n"), strerror(stat));
+      }
+   }
+   if (jcr->client->sem.valid != SEMLOCK_VALID) {
+      if ((stat = sem_init(&jcr->client->sem, jcr->client->MaxConcurrentJobs)) != 0) {
+         Emsg1(M_ABORT, 0, _("Could not init Client semaphore: ERR=%s\n"), strerror(stat));
+      }
+   }
+   if (jcr->job->sem.valid != SEMLOCK_VALID) {
+      if ((stat = sem_init(&jcr->job->sem, jcr->job->MaxConcurrentJobs)) != 0) {
+         Emsg1(M_ABORT, 0, _("Could not init Job semaphore: ERR=%s\n"), strerror(stat));
+      }
+   }
+
+   for ( ;; ) {
+      /* Acquire semaphore */
+      set_jcr_job_status(jcr, JS_WaitJobRes);
+      if ((stat = sem_lock(&jcr->job->sem)) != 0) {
+         Emsg1(M_ABORT, 0, _("Could not acquire Job max jobs lock: ERR=%s\n"), strerror(stat));
+      }
+      set_jcr_job_status(jcr, JS_WaitClientRes);
+      if ((stat = sem_trylock(&jcr->client->sem)) != 0) {
+        if (stat == EBUSY) {
+           backoff_resource_locks(jcr, 1);
+           goto wait;
+        } else {
+            Emsg1(M_ABORT, 0, _("Could not acquire Client max jobs lock: ERR=%s\n"), strerror(stat));
+        }
+      }
+      set_jcr_job_status(jcr, JS_WaitStoreRes);
+      if ((stat = sem_trylock(&jcr->store->sem)) != 0) {
+        if (stat == EBUSY) {
+           backoff_resource_locks(jcr, 2);
+           goto wait;
+        } else {
+            Emsg1(M_ABORT, 0, _("Could not acquire Storage max jobs lock: ERR=%s\n"), strerror(stat));
+        }
+      }
+      set_jcr_job_status(jcr, JS_WaitMaxJobs);
+      if ((stat = sem_trylock(&job_lock)) != 0) {
+        if (stat == EBUSY) {
+           backoff_resource_locks(jcr, 3);
+           goto wait;
+        } else {
+            Emsg1(M_ABORT, 0, _("Could not acquire max jobs lock: ERR=%s\n"), strerror(stat));
+        }
+      }
+      break;
+
+wait:
+      P(mutex);
+      /* Wait for some resource to be released */
+      pthread_cond_wait(&resource_wait, &mutex);
+      V(mutex);
+      /* Try again */
+   }
+#endif
+   return 1;
+}
+
+#ifdef USE_SEMAPHORE
+static void backoff_resource_locks(JCR *jcr, int count)
+{
+   switch (count) {
+   case 3:
+      sem_unlock(&jcr->store->sem);
+   case 2:
+      sem_unlock(&jcr->client->sem);
+   case 1:
+      sem_unlock(&jcr->job->sem);
+      break;
+   }
+}
+#endif
+
+static void release_resource_locks(JCR *jcr)
+{
+#ifdef USE_SEMAPHORE
+   P(mutex);
+   sem_unlock(&jcr->store->sem);
+   sem_unlock(&jcr->client->sem);
+   sem_unlock(&jcr->job->sem);
+   sem_unlock(&job_lock);
+   pthread_cond_signal(&resource_wait);
+   V(mutex);
+#endif
 }
 
 /*
index f9aead3740d58e918bcfce41274befcc300d3220..0b66a86a2bd2a1acd7efb162c23b769b71a5070f 100644 (file)
@@ -44,6 +44,7 @@ int newVolume(JCR *jcr, MEDIA_DBR *mr)
 {
    POOL_DBR pr;
    char name[MAXSTRING];
+   char num[20];
 
    memset(&pr, 0, sizeof(pr));
 
@@ -55,17 +56,18 @@ int newVolume(JCR *jcr, MEDIA_DBR *mr)
       if (pr.MaxVols == 0 || pr.NumVols < pr.MaxVols) {
         set_pool_dbr_defaults_in_media_dbr(mr, &pr);
         mr->LabelDate = time(NULL);
-        strcpy(mr->MediaType, jcr->store->media_type);
-        strcpy(name, pr.LabelFormat);   
+        bstrncpy(mr->MediaType, jcr->store->media_type, sizeof(mr->MediaType));
+        bstrncpy(name, pr.LabelFormat, sizeof(name));
          if (strchr(name, (int)'%') != NULL) {
            db_unlock(jcr->db);
             Jmsg(jcr, M_ERROR, 0, _("Illegal character in Label Format\n"));
            return 0;
         }
-         strcat(name, "%04d");
-        sprintf(mr->VolumeName, name, ++pr.NumVols);
+         sprintf(num, "%04d", ++pr.NumVols);
+        bstrncpy(mr->VolumeName, name, sizeof(mr->VolumeName));
+        bstrncat(mr->VolumeName, num, sizeof(mr->VolumeName));
         if (db_create_media_record(jcr, jcr->db, mr) &&
-           db_update_pool_record(jcr, jcr->db, &pr) == 1) {
+           db_update_pool_record(jcr, jcr->db, &pr)) {
            db_unlock(jcr->db);
             Dmsg1(90, "Created new Volume=%s\n", mr->VolumeName);
            return 1;
index 7163d92dd1a83105c7037d9e7a226c37311f48fa..23e77532bb8775625f2cff5d0fa0bd013fff259e 100644 (file)
@@ -38,7 +38,9 @@ extern int r_first;
 extern int r_last;
 extern struct s_res resources[];
 extern char my_name[];
+#ifndef USE_SEMAPHORE 
 extern workq_t job_wq;               /* work queue */
+#endif
 
 extern char *list_pool;
 
@@ -416,7 +418,9 @@ static int cancelcmd(UAContext *ua, char *cmd)
       set_jcr_job_status(jcr, JS_Cancelled);
       bsendmsg(ua, _("JobId %d, Job %s marked to be cancelled.\n"),
              jcr->JobId, jcr->Job);
+#ifndef USE_SEMAPHORE
       workq_remove(&job_wq, jcr->work_item); /* attempt to remove it from queue */
+#endif
       free_jcr(jcr);
       return 1;
         
@@ -1337,11 +1341,13 @@ gotVol:
    bash_spaces(pr.Name);
    bnet_fsend(sd, _("label %s VolumeName=%s PoolName=%s MediaType=%s Slot=%d"), 
       dev_name, mr.VolumeName, pr.Name, mr.MediaType, mr.Slot);
-   bsendmsg(ua, "Sending label command ...\n");
+   bsendmsg(ua, _("Sending label command ...\n"));
    while (bget_msg(sd, 0) >= 0) {
       bsendmsg(ua, "%s", sd->msg);
       if (strncmp(sd->msg, "3000 OK label.", 14) == 0) {
         ok = TRUE;
+      } else {
+         bsendmsg(ua, _("Label command failed.\n"));
       }
    }
    ua->jcr->store_bsock = NULL;
index d63b6ec1765c63b8f18634af197f4290de2847c7..40378b8b0f126ad5f3e4ffb7aaea9476902e9343 100644 (file)
@@ -50,7 +50,7 @@ int quit_cmd_thread = 0;
 /* Forward referenced functions */
 
 static void *connect_thread(void *arg);
-static void handle_UA_client_request(void *arg);
+static void *handle_UA_client_request(void *arg);
 
 
 /* Global variables */
@@ -96,7 +96,7 @@ static void *connect_thread(void *arg)
  * Handle Director User Agent commands  
  *
  */
-static void handle_UA_client_request(void *arg)
+static void *handle_UA_client_request(void *arg)
 {
    int stat;
    UAContext ua;
@@ -173,7 +173,7 @@ getout:
    if (ua.args) {
       free_pool_memory(ua.args);
    }
-   return;
+   return NULL;
 }
 
 /*
index 49f6c85aefcc60fb3fa840423a770efd4ae6bd6d..e64b325e49e038f54b4b46269fa5280e829b1729 100644 (file)
@@ -251,6 +251,19 @@ static void do_director_status(UAContext *ua, char *cmd)
             Mmsg(&msg, _("is waiting on Storage %s"), jcr->store->hdr.name);
            pool_mem = TRUE;
            break;
+        case JS_WaitStoreRes:
+            msg = _("is waiting on max Storage jobs");
+           break;
+        case JS_WaitClientRes:
+            msg = _("is waiting on max Client jobs");
+           break;
+        case JS_WaitJobRes:
+            msg = _("is waiting on max Job jobs");
+           break;
+        case JS_WaitMaxJobs:
+            msg = _("is waiting on max total jobs");
+           break;
+
         default:
            msg = (char *) get_pool_memory(PM_FNAME);
             Mmsg(&msg, _("is in unknown state %c"), jcr->JobStatus);
index 52bf099170dae87c66a015e21f5827f24fa7cd0d..df711307405cb078a3f760d7e6c2c2b82ef307fb 100644 (file)
@@ -30,7 +30,7 @@
 #include "filed.h"
 
 /* Imported Functions */
-extern void handle_client_request(void *dir_sock);
+extern void *handle_client_request(void *dir_sock);
 
 /* Forward referenced functions */
 void terminate_filed(int sig);
index 0669b5e63f37138ddfacb3f33e60490a971404bb..c96dabbd1cabe7eb65b945a041850c050bb923e8 100644 (file)
 #define JS_WaitSD                'S'  /* waiting on the Storage daemon */
 #define JS_WaitMedia             'm'  /* waiting for new media */
 #define JS_WaitMount             'M'  /* waiting for Mount */
+#define JS_WaitStoreRes          's'  /* Waiting for storage resource */
+#define JS_WaitJobRes            'j'  /* Waiting for job resource */
+#define JS_WaitClientRes         'c'  /* Waiting for Client resource */
+#define JS_WaitMaxJobs           'd'  /* Waiting for maximum jobs */
 
 #define job_cancelled(jcr) \
   (jcr->JobStatus == JS_Cancelled || \
index bd866d1be2744c94befda127a0e5b1105328a880..1c6f6f15a64eb00be2655fa841349cfe0e760a96 100644 (file)
@@ -37,7 +37,8 @@ LIBSRCS = alloc.c base64.c bmisc.c bnet.c bnet_server.c \
          hmac.c idcache.c jcr.c lex.c  \
          md5.c message.c mem_pool.c parse_conf.c \
          queue.c rwlock.c serial.c sha1.c \
-         signal.c smartall.c tree.c util.c watchdog.c workq.c  
+         semlock.c signal.c smartall.c tree.c \
+         util.c watchdog.c workq.c  
 
 #        immortal.c filesys.c
 
@@ -47,7 +48,8 @@ LIBOBJS = alloc.o base64.o bmisc.o bnet.o bnet_server.o \
          hmac.o idcache.o jcr.o lex.o  \
          md5.o message.o mem_pool.o parse_conf.o \
          queue.o rwlock.o serial.o sha1.o \
-         signal.o smartall.o tree.o util.o watchdog.o workq.o
+         semlock.o signal.o smartall.o tree.o \
+         util.o watchdog.o workq.o
 
 #        immortal.o filesys.o
 
index 16b5e6b513a4f5fd9c3bac4f525c1c72a9ba3dba..ecf8fa38f2d9fc39b364a9fb2185e710d864928e 100644 (file)
@@ -46,7 +46,7 @@ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 /* Become Threaded Network Server */
 void
 bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_wq, 
-                  void handle_client_request(void *bsock))
+                  void *handle_client_request(void *bsock))
 {
    int newsockfd, sockfd, stat;
    socklen_t clilen;
index b9bd67283051acb5006e3ccf7a6fdcdf1a0defc8..9eeb48fed3a2da12e374b4d7acef714644b49b1b 100644 (file)
@@ -39,6 +39,7 @@
 #include "bshm.h"
 #include "workq.h"
 #include "rwlock.h"
+#include "semlock.h"
 #include "queue.h"
 #include "serial.h"
 #ifndef HAVE_FNMATCH
index 6f35b8ffbef7937bb80b49e526cbc0bfcb247d53..4041e8488fbd25d0e5f390cb508204c602f06e6f 100644 (file)
@@ -117,7 +117,7 @@ int    close_spool_file      (void *vjcr, BSOCK *bs);
 
 /* bnet_server.c */
 void      bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_wq, 
-                  void handle_client_request(void *bsock));
+                  void *handle_client_request(void *bsock));
 void            bnet_server             (int port, void handle_client_request(BSOCK *bsock));
 int             net_connect             (int port);
 BSOCK *         bnet_bind               (int port);
index 4ec8b858092496695a410c4bbea77aa9688e6526..68adf340b73e453bab412e700b161484a0a2cf92 100644 (file)
@@ -13,7 +13,7 @@
  *
  */
 /*
-   Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker
+   Copyright (C) 2000-2003 Kern Sibbald and John Walker
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
  * Initialize a read/write lock
  *
  *  Returns: 0 on success
- *           errno on failure
+ *          errno on failure
  */
 int rwl_init(brwlock_t *rwl)
 {
    int stat;
-                        
+                       
    rwl->r_active = rwl->w_active = 0;
    rwl->r_wait = rwl->w_wait = 0;
    if ((stat = pthread_mutex_init(&rwl->mutex, NULL)) != 0) {
@@ -66,7 +66,7 @@ int rwl_init(brwlock_t *rwl)
  * Destroy a read/write lock
  *
  * Returns: 0 on success
- *          errno on failure
+ *         errno on failure
  */
 int rwl_destroy(brwlock_t *rwl)
 {
@@ -99,7 +99,7 @@ int rwl_destroy(brwlock_t *rwl)
   if ((stat = pthread_mutex_unlock(&rwl->mutex)) != 0) {
      return stat;
   }
-  stat  = pthread_mutex_destroy(&rwl->mutex);
+  stat = pthread_mutex_destroy(&rwl->mutex);
   stat1 = pthread_cond_destroy(&rwl->read);
   stat2 = pthread_cond_destroy(&rwl->write);
   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
@@ -143,19 +143,19 @@ int rwl_readlock(brwlock_t *rwl)
       return stat;
    }
    if (rwl->w_active) {
-      rwl->r_wait++;                  /* indicate that we are waiting */
+      rwl->r_wait++;                 /* indicate that we are waiting */
       pthread_cleanup_push(rwl_read_release, (void *)rwl);
       while (rwl->w_active) {
-         stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
-         if (stat != 0) {
-            break;                    /* error, bail out */
-         }
+        stat = pthread_cond_wait(&rwl->read, &rwl->mutex);
+        if (stat != 0) {
+           break;                    /* error, bail out */
+        }
       }
       pthread_cleanup_pop(0);
-      rwl->r_wait--;                  /* we are no longer waiting */
+      rwl->r_wait--;                 /* we are no longer waiting */
    }
    if (stat == 0) {
-      rwl->r_active++;                /* we are running */
+      rwl->r_active++;               /* we are running */
    }
    pthread_mutex_unlock(&rwl->mutex);
    return stat;
@@ -177,7 +177,7 @@ int rwl_readtrylock(brwlock_t *rwl)
    if (rwl->w_active) {
       stat = EBUSY;
    } else {
-      rwl->r_active++;                /* we are running */
+      rwl->r_active++;               /* we are running */
    }
    stat2 = pthread_mutex_unlock(&rwl->mutex);
    return (stat == 0 ? stat2 : stat);
@@ -225,18 +225,18 @@ int rwl_writelock(brwlock_t *rwl)
       return 0;
    }
    if (rwl->w_active || rwl->r_active > 0) {
-      rwl->w_wait++;                  /* indicate that we are waiting */
+      rwl->w_wait++;                 /* indicate that we are waiting */
       pthread_cleanup_push(rwl_write_release, (void *)rwl);
       while (rwl->w_active || rwl->r_active > 0) {
-         if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
-            break;                    /* error, bail out */
-         }
+        if ((stat = pthread_cond_wait(&rwl->write, &rwl->mutex)) != 0) {
+           break;                    /* error, bail out */
+        }
       }
       pthread_cleanup_pop(0);
-      rwl->w_wait--;                  /* we are no longer waiting */
+      rwl->w_wait--;                 /* we are no longer waiting */
    }
    if (stat == 0) {
-      rwl->w_active = 1;              /* we are running */
+      rwl->w_active = 1;             /* we are running */
       rwl->writer_id = pthread_self(); /* save writer thread's id */
    }
    pthread_mutex_unlock(&rwl->mutex);
@@ -264,7 +264,7 @@ int rwl_writetrylock(brwlock_t *rwl)
    if (rwl->w_active || rwl->r_active > 0) {
       stat = EBUSY;
    } else {
-      rwl->w_active = 1;              /* we are running */
+      rwl->w_active = 1;             /* we are running */
       rwl->writer_id = pthread_self(); /* save writer thread's id */
    }
    stat2 = pthread_mutex_unlock(&rwl->mutex);
@@ -290,13 +290,13 @@ int rwl_writeunlock(brwlock_t *rwl)
       Emsg0(M_ABORT, 0, "rwl_writeunlock by non-owner.\n");
    }
    if (rwl->w_active > 0) {
-      stat = 0;                       /* writers still active */
+      stat = 0;                      /* writers still active */
    } else {
       /* No more writers, awaken someone */
-      if (rwl->r_wait > 0) {         /* if readers waiting */
-         stat = pthread_cond_broadcast(&rwl->read);
+      if (rwl->r_wait > 0) {        /* if readers waiting */
+        stat = pthread_cond_broadcast(&rwl->read);
       } else if (rwl->w_wait > 0) {
-         stat = pthread_cond_signal(&rwl->write);
+        stat = pthread_cond_signal(&rwl->write);
       }
    }
    stat2 = pthread_mutex_unlock(&rwl->mutex);
@@ -350,43 +350,43 @@ void *thread_routine(void *arg)
        * lock).
        */
       if ((iteration % self->interval) == 0) {
-         status = rwl_writelock(&data[element].lock);
-         if (status != 0) {
+        status = rwl_writelock(&data[element].lock);
+        if (status != 0) {
             Emsg1(M_ABORT, 0, "Write lock failed. ERR=%s\n", strerror(status));
-         }
-         data[element].data = self->thread_num;
-         data[element].writes++;
-         self->writes++;
-         status = rwl_writeunlock(&data[element].lock);
-         if (status != 0) {
+        }
+        data[element].data = self->thread_num;
+        data[element].writes++;
+        self->writes++;
+        status = rwl_writeunlock(&data[element].lock);
+        if (status != 0) {
             Emsg1(M_ABORT, 0, "Write unlock failed. ERR=%s\n", strerror(status));
-         }
+        }
       } else {
-         /*
-          * Look at the current data element to see whether
-          * the current thread last updated it. Count the
-          * times to report later.
-          */
-          status = rwl_readlock(&data[element].lock);
-          if (status != 0) {
+        /*
+         * Look at the current data element to see whether
+         * the current thread last updated it. Count the
+         * times to report later.
+         */
+         status = rwl_readlock(&data[element].lock);
+         if (status != 0) {
              Emsg1(M_ABORT, 0, "Read lock failed. ERR=%s\n", strerror(status));
-          }
-          self->reads++;
-          if (data[element].data == self->thread_num)
-             repeats++;
-          status = rwl_readunlock(&data[element].lock);
-          if (status != 0) {
+         }
+         self->reads++;
+         if (data[element].data == self->thread_num)
+            repeats++;
+         status = rwl_readunlock(&data[element].lock);
+         if (status != 0) {
              Emsg1(M_ABORT, 0, "Read unlock failed. ERR=%s\n", strerror(status));
-          }
+         }
       }
       element++;
       if (element >= DATASIZE) {
-         element = 0;
+        element = 0;
       }
    }
    if (repeats > 0) {
       Dmsg2(000, "Thread %d found unchanged elements %d times\n",
-         self->thread_num, repeats);
+        self->thread_num, repeats);
    }
    return NULL;
 }
@@ -413,27 +413,27 @@ int main (int argc, char *argv[])
      * Initialize the shared data.
      */
     for (data_count = 0; data_count < DATASIZE; data_count++) {
-        data[data_count].data = 0;
-        data[data_count].writes = 0;
-        status = rwl_init (&data[data_count].lock);
-        if (status != 0) {
+       data[data_count].data = 0;
+       data[data_count].writes = 0;
+       status = rwl_init (&data[data_count].lock);
+       if (status != 0) {
            Emsg1(M_ABORT, 0, "Init rwlock failed. ERR=%s\n", strerror(status));
-        }
+       }
     }
 
     /*
      * Create THREADS threads to access shared data.
      */
     for (count = 0; count < THREADS; count++) {
-        threads[count].thread_num = count + 1;
-        threads[count].writes = 0;
-        threads[count].reads = 0;
-        threads[count].interval = rand_r (&seed) % 71;
-        status = pthread_create (&threads[count].thread_id,
-            NULL, thread_routine, (void*)&threads[count]);
-        if (status != 0) {
+       threads[count].thread_num = count + 1;
+       threads[count].writes = 0;
+       threads[count].reads = 0;
+       threads[count].interval = rand_r (&seed) % 71;
+       status = pthread_create (&threads[count].thread_id,
+           NULL, thread_routine, (void*)&threads[count]);
+       if (status != 0) {
            Emsg1(M_ABORT, 0, "Create thread failed. ERR=%s\n", strerror(status));
-        }
+       }
     }
 
     /*
@@ -441,28 +441,28 @@ int main (int argc, char *argv[])
      * statistics.
      */
     for (count = 0; count < THREADS; count++) {
-        status = pthread_join (threads[count].thread_id, NULL);
-        if (status != 0) {
+       status = pthread_join (threads[count].thread_id, NULL);
+       if (status != 0) {
            Emsg1(M_ABORT, 0, "Join thread failed. ERR=%s\n", strerror(status));
-        }
-        thread_writes += threads[count].writes;
+       }
+       thread_writes += threads[count].writes;
         printf ("%02d: interval %d, writes %d, reads %d\n",
-            count, threads[count].interval,
-            threads[count].writes, threads[count].reads);
+           count, threads[count].interval,
+           threads[count].writes, threads[count].reads);
     }
 
     /*
      * Collect statistics for the data.
      */
     for (data_count = 0; data_count < DATASIZE; data_count++) {
-        data_writes += data[data_count].writes;
+       data_writes += data[data_count].writes;
         printf ("data %02d: value %d, %d writes\n",
-            data_count, data[data_count].data, data[data_count].writes);
-        rwl_destroy (&data[data_count].lock);
+           data_count, data[data_count].data, data[data_count].writes);
+       rwl_destroy (&data[data_count].lock);
     }
 
     printf ("Total: %d thread writes, %d data writes\n",
-        thread_writes, data_writes);
+       thread_writes, data_writes);
     return 0;
 }
 
@@ -482,29 +482,29 @@ int main (int argc, char *argv[])
 #include "rwlock.h"
 #include "errors.h"
 
-#define THREADS         5
-#define ITERATIONS      1000
-#define DATASIZE        15
+#define THREADS        5
+#define ITERATIONS     1000
+#define DATASIZE       15
 
 /*
  * Keep statistics for each thread.
  */
 typedef struct thread_tag {
-    int         thread_num;
-    pthread_t   thread_id;
-    int         r_collisions;
-    int         w_collisions;
-    int         updates;
-    int         interval;
+    int        thread_num;
+    pthread_t  thread_id;
+    int        r_collisions;
+    int        w_collisions;
+    int        updates;
+    int        interval;
 } thread_t;
 
 /*
  * Read-write lock and shared data
  */
 typedef struct data_tag {
-    brwlock_t    lock;
-    int         data;
-    int         updates;
+    brwlock_t   lock;
+    int        data;
+    int        updates;
 } data_t;
 
 thread_t threads[THREADS];
@@ -520,38 +520,38 @@ void *thread_routine (void *arg)
     int element;
     int status;
 
-    element = 0;                        /* Current data element */
+    element = 0;                       /* Current data element */
 
     for (iteration = 0; iteration < ITERATIONS; iteration++) {
-        if ((iteration % self->interval) == 0) {
-            status = rwl_writetrylock (&data[element].lock);
-            if (status == EBUSY)
-                self->w_collisions++;
-            else if (status == 0) {
-                data[element].data++;
-                data[element].updates++;
-                self->updates++;
-                rwl_writeunlock (&data[element].lock);
-            } else
+       if ((iteration % self->interval) == 0) {
+           status = rwl_writetrylock (&data[element].lock);
+           if (status == EBUSY)
+               self->w_collisions++;
+           else if (status == 0) {
+               data[element].data++;
+               data[element].updates++;
+               self->updates++;
+               rwl_writeunlock (&data[element].lock);
+           } else
                 err_abort (status, "Try write lock");
-        } else {
-            status = rwl_readtrylock (&data[element].lock);
-            if (status == EBUSY)
-                self->r_collisions++;
-            else if (status != 0) {
+       } else {
+           status = rwl_readtrylock (&data[element].lock);
+           if (status == EBUSY)
+               self->r_collisions++;
+           else if (status != 0) {
                 err_abort (status, "Try read lock");
-            } else {
-                if (data[element].data != data[element].updates)
+           } else {
+               if (data[element].data != data[element].updates)
                     printf ("%d: data[%d] %d != %d\n",
-                        self->thread_num, element,
-                        data[element].data, data[element].updates);
-                rwl_readunlock (&data[element].lock);
-            }
-        }
-
-        element++;
-        if (element >= DATASIZE)
-            element = 0;
+                       self->thread_num, element,
+                       data[element].data, data[element].updates);
+               rwl_readunlock (&data[element].lock);
+           }
+       }
+
+       element++;
+       if (element >= DATASIZE)
+           element = 0;
     }
     return NULL;
 }
@@ -577,23 +577,23 @@ int main (int argc, char *argv[])
      * Initialize the shared data.
      */
     for (data_count = 0; data_count < DATASIZE; data_count++) {
-        data[data_count].data = 0;
-        data[data_count].updates = 0;
-        rwl_init (&data[data_count].lock);
+       data[data_count].data = 0;
+       data[data_count].updates = 0;
+       rwl_init (&data[data_count].lock);
     }
 
     /*
      * Create THREADS threads to access shared data.
      */
     for (count = 0; count < THREADS; count++) {
-        threads[count].thread_num = count;
-        threads[count].r_collisions = 0;
-        threads[count].w_collisions = 0;
-        threads[count].updates = 0;
-        threads[count].interval = rand_r (&seed) % ITERATIONS;
-        status = pthread_create (&threads[count].thread_id,
-            NULL, thread_routine, (void*)&threads[count]);
-        if (status != 0)
+       threads[count].thread_num = count;
+       threads[count].r_collisions = 0;
+       threads[count].w_collisions = 0;
+       threads[count].updates = 0;
+       threads[count].interval = rand_r (&seed) % ITERATIONS;
+       status = pthread_create (&threads[count].thread_id,
+           NULL, thread_routine, (void*)&threads[count]);
+       if (status != 0)
             err_abort (status, "Create thread");
     }
 
@@ -602,25 +602,25 @@ int main (int argc, char *argv[])
      * statistics.
      */
     for (count = 0; count < THREADS; count++) {
-        status = pthread_join (threads[count].thread_id, NULL);
-        if (status != 0)
+       status = pthread_join (threads[count].thread_id, NULL);
+       if (status != 0)
             err_abort (status, "Join thread");
-        thread_updates += threads[count].updates;
+       thread_updates += threads[count].updates;
         printf ("%02d: interval %d, updates %d, "
                 "r_collisions %d, w_collisions %d\n",
-            count, threads[count].interval,
-            threads[count].updates,
-            threads[count].r_collisions, threads[count].w_collisions);
+           count, threads[count].interval,
+           threads[count].updates,
+           threads[count].r_collisions, threads[count].w_collisions);
     }
 
     /*
      * Collect statistics for the data.
      */
     for (data_count = 0; data_count < DATASIZE; data_count++) {
-        data_updates += data[data_count].updates;
+       data_updates += data[data_count].updates;
         printf ("data %02d: value %d, %d updates\n",
-            data_count, data[data_count].data, data[data_count].updates);
-        rwl_destroy (&data[data_count].lock);
+           data_count, data[data_count].data, data[data_count].updates);
+       rwl_destroy (&data[data_count].lock);
     }
 
     return 0;
index 4b7ca78852c03f7f7aa0196d0b38833514ca3f05..d310e1b3daec9f081ed34f5f5e911f43882e5be9 100644 (file)
@@ -11,7 +11,7 @@
  *
  */
 /*
-   Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker
+   Copyright (C) 2000-2003 Kern Sibbald and John Walker
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
diff --git a/bacula/src/lib/semlock.c b/bacula/src/lib/semlock.c
new file mode 100644 (file)
index 0000000..7682682
--- /dev/null
@@ -0,0 +1,522 @@
+/*
+ * Bacula Semaphore code. This code permits setting up
+ *  a semaphore that lets through a specified number
+ *  of callers simultaneously. Once the number of callers
+ *  exceed the limit, they block.      
+ *
+ *  Kern Sibbald, March MMIII
+ *
+ *   Derived from rwlock.h which was in turn derived from code in
+ *     "Programming with POSIX Threads" By David R. Butenhof
+ &
+ *   Version $Id$
+ *
+ */
+/*
+   Copyright (C) 2000-2003 Kern Sibbald and John Walker
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of
+   the License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public
+   License along with this program; if not, write to the Free
+   Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+   MA 02111-1307, USA.
+
+ */
+
+#include "bacula.h"
+
+/*   
+ * Initialize a semaphore
+ *
+ *  Returns: 0 on success
+ *          errno on failure
+ */
+int sem_init(semlock_t *sem, int max_active)
+{
+   int stat;
+                       
+   sem->active = sem->waiting = 0;
+   sem->max_active = max_active;
+   if ((stat = pthread_mutex_init(&sem->mutex, NULL)) != 0) {
+      return stat;
+   }
+   if ((stat = pthread_cond_init(&sem->wait, NULL)) != 0) {
+      pthread_mutex_destroy(&sem->mutex);
+      return stat;
+   }
+   sem->valid = SEMLOCK_VALID;
+   return 0;
+}
+
+/*
+ * Destroy a semaphore
+ *
+ * Returns: 0 on success
+ *         errno on failure
+ */
+int sem_destroy(semlock_t *sem)
+{
+   int stat, stat1;      
+
+  if (sem->valid != SEMLOCK_VALID) {
+     return EINVAL;
+  }
+  if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
+     return stat;
+  }
+
+  /* 
+   * If any threads are active, report EBUSY
+   */
+  if (sem->active > 0) {
+     pthread_mutex_unlock(&sem->mutex);
+     return EBUSY;
+  }
+
+  /*
+   * If any threads are waiting, report EBUSY
+   */
+  if (sem->waiting > 0) {
+     pthread_mutex_unlock(&sem->mutex);
+     return EBUSY;
+  }
+
+  sem->valid = 0;
+  if ((stat = pthread_mutex_unlock(&sem->mutex)) != 0) {
+     return stat;
+  }
+  stat = pthread_mutex_destroy(&sem->mutex);
+  stat1 = pthread_cond_destroy(&sem->wait);
+  return (stat != 0 ? stat : stat1);
+}
+
+/*
+ * Handle cleanup when the wait lock condition variable
+ *    wait is released.
+ */
+static void sem_release(void *arg)
+{
+   semlock_t *sem = (semlock_t *)arg;
+
+   sem->waiting--;
+   pthread_mutex_unlock(&sem->mutex);
+}
+
+
+/*
+ * Lock semaphore, wait until locked (or error).
+ */
+int sem_lock(semlock_t *sem)
+{
+   int stat;
+    
+   if (sem->valid != SEMLOCK_VALID) {
+      return EINVAL;
+   }
+   if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
+      return stat;
+   }
+   if (sem->active >= sem->max_active) {
+      sem->waiting++;                /* indicate that we are waiting */
+      pthread_cleanup_push(sem_release, (void *)sem);
+      while (sem->active >= sem->max_active) {
+        if ((stat = pthread_cond_wait(&sem->wait, &sem->mutex)) != 0) {
+           break;                    /* error, bail out */
+        }
+      }
+      pthread_cleanup_pop(0);
+      sem->waiting--;                /* we are no longer waiting */
+   }
+   if (stat == 0) {
+      sem->active++;                 /* we are running */
+   }
+   pthread_mutex_unlock(&sem->mutex);
+   return stat;
+}
+
+/* 
+ * Attempt to lock semaphore, don't wait
+ */
+int sem_trylock(semlock_t *sem)
+{
+   int stat, stat1;
+    
+   if (sem->valid != SEMLOCK_VALID) {
+      return EINVAL;
+   }
+   if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
+      return stat;
+   }
+
+   if (sem->active >= sem->max_active) {
+      stat = EBUSY;
+   } else {
+      sem->active++;                /* we are running */
+   }
+   stat1 = pthread_mutex_unlock(&sem->mutex);
+   return (stat == 0 ? stat1 : stat);
+}
+   
+/* 
+ * Unlock semaphore
+ *  Start any waiting callers
+ */
+int sem_unlock(semlock_t *sem)
+{
+   int stat, stat1;
+    
+   if (sem->valid != SEMLOCK_VALID) {
+      return EINVAL;
+   }
+   if ((stat = pthread_mutex_lock(&sem->mutex)) != 0) {
+      return stat;
+   }
+   sem->active--;
+   if (sem->active < 0) {
+      Emsg0(M_ABORT, 0, "sem_unlock by non-owner.\n");
+   }
+   if (sem->active >= sem->max_active) {
+      stat = 0;                      /* caller(s) still active */
+   } else {
+      /* No more active, awaken someone */
+      if (sem->waiting > 0) {        /* if someone waiting */
+        stat = pthread_cond_broadcast(&sem->wait);
+      }
+   }
+   stat1 = pthread_mutex_unlock(&sem->mutex);
+   return (stat == 0 ? stat1 : stat);
+}
+
+#ifdef TEST_SEMLOCK
+
+#define THREADS     5
+#define DATASIZE   15
+#define ITERATIONS 10000
+
+/*
+ * Keep statics for each thread.
+ */
+typedef struct thread_tag {
+   int thread_num;
+   pthread_t thread_id;
+   int writes;
+   int reads;
+   int interval;
+} thread_t;
+
+/* 
+ * Semaphore lock and shared data.
+ */
+typedef struct data_tag {
+   semlock_t lock;
+   int data;
+   int writes;
+} data_t;
+
+thread_t threads[THREADS];
+data_t data[DATASIZE];
+
+/* 
+ * Thread start routine that uses semaphores locks.
+ */
+void *thread_routine(void *arg)
+{
+   thread_t *self = (thread_t *)arg;
+   int repeats = 0;
+   int iteration;
+   int element = 0;
+   int status;
+
+   for (iteration=0; iteration < ITERATIONS; iteration++) {
+      /*
+       * Each "self->interval" iterations, perform an
+       * update operation (write lock instead of read
+       * lock).
+       */
+      if ((iteration % self->interval) == 0) {
+        status = sem_writelock(&data[element].lock);
+        if (status != 0) {
+            Emsg1(M_ABORT, 0, "Write lock failed. ERR=%s\n", strerror(status));
+        }
+        data[element].data = self->thread_num;
+        data[element].writes++;
+        self->writes++;
+        status = sem_writeunlock(&data[element].lock);
+        if (status != 0) {
+            Emsg1(M_ABORT, 0, "Write unlock failed. ERR=%s\n", strerror(status));
+        }
+      } else {
+        /*
+         * Look at the current data element to see whether
+         * the current thread last updated it. Count the
+         * times to report later.
+         */
+         status = sem_readlock(&data[element].lock);
+         if (status != 0) {
+             Emsg1(M_ABORT, 0, "Read lock failed. ERR=%s\n", strerror(status));
+         }
+         self->reads++;
+         if (data[element].data == self->thread_num)
+            repeats++;
+         status = sem_readunlock(&data[element].lock);
+         if (status != 0) {
+             Emsg1(M_ABORT, 0, "Read unlock failed. ERR=%s\n", strerror(status));
+         }
+      }
+      element++;
+      if (element >= DATASIZE) {
+        element = 0;
+      }
+   }
+   if (repeats > 0) {
+      Dmsg2(000, "Thread %d found unchanged elements %d times\n",
+        self->thread_num, repeats);
+   }
+   return NULL;
+}
+
+int main (int argc, char *argv[])
+{
+    int count;
+    int data_count;
+    int status;
+    unsigned int seed = 1;
+    int thread_writes = 0;
+    int data_writes = 0;
+
+#ifdef sun
+    /*
+     * On Solaris 2.5, threads are not timesliced. To ensure
+     * that our threads can run concurrently, we need to
+     * increase the concurrency level to THREADS.
+     */
+    thr_setconcurrency (THREADS);
+#endif
+
+    /*
+     * Initialize the shared data.
+     */
+    for (data_count = 0; data_count < DATASIZE; data_count++) {
+       data[data_count].data = 0;
+       data[data_count].writes = 0;
+       status = sem_init (&data[data_count].lock);
+       if (status != 0) {
+           Emsg1(M_ABORT, 0, "Init rwlock failed. ERR=%s\n", strerror(status));
+       }
+    }
+
+    /*
+     * Create THREADS threads to access shared data.
+     */
+    for (count = 0; count < THREADS; count++) {
+       threads[count].thread_num = count + 1;
+       threads[count].writes = 0;
+       threads[count].reads = 0;
+       threads[count].interval = rand_r (&seed) % 71;
+       status = pthread_create(&threads[count].thread_id,
+           NULL, thread_routine, (void*)&threads[count]);
+       if (status != 0) {
+           Emsg1(M_ABORT, 0, "Create thread failed. ERR=%s\n", strerror(status));
+       }
+    }
+
+    /*
+     * Wait for all threads to complete, and collect
+     * statistics.
+     */
+    for (count = 0; count < THREADS; count++) {
+       status = pthread_join (threads[count].thread_id, NULL);
+       if (status != 0) {
+           Emsg1(M_ABORT, 0, "Join thread failed. ERR=%s\n", strerror(status));
+       }
+       thread_writes += threads[count].writes;
+        printf ("%02d: interval %d, writes %d, reads %d\n",
+           count, threads[count].interval,
+           threads[count].writes, threads[count].reads);
+    }
+
+    /*
+     * Collect statistics for the data.
+     */
+    for (data_count = 0; data_count < DATASIZE; data_count++) {
+       data_writes += data[data_count].writes;
+        printf ("data %02d: value %d, %d writes\n",
+           data_count, data[data_count].data, data[data_count].writes);
+       sem_destroy (&data[data_count].lock);
+    }
+
+    printf ("Total: %d thread writes, %d data writes\n",
+       thread_writes, data_writes);
+    return 0;
+}
+
+#endif
+
+#ifdef TEST_SEM_TRY_LOCK
+/*
+ * semlock_try_main.c
+ *
+ * Demonstrate use of non-blocking read-write locks.
+ *
+ * Special notes: On a Solaris system, call thr_setconcurrency()
+ * to allow interleaved thread execution, since threads are not
+ * timesliced.
+ */
+#include <pthread.h>
+#include "semlock.h"
+#include "errors.h"
+
+#define THREADS        5
+#define ITERATIONS     1000
+#define DATASIZE       15
+
+/*
+ * Keep statistics for each thread.
+ */
+typedef struct thread_tag {
+    int        thread_num;
+    pthread_t  thread_id;
+    int        r_collisions;
+    int        w_collisions;
+    int        updates;
+    int        interval;
+} thread_t;
+
+/*
+ * Read-write lock and shared data
+ */
+typedef struct data_tag {
+    semlock_t   lock;
+    int        data;
+    int        updates;
+} data_t;
+
+thread_t threads[THREADS];
+data_t data[DATASIZE];
+
+/*
+ * Thread start routine that uses read-write locks
+ */
+void *thread_routine (void *arg)
+{
+    thread_t *self = (thread_t*)arg;
+    int iteration;
+    int element;
+    int status;
+
+    element = 0;                       /* Current data element */
+
+    for (iteration = 0; iteration < ITERATIONS; iteration++) {
+       if ((iteration % self->interval) == 0) {
+           status = sem_writetrylock (&data[element].lock);
+           if (status == EBUSY)
+               self->w_collisions++;
+           else if (status == 0) {
+               data[element].data++;
+               data[element].updates++;
+               self->updates++;
+               sem_writeunlock (&data[element].lock);
+           } else
+                err_abort (status, "Try write lock");
+       } else {
+           status = sem_readtrylock (&data[element].lock);
+           if (status == EBUSY)
+               self->r_collisions++;
+           else if (status != 0) {
+                err_abort (status, "Try read lock");
+           } else {
+               if (data[element].data != data[element].updates)
+                    printf ("%d: data[%d] %d != %d\n",
+                       self->thread_num, element,
+                       data[element].data, data[element].updates);
+               sem_readunlock (&data[element].lock);
+           }
+       }
+
+       element++;
+       if (element >= DATASIZE)
+           element = 0;
+    }
+    return NULL;
+}
+
+int main (int argc, char *argv[])
+{
+    int count, data_count;
+    unsigned int seed = 1;
+    int thread_updates = 0, data_updates = 0;
+    int status;
+
+#ifdef sun
+    /*
+     * On Solaris 2.5, threads are not timesliced. To ensure
+     * that our threads can run concurrently, we need to
+     * increase the concurrency level to THREADS.
+     */
+    DPRINTF (("Setting concurrency level to %d\n", THREADS));
+    thr_setconcurrency (THREADS);
+#endif
+
+    /*
+     * Initialize the shared data.
+     */
+    for (data_count = 0; data_count < DATASIZE; data_count++) {
+       data[data_count].data = 0;
+       data[data_count].updates = 0;
+       sem_init (&data[data_count].lock);
+    }
+
+    /*
+     * Create THREADS threads to access shared data.
+     */
+    for (count = 0; count < THREADS; count++) {
+       threads[count].thread_num = count;
+       threads[count].r_collisions = 0;
+       threads[count].w_collisions = 0;
+       threads[count].updates = 0;
+       threads[count].interval = rand_r (&seed) % ITERATIONS;
+       status = pthread_create (&threads[count].thread_id,
+           NULL, thread_routine, (void*)&threads[count]);
+       if (status != 0)
+            err_abort (status, "Create thread");
+    }
+
+    /*
+     * Wait for all threads to complete, and collect
+     * statistics.
+     */
+    for (count = 0; count < THREADS; count++) {
+       status = pthread_join(threads[count].thread_id, NULL);
+       if (status != 0)
+            err_abort(status, "Join thread");
+       thread_updates += threads[count].updates;
+        printf ("%02d: interval %d, updates %d, "
+                "r_collisions %d, w_collisions %d\n",
+           count, threads[count].interval,
+           threads[count].updates,
+           threads[count].r_collisions, threads[count].w_collisions);
+    }
+
+    /*
+     * Collect statistics for the data.
+     */
+    for (data_count = 0; data_count < DATASIZE; data_count++) {
+       data_updates += data[data_count].updates;
+        printf ("data %02d: value %d, %d updates\n",
+           data_count, data[data_count].data, data[data_count].updates);
+       sem_destroy(&data[data_count].lock);
+    }
+
+    return 0;
+}
+
+#endif
diff --git a/bacula/src/lib/semlock.h b/bacula/src/lib/semlock.h
new file mode 100644 (file)
index 0000000..83c19c8
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Bacula Semaphore code. This code permits setting up
+ *  a semaphore that lets through a specified number
+ *  of callers simultaneously. Once the number of callers
+ *  exceed the limit, they block.      
+ *
+ *  Kern Sibbald, March MMIII
+ *
+ *   Derived from rwlock.h which was in turn derived from code in
+ *     "Programming with POSIX Threads" By David R. Butenhof
+ *
+ *   Version $Id$
+ */
+/*
+   Copyright (C) 2000-2003 Kern Sibbald and John Walker
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of
+   the License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public
+   License along with this program; if not, write to the Free
+   Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+   MA 02111-1307, USA.
+
+ */
+
+#ifndef __SEMLOCK_H 
+#define __SEMLOCK_H 1
+
+typedef struct s_semlock_tag {
+   pthread_mutex_t   mutex;           /* main lock */
+   pthread_cond_t    wait;            /* wait for available slot */
+   int               valid;           /* set when valid */
+   int               waiting;         /* number of callers waiting */
+   int               max_active;      /* maximum active callers */
+   int               active;          /* number of active callers */
+} semlock_t;
+
+#define SEMLOCK_VALID  0xfacade
+
+#define SEM_INIIALIZER \
+   {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, \
+    PTHREAD_COND_INITIALIZER, SEMLOCK_VALID, 0, 0, 0, 0}
+
+/* 
+ * semaphore lock prototypes
+ */
+extern int sem_init(semlock_t *sem, int max_active);
+extern int sem_destroy(semlock_t *sem);
+extern int sem_lock(semlock_t *sem);
+extern int sem_trylock(semlock_t *sem);
+extern int sem_unlock(semlock_t *sem);
+
+#endif /* __SEMLOCK_H */
index 3ccc663923a54d565280251d839472df2dffb25e..dbe0001ffff78551386943c16b80cbad422a56da 100755 (executable)
@@ -56,12 +56,12 @@ static void *workq_server(void *arg);
  * Initialize a work queue
  *
  *  Returns: 0 on success
- *           errno on failure
+ *          errno on failure
  */
-int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
+int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
 {
    int stat;
-                        
+                       
    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
       return stat;
    }
@@ -80,10 +80,10 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
    }
    wq->quit = 0;
    wq->first = wq->last = NULL;
-   wq->max_workers = threads;         /* max threads to create */
-   wq->num_workers = 0;               /* no threads yet */
-   wq->idle_workers = 0;              /* no idle threads */
-   wq->engine = engine;               /* routine to run */
+   wq->max_workers = threads;        /* max threads to create */
+   wq->num_workers = 0;              /* no threads yet */
+   wq->idle_workers = 0;             /* no idle threads */
+   wq->engine = engine;              /* routine to run */
    wq->valid = WORKQ_VALID; 
    return 0;
 }
@@ -92,7 +92,7 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
  * Destroy a work queue
  *
  * Returns: 0 on success
- *          errno on failure
+ *         errno on failure
  */
 int workq_destroy(workq_t *wq)
 {
@@ -104,7 +104,7 @@ int workq_destroy(workq_t *wq)
   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
      return stat;
   }
-  wq->valid = 0;                      /* prevent any more operations */
+  wq->valid = 0;                     /* prevent any more operations */
 
   /* 
    * If any threads are active, wake them 
@@ -112,22 +112,22 @@ int workq_destroy(workq_t *wq)
   if (wq->num_workers > 0) {
      wq->quit = 1;
      if (wq->idle_workers) {
-        if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
-           pthread_mutex_unlock(&wq->mutex);
-           return stat;
-        }
+       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+          pthread_mutex_unlock(&wq->mutex);
+          return stat;
+       }
      }
      while (wq->num_workers > 0) {
-        if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
-           pthread_mutex_unlock(&wq->mutex);
-           return stat;
-        }
+       if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
+          pthread_mutex_unlock(&wq->mutex);
+          return stat;
+       }
      }
   }
   if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
      return stat;
   }
-  stat  = pthread_mutex_destroy(&wq->mutex);
+  stat = pthread_mutex_destroy(&wq->mutex);
   stat1 = pthread_cond_destroy(&wq->work);
   stat2 = pthread_attr_destroy(&wq->attr);
   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
@@ -138,10 +138,10 @@ int workq_destroy(workq_t *wq)
  *  Add work to a queue
  *    wq is a queue that was created with workq_init
  *    element is a user unique item that will be passed to the 
- *        processing routine
+ *       processing routine
  *    work_item will get internal work queue item -- if it is not NULL
  *    priority if non-zero will cause the item to be placed on the
- *        head of the list instead of the tail.
+ *       head of the list instead of the tail.
  */
 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
 {
@@ -168,18 +168,18 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
    if (priority) {
       /* Add to head of queue */
       if (wq->first == NULL) {
-         wq->first = item;
-         wq->last = item;
+        wq->first = item;
+        wq->last = item;
       } else {
-         item->next = wq->first;
-         wq->first = item;
+        item->next = wq->first;
+        wq->first = item;
       }
    } else {
       /* Add to end of queue */
       if (wq->first == NULL) {
-         wq->first = item;
+        wq->first = item;
       } else {
-         wq->last->next = item;
+        wq->last->next = item;
       }
       wq->last = item;
    }
@@ -188,16 +188,16 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
    if (wq->idle_workers > 0) {
       Dmsg0(200, "Signal worker\n");
       if ((stat = pthread_cond_signal(&wq->work)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
-         return stat;
+        pthread_mutex_unlock(&wq->mutex);
+        return stat;
       }
    } else if (wq->num_workers < wq->max_workers) {
       Dmsg0(200, "Create worker thread\n");
       /* No idle threads so create a new one */
       set_thread_concurrency(wq->max_workers + 1);
       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
-         return stat;
+        pthread_mutex_unlock(&wq->mutex);
+        return stat;
       }
       wq->num_workers++;
    }
@@ -236,8 +236,8 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item)
 
    for (prev=item=wq->first; item; item=item->next) {
       if (item == work_item) {
-         found = 1;
-         break;
+        found = 1;
+        break;
       }
       prev = item;
    }
@@ -249,7 +249,7 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item)
    if (wq->first != work_item) {
       prev->next = work_item->next;   
       if (wq->last == work_item) {
-         wq->last = prev;
+        wq->last = prev;
       }
       work_item->next = wq->first;
       wq->first = work_item;
@@ -259,16 +259,16 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item)
    if (wq->idle_workers > 0) {
       Dmsg0(200, "Signal worker\n");
       if ((stat = pthread_cond_signal(&wq->work)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
-         return stat;
+        pthread_mutex_unlock(&wq->mutex);
+        return stat;
       }
    } else {
       Dmsg0(200, "Create worker thread\n");
       /* No idle threads so create a new one */
       set_thread_concurrency(wq->max_workers + 1);
       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
-         return stat;
+        pthread_mutex_unlock(&wq->mutex);
+        return stat;
       }
       wq->num_workers++;
    }
@@ -306,66 +306,66 @@ static void *workq_server(void *arg)
       timeout.tv_sec = tv.tv_sec + 2;
 
       while (wq->first == NULL && !wq->quit) {
-         /*
-          * Wait 2 seconds, then if no more work, exit
-          */
+        /*
+         * Wait 2 seconds, then if no more work, exit
+         */
          Dmsg0(200, "pthread_cond_timedwait()\n");
 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
-         /* CYGWIN dies with a page fault the second
-          * time that pthread_cond_timedwait() is called
-          * so fake it out.
-          */
-         pthread_mutex_lock(&wq->mutex);
-         stat = ETIMEDOUT;
+        /* CYGWIN dies with a page fault the second
+         * time that pthread_cond_timedwait() is called
+         * so fake it out.
+         */
+        pthread_mutex_lock(&wq->mutex);
+        stat = ETIMEDOUT;
 #else
-         stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
+        stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
 #endif
          Dmsg1(200, "timedwait=%d\n", stat);
-         if (stat == ETIMEDOUT) {
-            timedout = 1;
-            break;
-         } else if (stat != 0) {
+        if (stat == ETIMEDOUT) {
+           timedout = 1;
+           break;
+        } else if (stat != 0) {
             /* This shouldn't happen */
             Dmsg0(200, "This shouldn't happen\n");
-            wq->num_workers--;
-            pthread_mutex_unlock(&wq->mutex);
-            return NULL;
-         }
+           wq->num_workers--;
+           pthread_mutex_unlock(&wq->mutex);
+           return NULL;
+        }
       } 
       we = wq->first;
       if (we != NULL) {
-         wq->first = we->next;
-         if (wq->last == we) {
-            wq->last = NULL;
-         }
-         if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
-            return NULL;
-         }
+        wq->first = we->next;
+        if (wq->last == we) {
+           wq->last = NULL;
+        }
+        if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
+           return NULL;
+        }
          /* Call user's routine here */
          Dmsg0(200, "Calling user engine.\n");
-         wq->engine(we->data);
+        wq->engine(we->data);
          Dmsg0(200, "Back from user engine.\n");
-         free(we);                    /* release work entry */
+        free(we);                    /* release work entry */
          Dmsg0(200, "relock mutex\n"); 
-         if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-            return NULL;
-         }
+        if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
+           return NULL;
+        }
          Dmsg0(200, "Done lock mutex\n");
       }
       /*
        * If no more work request, and we are asked to quit, then do it
        */
       if (wq->first == NULL && wq->quit) {
-         wq->num_workers--;
-         if (wq->num_workers == 0) {
+        wq->num_workers--;
+        if (wq->num_workers == 0) {
             Dmsg0(200, "Wake up destroy routine\n");
-            /* Wake up destroy routine if he is waiting */
-            pthread_cond_broadcast(&wq->work);
-         }
+           /* Wake up destroy routine if he is waiting */
+           pthread_cond_broadcast(&wq->work);
+        }
          Dmsg0(200, "Unlock mutex\n");
-         pthread_mutex_unlock(&wq->mutex);
+        pthread_mutex_unlock(&wq->mutex);
          Dmsg0(200, "Return from workq_server\n");
-         return NULL;
+        return NULL;
       }
       Dmsg0(200, "Check for work request\n");
       /* 
@@ -375,8 +375,8 @@ static void *workq_server(void *arg)
       Dmsg1(200, "timedout=%d\n", timedout);
       if (wq->first == NULL && timedout) {
          Dmsg0(200, "break big loop\n");
-         wq->num_workers--;
-         break;
+        wq->num_workers--;
+        break;
       }
       Dmsg0(200, "Loop again\n");
    } /* end of big for loop */
index 64ef4834f316403c7828ba9babfa82a7c32c33f1..dc95fccdb607de53380c930a911b8c976ffc5f4a 100644 (file)
@@ -53,7 +53,7 @@ typedef struct workq_tag {
    int               max_workers;     /* max threads */
    int               num_workers;     /* current threads */
    int               idle_workers;    /* idle threads */
-   void              (*engine)(void *arg); /* user engine */
+   void             *(*engine)(void *arg); /* user engine */
 } workq_t;
 
 #define WORKQ_VALID  0xdec1992
@@ -61,7 +61,7 @@ typedef struct workq_tag {
 extern int workq_init(
               workq_t *wq,
               int     threads,        /* maximum threads */
-              void    (*engine)(void *)   /* engine routine */
+              void   *(*engine)(void *)   /* engine routine */
                     );
 extern int workq_destroy(workq_t *wq);
 extern int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority);
index 6d8dbf71b92535da6e62b64c08c5a49de92236db..b8327afc5d0463c172116c483fee29adb98fe4a7 100644 (file)
@@ -106,7 +106,7 @@ static struct s_cmds cmds[] = {
  *  - We execute the command
  *  - We continue or exit depending on the return status
  */
-void connection_request(void *arg)
+void *connection_request(void *arg)
 {
    BSOCK *bs = (BSOCK *)arg;
    JCR *jcr;
@@ -116,7 +116,7 @@ void connection_request(void *arg)
 
    if (bnet_recv(bs) <= 0) {
       Emsg0(M_ERROR, 0, "Connection request failed.\n");
-      return;
+      return NULL;
    }
 
    /* 
@@ -124,7 +124,7 @@ void connection_request(void *arg)
     */
    if (sscanf(bs->msg, "Hello Start Job %127s calling\n", name) == 1) {
       handle_filed_connection(bs, name);
-      return;
+      return NULL;
    }
    
    jcr = new_jcr(sizeof(JCR), stored_free_jcr);     /* create Job Control Record */
@@ -138,7 +138,7 @@ void connection_request(void *arg)
    if (!authenticate_director(jcr)) {
       Jmsg(jcr, M_FATAL, 0, _("Unable to authenticate Director\n"));
       free_jcr(jcr);
-      return;
+      return NULL;
    }
    Dmsg0(90, "Message channel init completed.\n");
 
@@ -170,7 +170,7 @@ void connection_request(void *arg)
       bnet_sig(bs, BNET_TERMINATE);
    }
    free_jcr(jcr);
-   return;
+   return NULL;
 }
 
 /*
index c74f40e7f5fbed18597b2f7ff5fc4a524e59ed73..a75a16055fd2fdcf027a0beba061c07968d071f6 100644 (file)
 uint32_t new_VolSessionId();
 
 /* From acquire.c */
-DEVICE *acquire_device_for_append(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-int     acquire_device_for_read(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-int     release_device(JCR *jcr, DEVICE *dev);
+DEVICE  *acquire_device_for_append(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int      acquire_device_for_read(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int      release_device(JCR *jcr, DEVICE *dev);
 
 /* From askdir.c */
-int    dir_get_volume_info(JCR *jcr, int writing);
-int    dir_find_next_appendable_volume(JCR *jcr);
-int    dir_update_volume_info(JCR *jcr, VOLUME_CAT_INFO *vol, int relabel);
-int    dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev);
-int    dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev);
-int    dir_update_file_attributes(JCR *jcr, DEV_RECORD *rec);
-int    dir_send_job_status(JCR *jcr);
-int    dir_create_jobmedia_record(JCR *jcr);
+int     dir_get_volume_info(JCR *jcr, int writing);
+int     dir_find_next_appendable_volume(JCR *jcr);
+int     dir_update_volume_info(JCR *jcr, VOLUME_CAT_INFO *vol, int relabel);
+int     dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev);
+int     dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev);
+int     dir_update_file_attributes(JCR *jcr, DEV_RECORD *rec);
+int     dir_send_job_status(JCR *jcr);
+int     dir_create_jobmedia_record(JCR *jcr);
 
 /* authenticate.c */
-int    authenticate_director(JCR *jcr);
-int    authenticate_filed(JCR *jcr);
+int     authenticate_director(JCR *jcr);
+int     authenticate_filed(JCR *jcr);
 
 /* From block.c */
-void   dump_block(DEV_BLOCK *b, char *msg);
+void    dump_block(DEV_BLOCK *b, char *msg);
 DEV_BLOCK *new_block(DEVICE *dev);
-void   init_block_write(DEV_BLOCK *block);
-void   empty_block(DEV_BLOCK *block);
-void   free_block(DEV_BLOCK *block);
-int    write_block_to_device(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-int    write_block_to_dev(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-int    read_block_from_device(DEVICE *dev, DEV_BLOCK *block);
-int    read_block_from_dev(DEVICE *dev, DEV_BLOCK *block);
+void    init_block_write(DEV_BLOCK *block);
+void    empty_block(DEV_BLOCK *block);
+void    free_block(DEV_BLOCK *block);
+int     write_block_to_device(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int     write_block_to_dev(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int     read_block_from_device(DEVICE *dev, DEV_BLOCK *block);
+int     read_block_from_dev(DEVICE *dev, DEV_BLOCK *block);
 
 /* From butil.c -- utilities for SD tool programs */
-void   print_ls_output(char *fname, char *link, int type, struct stat *statp);
+void    print_ls_output(char *fname, char *link, int type, struct stat *statp);
 JCR    *setup_jcr(char *name, char *device, BSR *bsr, char *VolumeName);
 DEVICE *setup_to_access_device(JCR *jcr, int read_access);
-void   display_error_status(DEVICE *dev);
+void    display_error_status(DEVICE *dev);
 DEVRES *find_device_res(char *device_name, int read_access);
 
 
 /* From dev.c */
-DEVICE *init_dev(DEVICE *dev, DEVRES *device);
-int     open_dev(DEVICE *dev, char *VolName, int mode);
-void    close_dev(DEVICE *dev);
-void    force_close_dev(DEVICE *dev);
-int     truncate_dev(DEVICE *dev);
-void    term_dev(DEVICE *dev);
-char *  strerror_dev(DEVICE *dev);
-void    clrerror_dev(DEVICE *dev, int func);
-int     update_pos_dev(DEVICE *dev);
-int     rewind_dev(DEVICE *dev);
-int     load_dev(DEVICE *dev);
-int     offline_dev(DEVICE *dev);
-int     flush_dev(DEVICE *dev);
-int     weof_dev(DEVICE *dev, int num);
-int     write_block(DEVICE *dev);
-int     write_dev(DEVICE *dev, char *buf, size_t len);
-int     read_dev(DEVICE *dev, char *buf, size_t len);
-int     status_dev(DEVICE *dev, uint32_t *status);
-int     eod_dev(DEVICE *dev);
-int     fsf_dev(DEVICE *dev, int num);
-int     fsr_dev(DEVICE *dev, int num);
-int     bsf_dev(DEVICE *dev, int num);
-int     bsr_dev(DEVICE *dev, int num);
-void    attach_jcr_to_device(DEVICE *dev, JCR *jcr);
-void    detach_jcr_from_device(DEVICE *dev, JCR *jcr);
-JCR    *next_attached_jcr(DEVICE *dev, JCR *jcr);
+DEVICE  *init_dev(DEVICE *dev, DEVRES *device);
+int      open_dev(DEVICE *dev, char *VolName, int mode);
+void     close_dev(DEVICE *dev);
+void     force_close_dev(DEVICE *dev);
+int      truncate_dev(DEVICE *dev);
+void     term_dev(DEVICE *dev);
+char *   strerror_dev(DEVICE *dev);
+void     clrerror_dev(DEVICE *dev, int func);
+int      update_pos_dev(DEVICE *dev);
+int      rewind_dev(DEVICE *dev);
+int      load_dev(DEVICE *dev);
+int      offline_dev(DEVICE *dev);
+int      flush_dev(DEVICE *dev);
+int      weof_dev(DEVICE *dev, int num);
+int      write_block(DEVICE *dev);
+int      write_dev(DEVICE *dev, char *buf, size_t len);
+int      read_dev(DEVICE *dev, char *buf, size_t len);
+int      status_dev(DEVICE *dev, uint32_t *status);
+int      eod_dev(DEVICE *dev);
+int      fsf_dev(DEVICE *dev, int num);
+int      fsr_dev(DEVICE *dev, int num);
+int      bsf_dev(DEVICE *dev, int num);
+int      bsr_dev(DEVICE *dev, int num);
+void     attach_jcr_to_device(DEVICE *dev, JCR *jcr);
+void     detach_jcr_from_device(DEVICE *dev, JCR *jcr);
+JCR     *next_attached_jcr(DEVICE *dev, JCR *jcr);
 
 
 /* Get info about device */
-char *  dev_name(DEVICE *dev);
-char *  dev_vol_name(DEVICE *dev);
+char *   dev_name(DEVICE *dev);
+char *   dev_vol_name(DEVICE *dev);
 uint32_t dev_block(DEVICE *dev);
 uint32_t dev_file(DEVICE *dev);
-int     dev_is_tape(DEVICE *dev);
+int      dev_is_tape(DEVICE *dev);
 
 /* From device.c */
-int     open_device(DEVICE *dev);
-int     fixup_device_block_write_error(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int      open_device(DEVICE *dev);
+int      fixup_device_block_write_error(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
 void _lock_device(char *file, int line, DEVICE *dev);
 void _unlock_device(char *file, int line, DEVICE *dev);
 void _block_device(char *file, int line, DEVICE *dev, int state);
@@ -119,40 +119,40 @@ void  new_steal_device_lock(DEVICE *dev, brwsteal_t *hold, int state);
 void  new_return_device_lock(DEVICE *dev, brwsteal_t *hold);
 
 /* From dircmd.c */
-void    connection_request(void *arg); 
+void     *connection_request(void *arg); 
 
 
 /* From fd_cmds.c */
-void    run_job(JCR *jcr);
+void     run_job(JCR *jcr);
 
 /* From fdmsg.c */
-int     bget_msg(BSOCK *sock);
+int      bget_msg(BSOCK *sock);
 
 /* From job.c */
-void    stored_free_jcr(JCR *jcr);
-void    connection_from_filed(void *arg);     
-void    handle_filed_connection(BSOCK *fd, char *job_name);
+void     stored_free_jcr(JCR *jcr);
+void     connection_from_filed(void *arg);     
+void     handle_filed_connection(BSOCK *fd, char *job_name);
 
 /* From label.c */
-int     read_dev_volume_label(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-void    create_session_label(JCR *jcr, DEV_RECORD *rec, int label);
-void    create_volume_label(DEVICE *dev, char *VolName);
-int     write_volume_label_to_dev(JCR *jcr, DEVRES *device, char *VolName, char *PoolName);
-int     write_session_label(JCR *jcr, DEV_BLOCK *block, int label);
-int     write_volume_label_to_block(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-void    dump_volume_label(DEVICE *dev);
-void    dump_label_record(DEVICE *dev, DEV_RECORD *rec, int verbose);
-int     unser_volume_label(DEVICE *dev, DEV_RECORD *rec);
-int     unser_session_label(SESSION_LABEL *label, DEV_RECORD *rec);
+int      read_dev_volume_label(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+void     create_session_label(JCR *jcr, DEV_RECORD *rec, int label);
+void     create_volume_label(DEVICE *dev, char *VolName);
+int      write_volume_label_to_dev(JCR *jcr, DEVRES *device, char *VolName, char *PoolName);
+int      write_session_label(JCR *jcr, DEV_BLOCK *block, int label);
+int      write_volume_label_to_block(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+void     dump_volume_label(DEVICE *dev);
+void     dump_label_record(DEVICE *dev, DEV_RECORD *rec, int verbose);
+int      unser_volume_label(DEVICE *dev, DEV_RECORD *rec);
+int      unser_session_label(SESSION_LABEL *label, DEV_RECORD *rec);
 
 /* From match_bsr.c */
 int match_bsr(BSR *bsr, DEV_RECORD *rec, VOLUME_LABEL *volrec, 
-             SESSION_LABEL *sesrec);
+              SESSION_LABEL *sesrec);
 
 /* From mount.c */
-int     mount_next_write_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block, int release);
-int     mount_next_read_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
-int     autoload_device(JCR *jcr, DEVICE *dev, int writing, BSOCK *dir);
+int      mount_next_write_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block, int release);
+int      mount_next_read_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block);
+int      autoload_device(JCR *jcr, DEVICE *dev, int writing, BSOCK *dir);
 
 
 /* From parse_bsr.c */
@@ -167,11 +167,11 @@ extern void create_vol_list(JCR *jcr);
 /* From record.c */
 char   *FI_to_ascii(int fi);
 char   *stream_to_ascii(int stream, int fi);
-int    write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec);
-int    can_write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec);
-int    read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec); 
+int     write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec);
+int     can_write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec);
+int     read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec); 
 DEV_RECORD *new_record();
-void   free_record(DEV_RECORD *rec);
+void    free_record(DEV_RECORD *rec);
 
 /* From read_record.c */
 int read_records(JCR *jcr,  DEVICE *dev, 
index 0647d910d8a91fc505cccc66be51136fb186eada..909e06a87c2084b955e8373118ce50ac04ff95f6 100644 (file)
@@ -1,8 +1,8 @@
 /* */
 #define VERSION "1.30"
 #define VSTRING "1"
-#define BDATE   "14 March 2003"
-#define LSMDATE "14Mar03"
+#define BDATE   "17 March 2003"
+#define LSMDATE "17Mar03"
 
 /* Debug flags */
 #define DEBUG 1
@@ -10,6 +10,9 @@
 #define SMCHECK     
 #define TRACE_FILE 1  
 
+/* Turn this on if you want to try the new Job semaphore code */
+/* #define USE_SEMAPHORE */
+
 /* IF you undefine this, Bacula will run 10X slower */
 #define NO_POLL_TEST 1