]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/dird/job.c
Improve mem pool garbage collection
[bacula/bacula] / bacula / src / dird / job.c
index 985469926783f7943d9b1046032adea634cfeb85..60c70f18ad758a0886ae39327befbeec96362b86 100644 (file)
@@ -1,17 +1,20 @@
 /*
-   Bacula® - The Network Backup Solution
+   Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
+   Copyright (C) 2000-2017 Kern Sibbald
 
-   The main author of Bacula is Kern Sibbald, with contributions from many
-   others, a complete list can be found in the file AUTHORS.
+   The original author of Bacula is Kern Sibbald, with contributions
+   from many others, a complete list can be found in the file AUTHORS.
 
    You may use this file and others of this release according to the
    license defined in the LICENSE file, which includes the Affero General
    Public License, v3.0 ("AGPLv3") and some additional permissions and
    terms pursuant to its AGPLv3 Section 7.
 
-   Bacula® is a registered trademark of Kern Sibbald.
+   This notice must be preserved when any source code is 
+   conveyed and/or propagated.
+
+   Bacula(R) is a registered trademark of Kern Sibbald.
 */
 /*
  *
@@ -113,19 +116,24 @@ bool setup_job(JCR *jcr)
     */
    Dmsg0(100, "Open database\n");
    jcr->db = db_init_database(jcr, jcr->catalog->db_driver, jcr->catalog->db_name,
-                              jcr->catalog->db_user, jcr->catalog->db_password,
-                              jcr->catalog->db_address, jcr->catalog->db_port,
-                              jcr->catalog->db_socket, jcr->catalog->mult_db_connections,
-                              jcr->catalog->disable_batch_insert);
+                jcr->catalog->db_user, jcr->catalog->db_password,
+                jcr->catalog->db_address, jcr->catalog->db_port,
+                jcr->catalog->db_socket, jcr->catalog->db_ssl_key,
+                jcr->catalog->db_ssl_cert, jcr->catalog->db_ssl_ca,
+                jcr->catalog->db_ssl_capath, jcr->catalog->db_ssl_cipher,
+                jcr->catalog->mult_db_connections,
+                jcr->catalog->disable_batch_insert);
    if (!jcr->db || !db_open_database(jcr, jcr->db)) {
       Jmsg(jcr, M_FATAL, 0, _("Could not open database \"%s\".\n"),
                  jcr->catalog->db_name);
       if (jcr->db) {
          Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
          db_close_database(jcr, jcr->db);
+         jcr->db = NULL;
       }
       goto bail_out;
    }
+
    Dmsg0(150, "DB opened\n");
    if (!jcr->fname) {
       jcr->fname = get_pool_memory(PM_FNAME);
@@ -232,6 +240,147 @@ bail_out:
    return false;
 }
 
+/*
+ * Setup a job for a resume command
+ */
+static bool setup_resume_job(JCR *jcr, JOB_DBR *jr)
+{
+   int errstat;
+   jcr->lock();
+   Dsm_check(100);
+   init_msg(jcr, jcr->messages);
+
+   /* Initialize termination condition variable */
+   if ((errstat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
+      berrno be;
+      Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(errstat));
+      jcr->unlock();
+      goto bail_out;
+   }
+   jcr->term_wait_inited = true;
+
+   jcr->setJobStatus(JS_Created);
+   jcr->unlock();
+
+   /*
+    * Open database
+    */
+   Dmsg0(100, "Open database\n");
+   jcr->db = db_init_database(jcr, jcr->catalog->db_driver, jcr->catalog->db_name,
+                              jcr->catalog->db_user, jcr->catalog->db_password,
+                              jcr->catalog->db_address, jcr->catalog->db_port,
+                              jcr->catalog->db_socket, jcr->catalog->db_ssl_key,
+                              jcr->catalog->db_ssl_cert, jcr->catalog->db_ssl_ca,
+                              jcr->catalog->db_ssl_capath, jcr->catalog->db_ssl_cipher,
+                              jcr->catalog->mult_db_connections,
+                              jcr->catalog->disable_batch_insert);
+   if (!jcr->db || !db_open_database(jcr, jcr->db)) {
+      Jmsg(jcr, M_FATAL, 0, _("Could not open database \"%s\".\n"),
+                 jcr->catalog->db_name);
+      if (jcr->db) {
+         Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
+         db_close_database(jcr, jcr->db);
+         jcr->db = NULL;
+      }
+      goto bail_out;
+   }
+   Dmsg0(100, "DB opened\n");
+   if (!jcr->fname) {
+      jcr->fname = get_pool_memory(PM_FNAME);
+   }
+   if (!jcr->pool_source) {
+      jcr->pool_source = get_pool_memory(PM_MESSAGE);
+      pm_strcpy(jcr->pool_source, _("unknown source"));
+   }
+   if (!jcr->next_pool_source) {
+      jcr->next_pool_source = get_pool_memory(PM_MESSAGE);
+      pm_strcpy(jcr->next_pool_source, _("unknown source"));
+   }
+
+
+   /*
+    * Setup Job record.  Make sure original job is Incomplete.
+    */
+   memcpy(&jcr->jr, jr, sizeof(JOB_DBR));
+   jcr->sched_time = jcr->jr.SchedTime;
+   jcr->start_time = jcr->jr.StartTime;
+   jcr->jr.EndTime = 0;               /* perhaps rescheduled, clear it */
+   jcr->setJobType(jcr->jr.JobType);
+   jcr->setJobLevel(jcr->jr.JobLevel);
+   jcr->JobId = jcr->jr.JobId;
+   if (!get_or_create_client_record(jcr)) {
+      Dmsg0(100, "Could not create client record.\n");
+      goto bail_out;
+   }
+
+   Dmsg6(100, "Got job record JobId=%d Job=%s Name=%s Type=%c Level=%c Status=%c\n",
+       jcr->jr.JobId, jcr->jr.Job, jcr->jr.Name, jcr->jr.JobType, jcr->jr.JobLevel,
+       jcr->jr.JobStatus);
+   if (jcr->jr.JobStatus != JS_Incomplete) {
+      /* ***FIXME*** add error message */
+      Dmsg1(100, "Job is not an Incomplete: status=%c\n", jcr->jr.JobStatus);
+      goto bail_out;
+   }
+   bstrncpy(jcr->Job, jcr->jr.Job, sizeof(jcr->Job));
+   jcr->setJobType(jcr->jr.JobType);
+   jcr->setJobLevel(jcr->jr.JobLevel);
+
+   generate_daemon_event(jcr, "JobStart");
+   new_plugins(jcr);                  /* instantiate plugins for this jcr */
+   generate_plugin_event(jcr, bDirEventJobStart);
+
+   if (job_canceled(jcr)) {
+      Dmsg0(100, "Oops. Job canceled\n");
+      goto bail_out;
+   }
+
+   /* Re-run the old job */
+   jcr->rerunning = true;
+
+   /*
+    * Now, do pre-run stuff, like setting job level (Inc/diff, ...)
+    *  this allows us to setup a proper job start record for restarting
+    *  in case of later errors.
+    */
+   switch (jcr->getJobType()) {
+   case JT_BACKUP:
+      if (!do_backup_init(jcr)) {
+         backup_cleanup(jcr, JS_ErrorTerminated);
+         goto bail_out;
+      }
+      break;
+   default:
+      Pmsg1(0, _("Unimplemented job type: %d\n"), jcr->getJobType());
+      jcr->setJobStatus(JS_ErrorTerminated);
+      goto bail_out;
+   }
+
+   generate_plugin_event(jcr, bDirEventJobInit);
+   Dsm_check(100);
+   return true;
+
+bail_out:
+   return false;
+}
+
+JobId_t resume_job(JCR *jcr, JOB_DBR *jr)
+{
+   int stat;
+   if (setup_resume_job(jcr, jr)) {
+      Dmsg0(200, "Add jrc to work queue\n");
+      /* Queue the job to be run */
+      if ((stat = jobq_add(&job_queue, jcr)) != 0) {
+         berrno be;
+         Jmsg(jcr, M_FATAL, 0, _("Could not add job queue: ERR=%s\n"), be.bstrerror(stat));
+         return 0;
+      }
+      return jcr->JobId;
+   }
+   return 0;
+}
+
+
+
 void update_job_end(JCR *jcr, int TermCode)
 {
    dequeue_messages(jcr);             /* display any queued messages */
@@ -367,18 +516,23 @@ void sd_msg_thread_send_signal(JCR *jcr, int sig)
    jcr->unlock();
 }
 
-static int cancel_file_daemon_job(UAContext *ua, const char *cmd, JCR *jcr)
+static bool cancel_file_daemon_job(UAContext *ua, const char *cmd, JCR *jcr)
 {
+   CLIENT *old_client;
+
    if (!jcr->client) {
       Dmsg0(100, "No client to cancel\n");
-      return 0;
+      return false;
    }
+   old_client = ua->jcr->client;
    ua->jcr->client = jcr->client;
    if (!connect_to_file_daemon(ua->jcr, 10, FDConnectTimeout, 1)) {
       ua->error_msg(_("Failed to connect to File daemon.\n"));
-      return 0;
+      ua->jcr->client = old_client;
+      return false;
    }
-   Dmsg0(100, "Connected to file daemon\n");
+   Dmsg3(10, "Connected to file daemon %s for cancel ua.jcr=%p jcr=%p\n",
+         ua->jcr->client->name(), ua->jcr, jcr);
    BSOCK *fd = ua->jcr->file_bsock;
    fd->fsend("%s Job=%s\n", cmd, jcr->Job);
    while (fd->recv() >= 0) {
@@ -386,8 +540,8 @@ static int cancel_file_daemon_job(UAContext *ua, const char *cmd, JCR *jcr)
    }
    fd->signal(BNET_TERMINATE);
    free_bsock(ua->jcr->file_bsock);
-   ua->jcr->client = NULL;
-   return 1;
+   ua->jcr->client = old_client;
+   return true;
 }
 
 static bool cancel_sd_job(UAContext *ua, const char *cmd, JCR *jcr)
@@ -417,7 +571,10 @@ static bool cancel_sd_job(UAContext *ua, const char *cmd, JCR *jcr)
       ua->error_msg(_("Failed to connect to Storage daemon.\n"));
       return false;
    }
-   Dmsg0(200, "Connected to storage daemon\n");
+
+   Dmsg3(10, "Connected to storage daemon %s for cancel ua.jcr=%p jcr=%p\n",
+         ua->jcr->wstore->name(), ua->jcr, jcr);
+
    BSOCK *sd = ua->jcr->store_bsock;
    sd->fsend("%s Job=%s\n", cmd, jcr->Job);
    while (sd->recv() >= 0) {
@@ -437,6 +594,9 @@ static int cancel_inactive_job(UAContext *ua, JCR *jcr)
    JOB_DBR    jr;
    int        i;
    USTORE     store;
+   CLIENT     *client;
+
+   Dmsg2(10, "cancel_inactive_job ua.jcr=%p jcr=%p\n", ua->jcr, jcr);
 
    if (!jcr->client) {
       memset(&cr, 0, sizeof(cr));
@@ -444,7 +604,6 @@ static int cancel_inactive_job(UAContext *ua, JCR *jcr)
       /* User is kind enough to provide the client name */
       if ((i = find_arg_with_value(ua, "client")) > 0) {
          bstrncpy(cr.Name, ua->argv[i], sizeof(cr.Name));
-
       } else {
          memset(&jr, 0, sizeof(jr));
          bstrncpy(jr.Job, jcr->Job, sizeof(jr.Job));
@@ -462,7 +621,15 @@ static int cancel_inactive_job(UAContext *ua, JCR *jcr)
       }
 
       if (acl_access_ok(ua, Client_ACL, cr.Name)) {
-         jcr->client = (CLIENT *)GetResWithName(R_CLIENT, cr.Name);
+         client = (CLIENT *)GetResWithName(R_CLIENT, cr.Name);
+         if (client) {
+            jcr->client = client;
+         } else {
+            Jmsg1(jcr, M_FATAL, 0, _("Client resource \"%s\" does not exist.\n"), cr.Name);
+            goto bail_out;
+         }
+      } else {
+         goto bail_out;
       }
    }
 
@@ -481,6 +648,8 @@ static int cancel_inactive_job(UAContext *ua, JCR *jcr)
    cancel_sd_job(ua, "cancel", jcr);
 
 bail_out:
+   jcr->JobId = 0;
+   free_jcr(jcr);
    return 1;
 }
 
@@ -491,15 +660,16 @@ bail_out:
  *  Returns: true  if cancel appears to be successful
  *           false on failure. Message sent to ua->jcr.
  */
-bool cancel_job(UAContext *ua, JCR *jcr, bool cancel)
+bool
+cancel_job(UAContext *ua, JCR *jcr, bool cancel)
 {
    char ed1[50];
    int32_t old_status = jcr->JobStatus;
    int status;
    const char *reason, *cmd;
    bool force = find_arg(ua, "inactive") > 0;
-   JCR *wjcr = jcr->wjcr;
 
+   Dmsg3(10, "cancel_job jcr=%p jobid=%d use_count\n", jcr, jcr->JobId, jcr->use_count());
 
    /* If the user explicitely ask, we can send the cancel command to
     * the FD.
@@ -508,9 +678,16 @@ bool cancel_job(UAContext *ua, JCR *jcr, bool cancel)
       return cancel_inactive_job(ua, jcr);
    }
 
-   status = JS_Canceled;
-   reason = _("canceled");
-   cmd = NT_("cancel");
+   if (cancel) {
+      status = JS_Canceled;
+      reason = _("canceled");
+      cmd = NT_("cancel");
+   } else {
+      status = JS_Incomplete;
+      reason = _("stopped");
+      cmd = NT_("stop");
+      jcr->RescheduleIncompleteJobs = false; /* do not restart */
+   }
 
    jcr->setJobStatus(status);
 
@@ -562,10 +739,14 @@ bool cancel_job(UAContext *ua, JCR *jcr, bool cancel)
       }
 
       /* Cancel Copy/Migration Storage daemon */
-      if (wjcr  && wjcr->store_bsock) {
-         /* do not return now, we want to try to cancel the sd socket */
-         cancel_sd_job(ua, cmd, wjcr);
+      if (jcr->wjcr) {
+         /* The wjcr is valid until we call free_jcr(jcr) */
+         JCR *wjcr = jcr->wjcr;
 
+         if (wjcr->store_bsock) {
+            /* do not return now, we want to try to cancel the sd socket */
+            cancel_sd_job(ua, cmd, wjcr);
+         }
          /* We test file_bsock because the previous operation can take
           * several minutes
           */
@@ -790,7 +971,7 @@ DBId_t get_or_create_pool_record(JCR *jcr, char *pool_name)
    while (!db_get_pool_record(jcr, jcr->db, &pr)) { /* get by Name */
       /* Try to create the pool */
       if (create_pool(jcr, jcr->db, jcr->pool, POOL_OP_CREATE) < 0) {
-         Jmsg(jcr, M_FATAL, 0, _("Pool \"%s\" not in database. ERR=%s"), pr.Name,
+         Jmsg(jcr, M_FATAL, 0, _("Cannot create pool \"%s\" in database. ERR=%s"), pr.Name,
             db_strerror(jcr->db));
          return 0;
       } else {
@@ -809,19 +990,13 @@ bool allow_duplicate_job(JCR *jcr)
 {
    JOB *job = jcr->job;
    JCR *djcr;                /* possible duplicate job */
-   bool cancel_dup = false;
-   bool cancel_me = false;
 
-   /*
-    * See if AllowDuplicateJobs is set or
-    * if duplicate checking is disabled for this job.
-    */
+   /* Is AllowDuplicateJobs is set or is duplicate checking 
+    *  disabled for this job? */
    if (job->AllowDuplicateJobs || jcr->IgnoreDuplicateJobChecking) {
       return true;
    }
-
    Dmsg0(800, "Enter allow_duplicate_job\n");
-
    /*
     * After this point, we do not want to allow any duplicate
     * job to run.
@@ -831,16 +1006,16 @@ bool allow_duplicate_job(JCR *jcr)
       if (jcr == djcr || djcr->JobId == 0) {
          continue;                   /* do not cancel this job or consoles */
       }
-
-      /*
-       * See if this Job has the IgnoreDuplicateJobChecking flag set, ignore it
-       * for any checking against other jobs.
-       */
+      /* Does Job has the IgnoreDuplicateJobChecking flag set,
+       * if so do not check it against other jobs */
       if (djcr->IgnoreDuplicateJobChecking) {
-         continue;
-      }
-
-      if (strcmp(job->name(), djcr->job->name()) == 0) {
+         continue; 
+      } 
+      if ((strcmp(job->name(), djcr->job->name()) == 0) &&
+          djcr->getJobType() == jcr->getJobType()) /* A duplicate is about the same name and the same type */
+      {
+         bool cancel_dup = false;
+         bool cancel_me = false;
          if (job->DuplicateJobProximity > 0) {
             utime_t now = (utime_t)time(NULL);
             if ((now - djcr->start_time) > job->DuplicateJobProximity) {
@@ -880,12 +1055,9 @@ bool allow_duplicate_job(JCR *jcr)
                  djcr->JobId);
               break;     /* get out of foreach_jcr */
             }
-         }
-
-         /*
-          * Cancel one of the two jobs (me or dup)
-          * If CancelQueuedDuplicates is set do so only if job is queued.
-          */
+         } 
+         /* Cancel one of the two jobs (me or dup) */
+         /* If CancelQueuedDuplicates is set do so only if job is queued */
          if (job->CancelQueuedDuplicates) {
              switch (djcr->JobStatus) {
              case JS_Created:
@@ -902,11 +1074,8 @@ bool allow_duplicate_job(JCR *jcr)
                 break;
              }
          }
-
          if (cancel_dup || job->CancelRunningDuplicates) {
-            /*
-             * Zap the duplicated job djcr
-             */
+            /* Zap the duplicated job djcr */
             UAContext *ua = new_ua_context(jcr);
             Jmsg(jcr, M_INFO, 0, _("Cancelling duplicate JobId=%d.\n"), djcr->JobId);
             cancel_job(ua, djcr);
@@ -916,9 +1085,7 @@ bool allow_duplicate_job(JCR *jcr)
             free_ua_context(ua);
             Dmsg2(800, "Cancel dup %p JobId=%d\n", djcr, djcr->JobId);
          } else {
-            /*
-             * Zap current job
-             */
+             /* Zap current job */
             jcr->setJobStatus(JS_Canceled);
             Jmsg(jcr, M_FATAL, 0, _("JobId %d already running. Duplicate job not allowed.\n"),
                djcr->JobId);
@@ -942,7 +1109,10 @@ bool apply_wstorage_overrides(JCR *jcr, POOL *opool)
    const char *source;
 
    Dmsg1(100, "Original pool=%s\n", opool->name());
-   if (jcr->run_next_pool_override) {
+   if (jcr->cmdline_next_pool_override) {
+      /* Can be Command line or User input */
+      source = NPRT(jcr->next_pool_source);
+   } else if (jcr->run_next_pool_override) {
       pm_strcpy(jcr->next_pool_source, _("Run NextPool override"));
       pm_strcpy(jcr->pool_source, _("Run NextPool override"));
       source = _("Run NextPool override");
@@ -1009,6 +1179,17 @@ void apply_pool_overrides(JCR *jcr)
          }
       }
       break;
+   case L_VIRTUAL_FULL:
+      if (jcr->vfull_pool) {
+         jcr->pool = jcr->vfull_pool;
+         pool_override = true;
+         if (jcr->run_vfull_pool_override) {
+            pm_strcpy(jcr->pool_source, _("Run VFullPool override"));
+         } else {
+            pm_strcpy(jcr->pool_source, _("Job VFullPool override"));
+         }
+      }
+      break;
    case L_INCREMENTAL:
       if (jcr->inc_pool) {
          jcr->pool = jcr->inc_pool;
@@ -1047,6 +1228,10 @@ bool get_or_create_client_record(JCR *jcr)
 {
    CLIENT_DBR cr;
 
+   if (!jcr->client) {
+      Jmsg(jcr, M_FATAL, 0, _("No Client specified.\n"));
+      return false;
+   }
    memset(&cr, 0, sizeof(cr));
    bstrncpy(cr.Name, jcr->client->hdr.name, sizeof(cr.Name));
    cr.AutoPrune = jcr->client->AutoPrune;
@@ -1169,6 +1354,7 @@ void create_unique_job_name(JCR *jcr, const char *base_name)
    char name[MAX_NAME_LENGTH];
    char *p;
    int len;
+   int local_seq;
 
    /* Guarantee unique start time -- maximum one per second, and
     * thus unique Job Name
@@ -1183,6 +1369,7 @@ void create_unique_job_name(JCR *jcr, const char *base_name)
       }
    }
    last_start_time = now;
+   local_seq = seq;
    V(mutex);                          /* allow creation of jobs */
    jcr->start_time = now;
    /* Form Unique JobName */
@@ -1192,7 +1379,7 @@ void create_unique_job_name(JCR *jcr, const char *base_name)
    len = strlen(dt) + 5;   /* dt + .%02d EOS */
    bstrncpy(name, base_name, sizeof(name));
    name[sizeof(name)-len] = 0;          /* truncate if too long */
-   bsnprintf(jcr->Job, sizeof(jcr->Job), "%s.%s_%02d", name, dt, seq); /* add date & time */
+   bsnprintf(jcr->Job, sizeof(jcr->Job), "%s.%s_%02d", name, dt, local_seq); /* add date & time */
    /* Convert spaces into underscores */
    for (p=jcr->Job; *p; p++) {
       if (*p == ' ') {
@@ -1238,6 +1425,10 @@ void dird_free_jcr(JCR *jcr)
    Dmsg0(200, "Start dird free_jcr\n");
 
    dird_free_jcr_pointers(jcr);
+   if (jcr->wjcr) {
+      free_jcr(jcr->wjcr);
+      jcr->wjcr = NULL;
+   }
    /* Free bsock packets */
    free_bsock(jcr->file_bsock);
    free_bsock(jcr->store_bsock);
@@ -1263,6 +1454,7 @@ void dird_free_jcr(JCR *jcr)
    free_and_null_pool_memory(jcr->rpool_source);
    free_and_null_pool_memory(jcr->wstore_source);
    free_and_null_pool_memory(jcr->rstore_source);
+   free_and_null_pool_memory(jcr->next_vol_list);
 
    /* Delete lists setup to hold storage pointers */
    free_rwstorage(jcr);
@@ -1275,6 +1467,8 @@ void dird_free_jcr(JCR *jcr)
 
    free_plugins(jcr);                 /* release instantiated plugins */
 
+   garbage_collect_memory_pool();
+
    Dmsg0(200, "End dird free_jcr\n");
 }
 
@@ -1325,7 +1519,9 @@ void set_jcr_defaults(JCR *jcr, JOB *job)
       jcr->setJobLevel(job->JobLevel);
       break;
    }
-
+   if (!jcr->next_vol_list) {
+      jcr->next_vol_list = get_pool_memory(PM_FNAME);
+   }
    if (!jcr->fname) {
       jcr->fname = get_pool_memory(PM_FNAME);
    }
@@ -1347,6 +1543,7 @@ void set_jcr_defaults(JCR *jcr, JOB *job)
       copy_rwstorage(jcr, job->pool->storage, _("Pool resource"));
    }
    jcr->client = job->client;
+   ASSERT2(jcr->client, "jcr->client==NULL!!!");
    if (!jcr->client_name) {
       jcr->client_name = get_pool_memory(PM_NAME);
    }
@@ -1363,6 +1560,7 @@ void set_jcr_defaults(JCR *jcr, JOB *job)
       pm_strcpy(jcr->next_pool_source, _("Job Pool's NextPool resource"));
    }
    jcr->full_pool = job->full_pool;
+   jcr->vfull_pool = job->vfull_pool;
    jcr->inc_pool = job->inc_pool;
    jcr->diff_pool = job->diff_pool;
    if (job->pool->catalog) {