]> git.sur5r.net Git - bacula/bacula/commitdiff
Nic's watchdog code + cleanup jcr locking/use_count
authorKern Sibbald <kern@sibbald.com>
Tue, 23 Dec 2003 20:23:15 +0000 (20:23 +0000)
committerKern Sibbald <kern@sibbald.com>
Tue, 23 Dec 2003 20:23:15 +0000 (20:23 +0000)
git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@944 91ce42f0-d328-0410-95d8-f526ca767f89

22 files changed:
bacula/kernstodo
bacula/src/dird/dird.c
bacula/src/dird/dird_conf.c
bacula/src/dird/dird_conf.h
bacula/src/dird/job.c
bacula/src/dird/jobq.c
bacula/src/dird/protos.h
bacula/src/dird/run_conf.c
bacula/src/dird/ua_cmds.c
bacula/src/dird/ua_run.c
bacula/src/dird/ua_server.c
bacula/src/dird/ua_status.c
bacula/src/filed/filed.c
bacula/src/jcr.h
bacula/src/lib/Makefile.in
bacula/src/lib/jcr.c
bacula/src/lib/lib.h
bacula/src/lib/protos.h
bacula/src/lib/watchdog.c
bacula/src/lib/watchdog.h
bacula/src/stored/stored.c
bacula/src/version.h

index fe847d8baecdee00bd3b61d4525a8e60f17b4f1e..8bfbd778e2bbd80a019045beb61e607bdbf13435 100644 (file)
@@ -55,6 +55,8 @@ For 1.33 Testing/Documentation:
 - Add subsections to the Disaster Recovery index section.
                 
 For 1.33
+- If a tape is recycled while it is mounted, Stanislav Tvrudy must do an
+  additional mount to deblock the job.
 - Notes for integrating Nic's code:
   - Most likely jcr crash is the free_jcr(). The ref count was not incremented
     in the watchdog call to cancel_job() as it is in the UA -- see
index b4130421222ccb25418d89b9b262928c47c2d9b4..f839831a5d6a5e516295503ed35ddf2a216b5a16 100644 (file)
@@ -214,12 +214,15 @@ int main (int argc, char *argv[])
 
    start_watchdog();                 /* start network watchdog thread */
 
+   init_jcr_subsystem();             /* start JCR watchdogs etc. */
+
    init_job_server(director->MaxConcurrentJobs);
   
    Dmsg0(200, "wait for next job\n");
    /* Main loop -- call scheduler to get next job to run */
    while ((jcr = wait_for_next_job(runjob))) {
       run_job(jcr);                  /* run job */
+      free_jcr(jcr);                 /* release jcr */
       if (runjob) {                  /* command line, run a single job? */
         break;                       /* yes, terminate */
       }
@@ -239,7 +242,6 @@ static void terminate_dird(int sig)
    already_here = TRUE;
    delete_pid_file(director->pid_directory, "bacula-dir",  
                   director->DIRport);
-   stop_watchdog();
 // signal(SIGCHLD, SIG_IGN);          /* don't worry about children now */
    term_scheduler();
    if (runjob) {
@@ -254,6 +256,7 @@ static void terminate_dird(int sig)
    free_config_resources();
    term_ua_server();
    term_msg();                       /* terminate message handler */
+   stop_watchdog();
    close_memory_pool();              /* release free memory in pool */
    sm_dump(False);
    exit(sig != 0);
index 2b4bc137867bfc1037fa3c2cf4115ffc5bc1ade7..e1fa607ab3825560254d4f77c0eb137907fd662a 100644 (file)
@@ -209,6 +209,7 @@ static struct res_items job_items[] = {
    {"replace",  store_replace, ITEM(res_job.replace), 0, ITEM_DEFAULT, REPLACE_ALWAYS},
    {"bootstrap",store_dir,     ITEM(res_job.RestoreBootstrap), 0, 0, 0},
    {"maxruntime", store_time,  ITEM(res_job.MaxRunTime), 0, 0, 0},
+   {"maxwaittime", store_time,  ITEM(res_job.MaxWaitTime), 0, 0, 0},
    {"maxstartdelay", store_time,ITEM(res_job.MaxStartDelay), 0, 0, 0},
    {"prefixlinks", store_yesno, ITEM(res_job.PrefixLinks), 1, ITEM_DEFAULT, 0},
    {"prunejobs",   store_yesno, ITEM(res_job.PruneJobs), 1, ITEM_DEFAULT, 0},
index bec790248cc1c57ab16355c269f9cb80f04ce73e..ef375599ccfc924f62b8bb134823a3baba6bc308 100644 (file)
@@ -191,6 +191,7 @@ struct JOB {
    char *WriteBootstrap;              /* Where to write bootstrap Job updates */
    int   replace;                     /* How (overwrite, ..) */
    utime_t MaxRunTime;                /* max run time in seconds */
+   utime_t MaxWaitTime;               /* max blocking time in seconds */
    utime_t MaxStartDelay;             /* max start delay in seconds */
    int PrefixLinks;                   /* prefix soft links with Where path */
    int PruneJobs;                     /* Force pruning of Jobs */
index e84a6422ccb1ee96e5cdbe9c52ed962f7ac3f6a4..1d3fd12096251f74e4319a9930eca6ebddcf410b 100644 (file)
 
 /* Forward referenced subroutines */
 static void *job_thread(void *arg);
+static void job_monitor_watchdog(watchdog_t *self);
+static void job_monitor_destructor(watchdog_t *self);
+static bool job_check_maxwaittime(JCR *control_jcr, JCR *jcr);
+static bool job_check_maxruntime(JCR *control_jcr, JCR *jcr);
 
 /* Exported subroutines */
 
-
 /* Imported subroutines */
 extern void term_scheduler();
 extern void term_ua_server();
@@ -43,17 +46,185 @@ extern int do_admin(JCR *jcr);
 extern int do_restore(JCR *jcr);
 extern int do_verify(JCR *jcr);
 
+/* Imported variables */
+extern time_t watchdog_time;
+
 jobq_t job_queue;
 
 void init_job_server(int max_workers)
 {
    int stat;
+   watchdog_t *wd;
+   
    if ((stat = jobq_init(&job_queue, max_workers, job_thread)) != 0) {
       Emsg1(M_ABORT, 0, _("Could not init job queue: ERR=%s\n"), strerror(stat));
    }
+   if ((wd = watchdog_new()) == NULL) {
+      Emsg0(M_ABORT, 0, _("Could not init job monitor watchdogs\n"));
+   }
+   wd->callback = job_monitor_watchdog;
+   wd->destructor = job_monitor_destructor;
+   wd->one_shot = false;
+   wd->interval = 60;
+   wd->data = create_control_jcr("*JobMonitor*", JT_SYSTEM);
+   register_watchdog(wd);
+   
    return;
 }
 
+static void job_monitor_destructor(watchdog_t *self)
+{
+   JCR *control_jcr = (JCR *) self->data;
+
+   free_jcr(control_jcr);
+}
+
+static void job_monitor_watchdog(watchdog_t *self)
+{
+   JCR *control_jcr, *jcr;
+
+   control_jcr = (JCR *) self->data;
+
+   Dmsg1(200, "job_monitor_watchdog %p called\n", self);
+
+   lock_jcr_chain();
+
+   for (jcr = NULL; (jcr = get_next_jcr(jcr)); /* nothing */) {
+      bool cancel;
+
+      if (jcr->JobId == 0) {
+         Dmsg2(200, "Skipping JCR %p (%s) with JobId 0\n",
+              jcr, jcr->Job);
+        /* Keep reference counts correct */
+        free_locked_jcr(jcr);
+        continue;
+      }
+
+      /* check MaxWaitTime */
+      cancel = job_check_maxwaittime(control_jcr, jcr);
+
+      /* check MaxRunTime */
+      cancel |= job_check_maxruntime(control_jcr, jcr);
+
+      if (cancel) {
+         Dmsg3(200, "Cancelling JCR %p jobid %d (%s)\n",
+              jcr, jcr->JobId, jcr->Job);
+
+        UAContext *ua = new_ua_context(jcr);
+        ua->jcr = control_jcr;
+        cancel_job(ua, jcr);
+        free_ua_context(ua);
+
+         Dmsg1(200, "Have cancelled JCR %p\n", jcr);
+      }
+
+      /* Keep reference counts correct */
+      free_locked_jcr(jcr);
+   }
+   unlock_jcr_chain();
+}
+
+static bool job_check_maxwaittime(JCR *control_jcr, JCR *jcr)
+{
+   bool cancel = false;
+
+   if (jcr->job->MaxWaitTime == 0) {
+      return false;
+   }
+   if ((watchdog_time - jcr->start_time) < jcr->job->MaxWaitTime) {
+      Dmsg3(200, "Job %p (%s) with MaxWaitTime %d not expired\n",
+           jcr, jcr->Job, jcr->job->MaxWaitTime);
+      return false;
+   }
+   Dmsg3(200, "Job %d (%s): MaxWaitTime of %d seconds exceeded, "
+         "checking status\n",
+        jcr->JobId, jcr->Job, jcr->job->MaxWaitTime);
+   switch (jcr->JobStatus) {
+      case JS_Created:
+      case JS_Blocked:
+      case JS_WaitFD:
+      case JS_WaitSD:
+      case JS_WaitStoreRes:
+      case JS_WaitClientRes:
+      case JS_WaitJobRes:
+      case JS_WaitPriority:
+      case JS_WaitMaxJobs:
+      case JS_WaitStartTime:
+        cancel = true;
+         Dmsg0(200, "JCR blocked in #1\n");
+        break;
+      case JS_Running:
+         Dmsg0(200, "JCR running, checking SD status\n");
+        switch (jcr->SDJobStatus) {
+           case JS_WaitMount:
+           case JS_WaitMedia:
+           case JS_WaitFD:
+              cancel = true;
+               Dmsg0(200, "JCR blocked in #2\n");
+              break;
+           default:
+               Dmsg0(200, "JCR not blocked in #2\n");
+              break;
+        }
+        break;
+      case JS_Terminated:
+      case JS_ErrorTerminated:
+      case JS_Canceled:
+         Dmsg0(200, "JCR already dead in #3\n");
+        break;
+      default:
+         Emsg1(M_ABORT, 0, _("Unhandled job status code %d\n"),
+              jcr->JobStatus);
+   }
+   Dmsg3(200, "MaxWaitTime result: %scancel JCR %p (%s)\n",
+         cancel ? "" : "do not ", jcr, jcr->job);
+
+   return cancel;
+}
+
+static bool job_check_maxruntime(JCR *control_jcr, JCR *jcr)
+{
+   bool cancel = false;
+
+   if (jcr->job->MaxRunTime == 0) {
+      return false;
+   }
+   if ((watchdog_time - jcr->start_time) < jcr->job->MaxRunTime) {
+      Dmsg3(200, "Job %p (%s) with MaxRunTime %d not expired\n",
+           jcr, jcr->Job, jcr->job->MaxRunTime);
+      return false;
+   }
+
+   switch (jcr->JobStatus) {
+      case JS_Created:
+      case JS_Blocked:
+      case JS_WaitFD:
+      case JS_WaitSD:
+      case JS_WaitStoreRes:
+      case JS_WaitClientRes:
+      case JS_WaitJobRes:
+      case JS_WaitPriority:
+      case JS_WaitMaxJobs:
+      case JS_WaitStartTime:
+      case JS_Running:
+        cancel = true;
+        break;
+      case JS_Terminated:
+      case JS_ErrorTerminated:
+      case JS_Canceled:
+        cancel = false;
+        break;
+      default:
+         Emsg1(M_ABORT, 0, _("Unhandled job status code %d\n"),
+              jcr->JobStatus);
+   }
+
+   Dmsg3(200, "MaxRunTime result: %scancel JCR %p (%s)\n",
+         cancel ? "" : "do not ", jcr, jcr->job);
+
+   return cancel;
+}
+
 /*
  * Run a job -- typically called by the scheduler, but may also
  *             be called by the UA (Console program).
@@ -63,6 +234,7 @@ void run_job(JCR *jcr)
 {
    int stat, errstat;
 
+   P(jcr->mutex);
    sm_check(__FILE__, __LINE__, True);
    init_msg(jcr, jcr->messages);
    create_unique_job_name(jcr, jcr->job->hdr.name);
@@ -79,9 +251,7 @@ void run_job(JCR *jcr)
    /* Initialize termination condition variable */
    if ((errstat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
       Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), strerror(errstat));
-      set_jcr_job_status(jcr, JS_ErrorTerminated);
-      free_jcr(jcr);
-      return;
+      goto bail_out;
    }
 
    /*
@@ -97,9 +267,7 @@ void run_job(JCR *jcr)
       if (jcr->db) {
          Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
       }
-      set_jcr_job_status(jcr, JS_ErrorTerminated);
-      free_jcr(jcr);
-      return;
+      goto bail_out;
    }
    Dmsg0(50, "DB opened\n");
 
@@ -109,9 +277,7 @@ void run_job(JCR *jcr)
    jcr->jr.JobStatus = jcr->JobStatus;
    if (!db_create_job_record(jcr, jcr->db, &jcr->jr)) {
       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
-      set_jcr_job_status(jcr, JS_ErrorTerminated);
-      free_jcr(jcr);
-      return;
+      goto bail_out;
    }
    jcr->JobId = jcr->jr.JobId;
    ASSERT(jcr->jr.JobId > 0);
@@ -125,10 +291,87 @@ void run_job(JCR *jcr)
       Emsg1(M_ABORT, 0, _("Could not add job queue: ERR=%s\n"), strerror(stat));
    }
    Dmsg0(100, "Done run_job()\n");
+
+   V(jcr->mutex);
+   return;
+
+bail_out:
+   set_jcr_job_status(jcr, JS_ErrorTerminated);
+   V(jcr->mutex);
+   return;
+
+}
+
+/*
+ * Cancel a job -- typically called by the UA (Console program), but may also
+ *             be called by the job watchdog.
+ * 
+ *  Returns: 1 if cancel appears to be successful
+ *          0 on failure. Message sent to ua->jcr.
+ */
+int cancel_job(UAContext *ua, JCR *jcr)
+{
+   BSOCK *sd, *fd;
+
+   switch (jcr->JobStatus) {
+   case JS_Created:
+   case JS_WaitJobRes:
+   case JS_WaitClientRes:
+   case JS_WaitStoreRes:
+   case JS_WaitPriority:
+   case JS_WaitMaxJobs:
+   case JS_WaitStartTime:
+      set_jcr_job_status(jcr, JS_Canceled);
+      bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"),
+             jcr->JobId, jcr->Job);
+      jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */
+      return 1;
+        
+   default:
+      set_jcr_job_status(jcr, JS_Canceled);
+
+      /* Cancel File daemon */
+      if (jcr->file_bsock) {
+        ua->jcr->client = jcr->client;
+        if (!connect_to_file_daemon(ua->jcr, 10, FDConnectTimeout, 1)) {
+            bsendmsg(ua, _("Failed to connect to File daemon.\n"));
+           return 0;
+        }
+         Dmsg0(200, "Connected to file daemon\n");
+        fd = ua->jcr->file_bsock;
+         bnet_fsend(fd, "cancel Job=%s\n", jcr->Job);
+        while (bnet_recv(fd) >= 0) {
+            bsendmsg(ua, "%s", fd->msg);
+        }
+        bnet_sig(fd, BNET_TERMINATE);
+        bnet_close(fd);
+        ua->jcr->file_bsock = NULL;
+      }
+
+      /* Cancel Storage daemon */
+      if (jcr->store_bsock) {
+        ua->jcr->store = jcr->store;
+        if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
+            bsendmsg(ua, _("Failed to connect to Storage daemon.\n"));
+           return 0;
+        }
+         Dmsg0(200, "Connected to storage daemon\n");
+        sd = ua->jcr->store_bsock;
+         bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
+        while (bnet_recv(sd) >= 0) {
+            bsendmsg(ua, "%s", sd->msg);
+        }
+        bnet_sig(sd, BNET_TERMINATE);
+        bnet_close(sd);
+        ua->jcr->store_bsock = NULL;
+      }
+   }
+
+   return 1;
 }
 
 /* 
- * This is the engine called by job_add() when we were pulled               
+ * This is the engine called by jobq.c:jobq_add() when we were pulled               
  *  from the work queue.
  *  At this point, we are running in our own thread and all
  *    necessary resources are allocated -- see jobq.c
index eb4640efcd7e9c9ff55a19cc01c8316ad4fb13e6..b5026918aaf4c6cccadaa8fdfe98eace6be2c961 100755 (executable)
@@ -171,7 +171,10 @@ static void *sched_wait(void *arg)
       }
       wtime = jcr->sched_time - time(NULL);
    }
+   P(jcr->mutex);                    /* lock jcr */
    jobq_add(jq, jcr);
+   V(jcr->mutex);
+   free_jcr(jcr);                    /* we are done with jcr */
    Dmsg0(100, "Exit sched_wait\n");
    return NULL;
 }
@@ -180,6 +183,8 @@ static void *sched_wait(void *arg)
 /*
  *  Add a job to the queue
  *    jq is a queue that was created with jobq_init
+ * 
+ *  On entry jcr->mutex must be locked.
  *   
  */
 int jobq_add(jobq_t *jq, JCR *jcr)
@@ -197,20 +202,27 @@ int jobq_add(jobq_t *jq, JCR *jcr)
       return EINVAL;
    }
 
+   jcr->use_count++;                 /* mark jcr in use by us */
+
    if (!job_canceled(jcr) && wtime > 0) {
       set_thread_concurrency(jq->max_workers + 2);
       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
       sched_pkt->jcr = jcr;
       sched_pkt->jq = jq;
       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);       
+      if (!stat) {                   /* thread not created */
+        jcr->use_count--;            /* release jcr */
+      }
       return stat;
    }
 
    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+      jcr->use_count--;              /* release jcr */
       return stat;
    }
 
    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
+      jcr->use_count--;              /* release jcr */
       return ENOMEM;
    }
    item->jcr = jcr;
@@ -248,13 +260,13 @@ int jobq_add(jobq_t *jq, JCR *jcr)
 }
 
 /*
- *  Remove a job from the job queue. Used only by cancel Console command.
+ *  Remove a job from the job queue. Used only by cancel_job().
  *    jq is a queue that was created with jobq_init
  *    work_item is an element of work
  *
- *   Note, it is "removed" by immediately calling a processing routine.
- *    if you want to cancel it, you need to provide some external means
- *    of doing so.
+ *   Note, it is "removed" from the job queue.
+ *    If you want to cancel it, you need to provide some external means
+ *    of doing so (e.g. pthread_kill()).
  */
 int jobq_remove(jobq_t *jq, JCR *jcr)
 {
@@ -454,6 +466,7 @@ static void *jobq_server(void *arg)
             Dmsg0(100, "Call to run new job\n");
            V(jq->mutex);
             run_job(njcr);            /* This creates a "new" job */
+            free_jcr(njcr);           /* release "new" jcr */
            P(jq->mutex);
             Dmsg0(100, "Back from running new job.\n");
         }
index f05b8c0cd01a7cafa6621f72696167748db4a9a1..d9a99255254ba908c2a24d57556241e8bfed224e 100644 (file)
@@ -87,6 +87,7 @@ extern void create_unique_job_name(JCR *jcr, char *base_name);
 extern void update_job_end_record(JCR *jcr);
 extern int get_or_create_client_record(JCR *jcr);
 extern void run_job(JCR *jcr);
+extern int cancel_job(UAContext *ua, JCR *jcr);
 
 /* mountreq.c */
 extern void mount_request(JCR *jcr, BSOCK *bs, char *buf);
@@ -137,6 +138,7 @@ RUN *find_next_run(RUN *run, JOB *job, time_t &runtime);
 /* ua_server.c */
 void bsendmsg(void *sock, char *fmt, ...);
 UAContext *new_ua_context(JCR *jcr);
+JCR *create_control_jcr(char *base_name, int job_type);
 void free_ua_context(UAContext *ua);
 
 /* ua_select.c */
index 66fafec301403240ae131b9a79fc454203cf3395..faabf7c9fa63e58522ed723b990b9d0edef4b86e 100644 (file)
@@ -412,6 +412,7 @@ void store_run(LEX *lc, struct res_items *item, int index, int pass)
             scan_err0(lc, _("Bad time specification."));
            /* NOT REACHED */
         }
+        /****FIXME**** convert to UTC */
         set_bit(code, lrun.hour);
         lrun.minute = code2;
         have_hour = true;
index 45f62fd5ec9f216a5f4f5407e3e0b7fca0bbcac9..37ba7ab57efc4b23f6466ce27bbe366136f03897 100644 (file)
@@ -348,9 +348,8 @@ int automount_cmd(UAContext *ua, char *cmd)
  */
 static int cancel_cmd(UAContext *ua, char *cmd)
 {
-   int i;
+   int i, ret;
    int njobs = 0;
-   BSOCK *sd, *fd;
    JCR *jcr = NULL;
    char JobName[MAX_NAME_LENGTH];
 
@@ -419,72 +418,18 @@ static int cancel_cmd(UAContext *ua, char *cmd)
            return 1;
         }
       }
+      /* NOTE! This increments the ref_count */
       jcr = get_jcr_by_full_name(JobName);
       if (!jcr) {
          bsendmsg(ua, _("Job %s not found.\n"), JobName);
         return 1;
       }
    }
-     
-   switch (jcr->JobStatus) {
-   case JS_Created:
-   case JS_WaitJobRes:
-   case JS_WaitClientRes:
-   case JS_WaitStoreRes:
-   case JS_WaitPriority:
-   case JS_WaitMaxJobs:
-   case JS_WaitStartTime:
-      set_jcr_job_status(jcr, JS_Canceled);
-      bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"),
-             jcr->JobId, jcr->Job);
-      jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */
-      free_jcr(jcr);                 /* this decrements the use count only */
-      return 1;
-        
-   default:
-      set_jcr_job_status(jcr, JS_Canceled);
-
-      /* Cancel File daemon */
-      if (jcr->file_bsock) {
-        ua->jcr->client = jcr->client;
-        if (!connect_to_file_daemon(ua->jcr, 10, FDConnectTimeout, 1)) {
-            bsendmsg(ua, _("Failed to connect to File daemon.\n"));
-           free_jcr(jcr);
-           return 1;
-        }
-         Dmsg0(200, "Connected to file daemon\n");
-        fd = ua->jcr->file_bsock;
-         bnet_fsend(fd, "cancel Job=%s\n", jcr->Job);
-        while (bnet_recv(fd) >= 0) {
-            bsendmsg(ua, "%s", fd->msg);
-        }
-        bnet_sig(fd, BNET_TERMINATE);
-        bnet_close(fd);
-        ua->jcr->file_bsock = NULL;
-      }
 
-      /* Cancel Storage daemon */
-      if (jcr->store_bsock) {
-        ua->jcr->store = jcr->store;
-        if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
-            bsendmsg(ua, _("Failed to connect to Storage daemon.\n"));
-           free_jcr(jcr);
-           return 1;
-        }
-         Dmsg0(200, "Connected to storage daemon\n");
-        sd = ua->jcr->store_bsock;
-         bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
-        while (bnet_recv(sd) >= 0) {
-            bsendmsg(ua, "%s", sd->msg);
-        }
-        bnet_sig(sd, BNET_TERMINATE);
-        bnet_close(sd);
-        ua->jcr->store_bsock = NULL;
-      }
-   }
+   ret = cancel_job(ua, jcr);
    free_jcr(jcr);
 
-   return 1; 
+   return ret;
 }
 
 /*
index e18777a7e20566b800fdaba8d277ce149fb0c181..d8c96b821ec97d1968d16b63720d71490305ac23 100644 (file)
@@ -325,7 +325,10 @@ int run_cmd(UAContext *ua, char *cmd)
       verify_job = job->verify_job;
    }
 
-   /* Create JCR to run job */
+   /*
+    * Create JCR to run job.  NOTE!!! after this point, free_jcr()
+    *  before returning.
+    */
    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
    set_jcr_defaults(jcr, job);
 
@@ -364,7 +367,7 @@ int run_cmd(UAContext *ua, char *cmd)
       }
       if (!jcr->replace) {
          bsendmsg(ua, _("Invalid replace option: %s\n"), replace);
-        return 0;
+        goto bail_out;
       }
    } else if (job->replace) {
       jcr->replace = job->replace;
@@ -387,7 +390,7 @@ try_again:
    switch (jcr->JobType) {
       char ec1[30];
       char dt[MAX_TIME_LENGTH];
-      case JT_ADMIN:
+   case JT_ADMIN:
          bsendmsg(ua, _("Run %s job\n\
 JobName:  %s\n\
 FileSet:  %s\n\
@@ -402,29 +405,28 @@ Priority: %d\n"),
                 NPRT(jcr->store->hdr.name), 
                 bstrutime(dt, sizeof(dt), jcr->sched_time), 
                 jcr->JobPriority);
-        jcr->JobLevel = L_FULL;
-        break;
-      case JT_BACKUP:
-      case JT_VERIFY:
-        if (level_name) {
-           /* Look up level name and pull code */
-           found = 0;
-           for (i=0; joblevels[i].level_name; i++) {
-              if (strcasecmp(level_name, _(joblevels[i].level_name)) == 0) {
-                 jcr->JobLevel = joblevels[i].level;
-                 found = 1;
-                 break;
-              }
-           }
-           if (!found) { 
-               bsendmsg(ua, _("Level %s not valid.\n"), level_name);
-              free_jcr(jcr);
-              return 1;
+      jcr->JobLevel = L_FULL;
+      break;
+   case JT_BACKUP:
+   case JT_VERIFY:
+      if (level_name) {
+        /* Look up level name and pull code */
+        found = 0;
+        for (i=0; joblevels[i].level_name; i++) {
+           if (strcasecmp(level_name, _(joblevels[i].level_name)) == 0) {
+              jcr->JobLevel = joblevels[i].level;
+              found = 1;
+              break;
            }
         }
-        level_name = NULL;
-        if (jcr->JobType == JT_BACKUP) {
-            bsendmsg(ua, _("Run %s job\n\
+        if (!found) { 
+            bsendmsg(ua, _("Level %s not valid.\n"), level_name);
+           goto bail_out;
+        }
+      }
+      level_name = NULL;
+      if (jcr->JobType == JT_BACKUP) {
+         bsendmsg(ua, _("Run %s job\n\
 JobName:  %s\n\
 FileSet:  %s\n\
 Level:    %s\n\
@@ -442,14 +444,14 @@ Priority: %d\n"),
                 NPRT(jcr->pool->hdr.name), 
                 bstrutime(dt, sizeof(dt), jcr->sched_time),
                 jcr->JobPriority);
-        } else {  /* JT_VERIFY */
-           char *Name;
-           if (jcr->job->verify_job) {
-              Name = jcr->job->verify_job->hdr.name;
-           } else {
-               Name = "";
-           }
-            bsendmsg(ua, _("Run %s job\n\
+      } else { /* JT_VERIFY */
+        char *Name;
+        if (jcr->job->verify_job) {
+           Name = jcr->job->verify_job->hdr.name;
+        } else {
+            Name = "";
+        }
+         bsendmsg(ua, _("Run %s job\n\
 JobName:     %s\n\
 FileSet:     %s\n\
 Level:       %s\n\
@@ -459,34 +461,33 @@ Pool:        %s\n\
 Verify Job:  %s\n\
 When:        %s\n\
 Priority:    %d\n"),
-                 _("Verify"),
-                job->hdr.name,
-                jcr->fileset->hdr.name,
-                level_to_str(jcr->JobLevel),
-                jcr->client->hdr.name,
-                jcr->store->hdr.name,
-                NPRT(jcr->pool->hdr.name), 
-                Name,            
-                bstrutime(dt, sizeof(dt), jcr->sched_time),
-                jcr->JobPriority);
-        }
-        break;
-      case JT_RESTORE:
-        if (jcr->RestoreJobId == 0 && !jcr->RestoreBootstrap) {
-           if (jid) {
-              jcr->RestoreJobId = atoi(jid);
-           } else {
-               if (!get_pint(ua, _("Please enter a JobId for restore: "))) {
-                 free_jcr(jcr);
-                 return 1;
-              }  
-              jcr->RestoreJobId = ua->pint32_val;
-           }
+              _("Verify"),
+             job->hdr.name,
+             jcr->fileset->hdr.name,
+             level_to_str(jcr->JobLevel),
+             jcr->client->hdr.name,
+             jcr->store->hdr.name,
+             NPRT(jcr->pool->hdr.name), 
+             Name,            
+             bstrutime(dt, sizeof(dt), jcr->sched_time),
+             jcr->JobPriority);
+      }
+      break;
+   case JT_RESTORE:
+      if (jcr->RestoreJobId == 0 && !jcr->RestoreBootstrap) {
+        if (jid) {
+           jcr->RestoreJobId = atoi(jid);
+        } else {
+            if (!get_pint(ua, _("Please enter a JobId for restore: "))) {
+              goto bail_out;
+           }  
+           jcr->RestoreJobId = ua->pint32_val;
         }
-        jcr->JobLevel = L_FULL;      /* default level */
-         Dmsg1(20, "JobId to restore=%d\n", jcr->RestoreJobId);
-        if (jcr->RestoreJobId == 0) {
-            bsendmsg(ua, _("Run Restore job\n\
+      }
+      jcr->JobLevel = L_FULL;     /* default level */
+      Dmsg1(20, "JobId to restore=%d\n", jcr->RestoreJobId);
+      if (jcr->RestoreJobId == 0) {
+         bsendmsg(ua, _("Run Restore job\n\
 JobName:    %s\n\
 Bootstrap:  %s\n\
 Where:      %s\n\
@@ -496,17 +497,17 @@ Client:     %s\n\
 Storage:    %s\n\
 When:       %s\n\
 Priority:   %d\n"),
-                job->hdr.name,
-                NPRT(jcr->RestoreBootstrap),
-                jcr->where?jcr->where:NPRT(job->RestoreWhere),
-                replace,
-                jcr->fileset->hdr.name,
-                jcr->client->hdr.name,
-                jcr->store->hdr.name, 
-                bstrutime(dt, sizeof(dt), jcr->sched_time),
-                jcr->JobPriority);
-        } else {
-            bsendmsg(ua, _("Run Restore job\n\
+             job->hdr.name,
+             NPRT(jcr->RestoreBootstrap),
+             jcr->where?jcr->where:NPRT(job->RestoreWhere),
+             replace,
+             jcr->fileset->hdr.name,
+             jcr->client->hdr.name,
+             jcr->store->hdr.name, 
+             bstrutime(dt, sizeof(dt), jcr->sched_time),
+             jcr->JobPriority);
+      } else {
+         bsendmsg(ua, _("Run Restore job\n\
 JobName:    %s\n\
 Bootstrap:  %s\n\
 Where:      %s\n\
@@ -517,43 +518,40 @@ Storage:    %s\n\
 JobId:      %s\n\
 When:       %s\n\
 Priority:   %d\n"),
-                job->hdr.name,
-                NPRT(jcr->RestoreBootstrap),
-                jcr->where?jcr->where:NPRT(job->RestoreWhere),
-                replace,
-                jcr->fileset->hdr.name,
-                jcr->client->hdr.name,
-                jcr->store->hdr.name, 
-                 jcr->RestoreJobId==0?"*None*":edit_uint64(jcr->RestoreJobId, ec1), 
-                bstrutime(dt, sizeof(dt), jcr->sched_time),
-                jcr->JobPriority);
-        }
-        break;
-      default:
-         bsendmsg(ua, _("Unknown Job Type=%d\n"), jcr->JobType);
-        free_jcr(jcr);
-        return 0;
+             job->hdr.name,
+             NPRT(jcr->RestoreBootstrap),
+             jcr->where?jcr->where:NPRT(job->RestoreWhere),
+             replace,
+             jcr->fileset->hdr.name,
+             jcr->client->hdr.name,
+             jcr->store->hdr.name, 
+              jcr->RestoreJobId==0?"*None*":edit_uint64(jcr->RestoreJobId, ec1), 
+             bstrutime(dt, sizeof(dt), jcr->sched_time),
+             jcr->JobPriority);
+      }
+      break;
+   default:
+      bsendmsg(ua, _("Unknown Job Type=%d\n"), jcr->JobType);
+      goto bail_out;
    }
 
    /* Run without prompting? */
    if (find_arg(ua, _("yes")) > 0) {
       Dmsg1(200, "Calling run_job job=%x\n", jcr->job);
       run_job(jcr);
+      free_jcr(jcr);                 /* release jcr */
       bsendmsg(ua, _("Run command submitted.\n"));
       return 1;
    }
 
    if (!get_cmd(ua, _("OK to run? (yes/mod/no): "))) {
-      free_jcr(jcr);
-      return 0;                      /* do not run */
+      goto bail_out;
    }
    /*
     * At user request modify parameters of job to be run.
     */
    if (ua->cmd[0] == 0) {
-      bsendmsg(ua, _("Job not run.\n"));
-      free_jcr(jcr);
-      return 0;                      /* do not run */
+      goto bail_out;
    }
    if (strncasecmp(ua->cmd, _("mod"), strlen(ua->cmd)) == 0) {
       FILE *fd;
@@ -777,20 +775,19 @@ Priority:   %d\n"),
       default: 
         goto try_again;
       }
-      bsendmsg(ua, _("Job not run.\n"));
-      free_jcr(jcr);
-      return 0;                      /* error do no run Job */
+      goto bail_out;
    }
 
    if (strncasecmp(ua->cmd, _("yes"), strlen(ua->cmd)) == 0) {
       Dmsg1(200, "Calling run_job job=%x\n", jcr->job);
       run_job(jcr);
+      free_jcr(jcr);                 /* release jcr */
       bsendmsg(ua, _("Run command submitted.\n"));
       return 1;
    }
 
+bail_out:
    bsendmsg(ua, _("Job not run.\n"));
    free_jcr(jcr);
    return 0;                      /* do not run */
-
 }
index 75c08fc9d6a275383e1e6dce72bcc599c1600696..cd6a94ab70a1bd861ac14189bd6de1171f6ae6c7 100644 (file)
@@ -92,20 +92,20 @@ static void *connect_thread(void *arg)
 }
 
 /*
- * Create a Job Control Record for a console "job" 
+ * Create a Job Control Record for a control "job",
  *   filling in all the appropriate fields.
  */
-static JCR *create_console_jcr()
+JCR *create_control_jcr(char *base_name, int job_type)
 {
    JCR *jcr;
    jcr = new_jcr(sizeof(JCR), dird_free_jcr);
    jcr->sd_auth_key = bstrdup("dummy"); /* dummy Storage daemon key */
-   create_unique_job_name(jcr, "*Console*");
+   create_unique_job_name(jcr, base_name);
    jcr->sched_time = jcr->start_time;
-   jcr->JobType = JT_CONSOLE;
+   jcr->JobType = job_type;
    jcr->JobLevel = L_FULL;
    jcr->JobStatus = JS_Running;
-   /* None of these are really defined for the Console, so we
+   /* None of these are really defined for control JCRs, so we
     * simply take the first of each one. This ensures that there
     * will be no null pointer references.
     */
@@ -134,7 +134,7 @@ static void *handle_UA_client_request(void *arg)
 
    pthread_detach(pthread_self());
 
-   jcr = create_console_jcr();
+   jcr = create_control_jcr("*Console*", JT_CONSOLE);
 
    ua = new_ua_context(jcr);
    ua->UA_sock = UA_sock;
index 819a2302b34495086af1e380fd9ee8e7c6de6644..b71b8161a1726812a35cdf1698ab9c02ae3aa865 100644 (file)
@@ -386,9 +386,14 @@ static void list_running_jobs(UAContext *ua)
    lock_jcr_chain();
    for (jcr=NULL; (jcr=get_next_jcr(jcr)); njobs++) {
       if (jcr->JobId == 0) {     /* this is us */
-        bstrftime(dt, sizeof(dt), jcr->start_time);
-        strcpy(dt+7, dt+9);     /* cut century */
-         bsendmsg(ua, _("Console connected at %s\n"), dt);
+        /* this is a console or other control job. We only show console
+         * jobs in the status output.
+         */
+        if (jcr->JobType == JT_CONSOLE) {
+           bstrftime(dt, sizeof(dt), jcr->start_time);
+           strcpy(dt+7, dt+9);  /* cut century */
+           bsendmsg(ua, _("Console connected at %s\n"), dt);
+        }
         njobs--;
       }
       free_locked_jcr(jcr);
index aa8dbe07bef51fda265f104efa9936502e179e22..61c6ea2f3d2d4c011e323908e179e1d526db0709 100644 (file)
@@ -224,6 +224,8 @@ Without that I don't know who I am :-(\n"), configfile);
 
    start_watchdog();                 /* start watchdog thread */
 
+   init_jcr_subsystem();             /* start JCR watchdogs etc. */
+
    if (inetd_request) {
       /* Socket is on fd 0 */         
       BSOCK *bs = init_bsock(NULL, 0, "client", "unknown client", me->FDport);
@@ -241,8 +243,6 @@ Without that I don't know who I am :-(\n"), configfile);
 
 void terminate_filed(int sig)
 {
-   stop_watchdog();
-
    if (configfile != NULL) {
       free(configfile);
    }
@@ -252,6 +252,7 @@ void terminate_filed(int sig)
    delete_pid_file(me->pid_directory, "bacula-fd", me->FDport);
    free_config_resources();
    term_msg();
+   stop_watchdog();
    close_memory_pool();              /* release free memory in pool */
    sm_dump(False);                   /* dump orphaned buffers */
    exit(1);
index 8a47e2e6c36acbe29f2800004c8a2c1901a5c40b..6f8994fe518caf6b1c5ab075d0f7d19ad49fa605 100644 (file)
@@ -51,6 +51,7 @@
 #define JT_VERIFY                'V'  /* Verify Job */
 #define JT_RESTORE               'R'  /* Restore Job */
 #define JT_CONSOLE               'C'  /* console program */
+#define JT_SYSTEM                'S'  /* internal system "job" */
 #define JT_ADMIN                 'D'  /* admin job */
 #define JT_ARCHIVE               'A'
 
@@ -273,6 +274,7 @@ extern dlist *last_jobs;
 
 
 /* The following routines are found in lib/jcr.c */
+extern bool init_jcr_subsystem(void);
 extern JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr);
 extern void free_locked_jcr(JCR *jcr);
 extern JCR *get_jcr_by_id(uint32_t JobId);
index dcdb32ab334e6d86108e62393a00f1c828ebe255..28cab1eed2ee19ac01187750b8692c380e99f678 100644 (file)
@@ -39,7 +39,7 @@ LIBSRCS = alloc.c attr.c base64.c bsys.c bget_msg.c \
          md5.c message.c mem_pool.c parse_conf.c \
          queue.c rwlock.c scan.c serial.c sha1.c \
          semlock.c signal.c smartall.c tree.c \
-         util.c var.c watchdog.c workq.c  
+         util.c var.c watchdog.c workq.c timers.c
 
 
 LIBOBJS = alloc.o attr.o base64.o bsys.o bget_msg.o \
@@ -50,7 +50,7 @@ LIBOBJS = alloc.o attr.o base64.o bsys.o bget_msg.o \
          md5.o message.o mem_pool.o parse_conf.o \
          queue.o rwlock.o scan.o serial.o sha1.o \
          semlock.o signal.o smartall.o tree.o \
-         util.o var.o watchdog.o workq.o
+         util.o var.o watchdog.o workq.o timers.o
 
 
 EXTRAOBJS = @OBJLIST@
index 6936b63854bcb2a0e727367be5041f360ba38e1c..017c0f37ba71b10ff8e05fc1f3df39a031e4e6df 100755 (executable)
 #include "bacula.h"
 #include "jcr.h"
 
-extern void timeout_handler(int sig);
+/* External variables we reference */
+extern time_t watchdog_time;
+
+/* Forward referenced functions */
+static void timeout_handler(int sig);
+static void jcr_timeout_check(watchdog_t *self);
 
 struct s_last_job last_job;    /* last job run by this daemon */
 dlist *last_jobs;
@@ -40,7 +45,7 @@ dlist *last_jobs;
 static JCR *jobs = NULL;             /* pointer to JCR chain */
 
 /* Mutex for locking various jcr chains while updating */
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t jcr_chain_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 void init_last_jobs_list()
 {
@@ -59,12 +64,14 @@ void term_last_jobs_list()
 
 void lock_last_jobs_list() 
 {
-   P(mutex);
+   /* Use jcr chain mutex */
+   P(jcr_chain_mutex);
 }
 
 void unlock_last_jobs_list() 
 {
-   V(mutex);
+   /* Use jcr chain mutex */
+   V(jcr_chain_mutex);
 }
 
 /*
@@ -98,14 +105,14 @@ JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
    sigfillset(&sigtimer.sa_mask);
    sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
 
-   P(mutex);
+   P(jcr_chain_mutex);
    jcr->prev = NULL;
    jcr->next = jobs;
    if (jobs) {
       jobs->prev = jcr;
    }
    jobs = jcr;
-   V(mutex);
+   V(jcr_chain_mutex);
    return jcr;
 }
 
@@ -219,11 +226,11 @@ void free_jcr(JCR *jcr)
 #endif
    struct s_last_job *je;
 
-   P(mutex);
+   P(jcr_chain_mutex);
    jcr->use_count--;                 /* decrement use count */
    Dmsg3(200, "Dec jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count, jcr->JobId);
    if (jcr->use_count > 0) {         /* if in use */
-      V(mutex);
+      V(jcr_chain_mutex);
       Dmsg2(200, "jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
       return;
    }
@@ -247,7 +254,7 @@ void free_jcr(JCR *jcr)
       last_job.JobId = 0;            /* zap last job */
    }
    close_msg(NULL);                  /* flush any daemon messages */
-   V(mutex);
+   V(jcr_chain_mutex);
    Dmsg0(200, "Exit free_jcr\n");
 }
 
@@ -280,15 +287,17 @@ JCR *get_jcr_by_id(uint32_t JobId)
 {
    JCR *jcr;      
 
-   P(mutex);
+   P(jcr_chain_mutex);                 /* lock chain */
    for (jcr = jobs; jcr; jcr=jcr->next) {
       if (jcr->JobId == JobId) {
+        P(jcr->mutex);
         jcr->use_count++;
+        V(jcr->mutex);
          Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
         break;
       }
    }
-   V(mutex);
+   V(jcr_chain_mutex);
    return jcr; 
 }
 
@@ -301,16 +310,18 @@ JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
 {
    JCR *jcr;      
 
-   P(mutex);
+   P(jcr_chain_mutex);
    for (jcr = jobs; jcr; jcr=jcr->next) {
       if (jcr->VolSessionId == SessionId && 
          jcr->VolSessionTime == SessionTime) {
+        P(jcr->mutex);
         jcr->use_count++;
+        V(jcr->mutex);
          Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
         break;
       }
    }
-   V(mutex);
+   V(jcr_chain_mutex);
    return jcr; 
 }
 
@@ -330,16 +341,18 @@ JCR *get_jcr_by_partial_name(char *Job)
    if (!Job) {
       return NULL;
    }
-   P(mutex);
+   P(jcr_chain_mutex);
    len = strlen(Job);
    for (jcr = jobs; jcr; jcr=jcr->next) {
       if (strncmp(Job, jcr->Job, len) == 0) {
+        P(jcr->mutex);
         jcr->use_count++;
+        V(jcr->mutex);
          Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
         break;
       }
    }
-   V(mutex);
+   V(jcr_chain_mutex);
    return jcr; 
 }
 
@@ -357,15 +370,17 @@ JCR *get_jcr_by_full_name(char *Job)
    if (!Job) {
       return NULL;
    }
-   P(mutex);
+   P(jcr_chain_mutex);
    for (jcr = jobs; jcr; jcr=jcr->next) {
       if (strcmp(jcr->Job, Job) == 0) {
+        P(jcr->mutex);
         jcr->use_count++;
+        V(jcr->mutex);
          Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
         break;
       }
    }
-   V(mutex);
+   V(jcr_chain_mutex);
    return jcr; 
 }
 
@@ -392,7 +407,7 @@ void set_jcr_job_status(JCR *jcr, int JobStatus)
  */
 void lock_jcr_chain()
 {
-   P(mutex);
+   P(jcr_chain_mutex);
 }
 
 /*
@@ -400,7 +415,7 @@ void lock_jcr_chain()
  */
 void unlock_jcr_chain()
 {
-   V(mutex);
+   V(jcr_chain_mutex);
 }
 
 
@@ -414,8 +429,92 @@ JCR *get_next_jcr(JCR *jcr)
       rjcr = jcr->next;
    }
    if (rjcr) {
+      P(rjcr->mutex);
       rjcr->use_count++;
+      V(rjcr->mutex);
       Dmsg1(200, "Inc jcr use_count=%d\n", rjcr->use_count);
    }
    return rjcr;
 }
+
+bool init_jcr_subsystem(void)
+{
+   watchdog_t *wd = watchdog_new();
+
+   wd->one_shot = false;
+   wd->interval = 30;  /* FIXME: should be configurable somewhere, even
+                        if only with a #define */
+   wd->callback = jcr_timeout_check;
+
+   register_watchdog(wd);
+
+   return true;
+}
+
+static void jcr_timeout_check(watchdog_t *self)
+{
+   JCR *jcr;
+   BSOCK *fd;
+   time_t timer_start, now;
+
+   Dmsg0(200, "Start JCR timeout checks\n");
+
+   /* Walk through all JCRs checking if any one is 
+    * blocked for more than specified max time.
+    */
+   lock_jcr_chain();
+   for (jcr=NULL; (jcr=get_next_jcr(jcr)); ) {
+      free_locked_jcr(jcr);          /* OK to free now cuz chain is locked */
+      if (jcr->JobId == 0) {
+        continue;
+      }
+      fd = jcr->store_bsock;
+      if (fd) {
+        timer_start = fd->timer_start;
+        if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
+           fd->timer_start = 0;      /* turn off timer */
+           fd->timed_out = TRUE;
+           Jmsg(jcr, M_ERROR, 0, _(
+"Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
+                watchdog_time - timer_start);
+           pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
+        }
+      }
+      fd = jcr->file_bsock;
+      if (fd) {
+        timer_start = fd->timer_start;
+        if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
+           fd->timer_start = 0;      /* turn off timer */
+           fd->timed_out = TRUE;
+           Jmsg(jcr, M_ERROR, 0, _(
+"Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
+                watchdog_time - timer_start);
+           pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
+        }
+      }
+      fd = jcr->dir_bsock;
+      if (fd) {
+        timer_start = fd->timer_start;
+        if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
+           fd->timer_start = 0;      /* turn off timer */
+           fd->timed_out = TRUE;
+           Jmsg(jcr, M_ERROR, 0, _(
+"Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
+                watchdog_time - timer_start);
+           pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
+        }
+      }
+
+   }
+   unlock_jcr_chain();
+
+   Dmsg0(200, "Finished JCR timeout checks\n");
+}
+
+/*
+ * Timeout signal comes here
+ */
+void timeout_handler(int sig)
+{
+   return;                           /* thus interrupting the function */
+}
index dd6f07c2656c2bec960953e19c2438c3616a7a39..00934170bf1c9b3a7af850bfb4f700fe88a30823 100644 (file)
@@ -51,6 +51,7 @@
 #include "sha1.h"
 #include "tree.h"
 #include "watchdog.h"
+#include "timers.h"
 #include "bpipe.h"
 #include "attr.h"
 #include "var.h"
index c04472e5b1ce3309194e213f8bfe1110446a3c38..b9c029667700b1472763b4708f46f5ec78f029b1 100644 (file)
@@ -199,7 +199,13 @@ void             set_working_directory(char *wd);
 /* watchdog.c */
 int start_watchdog(void);
 int stop_watchdog(void);
-btimer_t *start_child_timer(pid_t pid, uint32_t wait);
+watchdog_t *watchdog_new(void);
+bool register_watchdog(watchdog_t *wd);
+bool unregister_watchdog(watchdog_t *wd);
+bool unregister_watchdog_unlocked(watchdog_t *wd);
+
+/* timers.c */
+btimer_id start_child_timer(pid_t pid, uint32_t wait);
 void stop_child_timer(btimer_id wid);
 btimer_id start_thread_timer(pthread_t tid, uint32_t wait);
 void stop_thread_timer(btimer_id wid);
index 14fda2273b566fd3b420e4bbe555b994509f5568..3d2413685c1c937f516e4e5b303c78a80ad7add5 100755 (executable)
 #include "bacula.h"
 #include "jcr.h"
 
+/* This breaks Kern's #include rules, but I don't want to put it into bacula.h
+ * until it has been discussed with him */
+#include "bsd_queue.h"
+
 /* Exported globals */
 time_t watchdog_time;                /* this has granularity of SLEEP_TIME */
 
-#define SLEEP_TIME 30                /* examine things every 30 seconds */
+#define SLEEP_TIME 1                 /* examine things every second */
 
 /* Forward referenced functions */
-static void *btimer_thread(void *arg);
-static void stop_btimer(btimer_id wid);
-static btimer_id btimer_start_common(uint32_t wait);
+static void *watchdog_thread(void *arg);
 
 /* Static globals */
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  timer = PTHREAD_COND_INITIALIZER;
 static int quit;
-static btimer_t *timer_chain = NULL;
+static bool wd_is_init = false;
 
+/* Forward referenced callback functions */
+static void callback_child_timer(watchdog_t *self);
+static void callback_thread_timer(watchdog_t *self);
+static pthread_t wd_tid;
 
-/*
- * Timeout signal comes here
- */
-void timeout_handler(int sig)
-{
-   return;                           /* thus interrupting the function */
-}
-
+/* Static globals */
+static TAILQ_HEAD(/* no struct */, s_watchdog_t) wd_queue =
+       TAILQ_HEAD_INITIALIZER(wd_queue);
+static TAILQ_HEAD(/* no struct */, s_watchdog_t) wd_inactive =
+       TAILQ_HEAD_INITIALIZER(wd_inactive);
 
 /*   
  * Start watchdog thread
@@ -65,18 +68,14 @@ void timeout_handler(int sig)
 int start_watchdog(void)
 {
    int stat;
-   pthread_t wdid;
-   struct sigaction sigtimer;
-                       
-   sigtimer.sa_flags = 0;
-   sigtimer.sa_handler = timeout_handler;
-   sigfillset(&sigtimer.sa_mask);
-   sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
+
+   Dmsg0(200, "Initialising NicB-hacked watchdog thread\n");
    watchdog_time = time(NULL);
    quit = FALSE;
-   if ((stat = pthread_create(&wdid, NULL, btimer_thread, (void *)NULL)) != 0) {
+   if ((stat = pthread_create(&wd_tid, NULL, watchdog_thread, NULL)) != 0) {
       return stat;
    }
+   wd_is_init = true;
    return 0;
 }
 
@@ -89,223 +88,161 @@ int start_watchdog(void)
 int stop_watchdog(void)
 {
    int stat;
+   watchdog_t *p, *n;
 
-   quit = TRUE;
-   P(mutex);
-   if ((stat = pthread_cond_signal(&timer)) != 0) {
-      V(mutex);
-      return stat;
+   if (!wd_is_init) {
+      Emsg0(M_ABORT, 0, "BUG! stop_watchdog called before start_watchdog\n");
    }
-   V(mutex);
-   return 0;
-}
-
-
-/* 
- * This is the actual watchdog thread.
- */
-static void *btimer_thread(void *arg)
-{
-   JCR *jcr;
-   BSOCK *fd;
-   btimer_t *wid;
-
-   Dmsg0(200, "Start watchdog thread\n");
-   pthread_detach(pthread_self());
 
-   for ( ;!quit; ) {
-      time_t timer_start, now;
+   Dmsg0(200, "Sending stop signal to NicB-hacked watchdog thread\n");
+   P(mutex);
+   quit = true;
+   stat = pthread_cond_signal(&timer);
+   V(mutex);
 
-      Dmsg0(200, "Top of watchdog loop\n");
+   wd_is_init = false;
 
-      watchdog_time = time(NULL);     /* update timer */
-
-      /* Walk through all JCRs checking if any one is 
-       * blocked for more than specified max time.
-       */
-      lock_jcr_chain();
-      for (jcr=NULL; (jcr=get_next_jcr(jcr)); ) {
-        free_locked_jcr(jcr);
-        if (jcr->JobId == 0) {
-           continue;
-        }
-        fd = jcr->store_bsock;
-        if (fd) {
-           timer_start = fd->timer_start;
-           if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
-              fd->timer_start = 0;   /* turn off timer */
-              fd->timed_out = TRUE;
-              Jmsg(jcr, M_ERROR, 0, _(
-"Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
-                   watchdog_time - timer_start);
-              pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
-           }
-        }
-        fd = jcr->file_bsock;
-        if (fd) {
-           timer_start = fd->timer_start;
-           if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
-              fd->timer_start = 0;   /* turn off timer */
-              fd->timed_out = TRUE;
-              Jmsg(jcr, M_ERROR, 0, _(
-"Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
-                   watchdog_time - timer_start);
-              pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
-           }
-        }
-        fd = jcr->dir_bsock;
-        if (fd) {
-           timer_start = fd->timer_start;
-           if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
-              fd->timer_start = 0;   /* turn off timer */
-              fd->timed_out = TRUE;
-              Jmsg(jcr, M_ERROR, 0, _(
-"Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
-                   watchdog_time - timer_start);
-              pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
-           }
-        }
+   stat = pthread_join(wd_tid, NULL);
 
+   TAILQ_FOREACH_SAFE(p, &wd_queue, qe, n) {
+      TAILQ_REMOVE(&wd_queue, p, qe);
+      if (p->destructor != NULL) {
+        p->destructor(p);
       }
-      unlock_jcr_chain();
-
-      Dmsg0(200, "Watchdog sleep.\n");
-      bmicrosleep(SLEEP_TIME, 0);
-      now = time(NULL);
+      free(p);
+   }
 
-      /* 
-       * Now handle child and thread timers set by the code.
-       */
-      /* Walk child chain killing off any process overdue */
-      P(mutex);
-      for (wid = timer_chain; wid; wid=wid->next) {
-        int killed = FALSE;
-        /* First ask him politely to go away */
-        if (!wid->killed && now > (wid->start_time + wid->wait)) {
-//          Dmsg1(000, "Watchdog sigterm pid=%d\n", wid->pid);
-           if (wid->type == TYPE_CHILD) {
-              kill(wid->pid, SIGTERM);
-              killed = TRUE;
-           } else {
-               Dmsg1(200, "watchdog kill thread %d\n", wid->tid);
-              pthread_kill(wid->tid, TIMEOUT_SIGNAL);
-              wid->killed = TRUE;
-           }
-        }
-        /* If we asked a child to die, wait 3 seconds and slam him */
-        if (killed) {
-           btimer_t *wid1;
-           bmicrosleep(3, 0);
-           for (wid1 = timer_chain; wid1; wid1=wid1->next) {
-              if (wid->type == TYPE_CHILD &&
-                  !wid1->killed && now > (wid1->start_time + wid1->wait)) {
-                 kill(wid1->pid, SIGKILL);
-//                Dmsg1(000, "Watchdog killed pid=%d\n", wid->pid);
-                 wid1->killed = TRUE;
-              }
-           }
-        }
+   TAILQ_FOREACH_SAFE(p, &wd_inactive, qe, n) {
+      TAILQ_REMOVE(&wd_inactive, p, qe);
+      if (p->destructor != NULL) {
+        p->destructor(p);
       }
-      V(mutex);
-   } /* end of big for loop */
+      free(p);
+   }
 
-   Dmsg0(200, "End watchdog\n");
-   return NULL;
+   return stat;
 }
 
-/* 
- * Start a timer on a child process of pid, kill it after wait seconds.
- *   NOTE!  Granularity is SLEEP_TIME (i.e. 30 seconds)
- *
- *  Returns: btimer_id (pointer to btimer_t struct) on success
- *          NULL on failure
- */
-btimer_id start_child_timer(pid_t pid, uint32_t wait)
+watchdog_t *watchdog_new(void)
 {
-   btimer_t *wid;
-   wid = btimer_start_common(wait);
-   wid->pid = pid;
-   wid->type = TYPE_CHILD;
-   Dmsg2(200, "Start child timer 0x%x for %d secs.\n", wid, wait);
-   return wid;
-}
+   watchdog_t *wd = (watchdog_t *) malloc(sizeof(watchdog_t));
 
-/* 
- * Start a timer on a thread. kill it after wait seconds.
- *   NOTE!  Granularity is SLEEP_TIME (i.e. 30 seconds)
- *
- *  Returns: btimer_id (pointer to btimer_t struct) on success
- *          NULL on failure
- */
-btimer_id start_thread_timer(pthread_t tid, uint32_t wait)
-{
-   btimer_t *wid;
-   wid = btimer_start_common(wait);
-   wid->tid = tid;
-   wid->type = TYPE_PTHREAD;
-   Dmsg2(200, "Start thread timer 0x%x for %d secs.\n", wid, wait);
-   return wid;
+   if (!wd_is_init) {
+      Emsg0(M_ABORT, 0, "BUG! watchdog_new called before start_watchdog\n");
+   }
+
+   if (wd == NULL) {
+      return NULL;
+   }
+   wd->one_shot = true;
+   wd->interval = 0;
+   wd->callback = NULL;
+   wd->destructor = NULL;
+   wd->data = NULL;
+
+   return wd;
 }
 
-static btimer_id btimer_start_common(uint32_t wait)
+bool register_watchdog(watchdog_t *wd)
 {
-   btimer_id wid = (btimer_id)malloc(sizeof(btimer_t));
+   if (!wd_is_init) {
+      Emsg0(M_ABORT, 0, "BUG! register_watchdog called before start_watchdog\n");
+   }
+   if (wd->callback == NULL) {
+      Emsg1(M_ABORT, 0, "BUG! Watchdog %p has NULL callback\n", wd);
+   }
+   if (wd->interval == 0) {
+      Emsg1(M_ABORT, 0, "BUG! Watchdog %p has zero interval\n", wd);
+   }
 
    P(mutex);
-   /* Chain it into timer_chain as the first item */
-   wid->prev = NULL;
-   wid->next = timer_chain;
-   if (timer_chain) {
-      timer_chain->prev = wid;
-   }
-   timer_chain = wid;
-   wid->start_time = time(NULL);
-   wid->wait = wait;
-   wid->killed = FALSE;
+   wd->next_fire = watchdog_time + wd->interval;
+   TAILQ_INSERT_TAIL(&wd_queue, wd, qe);
+   Dmsg3(200, "Registered watchdog %p, interval %d%s\n",
+        wd, wd->interval, wd->one_shot ? " one shot" : "");
    V(mutex);
-   return wid;
-}
 
-/*
- * Stop child timer
- */
-void stop_child_timer(btimer_id wid)
-{
-   Dmsg2(200, "Stop child timer 0x%x for %d secs.\n", wid, wid->wait);
-   stop_btimer(wid);        
+   return false;
 }
 
-/*
- * Stop thread timer
- */
-void stop_thread_timer(btimer_id wid)
+bool unregister_watchdog_unlocked(watchdog_t *wd)
 {
-   if (!wid) {
-      return;
+   watchdog_t *p, *n;
+
+   if (!wd_is_init) {
+      Emsg0(M_ABORT, 0, "BUG! unregister_watchdog_unlocked called before start_watchdog\n");
    }
-   Dmsg2(200, "Stop thread timer 0x%x for %d secs.\n", wid, wid->wait);
-   stop_btimer(wid);        
-}
 
+   TAILQ_FOREACH_SAFE(p, &wd_queue, qe, n) {
+      if (wd == p) {
+        TAILQ_REMOVE(&wd_queue, wd, qe);
+        Dmsg1(200, "Unregistered watchdog %p\n", wd);
+        return true;
+      }
+   }
 
-/*
- * Stop btimer
- */
-static void stop_btimer(btimer_id wid)
+   TAILQ_FOREACH_SAFE(p, &wd_inactive, qe, n) {
+      if (wd == p) {
+        TAILQ_REMOVE(&wd_inactive, wd, qe);
+        Dmsg1(200, "Unregistered inactive watchdog %p\n", wd);
+        return true;
+      }
+   }
+
+   Dmsg1(200, "Failed to unregister watchdog %p\n", wd);
+
+   return false;
+}
+
+bool unregister_watchdog(watchdog_t *wd)
 {
-   if (wid == NULL) {
-      Emsg0(M_ABORT, 0, _("NULL btimer_id.\n"));
+   bool ret;
+
+   if (!wd_is_init) {
+      Emsg0(M_ABORT, 0, "BUG! unregister_watchdog called before start_watchdog\n");
    }
+
    P(mutex);
-   /* Remove wid from timer_chain */
-   if (!wid->prev) {                 /* if no prev */
-      timer_chain = wid->next;       /* set new head */
-   } else {
-      wid->prev->next = wid->next;    /* update prev */
-   }
-   if (wid->next) {
-      wid->next->prev = wid->prev;    /* unlink it */
-   }
+   ret = unregister_watchdog_unlocked(wd);
    V(mutex);
-   free(wid);
+
+   return ret;
+}
+
+static void *watchdog_thread(void *arg)
+{
+   Dmsg0(200, "NicB-reworked watchdog thread entered\n");
+
+   while (true) {
+      watchdog_t *p, *n;
+
+      P(mutex);
+      if (quit) {
+        V(mutex);
+        break;
+      }
+
+      watchdog_time = time(NULL);
+
+      TAILQ_FOREACH_SAFE(p, &wd_queue, qe, n) {
+        if (p->next_fire < watchdog_time) {
+           /* Run the callback */
+           p->callback(p);
+
+           /* Reschedule (or move to inactive list if it's a one-shot timer) */
+           if (p->one_shot) {
+              TAILQ_REMOVE(&wd_queue, p, qe);
+              TAILQ_INSERT_TAIL(&wd_inactive, p, qe);
+           } else {
+              p->next_fire = watchdog_time + p->interval;
+           }
+        }
+      }
+      V(mutex);
+      bmicrosleep(SLEEP_TIME, 0);
+   }
+
+   Dmsg0(200, "NicB-reworked watchdog thread exited\n");
+
+   return NULL;
 }
index 8d39d38bda8d1254e3559f48169012987cb36086..aaae0c74cc6a935af7f002e827be388509fab3a3 100644 (file)
 
 #define TIMEOUT_SIGNAL SIGUSR2
 
-typedef struct s_btimer_t {
-   struct s_btimer_t *next;
-   struct s_btimer_t *prev;
-   time_t start_time;
-   int32_t  wait;
-   pid_t pid;                        /* process id if TYPE_CHILD */
-   int killed;
-   int type;
-   pthread_t tid;                    /* thread id if TYPE_PTHREAD */
-} btimer_t;
-
-#define btimer_id btimer_t *
+/* This breaks Kern's #include rules, but I don't want to put it into bacula.h
+ * until it has been discussed with him */
+#include "bsd_queue.h"
+
+struct s_watchdog_t {
+       bool one_shot;
+       time_t interval;
+       void (*callback)(struct s_watchdog_t *wd);
+       void (*destructor)(struct s_watchdog_t *wd);
+       void *data;
+       /* Private data below - don't touch outside of watchdog.c */
+       TAILQ_ENTRY(s_watchdog_t) qe;
+       time_t next_fire;
+};
+typedef struct s_watchdog_t watchdog_t;
index a83e70bd015f9ba42108b5ee220cbedf7438f847..c3690464dcf198a8436e3d91d41d51ed3acded04 100644 (file)
@@ -209,6 +209,8 @@ int main (int argc, char *argv[])
 
    start_watchdog();                 /* start watchdog thread */
 
+   init_jcr_subsystem();             /* start JCR watchdogs etc. */
+
    /* 
     * Sleep a bit to give device thread a chance to lock the resource
     * chain before we start the server.
@@ -372,7 +374,6 @@ void terminate_stored(int sig)
    }
 
    delete_pid_file(me->pid_directory, "bacula-sd", me->SDport);
-   stop_watchdog();
 
    Dmsg1(200, "In terminate_stored() sig=%d\n", sig);
 
@@ -392,6 +393,7 @@ void terminate_stored(int sig)
       print_memory_pool_stats();
    }
    term_msg();
+   stop_watchdog();
    close_memory_pool();
 
    sm_dump(False);                   /* dump orphaned buffers */
index b821f338d5d0b1a776f64310f0839ca7b7cbebe6..d3295c2b86089508c69ee40b636f90cb5643c63d 100644 (file)
@@ -2,8 +2,8 @@
 #undef  VERSION
 #define VERSION "1.33"
 #define VSTRING "1"
-#define BDATE   "20 Dec 2003"
-#define LSMDATE "20Dec03"
+#define BDATE   "22 Dec 2003"
+#define LSMDATE "22Dec03"
 
 /* Debug flags */
 #undef  DEBUG