2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to Ingres
30 * These are Ingres specific routines
32 * Stefan Reddig, June 2009 with help of Marco van Wieringen April 2010
33 * based uopn work done
34 * by Dan Langille, December 2003 and
35 * by Kern Sibbald, March 2000
37 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
47 #include "bdb_ingres.h"
50 /* -----------------------------------------------------------------------
52 * Ingres dependent defines and subroutines
54 * -----------------------------------------------------------------------
58 * List of open databases.
60 static dlist *db_list = NULL;
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
67 BREGEXP *rewrite_regexp;
72 * Create a new query filter.
74 static bool db_allocate_query_filter(JCR *jcr, alist *query_filters, int pattern_length,
75 const char *search_pattern, const char *filter)
77 B_DB_RWRULE *rewrite_rule;
79 rewrite_rule = (B_DB_RWRULE *)malloc(sizeof(B_DB_RWRULE));
81 rewrite_rule->pattern_length = pattern_length;
82 rewrite_rule->search_pattern = bstrdup(search_pattern);
83 rewrite_rule->rewrite_regexp = new_bregexp(filter);
84 rewrite_rule->trigger = false;
86 if (!rewrite_rule->rewrite_regexp) {
87 Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filter.\n"));
88 free(rewrite_rule->search_pattern);
92 query_filters->append(rewrite_rule);
98 * Create a stack of all filters that should be applied to a SQL query
99 * before submitting it to the database backend.
101 static inline alist *db_initialize_query_filters(JCR *jcr)
103 alist *query_filters;
105 query_filters = New(alist(10, not_owned_by_alist));
107 if (!query_filters) {
108 Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filters.\n"));
112 db_allocate_query_filter(jcr, query_filters, 6, "OFFSET",
113 "/LIMIT ([0-9]+) OFFSET ([0-9]+)/OFFSET $2 FETCH NEXT $1 ROWS ONLY/ig");
114 db_allocate_query_filter(jcr, query_filters, 5, "LIMIT",
115 "/LIMIT ([0-9]+)/FETCH FIRST $1 ROWS ONLY/ig");
116 db_allocate_query_filter(jcr, query_filters, 9, "TEMPORARY",
117 "/CREATE TEMPORARY TABLE (.+)/DECLARE GLOBAL TEMPORARY TABLE $1 ON COMMIT PRESERVE ROWS WITH NORECOVERY/i");
119 return query_filters;
123 * Free all query filters.
125 static inline void db_destroy_query_filters(alist *query_filters)
127 B_DB_RWRULE *rewrite_rule;
129 foreach_alist(rewrite_rule, query_filters) {
130 free_bregexp(rewrite_rule->rewrite_regexp);
131 free(rewrite_rule->search_pattern);
135 delete query_filters;
138 B_DB_INGRES::B_DB_INGRES(JCR *jcr,
139 const char *db_driver,
142 const char *db_password,
143 const char *db_address,
145 const char *db_socket,
146 bool mult_db_connections,
147 bool disable_batch_insert)
150 int next_session_id = 0;
153 * See what the next available session_id is.
154 * We first see what the highest session_id is used now.
157 foreach_dlist(mdb, db_list) {
158 if (mdb->m_session_id > next_session_id) {
159 next_session_id = mdb->m_session_id;
165 * Initialize the parent class members.
167 m_db_interface_type = SQL_INTERFACE_TYPE_INGRES;
168 m_db_type = SQL_TYPE_INGRES;
169 m_db_driver = bstrdup("ingres");
170 m_db_name = bstrdup(db_name);
171 m_db_user = bstrdup(db_user);
173 m_db_password = bstrdup(db_password);
176 m_db_address = bstrdup(db_address);
179 m_db_socket = bstrdup(db_socket);
182 if (disable_batch_insert) {
183 m_disabled_batch_insert = true;
184 m_have_batch_insert = false;
186 m_disabled_batch_insert = false;
187 #if defined(USE_BATCH_FILE_INSERT)
188 m_have_batch_insert = true;
190 m_have_batch_insert = false;
194 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
196 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
197 cached_path = get_pool_memory(PM_FNAME);
200 fname = get_pool_memory(PM_FNAME);
201 path = get_pool_memory(PM_FNAME);
202 esc_name = get_pool_memory(PM_FNAME);
203 esc_path = get_pool_memory(PM_FNAME);
204 esc_obj = get_pool_memory(PM_FNAME);
205 m_allow_transactions = mult_db_connections;
207 /* At this time, when mult_db_connections == true, this is for
208 * specific console command such as bvfs or batch mode, and we don't
209 * want to share a batch mode or bvfs. In the future, we can change
210 * the creation function to add this parameter.
212 m_dedicated = mult_db_connections;
215 * Initialize the private members.
219 m_explicit_commit = true;
220 m_session_id = ++next_session_id;
221 m_query_filters = db_initialize_query_filters(jcr);
224 * Put the db in the list.
226 if (db_list == NULL) {
227 db_list = New(dlist(this, &this->m_link));
229 db_list->append(this);
232 B_DB_INGRES::~B_DB_INGRES()
237 * Now actually open the database. This can generate errors,
238 * which are returned in the errmsg
240 * DO NOT close the database or delete mdb here !!!!
242 bool B_DB_INGRES::db_open_database(JCR *jcr)
253 if ((errstat=rwl_init(&m_lock)) != 0) {
255 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
256 be.bstrerror(errstat));
260 m_db_handle = INGconnectDB(m_db_name, m_db_user, m_db_password, m_session_id);
262 Dmsg0(50, "Ingres real CONNECT done\n");
263 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
264 m_db_password == NULL ? "(NULL)" : m_db_password);
267 Mmsg2(&errmsg, _("Unable to connect to Ingres server.\n"
268 "Database=%s User=%s\n"
269 "It is probably not running or your password is incorrect.\n"),
270 m_db_name, m_db_user);
276 INGsetDefaultLockingMode(m_db_handle);
278 if (!check_tables_version(jcr, this)) {
289 void B_DB_INGRES::db_close_database(JCR *jcr)
292 db_end_transaction(jcr);
296 if (m_ref_count == 0) {
300 db_list->remove(this);
301 if (m_connected && m_db_handle) {
302 INGdisconnectDB(m_db_handle);
304 if (m_query_filters) {
305 db_destroy_query_filters(m_query_filters);
307 if (rwl_is_init(&m_lock)) {
308 rwl_destroy(&m_lock);
310 free_pool_memory(errmsg);
311 free_pool_memory(cmd);
312 free_pool_memory(cached_path);
313 free_pool_memory(fname);
314 free_pool_memory(path);
315 free_pool_memory(esc_name);
316 free_pool_memory(esc_path);
317 free_pool_memory(esc_obj);
331 if (db_list->size() == 0) {
339 void B_DB_INGRES::db_thread_cleanup(void)
344 * Escape strings so that Ingres is happy
346 * NOTE! len is the length of the old string. Your new
347 * string must be long enough (max 2*old+1) to hold
348 * the escaped output.
350 void B_DB_INGRES::db_escape_string(JCR *jcr, char *snew, char *old, int len)
377 * Escape binary so that Ingres is happy
379 * NOTE! Need to be implemented (escape \0)
382 char *B_DB_INGRES::db_escape_object(JCR *jcr, char *old, int len)
386 n = esc_obj = check_pool_memory_size(esc_obj, len*2+1);
410 * Unescape binary object so that Ingres is happy
412 * TODO: need to be implemented (escape \0)
414 void B_DB_INGRES::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
415 POOLMEM **dest, int32_t *dest_len)
422 *dest = check_pool_memory_size(*dest, expected_len+1);
423 *dest_len = expected_len;
424 memcpy(*dest, from, expected_len);
425 (*dest)[expected_len]=0;
429 * Start a transaction. This groups inserts and makes things
430 * much more efficient. Usually started when inserting
433 void B_DB_INGRES::db_start_transaction(JCR *jcr)
436 jcr->attr = get_pool_memory(PM_FNAME);
439 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
442 if (!m_allow_transactions) {
447 /* Allow only 25,000 changes per transaction */
448 if (m_transaction && changes > 25000) {
449 db_end_transaction(jcr);
451 if (!m_transaction) {
452 sql_query("BEGIN"); /* begin transaction */
453 Dmsg0(400, "Start Ingres transaction\n");
454 m_transaction = true;
459 void B_DB_INGRES::db_end_transaction(JCR *jcr)
461 if (jcr && jcr->cached_attribute) {
462 Dmsg0(400, "Flush last cached attribute.\n");
463 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
464 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
466 jcr->cached_attribute = false;
469 if (!m_allow_transactions) {
475 sql_query("COMMIT"); /* end transaction */
476 m_transaction = false;
477 Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
484 * Submit a general SQL command (cmd), and for each row returned,
485 * the result_handler is called with the ctx.
487 bool B_DB_INGRES::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
492 Dmsg1(500, "db_sql_query starts with %s\n", query);
495 if (!sql_query(query, QF_STORE_RESULT)) {
496 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
497 Dmsg0(500, "db_sql_query failed\n");
502 if (result_handler != NULL) {
503 Dmsg0(500, "db_sql_query invoking handler\n");
504 while ((row = sql_fetch_row()) != NULL) {
505 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
506 if (result_handler(ctx, m_num_fields, row))
512 Dmsg0(500, "db_sql_query finished\n");
520 * Note, if this routine returns false (failure), Bacula expects
521 * that no result has been stored.
523 * Returns: true on success
527 bool B_DB_INGRES::sql_query(const char *query, int flags)
531 char *dup_query, *new_query;
533 bool start_of_transaction = false;
534 bool end_of_transaction = false;
535 B_DB_RWRULE *rewrite_rule;
537 Dmsg1(500, "query starts with '%s'\n", query);
539 * We always make a private copy of the query as we are doing serious
540 * rewrites in this engine. When running the private copy through the
541 * different query filters we loose the orginal private copy so we
542 * first make a extra reference to it so we can free it on exit from the
545 dup_query = new_query = bstrdup(query);
548 * Iterate over the query string and perform any needed operations.
549 * We use a sliding window over the query string where bp points to
550 * the previous position in the query and cp to the current position
555 if ((cp = strchr(bp, ' ')) != NULL) {
559 if (!strncasecmp(bp, "BEGIN", 5)) {
561 * This is the start of a transaction.
562 * Inline copy the rest of the query over the BEGIN keyword.
569 start_of_transaction = true;
570 } else if (!strncasecmp(bp, "COMMIT", 6) && (cp == NULL || strncasecmp(cp, "PRESERVE", 8))) {
572 * This is the end of a transaction. We cannot check for just the COMMIT
573 * keyword as a DECLARE of an tempory table also has the word COMMIT in it
574 * but its followed by the word PRESERVE.
575 * Inline copy the rest of the query over the COMMIT keyword.
582 end_of_transaction = true;
586 * See what query filter might match.
588 foreach_alist(rewrite_rule, m_query_filters) {
589 if (!strncasecmp(bp, rewrite_rule->search_pattern, rewrite_rule->pattern_length)) {
590 rewrite_rule->trigger = true;
601 * Run the query through all query filters that apply e.g. have the trigger set in the
604 foreach_alist(rewrite_rule, m_query_filters) {
605 if (rewrite_rule->trigger) {
606 new_query = rewrite_rule->rewrite_regexp->replace(new_query);
607 rewrite_rule->trigger = false;
611 if (start_of_transaction) {
612 Dmsg0(500,"sql_query: Start of transaction\n");
613 m_explicit_commit = false;
617 * See if there is any query left after filtering for certain keywords.
620 while (bp != NULL && strlen(bp) > 0) {
622 * We are starting a new query. reset everything.
629 INGclear(m_result); /* hmm, someone forgot to free?? */
634 * See if this is a multi-statement query. We split a multi-statement query
635 * on the semi-column and feed the individual queries to the Ingres functions.
636 * We use a sliding window over the query string where bp points to
637 * the previous position in the query and cp to the current position
640 if ((cp = strchr(bp, ';')) != NULL) {
644 Dmsg1(500, "sql_query after rewrite continues with '%s'\n", bp);
647 * See if we got a store_result hint which could mean we are running a select.
648 * If flags has QF_STORE_RESULT not set we are sure its not a query that we
649 * need to store anything for.
651 if (flags & QF_STORE_RESULT) {
652 cols = INGgetCols(m_db_handle, bp, m_explicit_commit);
659 Dmsg0(500,"sql_query: neg.columns: no DML stmt!\n");
663 Dmsg0(500,"sql_query (non SELECT) starting...\n");
667 m_num_rows = INGexec(m_db_handle, bp, m_explicit_commit);
668 if (m_num_rows == -1) {
669 Dmsg0(500,"sql_query (non SELECT) went wrong\n");
673 Dmsg0(500,"sql_query (non SELECT) seems ok\n");
679 Dmsg0(500,"sql_query (SELECT) starting...\n");
680 m_result = INGquery(m_db_handle, bp, m_explicit_commit);
681 if (m_result != NULL) {
682 Dmsg0(500, "we have a result\n");
685 * How many fields in the set?
687 m_num_fields = (int)INGnfields(m_result);
688 Dmsg1(500, "we have %d fields\n", m_num_fields);
690 m_num_rows = INGntuples(m_result);
691 Dmsg1(500, "we have %d rows\n", m_num_rows);
693 Dmsg0(500, "No resultset...\n");
703 if (end_of_transaction) {
704 Dmsg0(500,"sql_query: End of transaction, commiting work\n");
705 m_explicit_commit = true;
706 INGcommit(m_db_handle);
710 Dmsg0(500, "sql_query finishing\n");
715 void B_DB_INGRES::sql_free_result(void)
730 m_num_rows = m_num_fields = 0;
734 SQL_ROW B_DB_INGRES::sql_fetch_row(void)
737 SQL_ROW row = NULL; /* by default, return NULL */
742 if (m_result->num_rows <= 0) {
746 Dmsg0(500, "sql_fetch_row start\n");
748 if (!m_rows || m_rows_size < m_num_fields) {
750 Dmsg0(500, "sql_fetch_row freeing space\n");
753 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
754 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
755 m_rows_size = m_num_fields;
758 * Now reset the row_number now that we have the space allocated
764 * If still within the result set
766 if (m_row_number < m_num_rows) {
767 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
769 * Get each value from this row
771 for (j = 0; j < m_num_fields; j++) {
772 m_rows[j] = INGgetvalue(m_result, m_row_number, j);
773 Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
776 * Increment the row number for the next call
782 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
785 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
790 const char *B_DB_INGRES::sql_strerror(void)
792 return INGerrorMessage(m_db_handle);
795 void B_DB_INGRES::sql_data_seek(int row)
798 * Set the row number to be returned on the next call to sql_fetch_row
803 int B_DB_INGRES::sql_affected_rows(void)
809 * First execute the insert query and then retrieve the currval.
810 * By setting transaction to true we make it an atomic transaction
811 * and as such we can get the currval after which we commit if
812 * transaction is false. This way things are an atomic operation
813 * for Ingres and things work. We save the current transaction status
814 * and set transaction in the mdb to true and at the end of this
815 * function we restore the actual transaction status.
817 uint64_t B_DB_INGRES::sql_insert_autokey_record(const char *query, const char *table_name)
820 char getkeyval_query[256];
823 bool current_explicit_commit;
826 * Save the current transaction status and pretend we are in a transaction.
828 current_explicit_commit = m_explicit_commit;
829 m_explicit_commit = false;
832 * Execute the INSERT query.
834 m_num_rows = INGexec(m_db_handle, query, m_explicit_commit);
835 if (m_num_rows == -1) {
842 * Obtain the current value of the sequence that
843 * provides the serial value for primary key of the table.
845 * currval is local to our session. It is not affected by
846 * other transactions.
848 * Determine the name of the sequence.
849 * As we name all sequences as <table>_seq this is easy.
851 bstrncpy(sequence, table_name, sizeof(sequence));
852 bstrncat(sequence, "_seq", sizeof(sequence));
854 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT %s.currval FROM %s", sequence, table_name);
860 m_result = INGquery(m_db_handle, getkeyval_query, m_explicit_commit);
863 Dmsg1(50, "Query failed: %s\n", getkeyval_query);
867 Dmsg0(500, "exec done");
869 currval = INGgetvalue(m_result, 0, 0);
871 id = str_to_uint64(currval);
879 * Restore the actual explicit_commit status.
881 m_explicit_commit = current_explicit_commit;
884 * Commit if explicit_commit is not set.
886 if (m_explicit_commit) {
887 INGcommit(m_db_handle);
893 SQL_FIELD *B_DB_INGRES::sql_fetch_field(void)
899 if (!m_fields || m_fields_size < m_num_fields) {
904 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
905 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
906 m_fields_size = m_num_fields;
908 for (i = 0; i < m_num_fields; i++) {
909 Dmsg1(500, "filling field %d\n", i);
910 m_fields[i].name = INGfname(m_result, i);
911 m_fields[i].type = INGftype(m_result, i);
912 m_fields[i].flags = 0;
915 * For a given column, find the max length.
918 for (j = 0; j < m_num_rows; j++) {
919 if (INGgetisnull(m_result, j, i)) {
920 this_length = 4; /* "NULL" */
922 this_length = cstrlen(INGgetvalue(m_result, j, i));
925 if (max_length < this_length) {
926 max_length = this_length;
929 m_fields[i].max_length = max_length;
931 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
932 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
937 * Increment field number for the next time around
939 return &m_fields[m_field_number++];
942 bool B_DB_INGRES::sql_field_is_not_null(int field_type)
944 switch (field_type) {
952 bool B_DB_INGRES::sql_field_is_numeric(int field_type)
955 * See ${II_SYSTEM}/ingres/files/eqsqlda.h for numeric types.
957 switch (field_type) {
968 * Escape strings so that Ingres is happy on COPY
970 * NOTE! len is the length of the old string. Your new
971 * string must be long enough (max 2*old+1) to hold
972 * the escaped output.
974 static char *ingres_copy_escape(char *dest, char *src, size_t len)
976 /* we have to escape \t, \n, \r, \ */
979 while (len > 0 && *src) {
1015 * Returns true if OK
1018 bool B_DB_INGRES::sql_batch_start(JCR *jcr)
1023 ok = sql_query("DECLARE GLOBAL TEMPORARY TABLE batch ("
1024 "FileIndex INTEGER,"
1026 "Path VARBYTE(32000),"
1027 "Name VARBYTE(32000),"
1028 "LStat VARBYTE(255),"
1030 "DeltaSeq SMALLINT)"
1031 " ON COMMIT PRESERVE ROWS WITH NORECOVERY");
1037 * Returns true if OK
1040 bool B_DB_INGRES::sql_batch_end(JCR *jcr, const char *error)
1047 * Returns true if OK
1050 bool B_DB_INGRES::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1056 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1057 db_escape_string(jcr, esc_name, fname, fnl);
1059 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1060 db_escape_string(jcr, esc_path, path, pnl);
1062 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1065 digest = ar->Digest;
1068 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1069 "(%u,%s,'%s','%s','%s','%s',%u)",
1070 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1071 esc_name, ar->attr, digest, ar->DeltaSeq);
1073 return sql_query(cmd);
1077 * Initialize database data structure. In principal this should
1078 * never have errors, or it is really fatal.
1080 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1081 const char *db_password, const char *db_address, int db_port,
1082 const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1084 B_DB_INGRES *mdb = NULL;
1087 Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
1091 P(mutex); /* lock DB queue */
1092 if (db_list && !mult_db_connections) {
1094 * Look to see if DB already open
1096 foreach_dlist(mdb, db_list) {
1097 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1098 Dmsg1(100, "DB REopen %s\n", db_name);
1099 mdb->increment_refcount();
1105 Dmsg0(100, "db_init_database first time\n");
1106 mdb = New(B_DB_INGRES(jcr, db_driver, db_name, db_user, db_password, db_address,
1107 db_port, db_socket, mult_db_connections, disable_batch_insert));
1113 #endif /* HAVE_INGRES */