/*
Bacula® - The Network Backup Solution
- Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
+ Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
The main author of Bacula is Kern Sibbald, with contributions from
many others, a complete list can be found in the file AUTHORS.
This program is Free Software; you can redistribute it and/or
- modify it under the terms of version two of the GNU General Public
+ modify it under the terms of version three of the GNU Affero General Public
License as published by the Free Software Foundation and included
in the file LICENSE.
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
- You should have received a copy of the GNU General Public License
+ You should have received a copy of the GNU Affero General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
*/
/*
* Bacula Catalog Database routines specific to Ingres
- * These are Ingres specific routines
+ * These are Ingres specific routines
*
- * Stefan Reddig, June 2009
+ * Stefan Reddig, June 2009 with help of Marco van Wieringen April 2010
* based uopn work done
* by Dan Langille, December 2003 and
* by Kern Sibbald, March 2000
*
+ * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
*/
-
-/* The following is necessary so that we do not include
- * the dummy external definition of DB.
- */
-#define __SQL_C /* indicate that this is sql.c */
-
#include "bacula.h"
-#include "cats.h"
#ifdef HAVE_INGRES
+#include "cats.h"
+#include "bdb_priv.h"
#include "myingres.h"
+#include "bdb_ingres.h"
+#include "lib/breg.h"
/* -----------------------------------------------------------------------
*
* -----------------------------------------------------------------------
*/
-/* List of open databases */ /* SRE: needed for ingres? */
-static BQUEUE db_list = {&db_list, &db_list};
+/*
+ * List of open databases.
+ */
+static dlist *db_list = NULL;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+struct B_DB_RWRULE {
+ int pattern_length;
+ char *search_pattern;
+ BREGEXP *rewrite_regexp;
+ bool trigger;
+};
+
/*
- * Retrieve database type
+ * Create a new query filter.
*/
-const char *
-db_get_type(void)
+static bool db_allocate_query_filter(JCR *jcr, alist *query_filters, int pattern_length,
+ const char *search_pattern, const char *filter)
{
- return "Ingres";
+ B_DB_RWRULE *rewrite_rule;
+
+ rewrite_rule = (B_DB_RWRULE *)malloc(sizeof(B_DB_RWRULE));
+
+ rewrite_rule->pattern_length = pattern_length;
+ rewrite_rule->search_pattern = bstrdup(search_pattern);
+ rewrite_rule->rewrite_regexp = new_bregexp(filter);
+ rewrite_rule->trigger = false;
+
+ if (!rewrite_rule->rewrite_regexp) {
+ Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filter.\n"));
+ free(rewrite_rule->search_pattern);
+ free(rewrite_rule);
+ return false;
+ } else {
+ query_filters->append(rewrite_rule);
+ return true;
+ }
}
/*
- * Initialize database data structure. In principal this should
- * never have errors, or it is really fatal.
+ * Create a stack of all filters that should be applied to a SQL query
+ * before submitting it to the database backend.
*/
-B_DB *
-db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
- const char *db_address, int db_port, const char *db_socket,
- int mult_db_connections)
+static inline alist *db_initialize_query_filters(JCR *jcr)
{
- B_DB *mdb;
+ alist *query_filters;
- if (!db_user) {
- Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
+ query_filters = New(alist(10, not_owned_by_alist));
+
+ if (!query_filters) {
+ Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filters.\n"));
return NULL;
}
- P(mutex); /* lock DB queue */
- if (!mult_db_connections) {
- /* Look to see if DB already open */
- for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
- if (bstrcmp(mdb->db_name, db_name) &&
- bstrcmp(mdb->db_address, db_address) &&
- mdb->db_port == db_port) {
- Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
- mdb->ref_count++;
- V(mutex);
- return mdb; /* already open */
+
+ db_allocate_query_filter(jcr, query_filters, 6, "OFFSET",
+ "/LIMIT ([0-9]+) OFFSET ([0-9]+)/OFFSET $2 FETCH NEXT $1 ROWS ONLY/ig");
+ db_allocate_query_filter(jcr, query_filters, 5, "LIMIT",
+ "/LIMIT ([0-9]+)/FETCH FIRST $1 ROWS ONLY/ig");
+ db_allocate_query_filter(jcr, query_filters, 9, "TEMPORARY",
+ "/CREATE TEMPORARY TABLE (.+)/DECLARE GLOBAL TEMPORARY TABLE $1 ON COMMIT PRESERVE ROWS WITH NORECOVERY/i");
+
+ return query_filters;
+}
+
+/*
+ * Free all query filters.
+ */
+static inline void db_destroy_query_filters(alist *query_filters)
+{
+ B_DB_RWRULE *rewrite_rule;
+
+ foreach_alist(rewrite_rule, query_filters) {
+ free_bregexp(rewrite_rule->rewrite_regexp);
+ free(rewrite_rule->search_pattern);
+ free(rewrite_rule);
+ }
+
+ delete query_filters;
+}
+
+B_DB_INGRES::B_DB_INGRES(JCR *jcr,
+ const char *db_driver,
+ const char *db_name,
+ const char *db_user,
+ const char *db_password,
+ const char *db_address,
+ int db_port,
+ const char *db_socket,
+ bool mult_db_connections,
+ bool disable_batch_insert)
+{
+ B_DB_INGRES *mdb;
+ int next_session_id = 0;
+
+ /*
+ * See what the next available session_id is.
+ * We first see what the highest session_id is used now.
+ */
+ if (db_list) {
+ foreach_dlist(mdb, db_list) {
+ if (mdb->m_session_id > next_session_id) {
+ next_session_id = mdb->m_session_id;
}
}
}
- Dmsg0(100, "db_open first time\n");
- mdb = (B_DB *)malloc(sizeof(B_DB));
- memset(mdb, 0, sizeof(B_DB));
- mdb->db_name = bstrdup(db_name);
- mdb->db_user = bstrdup(db_user);
+
+ /*
+ * Initialize the parent class members.
+ */
+ m_db_interface_type = SQL_INTERFACE_TYPE_INGRES;
+ m_db_type = SQL_TYPE_INGRES;
+ m_db_driver = bstrdup("ingres");
+ m_db_name = bstrdup(db_name);
+ m_db_user = bstrdup(db_user);
if (db_password) {
- mdb->db_password = bstrdup(db_password);
+ m_db_password = bstrdup(db_password);
}
if (db_address) {
- mdb->db_address = bstrdup(db_address);
+ m_db_address = bstrdup(db_address);
}
if (db_socket) {
- mdb->db_socket = bstrdup(db_socket);
- }
- mdb->db_port = db_port;
- mdb->have_insert_id = TRUE;
- mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
- *mdb->errmsg = 0;
- mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
- mdb->cached_path = get_pool_memory(PM_FNAME);
- mdb->cached_path_id = 0;
- mdb->ref_count = 1;
- mdb->fname = get_pool_memory(PM_FNAME);
- mdb->path = get_pool_memory(PM_FNAME);
- mdb->esc_name = get_pool_memory(PM_FNAME);
- mdb->esc_path = get_pool_memory(PM_FNAME);
- mdb->allow_transactions = mult_db_connections;
- qinsert(&db_list, &mdb->bq); /* put db in list */
- V(mutex);
- return mdb;
-}
-
-/* Check that the database correspond to the encoding we want */
-static bool check_database_encoding(JCR *jcr, B_DB *mdb)
-{
-/* SRE: TODO! Needed?
- SQL_ROW row;
- int ret=false;
-
- if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
- Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
- return false;
+ m_db_socket = bstrdup(db_socket);
}
-
- if ((row = sql_fetch_row(mdb)) == NULL) {
- Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
- Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
+ m_db_port = db_port;
+ if (disable_batch_insert) {
+ m_disabled_batch_insert = true;
+ m_have_batch_insert = false;
} else {
- ret = bstrcmp(row[0], "SQL_ASCII");
- if (!ret) {
- Mmsg(mdb->errmsg,
- _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
- mdb->db_name, row[0]);
- Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
- Dmsg1(50, "%s", mdb->errmsg);
- }
- }
- return ret;
-*/
- return true;
+ m_disabled_batch_insert = false;
+#if defined(USE_BATCH_FILE_INSERT)
+ m_have_batch_insert = true;
+#else
+ m_have_batch_insert = false;
+#endif
+ }
+
+ errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
+ *errmsg = 0;
+ cmd = get_pool_memory(PM_EMSG); /* get command buffer */
+ cached_path = get_pool_memory(PM_FNAME);
+ cached_path_id = 0;
+ m_ref_count = 1;
+ fname = get_pool_memory(PM_FNAME);
+ path = get_pool_memory(PM_FNAME);
+ esc_name = get_pool_memory(PM_FNAME);
+ esc_path = get_pool_memory(PM_FNAME);
+ esc_obj = get_pool_memory(PM_FNAME);
+ m_allow_transactions = mult_db_connections;
+
+ /*
+ * Initialize the private members.
+ */
+ m_db_handle = NULL;
+ m_result = NULL;
+ m_explicit_commit = true;
+ m_session_id = ++next_session_id;
+ m_query_filters = db_initialize_query_filters(jcr);
+
+ /*
+ * Put the db in the list.
+ */
+ if (db_list == NULL) {
+ db_list = New(dlist(this, &this->m_link));
+ }
+ db_list->append(this);
}
-/*
- * Check for errors in DBMS work
- */
-static int sql_check(B_DB *mdb)
+B_DB_INGRES::~B_DB_INGRES()
{
- return INGcheck();
}
/*
* Now actually open the database. This can generate errors,
* which are returned in the errmsg
*
- * DO NOT close the database or free(mdb) here !!!!
+ * DO NOT close the database or delete mdb here !!!!
*/
-int
-db_open_database(JCR *jcr, B_DB *mdb)
+bool B_DB_INGRES::db_open_database(JCR *jcr)
{
+ bool retval = false;
int errstat;
- char buf[10], *port;
P(mutex);
- if (mdb->connected) {
- V(mutex);
- return 1;
+ if (m_connected) {
+ retval = true;
+ goto bail_out;
}
- mdb->connected = false;
- if ((errstat=rwl_init(&mdb->lock)) != 0) {
+ if ((errstat=rwl_init(&m_lock)) != 0) {
berrno be;
- Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
+ Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
be.bstrerror(errstat));
- V(mutex);
- return 0;
- }
-
- if (mdb->db_port) {
- bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
- port = buf;
- } else {
- port = NULL;
+ goto bail_out;
}
- mdb->db = INGconnectDB(mdb->db_name, mdb->db_user, mdb->db_password);
+ m_db_handle = INGconnectDB(m_db_name, m_db_user, m_db_password, m_session_id);
Dmsg0(50, "Ingres real CONNECT done\n");
- Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
- mdb->db_password==NULL?"(NULL)":mdb->db_password);
+ Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
+ m_db_password == NULL ? "(NULL)" : m_db_password);
- if (sql_check(mdb)) {
- Mmsg2(&mdb->errmsg, _("Unable to connect to Ingres server.\n"
+ if (!m_db_handle) {
+ Mmsg2(&errmsg, _("Unable to connect to Ingres server.\n"
"Database=%s User=%s\n"
"It is probably not running or your password is incorrect.\n"),
- mdb->db_name, mdb->db_user);
- V(mutex);
- return 0;
+ m_db_name, m_db_user);
+ goto bail_out;
}
- mdb->connected = true;
+ m_connected = true;
+
+ INGsetDefaultLockingMode(m_db_handle);
- if (!check_tables_version(jcr, mdb)) {
- V(mutex);
- return 0;
+ if (!check_tables_version(jcr, this)) {
+ goto bail_out;
}
- //sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
-
- /* check that encoding is SQL_ASCII */
- check_database_encoding(jcr, mdb);
+ retval = true;
+bail_out:
V(mutex);
- return 1;
+ return retval;
}
-void
-db_close_database(JCR *jcr, B_DB *mdb)
+void B_DB_INGRES::db_close_database(JCR *jcr)
{
- if (!mdb) {
- return;
- }
- db_end_transaction(jcr, mdb);
+ db_end_transaction(jcr);
P(mutex);
- sql_free_result(mdb);
- mdb->ref_count--;
- if (mdb->ref_count == 0) {
- qdchain(&mdb->bq);
- if (mdb->connected && mdb->db) {
- sql_close(mdb);
+ m_ref_count--;
+ if (m_ref_count == 0) {
+ sql_free_result();
+ db_list->remove(this);
+ if (m_connected && m_db_handle) {
+ INGdisconnectDB(m_db_handle);
}
- rwl_destroy(&mdb->lock);
- free_pool_memory(mdb->errmsg);
- free_pool_memory(mdb->cmd);
- free_pool_memory(mdb->cached_path);
- free_pool_memory(mdb->fname);
- free_pool_memory(mdb->path);
- free_pool_memory(mdb->esc_name);
- free_pool_memory(mdb->esc_path);
- if (mdb->db_name) {
- free(mdb->db_name);
+ if (m_query_filters) {
+ db_destroy_query_filters(m_query_filters);
}
- if (mdb->db_user) {
- free(mdb->db_user);
+ rwl_destroy(&m_lock);
+ free_pool_memory(errmsg);
+ free_pool_memory(cmd);
+ free_pool_memory(cached_path);
+ free_pool_memory(fname);
+ free_pool_memory(path);
+ free_pool_memory(esc_name);
+ free_pool_memory(esc_path);
+ free_pool_memory(esc_obj);
+ free(m_db_driver);
+ free(m_db_name);
+ free(m_db_user);
+ if (m_db_password) {
+ free(m_db_password);
}
- if (mdb->db_password) {
- free(mdb->db_password);
+ if (m_db_address) {
+ free(m_db_address);
}
- if (mdb->db_address) {
- free(mdb->db_address);
+ if (m_db_socket) {
+ free(m_db_socket);
}
- if (mdb->db_socket) {
- free(mdb->db_socket);
+ delete this;
+ if (db_list->size() == 0) {
+ delete db_list;
+ db_list = NULL;
}
- free(mdb);
}
V(mutex);
}
-void db_thread_cleanup()
-{ }
+void B_DB_INGRES::db_thread_cleanup(void)
+{
+}
/*
- * Return the next unique index (auto-increment) for
- * the given table. Return NULL on error.
+ * Escape strings so that Ingres is happy
*
- * For Ingres, NULL causes the auto-increment value SRE: true?
- * to be updated.
+ * NOTE! len is the length of the old string. Your new
+ * string must be long enough (max 2*old+1) to hold
+ * the escaped output.
*/
-int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
+void B_DB_INGRES::db_escape_string(JCR *jcr, char *snew, char *old, int len)
{
- strcpy(index, "NULL");
- return 1;
+ char *n, *o;
+
+ n = snew;
+ o = old;
+ while (len--) {
+ switch (*o) {
+ case '\'':
+ *n++ = '\'';
+ *n++ = '\'';
+ o++;
+ break;
+ case 0:
+ *n++ = '\\';
+ *n++ = 0;
+ o++;
+ break;
+ default:
+ *n++ = *o++;
+ break;
+ }
+ }
+ *n = 0;
}
+/*
+ * Escape binary so that Ingres is happy
+ *
+ * NOTE! Need to be implemented (escape \0)
+ *
+ */
+char *B_DB_INGRES::db_escape_object(JCR *jcr, char *old, int len)
+{
+ char *n, *o;
+
+ n = esc_obj = check_pool_memory_size(esc_obj, len*2+1);
+ o = old;
+ while (len--) {
+ switch (*o) {
+ case '\'':
+ *n++ = '\'';
+ *n++ = '\'';
+ o++;
+ break;
+ case 0:
+ *n++ = '\\';
+ *n++ = 0;
+ o++;
+ break;
+ default:
+ *n++ = *o++;
+ break;
+ }
+ }
+ *n = 0;
+ return esc_obj;
+}
/*
- * Escape strings so that Ingres is happy
+ * Unescape binary object so that Ingres is happy
*
- * NOTE! len is the length of the old string. Your new
- * string must be long enough (max 2*old+1) to hold
- * the escaped output.
- * SRE: TODO!
+ * TODO: need to be implemented (escape \0)
*/
-void
-db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
+void B_DB_INGRES::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
+ POOLMEM **dest, int32_t *dest_len)
{
+ if (!from) {
+ *dest[0] = 0;
+ *dest_len = 0;
+ return;
+ }
+ *dest = check_pool_memory_size(*dest, expected_len+1);
+ *dest_len = expected_len;
+ memcpy(*dest, from, expected_len);
+ (*dest)[expected_len]=0;
+}
+
/*
- int error;
-
- PQescapeStringConn(mdb->db, snew, old, len, &error);
- if (error) {
- Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));*/
- /* error on encoding, probably invalid multibyte encoding in the source string
- see PQescapeStringConn documentation for details. */
-/* Dmsg0(500, "PQescapeStringConn failed\n");
- }*/
+ * Start a transaction. This groups inserts and makes things
+ * much more efficient. Usually started when inserting
+ * file attributes.
+ */
+void B_DB_INGRES::db_start_transaction(JCR *jcr)
+{
+ if (!jcr->attr) {
+ jcr->attr = get_pool_memory(PM_FNAME);
+ }
+ if (!jcr->ar) {
+ jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
+ }
+
+ if (!m_allow_transactions) {
+ return;
+ }
+
+ db_lock(this);
+ /* Allow only 25,000 changes per transaction */
+ if (m_transaction && changes > 25000) {
+ db_end_transaction(jcr);
+ }
+ if (!m_transaction) {
+ sql_query("BEGIN"); /* begin transaction */
+ Dmsg0(400, "Start Ingres transaction\n");
+ m_transaction = true;
+ }
+ db_unlock(this);
+}
+
+void B_DB_INGRES::db_end_transaction(JCR *jcr)
+{
+ if (jcr && jcr->cached_attribute) {
+ Dmsg0(400, "Flush last cached attribute.\n");
+ if (!db_create_attributes_record(jcr, this, jcr->ar)) {
+ Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
+ }
+ jcr->cached_attribute = false;
+ }
+
+ if (!m_allow_transactions) {
+ return;
+ }
+
+ db_lock(this);
+ if (m_transaction) {
+ sql_query("COMMIT"); /* end transaction */
+ m_transaction = false;
+ Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
+ }
+ changes = 0;
+ db_unlock(this);
}
/*
* Submit a general SQL command (cmd), and for each row returned,
- * the sqlite_handler is called with the ctx.
+ * the result_handler is called with the ctx.
*/
-bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
+bool B_DB_INGRES::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
{
SQL_ROW row;
+ bool retval = true;
- Dmsg0(500, "db_sql_query started\n");
+ Dmsg1(500, "db_sql_query starts with %s\n", query);
- db_lock(mdb);
- if (sql_query(mdb, query) != 0) {
- Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
- db_unlock(mdb);
+ db_lock(this);
+ if (!sql_query(query, QF_STORE_RESULT)) {
+ Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
Dmsg0(500, "db_sql_query failed\n");
- return false;
+ retval = false;
+ goto bail_out;
}
- Dmsg0(500, "db_sql_query succeeded. checking handler\n");
if (result_handler != NULL) {
Dmsg0(500, "db_sql_query invoking handler\n");
- if ((mdb->result = sql_store_result(mdb)) != NULL) {
- int num_fields = sql_num_fields(mdb);
-
- Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
- while ((row = sql_fetch_row(mdb)) != NULL) {
-
- Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
- if (result_handler(ctx, num_fields, row))
- break;
- }
-
- sql_free_result(mdb);
+ while ((row = sql_fetch_row()) != NULL) {
+ Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
+ if (result_handler(ctx, m_num_fields, row))
+ break;
}
+ sql_free_result();
}
- db_unlock(mdb);
Dmsg0(500, "db_sql_query finished\n");
- return true;
+bail_out:
+ db_unlock(this);
+ return retval;
}
/*
- * Close database connection
+ * Note, if this routine returns false (failure), Bacula expects
+ * that no result has been stored.
+ *
+ * Returns: true on success
+ * false on failure
+ *
*/
-void my_ingres_close(B_DB *mdb)
-{
- INGdisconnectDB(mdb->db);
- //SRE: error handling?
-}
-
-INGRES_ROW my_ingres_fetch_row(B_DB *mdb)
+bool B_DB_INGRES::sql_query(const char *query, int flags)
{
- int j;
- INGRES_ROW row = NULL; // by default, return NULL
-
- Dmsg0(500, "my_ingres_fetch_row start\n");
+ int cols;
+ char *cp, *bp;
+ char *dup_query, *new_query;
+ bool retval = true;
+ bool start_of_transaction = false;
+ bool end_of_transaction = false;
+ B_DB_RWRULE *rewrite_rule;
+
+ Dmsg1(500, "query starts with '%s'\n", query);
+ /*
+ * We always make a private copy of the query as we are doing serious
+ * rewrites in this engine. When running the private copy through the
+ * different query filters we loose the orginal private copy so we
+ * first make a extra reference to it so we can free it on exit from the
+ * function.
+ */
+ dup_query = new_query = bstrdup(query);
+
+ /*
+ * Iterate over the query string and perform any needed operations.
+ * We use a sliding window over the query string where bp points to
+ * the previous position in the query and cp to the current position
+ * in the query.
+ */
+ bp = new_query;
+ while (bp != NULL) {
+ if ((cp = strchr(bp, ' ')) != NULL) {
+ *cp++;
+ }
- if (!mdb->row || mdb->row_size < mdb->num_fields) {
- int num_fields = mdb->num_fields;
- Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
+ if (!strncasecmp(bp, "BEGIN", 5)) {
+ /*
+ * This is the start of a transaction.
+ * Inline copy the rest of the query over the BEGIN keyword.
+ */
+ if (cp) {
+ strcpy(bp, cp);
+ } else {
+ *bp = '\0';
+ }
+ start_of_transaction = true;
+ } else if (!strncasecmp(bp, "COMMIT", 6) && (cp == NULL || strncasecmp(cp, "PRESERVE", 8))) {
+ /*
+ * This is the end of a transaction. We cannot check for just the COMMIT
+ * keyword as a DECLARE of an tempory table also has the word COMMIT in it
+ * but its followed by the word PRESERVE.
+ * Inline copy the rest of the query over the COMMIT keyword.
+ */
+ if (cp) {
+ strcpy(bp, cp);
+ } else {
+ *bp = '\0';
+ }
+ end_of_transaction = true;
+ }
- if (mdb->row) {
- Dmsg0(500, "my_ingres_fetch_row freeing space\n");
- free(mdb->row);
+ /*
+ * See what query filter might match.
+ */
+ foreach_alist(rewrite_rule, m_query_filters) {
+ if (!strncasecmp(bp, rewrite_rule->search_pattern, rewrite_rule->pattern_length)) {
+ rewrite_rule->trigger = true;
+ }
}
- num_fields += 20; /* add a bit extra */
- mdb->row = (INGRES_ROW)malloc(sizeof(char *) * num_fields);
- mdb->row_size = num_fields;
- // now reset the row_number now that we have the space allocated
- mdb->row_number = 0;
+ /*
+ * Slide window.
+ */
+ bp = cp;
}
- // if still within the result set
- if (mdb->row_number < mdb->num_rows) {
- Dmsg2(500, "my_ingres_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
- // get each value from this row
- for (j = 0; j < mdb->num_fields; j++) {
- mdb->row[j] = INGgetvalue(mdb->result, mdb->row_number, j);
- Dmsg2(500, "my_ingres_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
+ /*
+ * Run the query through all query filters that apply e.g. have the trigger set in the
+ * previous loop.
+ */
+ foreach_alist(rewrite_rule, m_query_filters) {
+ if (rewrite_rule->trigger) {
+ new_query = rewrite_rule->rewrite_regexp->replace(new_query);
+ rewrite_rule->trigger = false;
}
- // increment the row number for the next call
- mdb->row_number++;
-
- row = mdb->row;
- } else {
- Dmsg2(500, "my_ingres_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
}
- Dmsg1(500, "my_ingres_fetch_row finishes returning %p\n", row);
+ if (start_of_transaction) {
+ Dmsg0(500,"sql_query: Start of transaction\n");
+ m_explicit_commit = false;
+ }
- return row;
-}
+ /*
+ * See if there is any query left after filtering for certain keywords.
+ */
+ bp = new_query;
+ while (bp != NULL && strlen(bp) > 0) {
+ /*
+ * We are starting a new query. reset everything.
+ */
+ m_num_rows = -1;
+ m_row_number = -1;
+ m_field_number = -1;
+
+ if (m_result) {
+ INGclear(m_result); /* hmm, someone forgot to free?? */
+ m_result = NULL;
+ }
+ /*
+ * See if this is a multi-statement query. We split a multi-statement query
+ * on the semi-column and feed the individual queries to the Ingres functions.
+ * We use a sliding window over the query string where bp points to
+ * the previous position in the query and cp to the current position
+ * in the query.
+ */
+ if ((cp = strchr(bp, ';')) != NULL) {
+ *cp++ = '\0';
+ }
-int my_ingres_max_length(B_DB *mdb, int field_num) {
- //
- // for a given column, find the max length
- //
- int max_length;
- int i;
- int this_length;
+ Dmsg1(500, "sql_query after rewrite continues with '%s'\n", bp);
- max_length = 0;
- for (i = 0; i < mdb->num_rows; i++) {
- if (INGgetisnull(mdb->result, i, field_num)) {
- this_length = 4; // "NULL"
+ /*
+ * See if we got a store_result hint which could mean we are running a select.
+ * If flags has QF_STORE_RESULT not set we are sure its not a query that we
+ * need to store anything for.
+ */
+ if (flags & QF_STORE_RESULT) {
+ cols = INGgetCols(m_db_handle, bp, m_explicit_commit);
} else {
- this_length = cstrlen(INGgetvalue(mdb->result, i, field_num));
+ cols = 0;
}
- if (max_length < this_length) {
- max_length = this_length;
+ if (cols <= 0) {
+ if (cols < 0 ) {
+ Dmsg0(500,"sql_query: neg.columns: no DML stmt!\n");
+ retval = false;
+ goto bail_out;
+ }
+ Dmsg0(500,"sql_query (non SELECT) starting...\n");
+ /*
+ * non SELECT
+ */
+ m_num_rows = INGexec(m_db_handle, bp, m_explicit_commit);
+ if (m_num_rows == -1) {
+ Dmsg0(500,"sql_query (non SELECT) went wrong\n");
+ retval = false;
+ goto bail_out;
+ } else {
+ Dmsg0(500,"sql_query (non SELECT) seems ok\n");
+ }
+ } else {
+ /*
+ * SELECT
+ */
+ Dmsg0(500,"sql_query (SELECT) starting...\n");
+ m_result = INGquery(m_db_handle, bp, m_explicit_commit);
+ if (m_result != NULL) {
+ Dmsg0(500, "we have a result\n");
+
+ /*
+ * How many fields in the set?
+ */
+ m_num_fields = (int)INGnfields(m_result);
+ Dmsg1(500, "we have %d fields\n", m_num_fields);
+
+ m_num_rows = INGntuples(m_result);
+ Dmsg1(500, "we have %d rows\n", m_num_rows);
+ } else {
+ Dmsg0(500, "No resultset...\n");
+ retval = false;
+ goto bail_out;
+ }
}
+
+ bp = cp;
}
- return max_length;
+bail_out:
+ if (end_of_transaction) {
+ Dmsg0(500,"sql_query: End of transaction, commiting work\n");
+ m_explicit_commit = true;
+ INGcommit(m_db_handle);
+ }
+
+ free(dup_query);
+ Dmsg0(500, "sql_query finishing\n");
+
+ return retval;
+}
+
+void B_DB_INGRES::sql_free_result(void)
+{
+ db_lock(this);
+ if (m_result) {
+ INGclear(m_result);
+ m_result = NULL;
+ }
+ if (m_rows) {
+ free(m_rows);
+ m_rows = NULL;
+ }
+ if (m_fields) {
+ free(m_fields);
+ m_fields = NULL;
+ }
+ m_num_rows = m_num_fields = 0;
+ db_unlock(this);
}
-INGRES_FIELD * my_ingres_fetch_field(B_DB *mdb)
+SQL_ROW B_DB_INGRES::sql_fetch_row(void)
{
- int i;
+ int j;
+ SQL_ROW row = NULL; /* by default, return NULL */
- Dmsg0(500, "my_ingres_fetch_field starts\n");
+ if (!m_result) {
+ return row;
+ }
+ if (m_result->num_rows <= 0) {
+ return row;
+ }
- if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
- if (mdb->fields) {
- free(mdb->fields);
+ Dmsg0(500, "sql_fetch_row start\n");
+
+ if (!m_rows || m_rows_size < m_num_fields) {
+ if (m_rows) {
+ Dmsg0(500, "sql_fetch_row freeing space\n");
+ free(m_rows);
}
- Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
- mdb->fields = (INGRES_FIELD *)malloc(sizeof(INGRES_FIELD) * mdb->num_fields);
- mdb->fields_size = mdb->num_fields;
+ Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
+ m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
+ m_rows_size = m_num_fields;
+
+ /*
+ * Now reset the row_number now that we have the space allocated
+ */
+ m_row_number = 0;
+ }
- for (i = 0; i < mdb->num_fields; i++) {
- Dmsg1(500, "filling field %d\n", i);
- strcpy(mdb->fields[i].name,INGfname(mdb->result, i));
- mdb->fields[i].max_length = my_ingres_max_length(mdb, i);
- mdb->fields[i].type = INGftype(mdb->result, i);
- mdb->fields[i].flags = 0;
+ /*
+ * If still within the result set
+ */
+ if (m_row_number < m_num_rows) {
+ Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
+ /*
+ * Get each value from this row
+ */
+ for (j = 0; j < m_num_fields; j++) {
+ m_rows[j] = INGgetvalue(m_result, m_row_number, j);
+ Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
+ }
+ /*
+ * Increment the row number for the next call
+ */
+ m_row_number++;
- Dmsg4(500, "my_ingres_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
- mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
- mdb->fields[i].flags);
- } // end for
- } // end if
+ row = m_rows;
+ } else {
+ Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
+ }
- // increment field number for the next time around
+ Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
- Dmsg0(500, "my_ingres_fetch_field finishes\n");
- return &mdb->fields[mdb->field_number++];
+ return row;
+}
+
+const char *B_DB_INGRES::sql_strerror(void)
+{
+ return INGerrorMessage(m_db_handle);
}
-void my_ingres_data_seek(B_DB *mdb, int row)
+void B_DB_INGRES::sql_data_seek(int row)
{
- // set the row number to be returned on the next call
- // to my_ingres_fetch_row
- mdb->row_number = row;
+ /*
+ * Set the row number to be returned on the next call to sql_fetch_row
+ */
+ m_row_number = row;
}
-void my_ingres_field_seek(B_DB *mdb, int field)
+int B_DB_INGRES::sql_affected_rows(void)
{
- mdb->field_number = field;
+ return m_num_rows;
}
/*
- * Note, if this routine returns 1 (failure), Bacula expects
- * that no result has been stored.
- * This is where QUERY_DB comes with Ingres. SRE: true?
- *
- * Returns: 0 on success
- * 1 on failure
- *
+ * First execute the insert query and then retrieve the currval.
+ * By setting transaction to true we make it an atomic transaction
+ * and as such we can get the currval after which we commit if
+ * transaction is false. This way things are an atomic operation
+ * for Ingres and things work. We save the current transaction status
+ * and set transaction in the mdb to true and at the end of this
+ * function we restore the actual transaction status.
*/
-int my_ingres_query(B_DB *mdb, const char *query)
+uint64_t B_DB_INGRES::sql_insert_autokey_record(const char *query, const char *table_name)
{
- Dmsg0(500, "my_ingres_query started\n");
- // We are starting a new query. reset everything.
- mdb->num_rows = -1;
- mdb->row_number = -1;
- mdb->field_number = -1;
+ char sequence[64];
+ char getkeyval_query[256];
+ char *currval;
+ uint64_t id = 0;
+ bool current_explicit_commit;
+
+ /*
+ * Save the current transaction status and pretend we are in a transaction.
+ */
+ current_explicit_commit = m_explicit_commit;
+ m_explicit_commit = false;
+
+ /*
+ * Execute the INSERT query.
+ */
+ m_num_rows = INGexec(m_db_handle, query, m_explicit_commit);
+ if (m_num_rows == -1) {
+ goto bail_out;
+ }
- if (mdb->result) {
- INGclear(mdb->result); /* hmm, someone forgot to free?? */
- mdb->result = NULL;
+ changes++;
+
+ /*
+ * Obtain the current value of the sequence that
+ * provides the serial value for primary key of the table.
+ *
+ * currval is local to our session. It is not affected by
+ * other transactions.
+ *
+ * Determine the name of the sequence.
+ * As we name all sequences as <table>_seq this is easy.
+ */
+ bstrncpy(sequence, table_name, sizeof(sequence));
+ bstrncat(sequence, "_seq", sizeof(sequence));
+
+ bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT %s.currval FROM %s", sequence, table_name);
+
+ if (m_result) {
+ INGclear(m_result);
+ m_result = NULL;
}
+ m_result = INGquery(m_db_handle, getkeyval_query, m_explicit_commit);
- Dmsg1(500, "my_ingres_query starts with '%s'\n", query);
- mdb->result = INGexec(mdb->db, query);
- if (!mdb->result) {
- Dmsg1(50, "Query failed: %s\n", query);
+ if (!m_result) {
+ Dmsg1(50, "Query failed: %s\n", getkeyval_query);
goto bail_out;
}
- mdb->status = INGresultStatus(mdb->result);
- if (mdb->status == ING_COMMAND_OK) {
- Dmsg1(500, "we have a result\n", query);
+ Dmsg0(500, "exec done");
- // how many fields in the set?
- mdb->num_fields = (int)INGnfields(mdb->result);
- Dmsg1(500, "we have %d fields\n", mdb->num_fields);
-
- mdb->num_rows = INGntuples(mdb->result);
- Dmsg1(500, "we have %d rows\n", mdb->num_rows);
-
- mdb->status = 0; /* succeed */
- } else {
- Dmsg1(50, "Result status failed: %s\n", query);
- goto bail_out;
+ currval = INGgetvalue(m_result, 0, 0);
+ if (currval) {
+ id = str_to_uint64(currval);
}
- Dmsg0(500, "my_ingres_query finishing\n");
- return mdb->status;
+ INGclear(m_result);
+ m_result = NULL;
bail_out:
- Dmsg1(500, "we failed\n", query);
- INGclear(mdb->result);
- mdb->result = NULL;
- mdb->status = 1; /* failed */
- return mdb->status;
-}
-
-void my_ingres_free_result(B_DB *mdb)
-{
-
- db_lock(mdb);
- if (mdb->result) {
- INGclear(mdb->result);
- mdb->result = NULL;
+ /*
+ * Restore the actual explicit_commit status.
+ */
+ m_explicit_commit = current_explicit_commit;
+
+ /*
+ * Commit if explicit_commit is not set.
+ */
+ if (m_explicit_commit) {
+ INGcommit(m_db_handle);
}
- if (mdb->row) {
- free(mdb->row);
- mdb->row = NULL;
- }
-
- if (mdb->fields) {
- free(mdb->fields);
- mdb->fields = NULL;
- }
- db_unlock(mdb);
+ return id;
}
-int my_ingres_currval(B_DB *mdb, const char *table_name)
+SQL_FIELD *B_DB_INGRES::sql_fetch_field(void)
{
- // TODO!
- return -1;
-}
+ int i, j;
+ int max_length;
+ int this_length;
-#ifdef HAVE_BATCH_FILE_INSERT
+ if (!m_fields || m_fields_size < m_num_fields) {
+ if (m_fields) {
+ free(m_fields);
+ m_fields = NULL;
+ }
+ Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
+ m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
+ m_fields_size = m_num_fields;
-int my_ingres_batch_start(JCR *jcr, B_DB *mdb)
-{
- //TODO!
- return ING_ERROR;
+ for (i = 0; i < m_num_fields; i++) {
+ Dmsg1(500, "filling field %d\n", i);
+ m_fields[i].name = INGfname(m_result, i);
+ m_fields[i].type = INGftype(m_result, i);
+ m_fields[i].flags = 0;
+
+ /*
+ * For a given column, find the max length.
+ */
+ max_length = 0;
+ for (j = 0; j < m_num_rows; j++) {
+ if (INGgetisnull(m_result, j, i)) {
+ this_length = 4; /* "NULL" */
+ } else {
+ this_length = cstrlen(INGgetvalue(m_result, j, i));
+ }
+
+ if (max_length < this_length) {
+ max_length = this_length;
+ }
+ }
+ m_fields[i].max_length = max_length;
+
+ Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
+ m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
+ }
+ }
+
+ /*
+ * Increment field number for the next time around
+ */
+ return &m_fields[m_field_number++];
}
-/* set error to something to abort operation */
-int my_ingres_batch_end(JCR *jcr, B_DB *mdb, const char *error)
+bool B_DB_INGRES::sql_field_is_not_null(int field_type)
{
- //TODO!
- return ING_ERROR;
+ switch (field_type) {
+ case 1:
+ return true;
+ default:
+ return false;
+ }
}
-int my_ingres_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
+bool B_DB_INGRES::sql_field_is_numeric(int field_type)
{
- //TODO!
- return ING_ERROR;
+ /*
+ * See ${II_SYSTEM}/ingres/files/eqsqlda.h for numeric types.
+ */
+ switch (field_type) {
+ case IISQ_DEC_TYPE:
+ case IISQ_INT_TYPE:
+ case IISQ_FLT_TYPE:
+ return true;
+ default:
+ return false;
+ }
}
-#endif /* HAVE_BATCH_FILE_INSERT */
-
/*
* Escape strings so that Ingres is happy on COPY
*
* string must be long enough (max 2*old+1) to hold
* the escaped output.
*/
-char *my_ingres_copy_escape(char *dest, char *src, size_t len)
+static char *ingres_copy_escape(char *dest, char *src, size_t len)
{
/* we have to escape \t, \n, \r, \ */
char c = '\0' ;
return dest;
}
-#ifdef HAVE_BATCH_FILE_INSERT
-const char *my_ingres_batch_lock_path_query =
- "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
+/*
+ * Returns true if OK
+ * false if failed
+ */
+bool B_DB_INGRES::sql_batch_start(JCR *jcr)
+{
+ bool ok;
+
+ db_lock(this);
+ ok = sql_query("DECLARE GLOBAL TEMPORARY TABLE batch ("
+ "FileIndex INTEGER,"
+ "JobId INTEGER,"
+ "Path VARBYTE(32000),"
+ "Name VARBYTE(32000),"
+ "LStat VARBYTE(255),"
+ "MD5 VARBYTE(255),"
+ "DeltaSeq SMALLINT)"
+ " ON COMMIT PRESERVE ROWS WITH NORECOVERY");
+ db_unlock(this);
+ return ok;
+}
+/*
+ * Returns true if OK
+ * false if failed
+ */
+bool B_DB_INGRES::sql_batch_end(JCR *jcr, const char *error)
+{
+ m_status = 0;
+ return true;
+}
+
+/*
+ * Returns true if OK
+ * false if failed
+ */
+bool B_DB_INGRES::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
+{
+ size_t len;
+ const char *digest;
+ char ed1[50];
+
+ esc_name = check_pool_memory_size(esc_name, fnl*2+1);
+ db_escape_string(jcr, esc_name, fname, fnl);
+
+ esc_path = check_pool_memory_size(esc_path, pnl*2+1);
+ db_escape_string(jcr, esc_path, path, pnl);
+
+ if (ar->Digest == NULL || ar->Digest[0] == 0) {
+ digest = "0";
+ } else {
+ digest = ar->Digest;
+ }
-const char *my_ingres_batch_lock_filename_query =
- "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
+ len = Mmsg(cmd, "INSERT INTO batch VALUES "
+ "(%u,%s,'%s','%s','%s','%s',%u)",
+ ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
+ esc_name, ar->attr, digest, ar->DeltaSeq);
+
+ return sql_query(cmd);
+}
-const char *my_ingres_batch_unlock_tables_query = "COMMIT";
+/*
+ * Initialize database data structure. In principal this should
+ * never have errors, or it is really fatal.
+ */
+B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
+ const char *db_password, const char *db_address, int db_port,
+ const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
+{
+ B_DB_INGRES *mdb = NULL;
-const char *my_ingres_batch_fill_path_query =
- "INSERT INTO Path (Path) "
- "SELECT a.Path FROM "
- "(SELECT DISTINCT Path FROM batch) AS a "
- "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
+ if (!db_user) {
+ Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
+ return NULL;
+ }
+ P(mutex); /* lock DB queue */
+ if (db_list && !mult_db_connections) {
+ /*
+ * Look to see if DB already open
+ */
+ foreach_dlist(mdb, db_list) {
+ if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
+ Dmsg1(100, "DB REopen %s\n", db_name);
+ mdb->increment_refcount();
+ goto bail_out;
+ }
+ }
+ }
-const char *my_ingres_batch_fill_filename_query =
- "INSERT INTO Filename (Name) "
- "SELECT a.Name FROM "
- "(SELECT DISTINCT Name FROM batch) as a "
- "WHERE NOT EXISTS "
- "(SELECT Name FROM Filename WHERE Name = a.Name)";
-#endif /* HAVE_BATCH_FILE_INSERT */
+ Dmsg0(100, "db_init_database first time\n");
+ mdb = New(B_DB_INGRES(jcr, db_driver, db_name, db_user, db_password, db_address,
+ db_port, db_socket, mult_db_connections, disable_batch_insert));
+bail_out:
+ V(mutex);
+ return mdb;
+}
#endif /* HAVE_INGRES */