2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to DBI
30 * These are DBI specific routines
32 * João Henrique Freitas, December 2007
33 * based upon work done by Dan Langille, December 2003 and
34 * by Kern Sibbald, March 2000
36 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
39 * This code only compiles against a recent version of libdbi. The current
40 * release found on the libdbi website (0.8.3) won't work for this code.
42 * You find the libdbi library on http://sourceforge.net/projects/libdbi
44 * A fairly recent version of libdbi from CVS works, so either make sure
45 * your distribution has a fairly recent version of libdbi installed or
46 * clone the CVS repositories from sourceforge and compile that code and
50 * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
51 * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
61 #include <dbi/dbi-dev.h>
64 /* -----------------------------------------------------------------------
66 * DBI dependent defines and subroutines
68 * -----------------------------------------------------------------------
72 * List of open databases
74 static dlist *db_list = NULL;
77 * Control allocated fields by dbi_getvalue
79 static dlist *dbi_getvalue_list = NULL;
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
83 typedef int (*custom_function_insert_t)(void*, const char*, int);
84 typedef char* (*custom_function_error_t)(void*);
85 typedef int (*custom_function_end_t)(void*, const char*);
87 B_DB_DBI::B_DB_DBI(JCR *jcr,
88 const char *db_driver,
91 const char *db_password,
92 const char *db_address,
94 const char *db_socket,
95 bool mult_db_connections,
96 bool disable_batch_insert)
99 char new_db_driver[10];
100 char db_driverdir[256];
101 DBI_FIELD_GET *field;
103 p = (char *)(db_driver + 4);
104 if (strcasecmp(p, "mysql") == 0) {
105 m_db_type = SQL_TYPE_MYSQL;
106 bstrncpy(new_db_driver, "mysql", sizeof(new_db_driver));
107 } else if (strcasecmp(p, "postgresql") == 0) {
108 m_db_type = SQL_TYPE_POSTGRESQL;
109 bstrncpy(new_db_driver, "pgsql", sizeof(new_db_driver));
110 } else if (strcasecmp(p, "sqlite3") == 0) {
111 m_db_type = SQL_TYPE_SQLITE3;
112 bstrncpy(new_db_driver, "sqlite3", sizeof(new_db_driver));
113 } else if (strcasecmp(p, "ingres") == 0) {
114 m_db_type = SQL_TYPE_INGRES;
115 bstrncpy(new_db_driver, "ingres", sizeof(new_db_driver));
117 Jmsg(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
122 * Set db_driverdir whereis is the libdbi drivers
124 bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
127 * Initialize the parent class members.
129 m_db_interface_type = SQL_INTERFACE_TYPE_DBI;
130 m_db_name = bstrdup(db_name);
131 m_db_user = bstrdup(db_user);
133 m_db_password = bstrdup(db_password);
136 m_db_address = bstrdup(db_address);
139 m_db_socket = bstrdup(db_socket);
142 m_db_driverdir = bstrdup(db_driverdir);
144 m_db_driver = bstrdup(new_db_driver);
146 if (disable_batch_insert) {
147 m_disabled_batch_insert = true;
148 m_have_batch_insert = false;
150 m_disabled_batch_insert = false;
151 #if defined(USE_BATCH_FILE_INSERT)
152 #ifdef HAVE_DBI_BATCH_FILE_INSERT
153 m_have_batch_insert = true;
155 m_have_batch_insert = false;
156 #endif /* HAVE_DBI_BATCH_FILE_INSERT */
158 m_have_batch_insert = false;
159 #endif /* USE_BATCH_FILE_INSERT */
161 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
163 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
164 cached_path = get_pool_memory(PM_FNAME);
167 fname = get_pool_memory(PM_FNAME);
168 path = get_pool_memory(PM_FNAME);
169 esc_name = get_pool_memory(PM_FNAME);
170 esc_path = get_pool_memory(PM_FNAME);
171 esc_obj = get_pool_memory(PM_FNAME);
172 m_allow_transactions = mult_db_connections;
174 /* At this time, when mult_db_connections == true, this is for
175 * specific console command such as bvfs or batch mode, and we don't
176 * want to share a batch mode or bvfs. In the future, we can change
177 * the creation function to add this parameter.
179 m_dedicated = mult_db_connections;
182 * Initialize the private members.
189 * Put the db in the list.
191 if (db_list == NULL) {
192 db_list = New(dlist(this, &this->m_link));
193 dbi_getvalue_list = New(dlist(field, &field->link));
195 db_list->append(this);
198 B_DB_DBI::~B_DB_DBI()
203 * Now actually open the database. This can generate errors,
204 * which are returned in the errmsg
206 * DO NOT close the database or delete mdb here !!!!
208 bool B_DB_DBI::db_open_database(JCR *jcr)
214 const char *dbi_errmsg;
217 char *new_db_name = NULL;
218 char *new_db_dir = NULL;
226 if ((errstat=rwl_init(&m_lock)) != 0) {
228 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
229 be.bstrerror(errstat));
234 bsnprintf(buf, sizeof(buf), "%d", m_db_port);
240 numdrivers = dbi_initialize_r(m_db_driverdir, &(m_instance));
241 if (numdrivers < 0) {
242 Mmsg2(&errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
243 "db_driverdir=%s. It is probaly not found any drivers\n"),
244 m_db_driverdir,numdrivers);
247 m_db_handle = (void **)dbi_conn_new_r(m_db_driver, m_instance);
249 * Can be many types of databases
253 dbi_conn_set_option(m_db_handle, "host", m_db_address); /* default = localhost */
254 dbi_conn_set_option(m_db_handle, "port", port); /* default port */
255 dbi_conn_set_option(m_db_handle, "username", m_db_user); /* login name */
256 dbi_conn_set_option(m_db_handle, "password", m_db_password); /* password */
257 dbi_conn_set_option(m_db_handle, "dbname", m_db_name); /* database name */
259 case SQL_TYPE_POSTGRESQL:
260 dbi_conn_set_option(m_db_handle, "host", m_db_address);
261 dbi_conn_set_option(m_db_handle, "port", port);
262 dbi_conn_set_option(m_db_handle, "username", m_db_user);
263 dbi_conn_set_option(m_db_handle, "password", m_db_password);
264 dbi_conn_set_option(m_db_handle, "dbname", m_db_name);
266 case SQL_TYPE_SQLITE3:
267 len = strlen(working_directory) + 5;
268 new_db_dir = (char *)malloc(len);
269 strcpy(new_db_dir, working_directory);
270 strcat(new_db_dir, "/");
271 len = strlen(m_db_name) + 5;
272 new_db_name = (char *)malloc(len);
273 strcpy(new_db_name, m_db_name);
274 strcat(new_db_name, ".db");
275 dbi_conn_set_option(m_db_handle, "sqlite3_dbdir", new_db_dir);
276 dbi_conn_set_option(m_db_handle, "dbname", new_db_name);
277 Dmsg2(500, "SQLITE: %s %s\n", new_db_dir, new_db_name);
284 * If connection fails, try at 5 sec intervals for 30 seconds.
286 for (int retry=0; retry < 6; retry++) {
287 dbstat = dbi_conn_connect(m_db_handle);
292 dbi_conn_error(m_db_handle, &dbi_errmsg);
293 Dmsg1(50, "dbi error: %s\n", dbi_errmsg);
299 Mmsg3(&errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
300 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
301 m_db_driver, m_db_name, m_db_user);
305 Dmsg0(50, "dbi_real_connect done\n");
306 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
307 m_db_user, m_db_name,
308 (m_db_password == NULL) ? "(NULL)" : m_db_password);
312 if (!check_tables_version(jcr, this)) {
319 * Set connection timeout to 8 days specialy for batch mode
321 sql_query("SET wait_timeout=691200");
322 sql_query("SET interactive_timeout=691200");
324 case SQL_TYPE_POSTGRESQL:
326 * Tell PostgreSQL we are using standard conforming strings
327 * and avoid warnings such as:
328 * WARNING: nonstandard use of \\ in a string literal
330 sql_query("SET datestyle TO 'ISO, YMD'");
331 sql_query("SET standard_conforming_strings=on");
342 void B_DB_DBI::db_close_database(JCR *jcr)
344 db_end_transaction(jcr);
347 if (m_ref_count == 0) {
349 db_list->remove(this);
350 if (m_connected && m_db_handle) {
351 dbi_shutdown_r(m_instance);
355 rwl_destroy(&m_lock);
356 free_pool_memory(errmsg);
357 free_pool_memory(cmd);
358 free_pool_memory(cached_path);
359 free_pool_memory(fname);
360 free_pool_memory(path);
361 free_pool_memory(esc_name);
362 free_pool_memory(esc_path);
363 free_pool_memory(esc_obj);
382 if (m_db_driverdir) {
383 free(m_db_driverdir);
386 if (db_list->size() == 0) {
394 void B_DB_DBI::db_thread_cleanup(void)
399 * Escape strings so that DBI is happy
401 * NOTE! len is the length of the old string. Your new
402 * string must be long enough (max 2*old+1) to hold
403 * the escaped output.
405 * dbi_conn_quote_string_copy receives a pointer to pointer.
406 * We need copy the value of pointer to snew because libdbi change the
409 void B_DB_DBI::db_escape_string(JCR *jcr, char *snew, char *old, int len)
418 * Correct the size of old basead in len and copy new string to inew
420 inew = (char *)malloc(sizeof(char) * len + 1);
421 bstrncpy(inew,old,len + 1);
423 * Escape the correct size of old
425 dbi_conn_escape_string_copy(m_db_handle, inew, &pnew);
428 * Copy the escaped string to snew
430 bstrncpy(snew, pnew, 2 * len + 1);
433 Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
437 * Escape binary object so that DBI is happy
438 * Memory is stored in B_DB struct, no need to free it
440 char *B_DB_DBI::db_escape_object(JCR *jcr, char *old, int len)
448 new_len = dbi_conn_escape_string_copy(m_db_handle, esc_obj, &pnew);
449 esc_obj = check_pool_memory_size(esc_obj, new_len+1);
450 memcpy(esc_obj, pnew, new_len);
457 * Unescape binary object so that DBI is happy
459 void B_DB_DBI::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
460 POOLMEM **dest, int32_t *dest_len)
467 *dest = check_pool_memory_size(*dest, expected_len+1);
468 *dest_len = expected_len;
469 memcpy(*dest, from, expected_len);
470 (*dest)[expected_len]=0;
474 * Start a transaction. This groups inserts and makes things
475 * much more efficient. Usually started when inserting
478 void B_DB_DBI::db_start_transaction(JCR *jcr)
481 jcr->attr = get_pool_memory(PM_FNAME);
484 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
488 case SQL_TYPE_SQLITE3:
489 if (!m_allow_transactions) {
495 * Allow only 10,000 changes per transaction
497 if (m_transaction && changes > 10000) {
498 db_end_transaction(jcr);
500 if (!m_transaction) {
501 sql_query("BEGIN"); /* begin transaction */
502 Dmsg0(400, "Start SQLite transaction\n");
503 m_transaction = true;
507 case SQL_TYPE_POSTGRESQL:
509 * This is turned off because transactions break
510 * if multiple simultaneous jobs are run.
512 if (!m_allow_transactions) {
518 * Allow only 25,000 changes per transaction
520 if (m_transaction && changes > 25000) {
521 db_end_transaction(jcr);
523 if (!m_transaction) {
524 sql_query("BEGIN"); /* begin transaction */
525 Dmsg0(400, "Start PosgreSQL transaction\n");
526 m_transaction = true;
530 case SQL_TYPE_INGRES:
531 if (!m_allow_transactions) {
537 * Allow only 25,000 changes per transaction
539 if (m_transaction && changes > 25000) {
540 db_end_transaction(jcr);
542 if (!m_transaction) {
543 sql_query("BEGIN"); /* begin transaction */
544 Dmsg0(400, "Start Ingres transaction\n");
545 m_transaction = true;
554 void B_DB_DBI::db_end_transaction(JCR *jcr)
556 if (jcr && jcr->cached_attribute) {
557 Dmsg0(400, "Flush last cached attribute.\n");
558 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
559 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
561 jcr->cached_attribute = false;
565 case SQL_TYPE_SQLITE3:
566 if (!m_allow_transactions) {
572 sql_query("COMMIT"); /* end transaction */
573 m_transaction = false;
574 Dmsg1(400, "End SQLite transaction changes=%d\n", changes);
579 case SQL_TYPE_POSTGRESQL:
580 if (!m_allow_transactions) {
586 sql_query("COMMIT"); /* end transaction */
587 m_transaction = false;
588 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
593 case SQL_TYPE_INGRES:
594 if (!m_allow_transactions) {
600 sql_query("COMMIT"); /* end transaction */
601 m_transaction = false;
602 Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
613 * Submit a general SQL command (cmd), and for each row returned,
614 * the result_handler is called with the ctx.
616 bool B_DB_DBI::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
621 Dmsg1(500, "db_sql_query starts with %s\n", query);
624 if (!sql_query(query, QF_STORE_RESULT)) {
625 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
626 Dmsg0(500, "db_sql_query failed\n");
631 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
633 if (result_handler != NULL) {
634 Dmsg0(500, "db_sql_query invoking handler\n");
635 while ((row = sql_fetch_row()) != NULL) {
636 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
637 if (result_handler(ctx, m_num_fields, row))
643 Dmsg0(500, "db_sql_query finished\n");
651 * Note, if this routine returns 1 (failure), Bacula expects
652 * that no result has been stored.
654 * Returns: true on success
657 bool B_DB_DBI::sql_query(const char *query, int flags)
660 const char *dbi_errmsg;
662 Dmsg1(500, "sql_query starts with %s\n", query);
665 * We are starting a new query. reset everything.
672 dbi_result_free(m_result); /* hmm, someone forgot to free?? */
676 m_result = (void **)dbi_conn_query(m_db_handle, query);
679 Dmsg2(50, "Query failed: %s %p\n", query, m_result);
683 m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
684 if (m_status == DBI_ERROR_NONE) {
685 Dmsg1(500, "we have a result\n", query);
688 * How many fields in the set?
689 * num_fields starting at 1
691 m_num_fields = dbi_result_get_numfields(m_result);
692 Dmsg1(500, "we have %d fields\n", m_num_fields);
694 * If no result num_rows is 0
696 m_num_rows = dbi_result_get_numrows(m_result);
697 Dmsg1(500, "we have %d rows\n", m_num_rows);
699 m_status = (dbi_error_flag) 0; /* succeed */
701 Dmsg1(50, "Result status failed: %s\n", query);
705 Dmsg0(500, "sql_query finishing\n");
710 m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
711 //dbi_conn_error(m_db_handle, &dbi_errmsg);
712 Dmsg4(500, "sql_query we failed dbi error: "
713 "'%s' '%p' '%d' flag '%d''\n", dbi_errmsg, m_result, m_result, m_status);
714 dbi_result_free(m_result);
716 m_status = (dbi_error_flag) 1; /* failed */
722 void B_DB_DBI::sql_free_result(void)
728 dbi_result_free(m_result);
736 * Now is time to free all value return by dbi_get_value
737 * this is necessary because libdbi don't free memory return by yours results
738 * and Bacula has some routine wich call more than once time sql_fetch_row
740 * Using a queue to store all pointer allocate is a good way to free all things
743 foreach_dlist(f, dbi_getvalue_list) {
751 m_num_rows = m_num_fields = 0;
757 * char *PQgetvalue(const PGresult *res,
759 * int column_number);
761 * use dbi_result_seek_row to search in result set
762 * use example to return only strings
764 static char *dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number)
767 const char *dbi_errmsg;
768 const char *field_name;
769 unsigned short dbitype;
773 /* correct the index for dbi interface
775 * I prefer do not change others functions
777 Dmsg3(600, "dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
778 result, row_number, column_number);
782 if(row_number == 0) {
786 Dmsg3(600, "dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
787 result, row_number, column_number);
789 if(dbi_result_seek_row(result, row_number)) {
791 field_name = dbi_result_get_field_name(result, column_number);
792 field_length = dbi_result_get_field_length(result, field_name);
793 dbitype = dbi_result_get_field_type_idx(result,column_number);
795 Dmsg3(500, "dbi_getvalue start: type: '%d' "
796 "field_length bytes: '%d' fieldname: '%s'\n",
797 dbitype, field_length, field_name);
800 //buf = (char *)malloc(sizeof(char *) * field_length + 1);
801 buf = (char *)malloc(field_length + 1);
806 buf = (char *)malloc(sizeof(char *) * 50);
810 case DBI_TYPE_INTEGER:
811 num = dbi_result_get_longlong(result, field_name);
812 edit_int64(num, buf);
813 field_length = strlen(buf);
815 case DBI_TYPE_STRING:
817 field_length = bsnprintf(buf, field_length + 1, "%s",
818 dbi_result_get_string(result, field_name));
823 case DBI_TYPE_BINARY:
825 * dbi_result_get_binary return a NULL pointer if value is empty
826 * following, change this to what Bacula espected
829 field_length = bsnprintf(buf, field_length + 1, "%s",
830 dbi_result_get_binary(result, field_name));
835 case DBI_TYPE_DATETIME:
839 last = dbi_result_get_datetime(result, field_name);
842 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
844 (void)localtime_r(&last, &tm);
845 field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
846 (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
847 tm.tm_hour, tm.tm_min, tm.tm_sec);
853 dbi_conn_error(dbi_result_get_conn(result), &dbi_errmsg);
854 Dmsg1(500, "dbi_getvalue error: %s\n", dbi_errmsg);
857 Dmsg3(500, "dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
858 buf, field_length, buf);
861 * Don't worry about this buf
866 SQL_ROW B_DB_DBI::sql_fetch_row(void)
869 SQL_ROW row = NULL; /* by default, return NULL */
871 Dmsg0(500, "sql_fetch_row start\n");
872 if ((!m_rows || m_rows_size < m_num_fields) && m_num_rows > 0) {
874 Dmsg0(500, "sql_fetch_row freeing space\n");
875 Dmsg2(500, "sql_fetch_row row: '%p' num_fields: '%d'\n", m_rows, m_num_fields);
876 if (m_num_rows != 0) {
877 for (j = 0; j < m_num_fields; j++) {
878 Dmsg2(500, "sql_fetch_row row '%p' '%d'\n", m_rows[j], j);
886 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
887 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
888 m_rows_size = m_num_fields;
891 * Now reset the row_number now that we have the space allocated
897 * If still within the result set
899 if (m_row_number <= m_num_rows && m_row_number != DBI_ERROR_BADPTR) {
900 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (1..%d)\n", m_row_number, m_num_rows);
902 * Get each value from this row
904 for (j = 0; j < m_num_fields; j++) {
905 m_rows[j] = dbi_getvalue(m_result, m_row_number, j);
907 * Allocate space to queue row
909 m_field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
911 * Store the pointer in queue
913 m_field_get->value = m_rows[j];
914 Dmsg4(500, "sql_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
915 j, m_rows[j], m_field_get->value, m_rows[j]);
917 * Insert in queue to future free
919 dbi_getvalue_list->append(m_field_get);
922 * Increment the row number for the next call
928 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (1..%d)\n", m_row_number, m_num_rows);
931 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
936 const char *B_DB_DBI::sql_strerror(void)
938 const char *dbi_errmsg;
940 dbi_conn_error(m_db_handle, &dbi_errmsg);
945 void B_DB_DBI::sql_data_seek(int row)
948 * Set the row number to be returned on the next call to sql_fetch_row
953 int B_DB_DBI::sql_affected_rows(void)
956 return dbi_result_get_numrows_affected(result);
962 uint64_t B_DB_DBI::sql_insert_autokey_record(const char *query, const char *table_name)
968 * First execute the insert query and then retrieve the currval.
970 if (!sql_query(query)) {
974 m_num_rows = sql_affected_rows();
975 if (m_num_rows != 1) {
982 * Obtain the current value of the sequence that
983 * provides the serial value for primary key of the table.
985 * currval is local to our session. It is not affected by
986 * other transactions.
988 * Determine the name of the sequence.
989 * PostgreSQL automatically creates a sequence using
990 * <table>_<column>_seq.
991 * At the time of writing, all tables used this format for
992 * for their primary key: <table>id
993 * Except for basefiles which has a primary key on baseid.
994 * Therefore, we need to special case that one table.
996 * everything else can use the PostgreSQL formula.
998 if (m_db_type == SQL_TYPE_POSTGRESQL) {
999 if (strcasecmp(table_name, "basefiles") == 0) {
1000 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1002 bstrncpy(sequence, table_name, sizeof(sequence));
1003 bstrncat(sequence, "_", sizeof(sequence));
1004 bstrncat(sequence, table_name, sizeof(sequence));
1005 bstrncat(sequence, "id", sizeof(sequence));
1008 bstrncat(sequence, "_seq", sizeof(sequence));
1009 id = dbi_conn_sequence_last(m_db_handle, NT_(sequence));
1011 id = dbi_conn_sequence_last(m_db_handle, NT_(table_name));
1019 * int PQgetisnull(const PGresult *res,
1021 * int column_number);
1023 * use dbi_result_seek_row to search in result set
1025 static int dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1028 if (row_number == 0) {
1034 if (dbi_result_seek_row(result, row_number)) {
1035 i = dbi_result_field_is_null_idx(result,column_number);
1042 SQL_FIELD *B_DB_DBI::sql_fetch_field(void)
1050 Dmsg0(500, "sql_fetch_field starts\n");
1052 if (!m_fields || m_fields_size < m_num_fields) {
1057 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
1058 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
1059 m_fields_size = m_num_fields;
1061 for (i = 0; i < m_num_fields; i++) {
1063 * num_fields is starting at 1, increment i by 1
1066 Dmsg1(500, "filling field %d\n", i);
1067 m_fields[i].name = (char *)dbi_result_get_field_name(m_result, dbi_index);
1068 m_fields[i].type = dbi_result_get_field_type_idx(m_result, dbi_index);
1069 m_fields[i].flags = dbi_result_get_field_attribs_idx(m_result, dbi_index);
1072 * For a given column, find the max length.
1075 for (j = 0; j < m_num_rows; j++) {
1076 if (dbi_getisnull(m_result, j, dbi_index)) {
1077 this_length = 4; /* "NULL" */
1079 cbuf = dbi_getvalue(m_result, j, dbi_index);
1080 this_length = cstrlen(cbuf);
1082 * cbuf is always free
1087 if (max_length < this_length) {
1088 max_length = this_length;
1091 m_fields[i].max_length = max_length;
1093 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
1094 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
1099 * Increment field number for the next time around
1101 return &m_fields[m_field_number++];
1104 bool B_DB_DBI::sql_field_is_not_null(int field_type)
1106 switch (field_type) {
1114 bool B_DB_DBI::sql_field_is_numeric(int field_type)
1116 switch (field_type) {
1126 * Escape strings so that PostgreSQL is happy on COPY
1128 * NOTE! len is the length of the old string. Your new
1129 * string must be long enough (max 2*old+1) to hold
1130 * the escaped output.
1132 static char *postgresql_copy_escape(char *dest, char *src, size_t len)
1135 * We have to escape \t, \n, \r, \
1139 while (len > 0 && *src) {
1175 * This can be a bit strang but is the one way to do
1177 * Returns true if OK
1180 bool B_DB_DBI::sql_batch_start(JCR *jcr)
1183 const char *query = "COPY batch FROM STDIN";
1185 Dmsg0(500, "sql_batch_start started\n");
1188 switch (m_db_type) {
1189 case SQL_TYPE_MYSQL:
1190 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1191 "FileIndex integer,"
1197 "DeltaSeq smallint)")) {
1198 Dmsg0(500, "sql_batch_start failed\n");
1201 Dmsg0(500, "sql_batch_start finishing\n");
1203 case SQL_TYPE_POSTGRESQL:
1204 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1212 Dmsg0(500, "sql_batch_start failed\n");
1217 * We are starting a new query. reset everything.
1221 m_field_number = -1;
1225 for (int i=0; i < 10; i++) {
1233 Dmsg1(50, "Query failed: %s\n", query);
1237 m_status = (dbi_error_flag)dbi_conn_error(m_db_handle, NULL);
1238 //m_status = DBI_ERROR_NONE;
1240 if (m_status == DBI_ERROR_NONE) {
1242 * How many fields in the set?
1244 m_num_fields = dbi_result_get_numfields(m_result);
1245 m_num_rows = dbi_result_get_numrows(m_result);
1246 m_status = (dbi_error_flag) 1;
1248 Dmsg1(50, "Result status failed: %s\n", query);
1252 Dmsg0(500, "sql_batch_start finishing\n");
1254 case SQL_TYPE_SQLITE3:
1255 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1256 "FileIndex integer,"
1262 "DeltaSeq smallint)")) {
1263 Dmsg0(500, "sql_batch_start failed\n");
1266 Dmsg0(500, "sql_batch_start finishing\n");
1271 Mmsg1(&errmsg, _("error starting batch mode: %s"), sql_strerror());
1272 m_status = (dbi_error_flag) 0;
1283 * Set error to something to abort operation
1285 bool B_DB_DBI::sql_batch_end(JCR *jcr, const char *error)
1289 int (*custom_function)(void*, const char*) = NULL;
1290 dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1292 Dmsg0(500, "sql_batch_start started\n");
1294 switch (m_db_type) {
1295 case SQL_TYPE_MYSQL:
1296 m_status = (dbi_error_flag) 0;
1298 case SQL_TYPE_POSTGRESQL:
1299 custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQputCopyEnd");
1302 res = (*custom_function)(myconn->connection, error);
1303 } while (res == 0 && --count > 0);
1307 m_status = (dbi_error_flag) 1;
1311 Dmsg0(500, "we failed\n");
1312 m_status = (dbi_error_flag) 0;
1313 //Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(myconn));
1316 case SQL_TYPE_SQLITE3:
1317 m_status = (dbi_error_flag) 0;
1321 Dmsg0(500, "sql_batch_start finishing\n");
1327 * This function is big and use a big switch.
1328 * In near future is better split in small functions
1331 bool B_DB_DBI::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1335 dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1336 int (*custom_function)(void*, const char*, int) = NULL;
1337 char* (*custom_function_error)(void*) = NULL;
1342 Dmsg0(500, "sql_batch_start started \n");
1344 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1345 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1347 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1350 digest = ar->Digest;
1353 switch (m_db_type) {
1354 case SQL_TYPE_MYSQL:
1355 db_escape_string(jcr, esc_name, fname, fnl);
1356 db_escape_string(jcr, esc_path, path, pnl);
1357 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1358 "(%u,%s,'%s','%s','%s','%s',%u)",
1359 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1360 esc_name, ar->attr, digest, ar->DeltaSeq);
1362 if (!sql_query(cmd))
1364 Dmsg0(500, "sql_batch_start failed\n");
1368 Dmsg0(500, "sql_batch_start finishing\n");
1372 case SQL_TYPE_POSTGRESQL:
1373 postgresql_copy_escape(esc_name, fname, fnl);
1374 postgresql_copy_escape(esc_path, path, pnl);
1375 len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1376 ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1377 esc_name, ar->attr, digest, ar->DeltaSeq);
1380 * libdbi don't support CopyData and we need call a postgresql
1381 * specific function to do this work
1383 Dmsg2(500, "sql_batch_insert :\n %s \ncmd_size: %d",cmd, len);
1384 custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn),"PQputCopyData");
1385 if (custom_function != NULL) {
1387 res = (*custom_function)(myconn->connection, cmd, len);
1388 } while (res == 0 && --count > 0);
1393 m_status = (dbi_error_flag) 1;
1397 Dmsg0(500, "sql_batch_insert failed\n");
1401 Dmsg0(500, "sql_batch_insert finishing\n");
1405 * Ensure to detect a PQerror
1407 custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQerrorMessage");
1408 Dmsg1(500, "sql_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1412 case SQL_TYPE_SQLITE3:
1413 db_escape_string(jcr, esc_name, fname, fnl);
1414 db_escape_string(jcr, esc_path, path, pnl);
1415 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1416 "(%u,%s,'%s','%s','%s','%s',%u)",
1417 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1418 esc_name, ar->attr, digest, ar->DeltaSeq);
1420 if (!sql_query(cmd))
1422 Dmsg0(500, "sql_batch_insert failed\n");
1426 Dmsg0(500, "sql_batch_insert finishing\n");
1433 Mmsg1(&errmsg, _("error inserting batch mode: %s"), sql_strerror());
1434 m_status = (dbi_error_flag) 0;
1440 * Initialize database data structure. In principal this should
1441 * never have errors, or it is really fatal.
1443 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1444 const char *db_password, const char *db_address, int db_port,
1445 const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1447 B_DB_DBI *mdb = NULL;
1450 Jmsg(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
1453 if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
1454 Jmsg(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
1458 Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
1462 P(mutex); /* lock DB queue */
1463 if (db_list && !mult_db_connections) {
1465 * Look to see if DB already open
1467 foreach_dlist(mdb, db_list) {
1468 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1469 Dmsg1(100, "DB REopen %s\n", db_name);
1470 mdb->increment_refcount();
1475 Dmsg0(100, "db_init_database first time\n");
1476 mdb = New(B_DB_DBI(jcr, db_driver, db_name, db_user, db_password, db_address,
1477 db_port, db_socket, mult_db_connections, disable_batch_insert));
1484 #endif /* HAVE_DBI */