]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/cats/sql_create.c
Backport from BEE
[bacula/bacula] / bacula / src / cats / sql_create.c
index 09fd90c386c6cfb2897b8370f90cd2749a241840..74d7603e90ad10fca466319562af3cd75b9be276 100644 (file)
@@ -1,34 +1,22 @@
 /*
    Bacula® - The Network Backup Solution
 
-   Copyright (C) 2000-2012 Free Software Foundation Europe e.V.
+   Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
 
-   The main author of Bacula is Kern Sibbald, with contributions from
-   many others, a complete list can be found in the file AUTHORS.
-   This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version three of the GNU Affero General Public
-   License as published by the Free Software Foundation and included
-   in the file LICENSE.
+   The main author of Bacula is Kern Sibbald, with contributions from many
+   others, a complete list can be found in the file AUTHORS.
 
-   This program is distributed in the hope that it will be useful, but
-   WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-   General Public License for more details.
-
-   You should have received a copy of the GNU Affero General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-   02110-1301, USA.
+   You may use this file and others of this release according to the
+   license defined in the LICENSE file, which includes the Affero General
+   Public License, v3.0 ("AGPLv3") and some additional permissions and
+   terms pursuant to its AGPLv3 Section 7.
 
    Bacula® is a registered trademark of Kern Sibbald.
-   The licensor of Bacula is the Free Software Foundation Europe
-   (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
-   Switzerland, email:ftf@fsfeurope.org.
 */
 /*
  * Bacula Catalog Database Create record interface routines
  *
- *    Kern Sibbald, March 2000
+ *    Written by Kern Sibbald, March 2000
  *
  */
 
@@ -36,7 +24,7 @@
 
 static const int dbglevel = 100;
 
-#if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
+#if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL
 
 #include "cats.h"
 #include "bdb_priv.h"
@@ -168,7 +156,7 @@ db_create_jobmedia_record(JCR *jcr, B_DB *mdb, JOBMEDIA_DBR *jm)
 bool
 db_create_pool_record(JCR *jcr, B_DB *mdb, POOL_DBR *pr)
 {
-   bool stat;        
+   bool stat;
    char ed1[30], ed2[30], ed3[50], ed4[50], ed5[50];
    char esc_name[MAX_ESCAPE_NAME_LENGTH];
    char esc_lf[MAX_ESCAPE_NAME_LENGTH];
@@ -350,7 +338,7 @@ bool db_create_storage_record(JCR *jcr, B_DB *mdb, STORAGE_DBR *sr)
 bool
 db_create_mediatype_record(JCR *jcr, B_DB *mdb, MEDIATYPE_DBR *mr)
 {
-   bool stat;        
+   bool stat;
    int num_rows;
    char esc[MAX_ESCAPE_NAME_LENGTH];
 
@@ -455,11 +443,11 @@ db_create_media_record(JCR *jcr, B_DB *mdb, MEDIA_DBR *mr)
           edit_int64(mr->VolWriteTime, ed7),
           mr->VolParts,
           mr->LabelType,
-          edit_int64(mr->StorageId, ed8), 
-          edit_int64(mr->DeviceId, ed9), 
-          edit_int64(mr->LocationId, ed10), 
-          edit_int64(mr->ScratchPoolId, ed11), 
-          edit_int64(mr->RecyclePoolId, ed12), 
+          edit_int64(mr->StorageId, ed8),
+          edit_int64(mr->DeviceId, ed9),
+          edit_int64(mr->LocationId, ed10),
+          edit_int64(mr->ScratchPoolId, ed11),
+          edit_int64(mr->RecyclePoolId, ed12),
           mr->Enabled, mr->ActionOnPurge
           );
 
@@ -597,7 +585,9 @@ int db_create_path_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
             Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
             sql_free_result(mdb);
             ar->PathId = 0;
-            ASSERT(ar->PathId);
+            ASSERT2(ar->PathId,
+                    "Your Path table is broken. "
+                    "Please, use dbcheck to correct it.");
             return 0;
          }
          ar->PathId = str_to_int64(row[0]);
@@ -770,19 +760,27 @@ bool db_create_fileset_record(JCR *jcr, B_DB *mdb, FILESET_DBR *fsr)
  *  };
  */
 
+/* For maintenance, we can put batch mode in hold */
+static bool batch_mode_enabled = true;
+
+void db_disable_batch_insert(bool enabled)
+{
+   batch_mode_enabled = enabled;
+}
+
 /**
  * All sql_batch_* functions are used to do bulk batch insert in File/Filename/Path
  *  tables.
- *  
+ *
  *  To sum up :
  *   - bulk load a temp table
- *   - insert missing filenames into filename with a single query (lock filenames 
+ *   - insert missing filenames into filename with a single query (lock filenames
  *   - table before that to avoid possible duplicate inserts with concurrent update)
  *   - insert missing paths into path with another single query
  *   - then insert the join between the temp, filename and path tables into file.
  */
 
-/* 
+/*
  * Returns true if OK
  *         false if failed
  */
@@ -795,13 +793,25 @@ bool db_write_batch_file_records(JCR *jcr)
       Dmsg0(50,"db_create_file_record : no files\n");
       return true;
    }
+
    if (job_canceled(jcr)) {
       goto bail_out;
    }
 
+   jcr->JobStatus = JS_AttrInserting;
+
+   /* Check if batch mode is on hold */
+   while (!batch_mode_enabled) {
+      Dmsg0(50, "batch mode is on hold\n");
+      bmicrosleep(10, 0);
+
+      if (job_canceled(jcr)) {
+         goto bail_out;
+      }
+   }
+
    Dmsg1(50,"db_create_file_record changes=%u\n",jcr->db_batch->changes);
 
-   jcr->JobStatus = JS_AttrInserting;
    if (!sql_batch_end(jcr, jcr->db_batch, NULL)) {
       Jmsg1(jcr, M_FATAL, 0, "Batch end %s\n", jcr->db_batch->errmsg);
       goto bail_out;
@@ -823,7 +833,7 @@ bool db_write_batch_file_records(JCR *jcr)
       db_sql_query(jcr->db_batch, batch_unlock_tables_query[db_get_type_index(jcr->db_batch)], NULL, NULL);
       goto bail_out;
    }
-   
+
    if (!db_sql_query(jcr->db_batch, batch_unlock_tables_query[db_get_type_index(jcr->db_batch)], NULL, NULL)) {
       Jmsg1(jcr, M_FATAL, 0, "Unlock Path table %s\n", jcr->db_batch->errmsg);
       goto bail_out;
@@ -836,7 +846,7 @@ bool db_write_batch_file_records(JCR *jcr)
       Jmsg1(jcr, M_FATAL, 0, "Lock Filename table %s\n", jcr->db_batch->errmsg);
       goto bail_out;
    }
-   
+
    if (!db_sql_query(jcr->db_batch, batch_fill_filename_query[db_get_type_index(jcr->db_batch)], NULL, NULL)) {
       Jmsg1(jcr,M_FATAL,0,"Fill Filename table %s\n",jcr->db_batch->errmsg);
       db_sql_query(jcr->db_batch, batch_unlock_tables_query[db_get_type_index(jcr->db_batch)], NULL, NULL);
@@ -847,8 +857,8 @@ bool db_write_batch_file_records(JCR *jcr)
       Jmsg1(jcr, M_FATAL, 0, "Unlock Filename table %s\n", jcr->db_batch->errmsg);
       goto bail_out;
    }
-   
-   if (!db_sql_query(jcr->db_batch, 
+
+   if (!db_sql_query(jcr->db_batch,
 "INSERT INTO File (FileIndex, JobId, PathId, FilenameId, LStat, MD5, DeltaSeq) "
     "SELECT batch.FileIndex, batch.JobId, Path.PathId, "
            "Filename.FilenameId,batch.LStat, batch.MD5, batch.DeltaSeq "
@@ -891,7 +901,7 @@ bool db_create_batch_file_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
    Dmsg1(dbglevel, "Fname=%s\n", ar->fname);
    Dmsg0(dbglevel, "put_file_into_catalog\n");
 
-   if (jcr->batch_started && jcr->db_batch->changes > 800000) {
+   if (jcr->batch_started && jcr->db_batch->changes > 500000) {
       db_write_batch_file_records(jcr);
       jcr->db_batch->changes = 0;
    }
@@ -902,7 +912,7 @@ bool db_create_batch_file_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
          return false;     /* error already printed */
       }
       if (!sql_batch_start(jcr, jcr->db_batch)) {
-         Mmsg1(&mdb->errmsg, 
+         Mmsg1(&mdb->errmsg,
               "Can't start batch mode: ERR=%s", db_strerror(jcr->db_batch));
          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
          return false;
@@ -1005,7 +1015,7 @@ static int db_create_filename_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
    mdb->errmsg[0] = 0;
    mdb->esc_name = check_pool_memory_size(mdb->esc_name, 2*mdb->fnl+2);
    db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
-   
+
    Mmsg(mdb->cmd, "SELECT FilenameId FROM Filename WHERE Name='%s'", mdb->esc_name);
 
    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
@@ -1042,7 +1052,7 @@ static int db_create_filename_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
    return ar->FilenameId > 0;
 }
 
-/** 
+/**
  * Create file attributes record, or base file attributes record
  */
 bool db_create_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
@@ -1057,7 +1067,7 @@ bool db_create_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
          ar->Stream == STREAM_UNIX_ATTRIBUTES_EX)) {
       Mmsg1(&mdb->errmsg, _("Attempt to put non-attributes into catalog. Stream=%d\n"),
          ar->Stream);
-      Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg); 
+      Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
       return false;
    }
 
@@ -1089,15 +1099,15 @@ bool db_create_base_file_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
    Dmsg1(dbglevel, "create_base_file Fname=%s\n", ar->fname);
    Dmsg0(dbglevel, "put_base_file_into_catalog\n");
 
-   db_lock(mdb); 
+   db_lock(mdb);
    split_path_and_file(jcr, mdb, ar->fname);
-   
+
    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
    db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
-   
+
    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
    db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
-   
+
    Mmsg(mdb->cmd, "INSERT INTO basefile%lld (Path, Name) VALUES ('%s','%s')",
         (uint64_t)jcr->JobId, mdb->esc_path, mdb->esc_name);
 
@@ -1107,7 +1117,7 @@ bool db_create_base_file_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
    return ret;
 }
 
-/** 
+/**
  * Cleanup the base file temporary tables
  */
 static void db_cleanup_base_file(JCR *jcr, B_DB *mdb)
@@ -1131,16 +1141,23 @@ bool db_commit_base_file_attributes_record(JCR *jcr, B_DB *mdb)
 
    db_lock(mdb);
 
-   Mmsg(mdb->cmd, 
+   Mmsg(mdb->cmd,
   "INSERT INTO BaseFiles (BaseJobId, JobId, FileId, FileIndex) "
    "SELECT B.JobId AS BaseJobId, %s AS JobId, "
           "B.FileId, B.FileIndex "
      "FROM basefile%s AS A, new_basefile%s AS B "
     "WHERE A.Path = B.Path "
       "AND A.Name = B.Name "
-    "ORDER BY B.FileId", 
+    "ORDER BY B.FileId",
         edit_uint64(jcr->JobId, ed1), ed1, ed1);
    ret = db_sql_query(mdb, mdb->cmd, NULL, NULL);
+   /*
+    * Display error now, because the subsequent cleanup destroys the
+    *  error message from the above query.
+    */
+   if (!ret) {
+      Jmsg1(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
+   }
    jcr->nb_base_files_used = sql_affected_rows(mdb);
    db_cleanup_base_file(jcr, mdb);
 
@@ -1150,7 +1167,7 @@ bool db_commit_base_file_attributes_record(JCR *jcr, B_DB *mdb)
 
 /**
  * Find the last "accurate" backup state with Base jobs
- * 1) Get all files with jobid in list (F subquery) 
+ * 1) Get all files with jobid in list (F subquery)
  * 2) Take only the last version of each file (Temp subquery) => accurate list is ok
  * 3) Put the result in a temporary table for the end of job
  *
@@ -1160,7 +1177,7 @@ bool db_create_base_file_list(JCR *jcr, B_DB *mdb, char *jobids)
    POOL_MEM buf;
    bool ret=false;
 
-   db_lock(mdb);   
+   db_lock(mdb);
 
    if (!*jobids) {
       Mmsg(mdb->errmsg, _("ERR=JobIds are empty\n"));
@@ -1198,7 +1215,7 @@ bool db_create_restore_object_record(JCR *jcr, B_DB *mdb, ROBJECT_DBR *ro)
    mdb->fnl = strlen(ro->object_name);
    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
    db_escape_string(jcr, mdb, mdb->esc_name, ro->object_name, mdb->fnl);
-   
+
    db_escape_object(jcr, mdb, ro->object, ro->object_len);
 
    plug_name_len = strlen(ro->plugin_name);
@@ -1211,7 +1228,7 @@ bool db_create_restore_object_record(JCR *jcr, B_DB *mdb, ROBJECT_DBR *ro)
         "ObjectCompression,FileIndex,JobId) "
         "VALUES ('%s','%s','%s',%d,%d,%d,%d,%d,%d,%u)",
         mdb->esc_name, esc_plug_name, mdb->esc_obj,
-        ro->object_len, ro->object_full_len, ro->object_index, 
+        ro->object_len, ro->object_full_len, ro->object_index,
         ro->FileType, ro->object_compression, ro->FileIndex, ro->JobId);
 
    ro->RestoreObjectId = sql_insert_autokey_record(mdb, mdb->cmd, NT_("RestoreObject"));
@@ -1228,4 +1245,4 @@ bool db_create_restore_object_record(JCR *jcr, B_DB *mdb, ROBJECT_DBR *ro)
    return stat;
 }
 
-#endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI */
+#endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL */