]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/cats/sql.c
Fix #1648 about make_catalog_backup.pl with multiple catalog
[bacula/bacula] / bacula / src / cats / sql.c
index 56b9ccaf0421b7395bf77d3ef953f4d22e671403..1f79a814b806461cf32f1bb76b16ba62cd983bd9 100644 (file)
@@ -1,12 +1,12 @@
 /*
    Bacula® - The Network Backup Solution
 
-   Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
+   Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
 
    The main author of Bacula is Kern Sibbald, with contributions from
    many others, a complete list can be found in the file AUTHORS.
    This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version two of the GNU General Public
+   modify it under the terms of version three of the GNU Affero General Public
    License as published by the Free Software Foundation and included
    in the file LICENSE.
 
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    General Public License for more details.
 
-   You should have received a copy of the GNU General Public License
+   You should have received a copy of the GNU Affero General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301, USA.
 
-   Bacula® is a registered trademark of John Walker.
+   Bacula® is a registered trademark of Kern Sibbald.
    The licensor of Bacula is the Free Software Foundation Europe
    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
    Switzerland, email:ftf@fsfeurope.org.
@@ -35,7 +35,7 @@
  *
  *    Kern Sibbald, March 2000
  *
- *    Version $Id$
+ *    Version $Id: sql.c 8034 2008-11-11 14:33:46Z ricozz $
  */
 
 /* The following is necessary so that we do not include
 #include "bacula.h"
 #include "cats.h"
 
-#if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_DBI
+#if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
 
 uint32_t bacula_db_version = 0;
 
-int db_type = -1;
+int db_type = -1;        /* SQL engine type index */
 
 /* Forward referenced subroutines */
 void print_dashes(B_DB *mdb);
 void print_result(B_DB *mdb);
 
-B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user, 
-              const char *db_password, const char *db_address, int db_port, 
+B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
+              const char *db_password, const char *db_address, int db_port,
               const char *db_socket, int mult_db_connections)
-{              
+{
 #ifdef HAVE_DBI
    char *p;
    if (!db_driver) {
@@ -68,13 +68,17 @@ B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *
    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
       Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
    }
-   p = (char *)(db_driver + 4);      
+   p = (char *)(db_driver + 4);
    if (strcasecmp(p, "mysql") == 0) {
       db_type = SQL_TYPE_MYSQL;
    } else if (strcasecmp(p, "postgresql") == 0) {
       db_type = SQL_TYPE_POSTGRESQL;
    } else if (strcasecmp(p, "sqlite") == 0) {
       db_type = SQL_TYPE_SQLITE;
+   } else if (strcasecmp(p, "sqlite3") == 0) {
+      db_type = SQL_TYPE_SQLITE3;
+   } else if (strcasecmp(p, "ingres") == 0) {
+      db_type = SQL_TYPE_INGRES;
    } else {
       Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
    }
@@ -82,17 +86,19 @@ B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *
    db_type = SQL_TYPE_MYSQL;
 #elif HAVE_POSTGRESQL
    db_type = SQL_TYPE_POSTGRESQL;
+#elif HAVE_INGRES
+   db_type = SQL_TYPE_INGRES;
 #elif HAVE_SQLITE
    db_type = SQL_TYPE_SQLITE;
 #elif HAVE_SQLITE3
-   db_type = SQL_TYPE_SQLITE;
+   db_type = SQL_TYPE_SQLITE3;
 #endif
 
    return db_init_database(jcr, db_name, db_user, db_password, db_address,
              db_port, db_socket, mult_db_connections);
 }
 
-dbid_list::dbid_list() 
+dbid_list::dbid_list()
 {
    memset(this, 0, sizeof(dbid_list));
    max_ids = 1000;
@@ -101,16 +107,15 @@ dbid_list::dbid_list()
    PurgedFiles = NULL;
 }
 
-dbid_list::~dbid_list() 
-{ 
+dbid_list::~dbid_list()
+{
    free(DBId);
 }
 
-
 /*
  * Called here to retrieve an integer from the database
  */
-static int int_handler(void *ctx, int num_fields, char **row)
+int db_int_handler(void *ctx, int num_fields, char **row)
 {
    uint32_t *val = (uint32_t *)ctx;
 
@@ -142,7 +147,68 @@ int db_int64_handler(void *ctx, int num_fields, char **row)
    return 0;
 }
 
+/*
+ * Use to build a comma separated list of values from a query. "10,20,30"
+ */
+int db_list_handler(void *ctx, int num_fields, char **row)
+{
+   db_list_ctx *lctx = (db_list_ctx *)ctx;
+   if (num_fields == 1 && row[0]) {
+      if (lctx->list[0]) {
+         pm_strcat(lctx->list, ",");
+      }
+      pm_strcat(lctx->list, row[0]);
+      lctx->count++;
+   }
+   return 0;
+}
+
+
+/*
+ * Called here to retrieve an integer from the database
+ */
+static int db_max_connections_handler(void *ctx, int num_fields, char **row)
+{
+   uint32_t *val = (uint32_t *)ctx;
+   uint32_t index = sql_get_max_connections_index[db_type];
+   if (row[index]) {
+      *val = str_to_int64(row[index]);
+   } else {
+      Dmsg0(800, "int_handler finds zero\n");
+      *val = 0;
+   }
+   return 0;
+}
+
+/* 
+ * Check catalog max_connections setting
+ */
+bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
+{
+#ifdef HAVE_BATCH_FILE_INSERT
+
+   uint32_t max_conn = 0;
+
+   /* With Batch insert, verify max_connections */
+   if (!db_sql_query(mdb, sql_get_max_connections[db_type], 
+                     db_max_connections_handler, &max_conn)) {
+      Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
+      return false;
+   }
+   if (max_conn && max_concurrent_jobs > max_conn) {
+      Mmsg(mdb->errmsg, 
+           _("Potential performance problem:\n"
+             "max_connections=%d set for %s database \"%s\" should be larger than Director's "
+             "MaxConcurrentJobs=%d\n"),
+           max_conn, db_get_type(), mdb->db_name, max_concurrent_jobs);
+      Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
+      return false;
+   }
 
+#endif
+
+   return true;
+}
 
 /* NOTE!!! The following routines expect that the
  *  calling subroutine sets and clears the mutex
@@ -154,7 +220,7 @@ bool check_tables_version(JCR *jcr, B_DB *mdb)
    const char *query = "SELECT VersionId FROM Version";
 
    bacula_db_version = 0;
-   if (!db_sql_query(mdb, query, int_handler, (void *)&bacula_db_version)) {
+   if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
       return false;
    }
@@ -204,11 +270,7 @@ InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
       }
       return 0;
    }
-   if (mdb->have_insert_id) {
-      mdb->num_rows = sql_affected_rows(mdb);
-   } else {
-      mdb->num_rows = 1;
-   }
+   mdb->num_rows = sql_affected_rows(mdb);
    if (mdb->num_rows != 1) {
       char ed1[30];
       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
@@ -317,7 +379,7 @@ char *db_strerror(B_DB *mdb)
 void _db_lock(const char *file, int line, B_DB *mdb)
 {
    int errstat;
-   if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
+   if ((errstat=rwl_writelock_p(&mdb->lock, file, line)) != 0) {
       berrno be;
       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
            errstat, be.bstrerror(errstat));
@@ -390,6 +452,58 @@ void db_start_transaction(JCR *jcr, B_DB *mdb)
    }
    db_unlock(mdb);
 #endif
+
+#ifdef HAVE_INGRES
+   if (!mdb->allow_transactions) {
+      return;
+   }
+   db_lock(mdb);
+   /* Allow only 25,000 changes per transaction */
+   if (mdb->transaction && mdb->changes > 25000) {
+      db_end_transaction(jcr, mdb);
+   }
+   if (!mdb->transaction) {
+      db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
+      Dmsg0(400, "Start Ingres transaction\n");
+      mdb->transaction = 1;
+   }
+   db_unlock(mdb);
+#endif
+
+#ifdef HAVE_DBI
+   if (db_type == SQL_TYPE_SQLITE) {
+      if (!mdb->allow_transactions) {
+         return;
+      }
+      db_lock(mdb);
+      /* Allow only 10,000 changes per transaction */
+      if (mdb->transaction && mdb->changes > 10000) {
+         db_end_transaction(jcr, mdb);
+      }
+      if (!mdb->transaction) {
+         //my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
+         db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
+         Dmsg0(400, "Start SQLite transaction\n");
+         mdb->transaction = 1;
+      }
+      db_unlock(mdb);
+   } else if (db_type == SQL_TYPE_POSTGRESQL) {
+      if (!mdb->allow_transactions) {
+         return;
+      }
+      db_lock(mdb);
+      /* Allow only 25,000 changes per transaction */
+      if (mdb->transaction && mdb->changes > 25000) {
+         db_end_transaction(jcr, mdb);
+      }
+      if (!mdb->transaction) {
+         db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
+         Dmsg0(400, "Start PosgreSQL transaction\n");
+         mdb->transaction = 1;
+      }
+      db_unlock(mdb);
+   }
+#endif
 }
 
 void db_end_transaction(JCR *jcr, B_DB *mdb)
@@ -404,7 +518,7 @@ void db_end_transaction(JCR *jcr, B_DB *mdb)
 
    if (jcr && jcr->cached_attribute) {
       Dmsg0(400, "Flush last cached attribute.\n");
-      if (!db_create_file_attributes_record(jcr, mdb, jcr->ar)) {
+      if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
       }
       jcr->cached_attribute = false;
@@ -424,6 +538,23 @@ void db_end_transaction(JCR *jcr, B_DB *mdb)
    db_unlock(mdb);
 #endif
 
+
+
+#ifdef HAVE_INGRES
+   if (!mdb->allow_transactions) {
+      return;
+   }
+   db_lock(mdb);
+   if (mdb->transaction) {
+      db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
+      mdb->transaction = 0;
+      Dmsg1(400, "End Ingres transaction changes=%d\n", mdb->changes);
+   }
+   mdb->changes = 0;
+   db_unlock(mdb);
+#endif
+
+
 #ifdef HAVE_POSTGRESQL
    if (!mdb->allow_transactions) {
       return;
@@ -437,6 +568,35 @@ void db_end_transaction(JCR *jcr, B_DB *mdb)
    mdb->changes = 0;
    db_unlock(mdb);
 #endif
+
+#ifdef HAVE_DBI
+   if (db_type == SQL_TYPE_SQLITE) {
+      if (!mdb->allow_transactions) {
+         return;
+      }
+      db_lock(mdb);
+      if (mdb->transaction) {
+         //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
+         db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
+         mdb->transaction = 0;
+         Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
+      }
+      mdb->changes = 0;
+      db_unlock(mdb);
+   } else if (db_type == SQL_TYPE_POSTGRESQL) {
+      if (!mdb->allow_transactions) {
+         return;
+      }
+      db_lock(mdb);
+      if (mdb->transaction) {
+         db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
+         mdb->transaction = 0;
+         Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
+      }
+      mdb->changes = 0;
+      db_unlock(mdb);
+   }
+#endif
 }
 
 /*
@@ -495,6 +655,21 @@ void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
 }
 
+/*
+ * Set maximum field length to something reasonable
+ */
+static int max_length(int max_length)
+{
+   int max_len = max_length;
+   /* Sanity check */
+   if (max_len < 0) {
+      max_len = 2;
+   } else if (max_len > 100) {
+      max_len = 100;
+   }
+   return max_len;
+}
+
 /*
  * List dashes as part of header for listing SQL results in a table
  */
@@ -503,12 +678,17 @@ list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
 {
    SQL_FIELD  *field;
    int i, j;
+   int len;
 
    sql_field_seek(mdb, 0);
    send(ctx, "+");
    for (i = 0; i < sql_num_fields(mdb); i++) {
       field = sql_fetch_field(mdb);
-      for (j = 0; j < (int)field->max_length + 2; j++) {
+      if (!field) {
+         break;
+      }
+      len = max_length(field->max_length + 2);
+      for (j = 0; j < len; j++) {
          send(ctx, "-");
       }
       send(ctx, "+");
@@ -540,6 +720,9 @@ list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type t
    for (i = 0; i < sql_num_fields(mdb); i++) {
       Dmsg1(800, "list_result processing field %d\n", i);
       field = sql_fetch_field(mdb);
+      if (!field) {
+         break;
+      }
       col_len = cstrlen(field->name);
       if (type == VERT_LIST) {
          if (col_len > max_len) {
@@ -571,7 +754,11 @@ list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type t
    for (i = 0; i < sql_num_fields(mdb); i++) {
       Dmsg1(800, "list_result looking at field %d\n", i);
       field = sql_fetch_field(mdb);
-      bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, field->name);
+      if (!field) {
+         break;
+      }
+      max_len = max_length(field->max_length);
+      bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
       send(ctx, buf);
    }
    send(ctx, "\n");
@@ -583,13 +770,17 @@ list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type t
       send(ctx, "|");
       for (i = 0; i < sql_num_fields(mdb); i++) {
          field = sql_fetch_field(mdb);
+         if (!field) {
+            break;
+         }
+         max_len = max_length(field->max_length);
          if (row[i] == NULL) {
-            bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, "NULL");
+            bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
-            bsnprintf(buf, sizeof(buf), " %*s |", (int)field->max_length,
+            bsnprintf(buf, sizeof(buf), " %*s |", max_len,
                       add_commas(row[i], ewc));
          } else {
-            bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, row[i]);
+            bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
          }
          send(ctx, buf);
       }
@@ -605,6 +796,9 @@ vertical_list:
       sql_field_seek(mdb, 0);
       for (i = 0; i < sql_num_fields(mdb); i++) {
          field = sql_fetch_field(mdb);
+         if (!field) {
+            break;
+         }
          if (row[i] == NULL) {
             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
@@ -620,5 +814,65 @@ vertical_list:
    return;
 }
 
+/* 
+ * Open a new connexion to mdb catalog. This function is used
+ * by batch and accurate mode.
+ */
+bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
+{
+#ifdef HAVE_BATCH_FILE_INSERT
+   const int multi_db = true;   /* we force a new connection only if batch insert is enabled */
+#else
+   const int multi_db = false;
+#endif
+
+   if (!jcr->db_batch) {
+      jcr->db_batch = db_init_database(jcr, 
+                                      mdb->db_name, 
+                                      mdb->db_user,
+                                      mdb->db_password, 
+                                      mdb->db_address,
+                                      mdb->db_port,
+                                      mdb->db_socket,
+                                      multi_db /* multi_db = true when using batch mode */);
+      if (!jcr->db_batch) {
+         Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
+         Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
+         return false;
+      }
+
+      if (!db_open_database(jcr, jcr->db_batch)) {
+         Mmsg2(&mdb->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
+              jcr->db_batch->db_name, db_strerror(jcr->db_batch));
+         Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
+         return false;
+      }      
+      Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
+            jcr->db_batch->connected, jcr->db_batch->db);
+
+   }
+   return true;
+}
+
+/*
+ * !!! WARNING !!! Use this function only when bacula is stopped.
+ * ie, after a fatal signal and before exiting the program
+ * Print information about a B_DB object.
+ */
+void db_debug_print(JCR *jcr, FILE *fp)
+{
+   B_DB *mdb = jcr->db;
+
+   if (!mdb) {
+      return;
+   }
+
+   fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
+           mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
+   fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
+   if (mdb->lock.valid == RWLOCK_VALID) { 
+      fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
+   }
+}
 
-#endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/
+#endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES*/