2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to DBI
30 * These are DBI specific routines
32 * João Henrique Freitas, December 2007
33 * based upon work done by Dan Langille, December 2003 and
34 * by Kern Sibbald, March 2000
36 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
39 * This code only compiles against a recent version of libdbi. The current
40 * release found on the libdbi website (0.8.3) won't work for this code.
42 * You find the libdbi library on http://sourceforge.net/projects/libdbi
44 * A fairly recent version of libdbi from CVS works, so either make sure
45 * your distribution has a fairly recent version of libdbi installed or
46 * clone the CVS repositories from sourceforge and compile that code and
50 * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
51 * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
61 #include <dbi/dbi-dev.h>
64 /* -----------------------------------------------------------------------
66 * DBI dependent defines and subroutines
68 * -----------------------------------------------------------------------
72 * List of open databases
74 static dlist *db_list = NULL;
77 * Control allocated fields by dbi_getvalue
79 static dlist *dbi_getvalue_list = NULL;
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
83 typedef int (*custom_function_insert_t)(void*, const char*, int);
84 typedef char* (*custom_function_error_t)(void*);
85 typedef int (*custom_function_end_t)(void*, const char*);
87 B_DB_DBI::B_DB_DBI(JCR *jcr,
88 const char *db_driver,
91 const char *db_password,
92 const char *db_address,
94 const char *db_socket,
95 bool mult_db_connections,
96 bool disable_batch_insert)
99 char new_db_driver[10];
100 char db_driverdir[256];
101 DBI_FIELD_GET *field;
103 p = (char *)(db_driver + 4);
104 if (strcasecmp(p, "mysql") == 0) {
105 m_db_type = SQL_TYPE_MYSQL;
106 bstrncpy(new_db_driver, "mysql", sizeof(new_db_driver));
107 } else if (strcasecmp(p, "postgresql") == 0) {
108 m_db_type = SQL_TYPE_POSTGRESQL;
109 bstrncpy(new_db_driver, "pgsql", sizeof(new_db_driver));
110 } else if (strcasecmp(p, "sqlite3") == 0) {
111 m_db_type = SQL_TYPE_SQLITE3;
112 bstrncpy(new_db_driver, "sqlite3", sizeof(new_db_driver));
113 } else if (strcasecmp(p, "ingres") == 0) {
114 m_db_type = SQL_TYPE_INGRES;
115 bstrncpy(new_db_driver, "ingres", sizeof(new_db_driver));
117 Jmsg(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
122 * Set db_driverdir whereis is the libdbi drivers
124 bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
127 * Initialize the parent class members.
129 m_db_interface_type = SQL_INTERFACE_TYPE_DBI;
130 m_db_name = bstrdup(db_name);
131 m_db_user = bstrdup(db_user);
133 m_db_password = bstrdup(db_password);
136 m_db_address = bstrdup(db_address);
139 m_db_socket = bstrdup(db_socket);
142 m_db_driverdir = bstrdup(db_driverdir);
144 m_db_driver = bstrdup(new_db_driver);
146 if (disable_batch_insert) {
147 m_disabled_batch_insert = true;
148 m_have_batch_insert = false;
150 m_disabled_batch_insert = false;
151 #if defined(USE_BATCH_FILE_INSERT)
152 #ifdef HAVE_DBI_BATCH_FILE_INSERT
153 m_have_batch_insert = true;
155 m_have_batch_insert = false;
156 #endif /* HAVE_DBI_BATCH_FILE_INSERT */
158 m_have_batch_insert = false;
159 #endif /* USE_BATCH_FILE_INSERT */
161 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
163 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
164 cached_path = get_pool_memory(PM_FNAME);
167 fname = get_pool_memory(PM_FNAME);
168 path = get_pool_memory(PM_FNAME);
169 esc_name = get_pool_memory(PM_FNAME);
170 esc_path = get_pool_memory(PM_FNAME);
171 m_allow_transactions = mult_db_connections;
174 * Initialize the private members.
181 * Put the db in the list.
183 if (db_list == NULL) {
184 db_list = New(dlist(this, &this->m_link));
185 dbi_getvalue_list = New(dlist(field, &field->link));
187 db_list->append(this);
190 B_DB_DBI::~B_DB_DBI()
195 * Now actually open the database. This can generate errors,
196 * which are returned in the errmsg
198 * DO NOT close the database or delete mdb here !!!!
200 bool B_DB_DBI::db_open_database(JCR *jcr)
206 const char *dbi_errmsg;
209 char *new_db_name = NULL;
210 char *new_db_dir = NULL;
218 if ((errstat=rwl_init(&m_lock)) != 0) {
220 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
221 be.bstrerror(errstat));
226 bsnprintf(buf, sizeof(buf), "%d", m_db_port);
232 numdrivers = dbi_initialize_r(m_db_driverdir, &(m_instance));
233 if (numdrivers < 0) {
234 Mmsg2(&errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
235 "db_driverdir=%s. It is probaly not found any drivers\n"),
236 m_db_driverdir,numdrivers);
239 m_db_handle = (void **)dbi_conn_new_r(m_db_driver, m_instance);
241 * Can be many types of databases
245 dbi_conn_set_option(m_db_handle, "host", m_db_address); /* default = localhost */
246 dbi_conn_set_option(m_db_handle, "port", port); /* default port */
247 dbi_conn_set_option(m_db_handle, "username", m_db_user); /* login name */
248 dbi_conn_set_option(m_db_handle, "password", m_db_password); /* password */
249 dbi_conn_set_option(m_db_handle, "dbname", m_db_name); /* database name */
251 case SQL_TYPE_POSTGRESQL:
252 dbi_conn_set_option(m_db_handle, "host", m_db_address);
253 dbi_conn_set_option(m_db_handle, "port", port);
254 dbi_conn_set_option(m_db_handle, "username", m_db_user);
255 dbi_conn_set_option(m_db_handle, "password", m_db_password);
256 dbi_conn_set_option(m_db_handle, "dbname", m_db_name);
258 case SQL_TYPE_SQLITE3:
259 len = strlen(working_directory) + 5;
260 new_db_dir = (char *)malloc(len);
261 strcpy(new_db_dir, working_directory);
262 strcat(new_db_dir, "/");
263 len = strlen(m_db_name) + 5;
264 new_db_name = (char *)malloc(len);
265 strcpy(new_db_name, m_db_name);
266 strcat(new_db_name, ".db");
267 dbi_conn_set_option(m_db_handle, "sqlite3_dbdir", new_db_dir);
268 dbi_conn_set_option(m_db_handle, "dbname", new_db_name);
269 Dmsg2(500, "SQLITE: %s %s\n", new_db_dir, new_db_name);
276 * If connection fails, try at 5 sec intervals for 30 seconds.
278 for (int retry=0; retry < 6; retry++) {
279 dbstat = dbi_conn_connect(m_db_handle);
284 dbi_conn_error(m_db_handle, &dbi_errmsg);
285 Dmsg1(50, "dbi error: %s\n", dbi_errmsg);
291 Mmsg3(&errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
292 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
293 m_db_driver, m_db_name, m_db_user);
297 Dmsg0(50, "dbi_real_connect done\n");
298 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
299 m_db_user, m_db_name,
300 (m_db_password == NULL) ? "(NULL)" : m_db_password);
304 if (!check_tables_version(jcr, this)) {
311 * Set connection timeout to 8 days specialy for batch mode
313 sql_query("SET wait_timeout=691200");
314 sql_query("SET interactive_timeout=691200");
316 case SQL_TYPE_POSTGRESQL:
318 * Tell PostgreSQL we are using standard conforming strings
319 * and avoid warnings such as:
320 * WARNING: nonstandard use of \\ in a string literal
322 sql_query("SET datestyle TO 'ISO, YMD'");
323 sql_query("SET standard_conforming_strings=on");
334 void B_DB_DBI::db_close_database(JCR *jcr)
336 db_end_transaction(jcr);
340 if (m_ref_count == 0) {
341 db_list->remove(this);
342 if (m_connected && m_db_handle) {
343 dbi_shutdown_r(m_instance);
347 rwl_destroy(&m_lock);
348 free_pool_memory(errmsg);
349 free_pool_memory(cmd);
350 free_pool_memory(cached_path);
351 free_pool_memory(fname);
352 free_pool_memory(path);
353 free_pool_memory(esc_name);
354 free_pool_memory(esc_path);
373 if (m_db_driverdir) {
374 free(m_db_driverdir);
377 if (db_list->size() == 0) {
385 void B_DB_DBI::db_thread_cleanup(void)
390 * Escape strings so that DBI is happy
392 * NOTE! len is the length of the old string. Your new
393 * string must be long enough (max 2*old+1) to hold
394 * the escaped output.
396 * dbi_conn_quote_string_copy receives a pointer to pointer.
397 * We need copy the value of pointer to snew because libdbi change the
400 void B_DB_DBI::db_escape_string(JCR *jcr, char *snew, char *old, int len)
409 * Correct the size of old basead in len and copy new string to inew
411 inew = (char *)malloc(sizeof(char) * len + 1);
412 bstrncpy(inew,old,len + 1);
414 * Escape the correct size of old
416 dbi_conn_escape_string_copy(m_db_handle, inew, &pnew);
419 * Copy the escaped string to snew
421 bstrncpy(snew, pnew, 2 * len + 1);
424 Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
428 * Escape binary object so that DBI is happy
429 * Memory is stored in B_DB struct, no need to free it
431 char *B_DB_DBI::db_escape_object(JCR *jcr, char *old, int len)
439 new_len = dbi_conn_escape_string_copy(m_db_handle, esc_obj, &pnew);
440 esc_obj = check_pool_memory_size(esc_obj, new_len+1);
441 memcpy(esc_obj, pnew, new_len);
448 * Unescape binary object so that DBI is happy
450 void B_DB_DBI::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
451 POOLMEM **dest, int32_t *dest_len)
458 *dest = check_pool_memory_size(*dest, expected_len+1);
459 *dest_len = expected_len;
460 memcpy(*dest, from, expected_len);
461 (*dest)[expected_len]=0;
465 * Start a transaction. This groups inserts and makes things
466 * much more efficient. Usually started when inserting
469 void B_DB_DBI::db_start_transaction(JCR *jcr)
472 jcr->attr = get_pool_memory(PM_FNAME);
475 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
479 case SQL_TYPE_SQLITE3:
480 if (!m_allow_transactions) {
486 * Allow only 10,000 changes per transaction
488 if (m_transaction && changes > 10000) {
489 db_end_transaction(jcr);
491 if (!m_transaction) {
492 sql_query("BEGIN"); /* begin transaction */
493 Dmsg0(400, "Start SQLite transaction\n");
494 m_transaction = true;
498 case SQL_TYPE_POSTGRESQL:
500 * This is turned off because transactions break
501 * if multiple simultaneous jobs are run.
503 if (!m_allow_transactions) {
509 * Allow only 25,000 changes per transaction
511 if (m_transaction && changes > 25000) {
512 db_end_transaction(jcr);
514 if (!m_transaction) {
515 sql_query("BEGIN"); /* begin transaction */
516 Dmsg0(400, "Start PosgreSQL transaction\n");
517 m_transaction = true;
521 case SQL_TYPE_INGRES:
522 if (!m_allow_transactions) {
528 * Allow only 25,000 changes per transaction
530 if (m_transaction && changes > 25000) {
531 db_end_transaction(jcr);
533 if (!m_transaction) {
534 sql_query("BEGIN"); /* begin transaction */
535 Dmsg0(400, "Start Ingres transaction\n");
536 m_transaction = true;
545 void B_DB_DBI::db_end_transaction(JCR *jcr)
547 if (jcr && jcr->cached_attribute) {
548 Dmsg0(400, "Flush last cached attribute.\n");
549 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
550 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
552 jcr->cached_attribute = false;
556 case SQL_TYPE_SQLITE3:
557 if (!m_allow_transactions) {
563 sql_query("COMMIT"); /* end transaction */
564 m_transaction = false;
565 Dmsg1(400, "End SQLite transaction changes=%d\n", changes);
570 case SQL_TYPE_POSTGRESQL:
571 if (!m_allow_transactions) {
577 sql_query("COMMIT"); /* end transaction */
578 m_transaction = false;
579 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
584 case SQL_TYPE_INGRES:
585 if (!m_allow_transactions) {
591 sql_query("COMMIT"); /* end transaction */
592 m_transaction = false;
593 Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
604 * Submit a general SQL command (cmd), and for each row returned,
605 * the result_handler is called with the ctx.
607 bool B_DB_DBI::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
612 Dmsg1(500, "db_sql_query starts with %s\n", query);
615 if (!sql_query(query, QF_STORE_RESULT)) {
616 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
617 Dmsg0(500, "db_sql_query failed\n");
622 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
624 if (result_handler != NULL) {
625 Dmsg0(500, "db_sql_query invoking handler\n");
626 while ((row = sql_fetch_row()) != NULL) {
627 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
628 if (result_handler(ctx, m_num_fields, row))
634 Dmsg0(500, "db_sql_query finished\n");
642 * Note, if this routine returns 1 (failure), Bacula expects
643 * that no result has been stored.
645 * Returns: true on success
648 bool B_DB_DBI::sql_query(const char *query, int flags)
651 const char *dbi_errmsg;
653 Dmsg1(500, "sql_query starts with %s\n", query);
656 * We are starting a new query. reset everything.
663 dbi_result_free(m_result); /* hmm, someone forgot to free?? */
667 m_result = (void **)dbi_conn_query(m_db_handle, query);
670 Dmsg2(50, "Query failed: %s %p\n", query, m_result);
674 m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
675 if (m_status == DBI_ERROR_NONE) {
676 Dmsg1(500, "we have a result\n", query);
679 * How many fields in the set?
680 * num_fields starting at 1
682 m_num_fields = dbi_result_get_numfields(m_result);
683 Dmsg1(500, "we have %d fields\n", m_num_fields);
685 * If no result num_rows is 0
687 m_num_rows = dbi_result_get_numrows(m_result);
688 Dmsg1(500, "we have %d rows\n", m_num_rows);
690 m_status = (dbi_error_flag) 0; /* succeed */
692 Dmsg1(50, "Result status failed: %s\n", query);
696 Dmsg0(500, "sql_query finishing\n");
701 m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
702 //dbi_conn_error(m_db_handle, &dbi_errmsg);
703 Dmsg4(500, "sql_query we failed dbi error: "
704 "'%s' '%p' '%d' flag '%d''\n", dbi_errmsg, m_result, m_result, m_status);
705 dbi_result_free(m_result);
707 m_status = (dbi_error_flag) 1; /* failed */
713 void B_DB_DBI::sql_free_result(void)
719 dbi_result_free(m_result);
727 * Now is time to free all value return by dbi_get_value
728 * this is necessary because libdbi don't free memory return by yours results
729 * and Bacula has some routine wich call more than once time sql_fetch_row
731 * Using a queue to store all pointer allocate is a good way to free all things
734 foreach_dlist(f, dbi_getvalue_list) {
742 m_num_rows = m_num_fields = 0;
748 * char *PQgetvalue(const PGresult *res,
750 * int column_number);
752 * use dbi_result_seek_row to search in result set
753 * use example to return only strings
755 static char *dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number)
758 const char *dbi_errmsg;
759 const char *field_name;
760 unsigned short dbitype;
764 /* correct the index for dbi interface
766 * I prefer do not change others functions
768 Dmsg3(600, "dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
769 result, row_number, column_number);
773 if(row_number == 0) {
777 Dmsg3(600, "dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
778 result, row_number, column_number);
780 if(dbi_result_seek_row(result, row_number)) {
782 field_name = dbi_result_get_field_name(result, column_number);
783 field_length = dbi_result_get_field_length(result, field_name);
784 dbitype = dbi_result_get_field_type_idx(result,column_number);
786 Dmsg3(500, "dbi_getvalue start: type: '%d' "
787 "field_length bytes: '%d' fieldname: '%s'\n",
788 dbitype, field_length, field_name);
791 //buf = (char *)malloc(sizeof(char *) * field_length + 1);
792 buf = (char *)malloc(field_length + 1);
797 buf = (char *)malloc(sizeof(char *) * 50);
801 case DBI_TYPE_INTEGER:
802 num = dbi_result_get_longlong(result, field_name);
803 edit_int64(num, buf);
804 field_length = strlen(buf);
806 case DBI_TYPE_STRING:
808 field_length = bsnprintf(buf, field_length + 1, "%s",
809 dbi_result_get_string(result, field_name));
814 case DBI_TYPE_BINARY:
816 * dbi_result_get_binary return a NULL pointer if value is empty
817 * following, change this to what Bacula espected
820 field_length = bsnprintf(buf, field_length + 1, "%s",
821 dbi_result_get_binary(result, field_name));
826 case DBI_TYPE_DATETIME:
830 last = dbi_result_get_datetime(result, field_name);
833 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
835 (void)localtime_r(&last, &tm);
836 field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
837 (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
838 tm.tm_hour, tm.tm_min, tm.tm_sec);
844 dbi_conn_error(dbi_result_get_conn(result), &dbi_errmsg);
845 Dmsg1(500, "dbi_getvalue error: %s\n", dbi_errmsg);
848 Dmsg3(500, "dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
849 buf, field_length, buf);
852 * Don't worry about this buf
857 SQL_ROW B_DB_DBI::sql_fetch_row(void)
860 SQL_ROW row = NULL; /* by default, return NULL */
862 Dmsg0(500, "sql_fetch_row start\n");
863 if ((!m_rows || m_rows_size < m_num_fields) && m_num_rows > 0) {
865 Dmsg0(500, "sql_fetch_row freeing space\n");
866 Dmsg2(500, "sql_fetch_row row: '%p' num_fields: '%d'\n", m_rows, m_num_fields);
867 if (m_num_rows != 0) {
868 for (j = 0; j < m_num_fields; j++) {
869 Dmsg2(500, "sql_fetch_row row '%p' '%d'\n", m_rows[j], j);
877 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
878 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
879 m_rows_size = m_num_fields;
882 * Now reset the row_number now that we have the space allocated
888 * If still within the result set
890 if (m_row_number <= m_num_rows && m_row_number != DBI_ERROR_BADPTR) {
891 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (1..%d)\n", m_row_number, m_num_rows);
893 * Get each value from this row
895 for (j = 0; j < m_num_fields; j++) {
896 m_rows[j] = dbi_getvalue(m_result, m_row_number, j);
898 * Allocate space to queue row
900 m_field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
902 * Store the pointer in queue
904 m_field_get->value = m_rows[j];
905 Dmsg4(500, "sql_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
906 j, m_rows[j], m_field_get->value, m_rows[j]);
908 * Insert in queue to future free
910 dbi_getvalue_list->append(m_field_get);
913 * Increment the row number for the next call
919 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (1..%d)\n", m_row_number, m_num_rows);
922 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
927 const char *B_DB_DBI::sql_strerror(void)
929 const char *dbi_errmsg;
931 dbi_conn_error(m_db_handle, &dbi_errmsg);
936 void B_DB_DBI::sql_data_seek(int row)
939 * Set the row number to be returned on the next call to sql_fetch_row
944 int B_DB_DBI::sql_affected_rows(void)
947 return dbi_result_get_numrows_affected(result);
953 uint64_t B_DB_DBI::sql_insert_autokey_record(const char *query, const char *table_name)
959 * First execute the insert query and then retrieve the currval.
961 if (!sql_query(query)) {
965 m_num_rows = sql_affected_rows();
966 if (m_num_rows != 1) {
973 * Obtain the current value of the sequence that
974 * provides the serial value for primary key of the table.
976 * currval is local to our session. It is not affected by
977 * other transactions.
979 * Determine the name of the sequence.
980 * PostgreSQL automatically creates a sequence using
981 * <table>_<column>_seq.
982 * At the time of writing, all tables used this format for
983 * for their primary key: <table>id
984 * Except for basefiles which has a primary key on baseid.
985 * Therefore, we need to special case that one table.
987 * everything else can use the PostgreSQL formula.
989 if (m_db_type == SQL_TYPE_POSTGRESQL) {
990 if (strcasecmp(table_name, "basefiles") == 0) {
991 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
993 bstrncpy(sequence, table_name, sizeof(sequence));
994 bstrncat(sequence, "_", sizeof(sequence));
995 bstrncat(sequence, table_name, sizeof(sequence));
996 bstrncat(sequence, "id", sizeof(sequence));
999 bstrncat(sequence, "_seq", sizeof(sequence));
1000 id = dbi_conn_sequence_last(m_db_handle, NT_(sequence));
1002 id = dbi_conn_sequence_last(m_db_handle, NT_(table_name));
1010 * int PQgetisnull(const PGresult *res,
1012 * int column_number);
1014 * use dbi_result_seek_row to search in result set
1016 static int dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1019 if (row_number == 0) {
1025 if (dbi_result_seek_row(result, row_number)) {
1026 i = dbi_result_field_is_null_idx(result,column_number);
1033 SQL_FIELD *B_DB_DBI::sql_fetch_field(void)
1041 Dmsg0(500, "sql_fetch_field starts\n");
1043 if (!m_fields || m_fields_size < m_num_fields) {
1048 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
1049 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
1050 m_fields_size = m_num_fields;
1052 for (i = 0; i < m_num_fields; i++) {
1054 * num_fields is starting at 1, increment i by 1
1057 Dmsg1(500, "filling field %d\n", i);
1058 m_fields[i].name = (char *)dbi_result_get_field_name(m_result, dbi_index);
1059 m_fields[i].type = dbi_result_get_field_type_idx(m_result, dbi_index);
1060 m_fields[i].flags = dbi_result_get_field_attribs_idx(m_result, dbi_index);
1063 * For a given column, find the max length.
1066 for (j = 0; j < m_num_rows; j++) {
1067 if (dbi_getisnull(m_result, j, dbi_index)) {
1068 this_length = 4; /* "NULL" */
1070 cbuf = dbi_getvalue(m_result, j, dbi_index);
1071 this_length = cstrlen(cbuf);
1073 * cbuf is always free
1078 if (max_length < this_length) {
1079 max_length = this_length;
1082 m_fields[i].max_length = max_length;
1084 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
1085 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
1090 * Increment field number for the next time around
1092 return &m_fields[m_field_number++];
1095 bool B_DB_DBI::sql_field_is_not_null(int field_type)
1097 switch (field_type) {
1105 bool B_DB_DBI::sql_field_is_numeric(int field_type)
1107 switch (field_type) {
1117 * Escape strings so that PostgreSQL is happy on COPY
1119 * NOTE! len is the length of the old string. Your new
1120 * string must be long enough (max 2*old+1) to hold
1121 * the escaped output.
1123 static char *postgresql_copy_escape(char *dest, char *src, size_t len)
1126 * We have to escape \t, \n, \r, \
1130 while (len > 0 && *src) {
1166 * This can be a bit strang but is the one way to do
1168 * Returns true if OK
1171 bool B_DB_DBI::sql_batch_start(JCR *jcr)
1174 const char *query = "COPY batch FROM STDIN";
1176 Dmsg0(500, "sql_batch_start started\n");
1179 switch (m_db_type) {
1180 case SQL_TYPE_MYSQL:
1181 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1182 "FileIndex integer,"
1188 "MarkId integer)")) {
1189 Dmsg0(500, "sql_batch_start failed\n");
1192 Dmsg0(500, "sql_batch_start finishing\n");
1194 case SQL_TYPE_POSTGRESQL:
1195 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1203 Dmsg0(500, "sql_batch_start failed\n");
1208 * We are starting a new query. reset everything.
1212 m_field_number = -1;
1216 for (int i=0; i < 10; i++) {
1224 Dmsg1(50, "Query failed: %s\n", query);
1228 m_status = (dbi_error_flag)dbi_conn_error(m_db_handle, NULL);
1229 //m_status = DBI_ERROR_NONE;
1231 if (m_status == DBI_ERROR_NONE) {
1233 * How many fields in the set?
1235 m_num_fields = dbi_result_get_numfields(m_result);
1236 m_num_rows = dbi_result_get_numrows(m_result);
1237 m_status = (dbi_error_flag) 1;
1239 Dmsg1(50, "Result status failed: %s\n", query);
1243 Dmsg0(500, "sql_batch_start finishing\n");
1245 case SQL_TYPE_SQLITE3:
1246 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1247 "FileIndex integer,"
1253 "MarkId integer)")) {
1254 Dmsg0(500, "sql_batch_start failed\n");
1257 Dmsg0(500, "sql_batch_start finishing\n");
1262 Mmsg1(&errmsg, _("error starting batch mode: %s"), sql_strerror());
1263 m_status = (dbi_error_flag) 0;
1274 * Set error to something to abort operation
1276 bool B_DB_DBI::sql_batch_end(JCR *jcr, const char *error)
1280 int (*custom_function)(void*, const char*) = NULL;
1281 dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1283 Dmsg0(500, "sql_batch_start started\n");
1285 switch (m_db_type) {
1286 case SQL_TYPE_MYSQL:
1287 m_status = (dbi_error_flag) 0;
1289 case SQL_TYPE_POSTGRESQL:
1290 custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQputCopyEnd");
1293 res = (*custom_function)(myconn->connection, error);
1294 } while (res == 0 && --count > 0);
1298 m_status = (dbi_error_flag) 1;
1302 Dmsg0(500, "we failed\n");
1303 m_status = (dbi_error_flag) 0;
1304 //Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(myconn));
1307 case SQL_TYPE_SQLITE3:
1308 m_status = (dbi_error_flag) 0;
1312 Dmsg0(500, "sql_batch_start finishing\n");
1318 * This function is big and use a big switch.
1319 * In near future is better split in small functions
1322 bool B_DB_DBI::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1326 dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1327 int (*custom_function)(void*, const char*, int) = NULL;
1328 char* (*custom_function_error)(void*) = NULL;
1333 Dmsg0(500, "sql_batch_start started \n");
1335 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1336 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1338 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1341 digest = ar->Digest;
1344 switch (m_db_type) {
1345 case SQL_TYPE_MYSQL:
1346 db_escape_string(jcr, esc_name, fname, fnl);
1347 db_escape_string(jcr, esc_path, path, pnl);
1348 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1349 "(%u,%s,'%s','%s','%s','%s',%u)",
1350 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1351 esc_name, ar->attr, digest, ar->DeltaSeq);
1353 if (!sql_query(cmd))
1355 Dmsg0(500, "sql_batch_start failed\n");
1359 Dmsg0(500, "sql_batch_start finishing\n");
1363 case SQL_TYPE_POSTGRESQL:
1364 postgresql_copy_escape(esc_name, fname, fnl);
1365 postgresql_copy_escape(esc_path, path, pnl);
1366 len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1367 ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1368 esc_name, ar->attr, digest, ar->DeltaSeq);
1371 * libdbi don't support CopyData and we need call a postgresql
1372 * specific function to do this work
1374 Dmsg2(500, "sql_batch_insert :\n %s \ncmd_size: %d",cmd, len);
1375 custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn),"PQputCopyData");
1376 if (custom_function != NULL) {
1378 res = (*custom_function)(myconn->connection, cmd, len);
1379 } while (res == 0 && --count > 0);
1384 m_status = (dbi_error_flag) 1;
1388 Dmsg0(500, "sql_batch_insert failed\n");
1392 Dmsg0(500, "sql_batch_insert finishing\n");
1396 * Ensure to detect a PQerror
1398 custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQerrorMessage");
1399 Dmsg1(500, "sql_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1403 case SQL_TYPE_SQLITE3:
1404 db_escape_string(jcr, esc_name, fname, fnl);
1405 db_escape_string(jcr, esc_path, path, pnl);
1406 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1407 "(%u,%s,'%s','%s','%s','%s',%u)",
1408 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1409 esc_name, ar->attr, digest, ar->DeltaSeq);
1411 if (!sql_query(cmd))
1413 Dmsg0(500, "sql_batch_insert failed\n");
1417 Dmsg0(500, "sql_batch_insert finishing\n");
1424 Mmsg1(&errmsg, _("error inserting batch mode: %s"), sql_strerror());
1425 m_status = (dbi_error_flag) 0;
1431 * Initialize database data structure. In principal this should
1432 * never have errors, or it is really fatal.
1434 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1435 const char *db_password, const char *db_address, int db_port,
1436 const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1438 B_DB_DBI *mdb = NULL;
1441 Jmsg(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
1444 if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
1445 Jmsg(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
1449 Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
1453 P(mutex); /* lock DB queue */
1454 if (db_list && !mult_db_connections) {
1456 * Look to see if DB already open
1458 foreach_dlist(mdb, db_list) {
1459 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1460 Dmsg1(100, "DB REopen %s\n", db_name);
1461 mdb->increment_refcount();
1466 Dmsg0(100, "db_init_database first time\n");
1467 mdb = New(B_DB_DBI(jcr, db_driver, db_name, db_user, db_password, db_address,
1468 db_port, db_socket, mult_db_connections, disable_batch_insert));
1475 #endif /* HAVE_DBI */