2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to Ingres
30 * These are Ingres specific routines
32 * Stefan Reddig, June 2009 with help of Marco van Wieringen April 2010
33 * based uopn work done
34 * by Dan Langille, December 2003 and
35 * by Kern Sibbald, March 2000
37 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
47 #include "bdb_ingres.h"
50 /* -----------------------------------------------------------------------
52 * Ingres dependent defines and subroutines
54 * -----------------------------------------------------------------------
58 * List of open databases.
60 static dlist *db_list = NULL;
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
67 BREGEXP *rewrite_regexp;
72 * Create a new query filter.
74 static bool db_allocate_query_filter(JCR *jcr, alist *query_filters, int pattern_length,
75 const char *search_pattern, const char *filter)
77 B_DB_RWRULE *rewrite_rule;
79 rewrite_rule = (B_DB_RWRULE *)malloc(sizeof(B_DB_RWRULE));
81 rewrite_rule->pattern_length = pattern_length;
82 rewrite_rule->search_pattern = bstrdup(search_pattern);
83 rewrite_rule->rewrite_regexp = new_bregexp(filter);
84 rewrite_rule->trigger = false;
86 if (!rewrite_rule->rewrite_regexp) {
87 Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filter.\n"));
88 free(rewrite_rule->search_pattern);
92 query_filters->append(rewrite_rule);
98 * Create a stack of all filters that should be applied to a SQL query
99 * before submitting it to the database backend.
101 static inline alist *db_initialize_query_filters(JCR *jcr)
103 alist *query_filters;
105 query_filters = New(alist(10, not_owned_by_alist));
107 if (!query_filters) {
108 Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filters.\n"));
112 db_allocate_query_filter(jcr, query_filters, 6, "OFFSET",
113 "/LIMIT ([0-9]+) OFFSET ([0-9]+)/OFFSET $2 FETCH NEXT $1 ROWS ONLY/ig");
114 db_allocate_query_filter(jcr, query_filters, 5, "LIMIT",
115 "/LIMIT ([0-9]+)/FETCH FIRST $1 ROWS ONLY/ig");
116 db_allocate_query_filter(jcr, query_filters, 9, "TEMPORARY",
117 "/CREATE TEMPORARY TABLE (.+)/DECLARE GLOBAL TEMPORARY TABLE $1 ON COMMIT PRESERVE ROWS WITH NORECOVERY/i");
119 return query_filters;
123 * Free all query filters.
125 static inline void db_destroy_query_filters(alist *query_filters)
127 B_DB_RWRULE *rewrite_rule;
129 foreach_alist(rewrite_rule, query_filters) {
130 free_bregexp(rewrite_rule->rewrite_regexp);
131 free(rewrite_rule->search_pattern);
135 delete query_filters;
138 B_DB_INGRES::B_DB_INGRES(JCR *jcr,
139 const char *db_driver,
142 const char *db_password,
143 const char *db_address,
145 const char *db_socket,
146 bool mult_db_connections,
147 bool disable_batch_insert)
150 int next_session_id = 0;
153 * See what the next available session_id is.
154 * We first see what the highest session_id is used now.
157 foreach_dlist(mdb, db_list) {
158 if (mdb->m_session_id > next_session_id) {
159 next_session_id = mdb->m_session_id;
165 * Initialize the parent class members.
167 m_db_interface_type = SQL_INTERFACE_TYPE_INGRES;
168 m_db_type = SQL_TYPE_INGRES;
169 m_db_driver = bstrdup("ingres");
170 m_db_name = bstrdup(db_name);
171 m_db_user = bstrdup(db_user);
173 m_db_password = bstrdup(db_password);
176 m_db_address = bstrdup(db_address);
179 m_db_socket = bstrdup(db_socket);
182 if (disable_batch_insert) {
183 m_disabled_batch_insert = true;
184 m_have_batch_insert = false;
186 m_disabled_batch_insert = false;
187 #if defined(USE_BATCH_FILE_INSERT)
188 m_have_batch_insert = true;
190 m_have_batch_insert = false;
194 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
196 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
197 cached_path = get_pool_memory(PM_FNAME);
200 fname = get_pool_memory(PM_FNAME);
201 path = get_pool_memory(PM_FNAME);
202 esc_name = get_pool_memory(PM_FNAME);
203 esc_path = get_pool_memory(PM_FNAME);
204 esc_obj = get_pool_memory(PM_FNAME);
205 m_allow_transactions = mult_db_connections;
208 * Initialize the private members.
212 m_explicit_commit = true;
213 m_session_id = ++next_session_id;
214 m_query_filters = db_initialize_query_filters(jcr);
217 * Put the db in the list.
219 if (db_list == NULL) {
220 db_list = New(dlist(this, &this->m_link));
222 db_list->append(this);
225 B_DB_INGRES::~B_DB_INGRES()
230 * Now actually open the database. This can generate errors,
231 * which are returned in the errmsg
233 * DO NOT close the database or delete mdb here !!!!
235 bool B_DB_INGRES::db_open_database(JCR *jcr)
246 if ((errstat=rwl_init(&m_lock)) != 0) {
248 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
249 be.bstrerror(errstat));
253 m_db_handle = INGconnectDB(m_db_name, m_db_user, m_db_password, m_session_id);
255 Dmsg0(50, "Ingres real CONNECT done\n");
256 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
257 m_db_password == NULL ? "(NULL)" : m_db_password);
260 Mmsg2(&errmsg, _("Unable to connect to Ingres server.\n"
261 "Database=%s User=%s\n"
262 "It is probably not running or your password is incorrect.\n"),
263 m_db_name, m_db_user);
269 INGsetDefaultLockingMode(m_db_handle);
271 if (!check_tables_version(jcr, this)) {
282 void B_DB_INGRES::db_close_database(JCR *jcr)
284 db_end_transaction(jcr);
287 if (m_ref_count == 0) {
289 db_list->remove(this);
290 if (m_connected && m_db_handle) {
291 INGdisconnectDB(m_db_handle);
293 if (m_query_filters) {
294 db_destroy_query_filters(m_query_filters);
296 rwl_destroy(&m_lock);
297 free_pool_memory(errmsg);
298 free_pool_memory(cmd);
299 free_pool_memory(cached_path);
300 free_pool_memory(fname);
301 free_pool_memory(path);
302 free_pool_memory(esc_name);
303 free_pool_memory(esc_path);
304 free_pool_memory(esc_obj);
318 if (db_list->size() == 0) {
326 void B_DB_INGRES::db_thread_cleanup(void)
331 * Escape strings so that Ingres is happy
333 * NOTE! len is the length of the old string. Your new
334 * string must be long enough (max 2*old+1) to hold
335 * the escaped output.
337 void B_DB_INGRES::db_escape_string(JCR *jcr, char *snew, char *old, int len)
364 * Escape binary so that Ingres is happy
366 * NOTE! Need to be implemented (escape \0)
369 char *B_DB_INGRES::db_escape_object(JCR *jcr, char *old, int len)
373 n = esc_obj = check_pool_memory_size(esc_obj, len*2+1);
397 * Unescape binary object so that Ingres is happy
399 * TODO: need to be implemented (escape \0)
401 void B_DB_INGRES::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
402 POOLMEM **dest, int32_t *dest_len)
409 *dest = check_pool_memory_size(*dest, expected_len+1);
410 *dest_len = expected_len;
411 memcpy(*dest, from, expected_len);
412 (*dest)[expected_len]=0;
416 * Start a transaction. This groups inserts and makes things
417 * much more efficient. Usually started when inserting
420 void B_DB_INGRES::db_start_transaction(JCR *jcr)
423 jcr->attr = get_pool_memory(PM_FNAME);
426 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
429 if (!m_allow_transactions) {
434 /* Allow only 25,000 changes per transaction */
435 if (m_transaction && changes > 25000) {
436 db_end_transaction(jcr);
438 if (!m_transaction) {
439 sql_query("BEGIN"); /* begin transaction */
440 Dmsg0(400, "Start Ingres transaction\n");
441 m_transaction = true;
446 void B_DB_INGRES::db_end_transaction(JCR *jcr)
448 if (jcr && jcr->cached_attribute) {
449 Dmsg0(400, "Flush last cached attribute.\n");
450 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
451 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
453 jcr->cached_attribute = false;
456 if (!m_allow_transactions) {
462 sql_query("COMMIT"); /* end transaction */
463 m_transaction = false;
464 Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
471 * Submit a general SQL command (cmd), and for each row returned,
472 * the result_handler is called with the ctx.
474 bool B_DB_INGRES::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
479 Dmsg1(500, "db_sql_query starts with %s\n", query);
482 if (!sql_query(query, QF_STORE_RESULT)) {
483 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
484 Dmsg0(500, "db_sql_query failed\n");
489 if (result_handler != NULL) {
490 Dmsg0(500, "db_sql_query invoking handler\n");
491 while ((row = sql_fetch_row()) != NULL) {
492 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
493 if (result_handler(ctx, m_num_fields, row))
499 Dmsg0(500, "db_sql_query finished\n");
507 * Note, if this routine returns false (failure), Bacula expects
508 * that no result has been stored.
510 * Returns: true on success
514 bool B_DB_INGRES::sql_query(const char *query, int flags)
518 char *dup_query, *new_query;
520 bool start_of_transaction = false;
521 bool end_of_transaction = false;
522 B_DB_RWRULE *rewrite_rule;
524 Dmsg1(500, "query starts with '%s'\n", query);
526 * We always make a private copy of the query as we are doing serious
527 * rewrites in this engine. When running the private copy through the
528 * different query filters we loose the orginal private copy so we
529 * first make a extra reference to it so we can free it on exit from the
532 dup_query = new_query = bstrdup(query);
535 * Iterate over the query string and perform any needed operations.
536 * We use a sliding window over the query string where bp points to
537 * the previous position in the query and cp to the current position
542 if ((cp = strchr(bp, ' ')) != NULL) {
546 if (!strncasecmp(bp, "BEGIN", 5)) {
548 * This is the start of a transaction.
549 * Inline copy the rest of the query over the BEGIN keyword.
556 start_of_transaction = true;
557 } else if (!strncasecmp(bp, "COMMIT", 6) && (cp == NULL || strncasecmp(cp, "PRESERVE", 8))) {
559 * This is the end of a transaction. We cannot check for just the COMMIT
560 * keyword as a DECLARE of an tempory table also has the word COMMIT in it
561 * but its followed by the word PRESERVE.
562 * Inline copy the rest of the query over the COMMIT keyword.
569 end_of_transaction = true;
573 * See what query filter might match.
575 foreach_alist(rewrite_rule, m_query_filters) {
576 if (!strncasecmp(bp, rewrite_rule->search_pattern, rewrite_rule->pattern_length)) {
577 rewrite_rule->trigger = true;
588 * Run the query through all query filters that apply e.g. have the trigger set in the
591 foreach_alist(rewrite_rule, m_query_filters) {
592 if (rewrite_rule->trigger) {
593 new_query = rewrite_rule->rewrite_regexp->replace(new_query);
594 rewrite_rule->trigger = false;
598 if (start_of_transaction) {
599 Dmsg0(500,"sql_query: Start of transaction\n");
600 m_explicit_commit = false;
604 * See if there is any query left after filtering for certain keywords.
607 while (bp != NULL && strlen(bp) > 0) {
609 * We are starting a new query. reset everything.
616 INGclear(m_result); /* hmm, someone forgot to free?? */
621 * See if this is a multi-statement query. We split a multi-statement query
622 * on the semi-column and feed the individual queries to the Ingres functions.
623 * We use a sliding window over the query string where bp points to
624 * the previous position in the query and cp to the current position
627 if ((cp = strchr(bp, ';')) != NULL) {
631 Dmsg1(500, "sql_query after rewrite continues with '%s'\n", bp);
634 * See if we got a store_result hint which could mean we are running a select.
635 * If flags has QF_STORE_RESULT not set we are sure its not a query that we
636 * need to store anything for.
638 if (flags & QF_STORE_RESULT) {
639 cols = INGgetCols(m_db_handle, bp, m_explicit_commit);
646 Dmsg0(500,"sql_query: neg.columns: no DML stmt!\n");
650 Dmsg0(500,"sql_query (non SELECT) starting...\n");
654 m_num_rows = INGexec(m_db_handle, bp, m_explicit_commit);
655 if (m_num_rows == -1) {
656 Dmsg0(500,"sql_query (non SELECT) went wrong\n");
660 Dmsg0(500,"sql_query (non SELECT) seems ok\n");
666 Dmsg0(500,"sql_query (SELECT) starting...\n");
667 m_result = INGquery(m_db_handle, bp, m_explicit_commit);
668 if (m_result != NULL) {
669 Dmsg0(500, "we have a result\n");
672 * How many fields in the set?
674 m_num_fields = (int)INGnfields(m_result);
675 Dmsg1(500, "we have %d fields\n", m_num_fields);
677 m_num_rows = INGntuples(m_result);
678 Dmsg1(500, "we have %d rows\n", m_num_rows);
680 Dmsg0(500, "No resultset...\n");
690 if (end_of_transaction) {
691 Dmsg0(500,"sql_query: End of transaction, commiting work\n");
692 m_explicit_commit = true;
693 INGcommit(m_db_handle);
697 Dmsg0(500, "sql_query finishing\n");
702 void B_DB_INGRES::sql_free_result(void)
717 m_num_rows = m_num_fields = 0;
721 SQL_ROW B_DB_INGRES::sql_fetch_row(void)
724 SQL_ROW row = NULL; /* by default, return NULL */
729 if (m_result->num_rows <= 0) {
733 Dmsg0(500, "sql_fetch_row start\n");
735 if (!m_rows || m_rows_size < m_num_fields) {
737 Dmsg0(500, "sql_fetch_row freeing space\n");
740 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
741 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
742 m_rows_size = m_num_fields;
745 * Now reset the row_number now that we have the space allocated
751 * If still within the result set
753 if (m_row_number < m_num_rows) {
754 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
756 * Get each value from this row
758 for (j = 0; j < m_num_fields; j++) {
759 m_rows[j] = INGgetvalue(m_result, m_row_number, j);
760 Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
763 * Increment the row number for the next call
769 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
772 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
777 const char *B_DB_INGRES::sql_strerror(void)
779 return INGerrorMessage(m_db_handle);
782 void B_DB_INGRES::sql_data_seek(int row)
785 * Set the row number to be returned on the next call to sql_fetch_row
790 int B_DB_INGRES::sql_affected_rows(void)
796 * First execute the insert query and then retrieve the currval.
797 * By setting transaction to true we make it an atomic transaction
798 * and as such we can get the currval after which we commit if
799 * transaction is false. This way things are an atomic operation
800 * for Ingres and things work. We save the current transaction status
801 * and set transaction in the mdb to true and at the end of this
802 * function we restore the actual transaction status.
804 uint64_t B_DB_INGRES::sql_insert_autokey_record(const char *query, const char *table_name)
807 char getkeyval_query[256];
810 bool current_explicit_commit;
813 * Save the current transaction status and pretend we are in a transaction.
815 current_explicit_commit = m_explicit_commit;
816 m_explicit_commit = false;
819 * Execute the INSERT query.
821 m_num_rows = INGexec(m_db_handle, query, m_explicit_commit);
822 if (m_num_rows == -1) {
829 * Obtain the current value of the sequence that
830 * provides the serial value for primary key of the table.
832 * currval is local to our session. It is not affected by
833 * other transactions.
835 * Determine the name of the sequence.
836 * As we name all sequences as <table>_seq this is easy.
838 bstrncpy(sequence, table_name, sizeof(sequence));
839 bstrncat(sequence, "_seq", sizeof(sequence));
841 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT %s.currval FROM %s", sequence, table_name);
847 m_result = INGquery(m_db_handle, getkeyval_query, m_explicit_commit);
850 Dmsg1(50, "Query failed: %s\n", getkeyval_query);
854 Dmsg0(500, "exec done");
856 currval = INGgetvalue(m_result, 0, 0);
858 id = str_to_uint64(currval);
866 * Restore the actual explicit_commit status.
868 m_explicit_commit = current_explicit_commit;
871 * Commit if explicit_commit is not set.
873 if (m_explicit_commit) {
874 INGcommit(m_db_handle);
880 SQL_FIELD *B_DB_INGRES::sql_fetch_field(void)
886 if (!m_fields || m_fields_size < m_num_fields) {
891 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
892 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
893 m_fields_size = m_num_fields;
895 for (i = 0; i < m_num_fields; i++) {
896 Dmsg1(500, "filling field %d\n", i);
897 m_fields[i].name = INGfname(m_result, i);
898 m_fields[i].type = INGftype(m_result, i);
899 m_fields[i].flags = 0;
902 * For a given column, find the max length.
905 for (j = 0; j < m_num_rows; j++) {
906 if (INGgetisnull(m_result, j, i)) {
907 this_length = 4; /* "NULL" */
909 this_length = cstrlen(INGgetvalue(m_result, j, i));
912 if (max_length < this_length) {
913 max_length = this_length;
916 m_fields[i].max_length = max_length;
918 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
919 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
924 * Increment field number for the next time around
926 return &m_fields[m_field_number++];
929 bool B_DB_INGRES::sql_field_is_not_null(int field_type)
931 switch (field_type) {
939 bool B_DB_INGRES::sql_field_is_numeric(int field_type)
942 * See ${II_SYSTEM}/ingres/files/eqsqlda.h for numeric types.
944 switch (field_type) {
955 * Escape strings so that Ingres is happy on COPY
957 * NOTE! len is the length of the old string. Your new
958 * string must be long enough (max 2*old+1) to hold
959 * the escaped output.
961 static char *ingres_copy_escape(char *dest, char *src, size_t len)
963 /* we have to escape \t, \n, \r, \ */
966 while (len > 0 && *src) {
1002 * Returns true if OK
1005 bool B_DB_INGRES::sql_batch_start(JCR *jcr)
1010 ok = sql_query("DECLARE GLOBAL TEMPORARY TABLE batch ("
1011 "FileIndex INTEGER,"
1013 "Path VARBYTE(32000),"
1014 "Name VARBYTE(32000),"
1015 "LStat VARBYTE(255),"
1017 "DeltaSeq SMALLINT)"
1018 " ON COMMIT PRESERVE ROWS WITH NORECOVERY");
1024 * Returns true if OK
1027 bool B_DB_INGRES::sql_batch_end(JCR *jcr, const char *error)
1034 * Returns true if OK
1037 bool B_DB_INGRES::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1043 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1044 db_escape_string(jcr, esc_name, fname, fnl);
1046 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1047 db_escape_string(jcr, esc_path, path, pnl);
1049 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1052 digest = ar->Digest;
1055 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1056 "(%u,%s,'%s','%s','%s','%s',%u)",
1057 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1058 esc_name, ar->attr, digest, ar->DeltaSeq);
1060 return sql_query(cmd);
1064 * Initialize database data structure. In principal this should
1065 * never have errors, or it is really fatal.
1067 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1068 const char *db_password, const char *db_address, int db_port,
1069 const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1071 B_DB_INGRES *mdb = NULL;
1074 Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
1078 P(mutex); /* lock DB queue */
1079 if (db_list && !mult_db_connections) {
1081 * Look to see if DB already open
1083 foreach_dlist(mdb, db_list) {
1084 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1085 Dmsg1(100, "DB REopen %s\n", db_name);
1086 mdb->increment_refcount();
1092 Dmsg0(100, "db_init_database first time\n");
1093 mdb = New(B_DB_INGRES(jcr, db_driver, db_name, db_user, db_password, db_address,
1094 db_port, db_socket, mult_db_connections, disable_batch_insert));
1100 #endif /* HAVE_INGRES */