2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to Ingres
30 * These are Ingres specific routines
32 * Stefan Reddig, June 2009 with help of Marco van Wieringen April 2010
33 * based uopn work done
34 * by Dan Langille, December 2003 and
35 * by Kern Sibbald, March 2000
37 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
47 #include "bdb_ingres.h"
50 /* -----------------------------------------------------------------------
52 * Ingres dependent defines and subroutines
54 * -----------------------------------------------------------------------
58 * List of open databases.
60 static dlist *db_list = NULL;
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
67 BREGEXP *rewrite_regexp;
72 * Create a new query filter.
74 static bool db_allocate_query_filter(JCR *jcr, alist *query_filters, int pattern_length,
75 const char *search_pattern, const char *filter)
77 B_DB_RWRULE *rewrite_rule;
79 rewrite_rule = (B_DB_RWRULE *)malloc(sizeof(B_DB_RWRULE));
81 rewrite_rule->pattern_length = pattern_length;
82 rewrite_rule->search_pattern = bstrdup(search_pattern);
83 rewrite_rule->rewrite_regexp = new_bregexp(filter);
84 rewrite_rule->trigger = false;
86 if (!rewrite_rule->rewrite_regexp) {
87 Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filter.\n"));
88 free(rewrite_rule->search_pattern);
92 query_filters->append(rewrite_rule);
98 * Create a stack of all filters that should be applied to a SQL query
99 * before submitting it to the database backend.
101 static inline alist *db_initialize_query_filters(JCR *jcr)
103 alist *query_filters;
105 query_filters = New(alist(10, not_owned_by_alist));
107 if (!query_filters) {
108 Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filters.\n"));
112 db_allocate_query_filter(jcr, query_filters, 6, "OFFSET",
113 "/LIMIT ([0-9]+) OFFSET ([0-9]+)/OFFSET $2 FETCH NEXT $1 ROWS ONLY/ig");
114 db_allocate_query_filter(jcr, query_filters, 5, "LIMIT",
115 "/LIMIT ([0-9]+)/FETCH FIRST $1 ROWS ONLY/ig");
116 db_allocate_query_filter(jcr, query_filters, 9, "TEMPORARY",
117 "/CREATE TEMPORARY TABLE (.+)/DECLARE GLOBAL TEMPORARY TABLE $1 ON COMMIT PRESERVE ROWS WITH NORECOVERY/i");
119 return query_filters;
123 * Free all query filters.
125 static inline void db_destroy_query_filters(alist *query_filters)
127 B_DB_RWRULE *rewrite_rule;
129 foreach_alist(rewrite_rule, query_filters) {
130 free_bregexp(rewrite_rule->rewrite_regexp);
131 free(rewrite_rule->search_pattern);
135 delete query_filters;
138 B_DB_INGRES::B_DB_INGRES(JCR *jcr,
139 const char *db_driver,
142 const char *db_password,
143 const char *db_address,
145 const char *db_socket,
146 bool mult_db_connections,
147 bool disable_batch_insert)
150 int next_session_id = 0;
153 * See what the next available session_id is.
154 * We first see what the highest session_id is used now.
157 foreach_dlist(mdb, db_list) {
158 if (mdb->m_session_id > next_session_id) {
159 next_session_id = mdb->m_session_id;
165 * Initialize the parent class members.
167 m_db_interface_type = SQL_INTERFACE_TYPE_INGRES;
168 m_db_type = SQL_TYPE_INGRES;
169 m_db_driver = bstrdup("ingres");
170 m_db_name = bstrdup(db_name);
171 m_db_user = bstrdup(db_user);
173 m_db_password = bstrdup(db_password);
176 m_db_address = bstrdup(db_address);
179 m_db_socket = bstrdup(db_socket);
182 if (disable_batch_insert) {
183 m_disabled_batch_insert = true;
184 m_have_batch_insert = false;
186 m_disabled_batch_insert = false;
187 #if defined(USE_BATCH_FILE_INSERT)
188 m_have_batch_insert = true;
190 m_have_batch_insert = false;
194 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
196 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
197 cached_path = get_pool_memory(PM_FNAME);
200 fname = get_pool_memory(PM_FNAME);
201 path = get_pool_memory(PM_FNAME);
202 esc_name = get_pool_memory(PM_FNAME);
203 esc_path = get_pool_memory(PM_FNAME);
204 esc_obj = get_pool_memory(PM_FNAME);
205 m_allow_transactions = mult_db_connections;
207 /* At this time, when mult_db_connections == true, this is for
208 * specific console command such as bvfs or batch mode, and we don't
209 * want to share a batch mode or bvfs. In the future, we can change
210 * the creation function to add this parameter.
212 m_dedicated = mult_db_connections;
215 * Initialize the private members.
219 m_explicit_commit = true;
220 m_session_id = ++next_session_id;
221 m_query_filters = db_initialize_query_filters(jcr);
224 * Put the db in the list.
226 if (db_list == NULL) {
227 db_list = New(dlist(this, &this->m_link));
229 db_list->append(this);
232 B_DB_INGRES::~B_DB_INGRES()
237 * Now actually open the database. This can generate errors,
238 * which are returned in the errmsg
240 * DO NOT close the database or delete mdb here !!!!
242 bool B_DB_INGRES::db_open_database(JCR *jcr)
253 if ((errstat=rwl_init(&m_lock)) != 0) {
255 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
256 be.bstrerror(errstat));
260 m_db_handle = INGconnectDB(m_db_name, m_db_user, m_db_password, m_session_id);
262 Dmsg0(50, "Ingres real CONNECT done\n");
263 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
264 m_db_password == NULL ? "(NULL)" : m_db_password);
267 Mmsg2(&errmsg, _("Unable to connect to Ingres server.\n"
268 "Database=%s User=%s\n"
269 "It is probably not running or your password is incorrect.\n"),
270 m_db_name, m_db_user);
276 INGsetDefaultLockingMode(m_db_handle);
278 if (!check_tables_version(jcr, this)) {
289 void B_DB_INGRES::db_close_database(JCR *jcr)
291 db_end_transaction(jcr);
294 if (m_ref_count == 0) {
296 db_list->remove(this);
297 if (m_connected && m_db_handle) {
298 INGdisconnectDB(m_db_handle);
300 if (m_query_filters) {
301 db_destroy_query_filters(m_query_filters);
303 rwl_destroy(&m_lock);
304 free_pool_memory(errmsg);
305 free_pool_memory(cmd);
306 free_pool_memory(cached_path);
307 free_pool_memory(fname);
308 free_pool_memory(path);
309 free_pool_memory(esc_name);
310 free_pool_memory(esc_path);
311 free_pool_memory(esc_obj);
325 if (db_list->size() == 0) {
333 void B_DB_INGRES::db_thread_cleanup(void)
338 * Escape strings so that Ingres is happy
340 * NOTE! len is the length of the old string. Your new
341 * string must be long enough (max 2*old+1) to hold
342 * the escaped output.
344 void B_DB_INGRES::db_escape_string(JCR *jcr, char *snew, char *old, int len)
371 * Escape binary so that Ingres is happy
373 * NOTE! Need to be implemented (escape \0)
376 char *B_DB_INGRES::db_escape_object(JCR *jcr, char *old, int len)
380 n = esc_obj = check_pool_memory_size(esc_obj, len*2+1);
404 * Unescape binary object so that Ingres is happy
406 * TODO: need to be implemented (escape \0)
408 void B_DB_INGRES::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
409 POOLMEM **dest, int32_t *dest_len)
416 *dest = check_pool_memory_size(*dest, expected_len+1);
417 *dest_len = expected_len;
418 memcpy(*dest, from, expected_len);
419 (*dest)[expected_len]=0;
423 * Start a transaction. This groups inserts and makes things
424 * much more efficient. Usually started when inserting
427 void B_DB_INGRES::db_start_transaction(JCR *jcr)
430 jcr->attr = get_pool_memory(PM_FNAME);
433 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
436 if (!m_allow_transactions) {
441 /* Allow only 25,000 changes per transaction */
442 if (m_transaction && changes > 25000) {
443 db_end_transaction(jcr);
445 if (!m_transaction) {
446 sql_query("BEGIN"); /* begin transaction */
447 Dmsg0(400, "Start Ingres transaction\n");
448 m_transaction = true;
453 void B_DB_INGRES::db_end_transaction(JCR *jcr)
455 if (jcr && jcr->cached_attribute) {
456 Dmsg0(400, "Flush last cached attribute.\n");
457 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
458 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
460 jcr->cached_attribute = false;
463 if (!m_allow_transactions) {
469 sql_query("COMMIT"); /* end transaction */
470 m_transaction = false;
471 Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
478 * Submit a general SQL command (cmd), and for each row returned,
479 * the result_handler is called with the ctx.
481 bool B_DB_INGRES::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
486 Dmsg1(500, "db_sql_query starts with %s\n", query);
489 if (!sql_query(query, QF_STORE_RESULT)) {
490 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
491 Dmsg0(500, "db_sql_query failed\n");
496 if (result_handler != NULL) {
497 Dmsg0(500, "db_sql_query invoking handler\n");
498 while ((row = sql_fetch_row()) != NULL) {
499 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
500 if (result_handler(ctx, m_num_fields, row))
506 Dmsg0(500, "db_sql_query finished\n");
514 * Note, if this routine returns false (failure), Bacula expects
515 * that no result has been stored.
517 * Returns: true on success
521 bool B_DB_INGRES::sql_query(const char *query, int flags)
525 char *dup_query, *new_query;
527 bool start_of_transaction = false;
528 bool end_of_transaction = false;
529 B_DB_RWRULE *rewrite_rule;
531 Dmsg1(500, "query starts with '%s'\n", query);
533 * We always make a private copy of the query as we are doing serious
534 * rewrites in this engine. When running the private copy through the
535 * different query filters we loose the orginal private copy so we
536 * first make a extra reference to it so we can free it on exit from the
539 dup_query = new_query = bstrdup(query);
542 * Iterate over the query string and perform any needed operations.
543 * We use a sliding window over the query string where bp points to
544 * the previous position in the query and cp to the current position
549 if ((cp = strchr(bp, ' ')) != NULL) {
553 if (!strncasecmp(bp, "BEGIN", 5)) {
555 * This is the start of a transaction.
556 * Inline copy the rest of the query over the BEGIN keyword.
563 start_of_transaction = true;
564 } else if (!strncasecmp(bp, "COMMIT", 6) && (cp == NULL || strncasecmp(cp, "PRESERVE", 8))) {
566 * This is the end of a transaction. We cannot check for just the COMMIT
567 * keyword as a DECLARE of an tempory table also has the word COMMIT in it
568 * but its followed by the word PRESERVE.
569 * Inline copy the rest of the query over the COMMIT keyword.
576 end_of_transaction = true;
580 * See what query filter might match.
582 foreach_alist(rewrite_rule, m_query_filters) {
583 if (!strncasecmp(bp, rewrite_rule->search_pattern, rewrite_rule->pattern_length)) {
584 rewrite_rule->trigger = true;
595 * Run the query through all query filters that apply e.g. have the trigger set in the
598 foreach_alist(rewrite_rule, m_query_filters) {
599 if (rewrite_rule->trigger) {
600 new_query = rewrite_rule->rewrite_regexp->replace(new_query);
601 rewrite_rule->trigger = false;
605 if (start_of_transaction) {
606 Dmsg0(500,"sql_query: Start of transaction\n");
607 m_explicit_commit = false;
611 * See if there is any query left after filtering for certain keywords.
614 while (bp != NULL && strlen(bp) > 0) {
616 * We are starting a new query. reset everything.
623 INGclear(m_result); /* hmm, someone forgot to free?? */
628 * See if this is a multi-statement query. We split a multi-statement query
629 * on the semi-column and feed the individual queries to the Ingres functions.
630 * We use a sliding window over the query string where bp points to
631 * the previous position in the query and cp to the current position
634 if ((cp = strchr(bp, ';')) != NULL) {
638 Dmsg1(500, "sql_query after rewrite continues with '%s'\n", bp);
641 * See if we got a store_result hint which could mean we are running a select.
642 * If flags has QF_STORE_RESULT not set we are sure its not a query that we
643 * need to store anything for.
645 if (flags & QF_STORE_RESULT) {
646 cols = INGgetCols(m_db_handle, bp, m_explicit_commit);
653 Dmsg0(500,"sql_query: neg.columns: no DML stmt!\n");
657 Dmsg0(500,"sql_query (non SELECT) starting...\n");
661 m_num_rows = INGexec(m_db_handle, bp, m_explicit_commit);
662 if (m_num_rows == -1) {
663 Dmsg0(500,"sql_query (non SELECT) went wrong\n");
667 Dmsg0(500,"sql_query (non SELECT) seems ok\n");
673 Dmsg0(500,"sql_query (SELECT) starting...\n");
674 m_result = INGquery(m_db_handle, bp, m_explicit_commit);
675 if (m_result != NULL) {
676 Dmsg0(500, "we have a result\n");
679 * How many fields in the set?
681 m_num_fields = (int)INGnfields(m_result);
682 Dmsg1(500, "we have %d fields\n", m_num_fields);
684 m_num_rows = INGntuples(m_result);
685 Dmsg1(500, "we have %d rows\n", m_num_rows);
687 Dmsg0(500, "No resultset...\n");
697 if (end_of_transaction) {
698 Dmsg0(500,"sql_query: End of transaction, commiting work\n");
699 m_explicit_commit = true;
700 INGcommit(m_db_handle);
704 Dmsg0(500, "sql_query finishing\n");
709 void B_DB_INGRES::sql_free_result(void)
724 m_num_rows = m_num_fields = 0;
728 SQL_ROW B_DB_INGRES::sql_fetch_row(void)
731 SQL_ROW row = NULL; /* by default, return NULL */
736 if (m_result->num_rows <= 0) {
740 Dmsg0(500, "sql_fetch_row start\n");
742 if (!m_rows || m_rows_size < m_num_fields) {
744 Dmsg0(500, "sql_fetch_row freeing space\n");
747 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
748 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
749 m_rows_size = m_num_fields;
752 * Now reset the row_number now that we have the space allocated
758 * If still within the result set
760 if (m_row_number < m_num_rows) {
761 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
763 * Get each value from this row
765 for (j = 0; j < m_num_fields; j++) {
766 m_rows[j] = INGgetvalue(m_result, m_row_number, j);
767 Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
770 * Increment the row number for the next call
776 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
779 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
784 const char *B_DB_INGRES::sql_strerror(void)
786 return INGerrorMessage(m_db_handle);
789 void B_DB_INGRES::sql_data_seek(int row)
792 * Set the row number to be returned on the next call to sql_fetch_row
797 int B_DB_INGRES::sql_affected_rows(void)
803 * First execute the insert query and then retrieve the currval.
804 * By setting transaction to true we make it an atomic transaction
805 * and as such we can get the currval after which we commit if
806 * transaction is false. This way things are an atomic operation
807 * for Ingres and things work. We save the current transaction status
808 * and set transaction in the mdb to true and at the end of this
809 * function we restore the actual transaction status.
811 uint64_t B_DB_INGRES::sql_insert_autokey_record(const char *query, const char *table_name)
814 char getkeyval_query[256];
817 bool current_explicit_commit;
820 * Save the current transaction status and pretend we are in a transaction.
822 current_explicit_commit = m_explicit_commit;
823 m_explicit_commit = false;
826 * Execute the INSERT query.
828 m_num_rows = INGexec(m_db_handle, query, m_explicit_commit);
829 if (m_num_rows == -1) {
836 * Obtain the current value of the sequence that
837 * provides the serial value for primary key of the table.
839 * currval is local to our session. It is not affected by
840 * other transactions.
842 * Determine the name of the sequence.
843 * As we name all sequences as <table>_seq this is easy.
845 bstrncpy(sequence, table_name, sizeof(sequence));
846 bstrncat(sequence, "_seq", sizeof(sequence));
848 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT %s.currval FROM %s", sequence, table_name);
854 m_result = INGquery(m_db_handle, getkeyval_query, m_explicit_commit);
857 Dmsg1(50, "Query failed: %s\n", getkeyval_query);
861 Dmsg0(500, "exec done");
863 currval = INGgetvalue(m_result, 0, 0);
865 id = str_to_uint64(currval);
873 * Restore the actual explicit_commit status.
875 m_explicit_commit = current_explicit_commit;
878 * Commit if explicit_commit is not set.
880 if (m_explicit_commit) {
881 INGcommit(m_db_handle);
887 SQL_FIELD *B_DB_INGRES::sql_fetch_field(void)
893 if (!m_fields || m_fields_size < m_num_fields) {
898 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
899 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
900 m_fields_size = m_num_fields;
902 for (i = 0; i < m_num_fields; i++) {
903 Dmsg1(500, "filling field %d\n", i);
904 m_fields[i].name = INGfname(m_result, i);
905 m_fields[i].type = INGftype(m_result, i);
906 m_fields[i].flags = 0;
909 * For a given column, find the max length.
912 for (j = 0; j < m_num_rows; j++) {
913 if (INGgetisnull(m_result, j, i)) {
914 this_length = 4; /* "NULL" */
916 this_length = cstrlen(INGgetvalue(m_result, j, i));
919 if (max_length < this_length) {
920 max_length = this_length;
923 m_fields[i].max_length = max_length;
925 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
926 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
931 * Increment field number for the next time around
933 return &m_fields[m_field_number++];
936 bool B_DB_INGRES::sql_field_is_not_null(int field_type)
938 switch (field_type) {
946 bool B_DB_INGRES::sql_field_is_numeric(int field_type)
949 * See ${II_SYSTEM}/ingres/files/eqsqlda.h for numeric types.
951 switch (field_type) {
962 * Escape strings so that Ingres is happy on COPY
964 * NOTE! len is the length of the old string. Your new
965 * string must be long enough (max 2*old+1) to hold
966 * the escaped output.
968 static char *ingres_copy_escape(char *dest, char *src, size_t len)
970 /* we have to escape \t, \n, \r, \ */
973 while (len > 0 && *src) {
1009 * Returns true if OK
1012 bool B_DB_INGRES::sql_batch_start(JCR *jcr)
1017 ok = sql_query("DECLARE GLOBAL TEMPORARY TABLE batch ("
1018 "FileIndex INTEGER,"
1020 "Path VARBYTE(32000),"
1021 "Name VARBYTE(32000),"
1022 "LStat VARBYTE(255),"
1024 "DeltaSeq SMALLINT)"
1025 " ON COMMIT PRESERVE ROWS WITH NORECOVERY");
1031 * Returns true if OK
1034 bool B_DB_INGRES::sql_batch_end(JCR *jcr, const char *error)
1041 * Returns true if OK
1044 bool B_DB_INGRES::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1050 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1051 db_escape_string(jcr, esc_name, fname, fnl);
1053 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1054 db_escape_string(jcr, esc_path, path, pnl);
1056 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1059 digest = ar->Digest;
1062 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1063 "(%u,%s,'%s','%s','%s','%s',%u)",
1064 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1065 esc_name, ar->attr, digest, ar->DeltaSeq);
1067 return sql_query(cmd);
1071 * Initialize database data structure. In principal this should
1072 * never have errors, or it is really fatal.
1074 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1075 const char *db_password, const char *db_address, int db_port,
1076 const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1078 B_DB_INGRES *mdb = NULL;
1081 Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
1085 P(mutex); /* lock DB queue */
1086 if (db_list && !mult_db_connections) {
1088 * Look to see if DB already open
1090 foreach_dlist(mdb, db_list) {
1091 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1092 Dmsg1(100, "DB REopen %s\n", db_name);
1093 mdb->increment_refcount();
1099 Dmsg0(100, "db_init_database first time\n");
1100 mdb = New(B_DB_INGRES(jcr, db_driver, db_name, db_user, db_password, db_address,
1101 db_port, db_socket, mult_db_connections, disable_batch_insert));
1107 #endif /* HAVE_INGRES */