2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to DBI
30 * These are DBI specific routines
32 * João Henrique Freitas, December 2007
33 * based upon work done by Dan Langille, December 2003 and
34 * by Kern Sibbald, March 2000
36 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
39 * This code only compiles against a recent version of libdbi. The current
40 * release found on the libdbi website (0.8.3) won't work for this code.
42 * You find the libdbi library on http://sourceforge.net/projects/libdbi
44 * A fairly recent version of libdbi from CVS works, so either make sure
45 * your distribution has a fairly recent version of libdbi installed or
46 * clone the CVS repositories from sourceforge and compile that code and
50 * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
51 * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
61 #include <dbi/dbi-dev.h>
64 /* -----------------------------------------------------------------------
66 * DBI dependent defines and subroutines
68 * -----------------------------------------------------------------------
72 * List of open databases
74 static dlist *db_list = NULL;
77 * Control allocated fields by dbi_getvalue
79 static dlist *dbi_getvalue_list = NULL;
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
83 typedef int (*custom_function_insert_t)(void*, const char*, int);
84 typedef char* (*custom_function_error_t)(void*);
85 typedef int (*custom_function_end_t)(void*, const char*);
87 B_DB_DBI::B_DB_DBI(JCR *jcr,
88 const char *db_driver,
91 const char *db_password,
92 const char *db_address,
94 const char *db_socket,
95 bool mult_db_connections,
96 bool disable_batch_insert)
99 char new_db_driver[10];
100 char db_driverdir[256];
101 DBI_FIELD_GET *field;
103 p = (char *)(db_driver + 4);
104 if (strcasecmp(p, "mysql") == 0) {
105 m_db_type = SQL_TYPE_MYSQL;
106 bstrncpy(new_db_driver, "mysql", sizeof(new_db_driver));
107 } else if (strcasecmp(p, "postgresql") == 0) {
108 m_db_type = SQL_TYPE_POSTGRESQL;
109 bstrncpy(new_db_driver, "pgsql", sizeof(new_db_driver));
110 } else if (strcasecmp(p, "sqlite3") == 0) {
111 m_db_type = SQL_TYPE_SQLITE3;
112 bstrncpy(new_db_driver, "sqlite3", sizeof(new_db_driver));
113 } else if (strcasecmp(p, "ingres") == 0) {
114 m_db_type = SQL_TYPE_INGRES;
115 bstrncpy(new_db_driver, "ingres", sizeof(new_db_driver));
117 Jmsg(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
122 * Set db_driverdir whereis is the libdbi drivers
124 bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
127 * Initialize the parent class members.
129 m_db_interface_type = SQL_INTERFACE_TYPE_DBI;
130 m_db_name = bstrdup(db_name);
131 m_db_user = bstrdup(db_user);
133 m_db_password = bstrdup(db_password);
136 m_db_address = bstrdup(db_address);
139 m_db_socket = bstrdup(db_socket);
142 m_db_driverdir = bstrdup(db_driverdir);
144 m_db_driver = bstrdup(new_db_driver);
146 if (disable_batch_insert) {
147 m_disabled_batch_insert = true;
148 m_have_batch_insert = false;
150 m_disabled_batch_insert = false;
151 #if defined(USE_BATCH_FILE_INSERT)
152 #ifdef HAVE_DBI_BATCH_FILE_INSERT
153 m_have_batch_insert = true;
155 m_have_batch_insert = false;
156 #endif /* HAVE_DBI_BATCH_FILE_INSERT */
158 m_have_batch_insert = false;
159 #endif /* USE_BATCH_FILE_INSERT */
161 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
163 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
164 cached_path = get_pool_memory(PM_FNAME);
167 fname = get_pool_memory(PM_FNAME);
168 path = get_pool_memory(PM_FNAME);
169 esc_name = get_pool_memory(PM_FNAME);
170 esc_path = get_pool_memory(PM_FNAME);
171 esc_obj = get_pool_memory(PM_FNAME);
172 m_allow_transactions = mult_db_connections;
174 /* At this time, when mult_db_connections == true, this is for
175 * specific console command such as bvfs or batch mode, and we don't
176 * want to share a batch mode or bvfs. In the future, we can change
177 * the creation function to add this parameter.
179 m_dedicated = mult_db_connections;
182 * Initialize the private members.
189 * Put the db in the list.
191 if (db_list == NULL) {
192 db_list = New(dlist(this, &this->m_link));
193 dbi_getvalue_list = New(dlist(field, &field->link));
195 db_list->append(this);
198 B_DB_DBI::~B_DB_DBI()
203 * Now actually open the database. This can generate errors,
204 * which are returned in the errmsg
206 * DO NOT close the database or delete mdb here !!!!
208 bool B_DB_DBI::db_open_database(JCR *jcr)
214 const char *dbi_errmsg;
217 char *new_db_name = NULL;
218 char *new_db_dir = NULL;
226 if ((errstat=rwl_init(&m_lock)) != 0) {
228 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
229 be.bstrerror(errstat));
234 bsnprintf(buf, sizeof(buf), "%d", m_db_port);
240 numdrivers = dbi_initialize_r(m_db_driverdir, &(m_instance));
241 if (numdrivers < 0) {
242 Mmsg2(&errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
243 "db_driverdir=%s. It is probaly not found any drivers\n"),
244 m_db_driverdir,numdrivers);
247 m_db_handle = (void **)dbi_conn_new_r(m_db_driver, m_instance);
249 * Can be many types of databases
253 dbi_conn_set_option(m_db_handle, "host", m_db_address); /* default = localhost */
254 dbi_conn_set_option(m_db_handle, "port", port); /* default port */
255 dbi_conn_set_option(m_db_handle, "username", m_db_user); /* login name */
256 dbi_conn_set_option(m_db_handle, "password", m_db_password); /* password */
257 dbi_conn_set_option(m_db_handle, "dbname", m_db_name); /* database name */
259 case SQL_TYPE_POSTGRESQL:
260 dbi_conn_set_option(m_db_handle, "host", m_db_address);
261 dbi_conn_set_option(m_db_handle, "port", port);
262 dbi_conn_set_option(m_db_handle, "username", m_db_user);
263 dbi_conn_set_option(m_db_handle, "password", m_db_password);
264 dbi_conn_set_option(m_db_handle, "dbname", m_db_name);
266 case SQL_TYPE_SQLITE3:
267 len = strlen(working_directory) + 5;
268 new_db_dir = (char *)malloc(len);
269 strcpy(new_db_dir, working_directory);
270 strcat(new_db_dir, "/");
271 len = strlen(m_db_name) + 5;
272 new_db_name = (char *)malloc(len);
273 strcpy(new_db_name, m_db_name);
274 strcat(new_db_name, ".db");
275 dbi_conn_set_option(m_db_handle, "sqlite3_dbdir", new_db_dir);
276 dbi_conn_set_option(m_db_handle, "dbname", new_db_name);
277 Dmsg2(500, "SQLITE: %s %s\n", new_db_dir, new_db_name);
284 * If connection fails, try at 5 sec intervals for 30 seconds.
286 for (int retry=0; retry < 6; retry++) {
287 dbstat = dbi_conn_connect(m_db_handle);
292 dbi_conn_error(m_db_handle, &dbi_errmsg);
293 Dmsg1(50, "dbi error: %s\n", dbi_errmsg);
299 Mmsg3(&errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
300 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
301 m_db_driver, m_db_name, m_db_user);
305 Dmsg0(50, "dbi_real_connect done\n");
306 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
307 m_db_user, m_db_name,
308 (m_db_password == NULL) ? "(NULL)" : m_db_password);
312 if (!check_tables_version(jcr, this)) {
319 * Set connection timeout to 8 days specialy for batch mode
321 sql_query("SET wait_timeout=691200");
322 sql_query("SET interactive_timeout=691200");
324 case SQL_TYPE_POSTGRESQL:
326 * Tell PostgreSQL we are using standard conforming strings
327 * and avoid warnings such as:
328 * WARNING: nonstandard use of \\ in a string literal
330 sql_query("SET datestyle TO 'ISO, YMD'");
331 sql_query("SET standard_conforming_strings=on");
342 void B_DB_DBI::db_close_database(JCR *jcr)
345 db_end_transaction(jcr);
349 if (m_ref_count == 0) {
353 db_list->remove(this);
354 if (m_connected && m_db_handle) {
355 dbi_shutdown_r(m_instance);
359 if (rwl_is_init(&m_lock)) {
360 rwl_destroy(&m_lock);
362 free_pool_memory(errmsg);
363 free_pool_memory(cmd);
364 free_pool_memory(cached_path);
365 free_pool_memory(fname);
366 free_pool_memory(path);
367 free_pool_memory(esc_name);
368 free_pool_memory(esc_path);
369 free_pool_memory(esc_obj);
388 if (m_db_driverdir) {
389 free(m_db_driverdir);
392 if (db_list->size() == 0) {
400 void B_DB_DBI::db_thread_cleanup(void)
405 * Escape strings so that DBI is happy
407 * NOTE! len is the length of the old string. Your new
408 * string must be long enough (max 2*old+1) to hold
409 * the escaped output.
411 * dbi_conn_quote_string_copy receives a pointer to pointer.
412 * We need copy the value of pointer to snew because libdbi change the
415 void B_DB_DBI::db_escape_string(JCR *jcr, char *snew, char *old, int len)
424 * Correct the size of old basead in len and copy new string to inew
426 inew = (char *)malloc(sizeof(char) * len + 1);
427 bstrncpy(inew,old,len + 1);
429 * Escape the correct size of old
431 dbi_conn_escape_string_copy(m_db_handle, inew, &pnew);
434 * Copy the escaped string to snew
436 bstrncpy(snew, pnew, 2 * len + 1);
439 Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
443 * Escape binary object so that DBI is happy
444 * Memory is stored in B_DB struct, no need to free it
446 char *B_DB_DBI::db_escape_object(JCR *jcr, char *old, int len)
454 new_len = dbi_conn_escape_string_copy(m_db_handle, esc_obj, &pnew);
455 esc_obj = check_pool_memory_size(esc_obj, new_len+1);
456 memcpy(esc_obj, pnew, new_len);
463 * Unescape binary object so that DBI is happy
465 void B_DB_DBI::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
466 POOLMEM **dest, int32_t *dest_len)
473 *dest = check_pool_memory_size(*dest, expected_len+1);
474 *dest_len = expected_len;
475 memcpy(*dest, from, expected_len);
476 (*dest)[expected_len]=0;
480 * Start a transaction. This groups inserts and makes things
481 * much more efficient. Usually started when inserting
484 void B_DB_DBI::db_start_transaction(JCR *jcr)
487 jcr->attr = get_pool_memory(PM_FNAME);
490 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
494 case SQL_TYPE_SQLITE3:
495 if (!m_allow_transactions) {
501 * Allow only 10,000 changes per transaction
503 if (m_transaction && changes > 10000) {
504 db_end_transaction(jcr);
506 if (!m_transaction) {
507 sql_query("BEGIN"); /* begin transaction */
508 Dmsg0(400, "Start SQLite transaction\n");
509 m_transaction = true;
513 case SQL_TYPE_POSTGRESQL:
515 * This is turned off because transactions break
516 * if multiple simultaneous jobs are run.
518 if (!m_allow_transactions) {
524 * Allow only 25,000 changes per transaction
526 if (m_transaction && changes > 25000) {
527 db_end_transaction(jcr);
529 if (!m_transaction) {
530 sql_query("BEGIN"); /* begin transaction */
531 Dmsg0(400, "Start PosgreSQL transaction\n");
532 m_transaction = true;
536 case SQL_TYPE_INGRES:
537 if (!m_allow_transactions) {
543 * Allow only 25,000 changes per transaction
545 if (m_transaction && changes > 25000) {
546 db_end_transaction(jcr);
548 if (!m_transaction) {
549 sql_query("BEGIN"); /* begin transaction */
550 Dmsg0(400, "Start Ingres transaction\n");
551 m_transaction = true;
560 void B_DB_DBI::db_end_transaction(JCR *jcr)
562 if (jcr && jcr->cached_attribute) {
563 Dmsg0(400, "Flush last cached attribute.\n");
564 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
565 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
567 jcr->cached_attribute = false;
571 case SQL_TYPE_SQLITE3:
572 if (!m_allow_transactions) {
578 sql_query("COMMIT"); /* end transaction */
579 m_transaction = false;
580 Dmsg1(400, "End SQLite transaction changes=%d\n", changes);
585 case SQL_TYPE_POSTGRESQL:
586 if (!m_allow_transactions) {
592 sql_query("COMMIT"); /* end transaction */
593 m_transaction = false;
594 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
599 case SQL_TYPE_INGRES:
600 if (!m_allow_transactions) {
606 sql_query("COMMIT"); /* end transaction */
607 m_transaction = false;
608 Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
619 * Submit a general SQL command (cmd), and for each row returned,
620 * the result_handler is called with the ctx.
622 bool B_DB_DBI::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
627 Dmsg1(500, "db_sql_query starts with %s\n", query);
630 if (!sql_query(query, QF_STORE_RESULT)) {
631 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
632 Dmsg0(500, "db_sql_query failed\n");
637 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
639 if (result_handler != NULL) {
640 Dmsg0(500, "db_sql_query invoking handler\n");
641 while ((row = sql_fetch_row()) != NULL) {
642 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
643 if (result_handler(ctx, m_num_fields, row))
649 Dmsg0(500, "db_sql_query finished\n");
657 * Note, if this routine returns 1 (failure), Bacula expects
658 * that no result has been stored.
660 * Returns: true on success
663 bool B_DB_DBI::sql_query(const char *query, int flags)
666 const char *dbi_errmsg;
668 Dmsg1(500, "sql_query starts with %s\n", query);
671 * We are starting a new query. reset everything.
678 dbi_result_free(m_result); /* hmm, someone forgot to free?? */
682 m_result = (void **)dbi_conn_query(m_db_handle, query);
685 Dmsg2(50, "Query failed: %s %p\n", query, m_result);
689 m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
690 if (m_status == DBI_ERROR_NONE) {
691 Dmsg1(500, "we have a result\n", query);
694 * How many fields in the set?
695 * num_fields starting at 1
697 m_num_fields = dbi_result_get_numfields(m_result);
698 Dmsg1(500, "we have %d fields\n", m_num_fields);
700 * If no result num_rows is 0
702 m_num_rows = dbi_result_get_numrows(m_result);
703 Dmsg1(500, "we have %d rows\n", m_num_rows);
705 m_status = (dbi_error_flag) 0; /* succeed */
707 Dmsg1(50, "Result status failed: %s\n", query);
711 Dmsg0(500, "sql_query finishing\n");
716 m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
717 //dbi_conn_error(m_db_handle, &dbi_errmsg);
718 Dmsg4(500, "sql_query we failed dbi error: "
719 "'%s' '%p' '%d' flag '%d''\n", dbi_errmsg, m_result, m_result, m_status);
720 dbi_result_free(m_result);
722 m_status = (dbi_error_flag) 1; /* failed */
728 void B_DB_DBI::sql_free_result(void)
734 dbi_result_free(m_result);
742 * Now is time to free all value return by dbi_get_value
743 * this is necessary because libdbi don't free memory return by yours results
744 * and Bacula has some routine wich call more than once time sql_fetch_row
746 * Using a queue to store all pointer allocate is a good way to free all things
749 foreach_dlist(f, dbi_getvalue_list) {
757 m_num_rows = m_num_fields = 0;
763 * char *PQgetvalue(const PGresult *res,
765 * int column_number);
767 * use dbi_result_seek_row to search in result set
768 * use example to return only strings
770 static char *dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number)
773 const char *dbi_errmsg;
774 const char *field_name;
775 unsigned short dbitype;
779 /* correct the index for dbi interface
781 * I prefer do not change others functions
783 Dmsg3(600, "dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
784 result, row_number, column_number);
788 if(row_number == 0) {
792 Dmsg3(600, "dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
793 result, row_number, column_number);
795 if(dbi_result_seek_row(result, row_number)) {
797 field_name = dbi_result_get_field_name(result, column_number);
798 field_length = dbi_result_get_field_length(result, field_name);
799 dbitype = dbi_result_get_field_type_idx(result,column_number);
801 Dmsg3(500, "dbi_getvalue start: type: '%d' "
802 "field_length bytes: '%d' fieldname: '%s'\n",
803 dbitype, field_length, field_name);
806 //buf = (char *)malloc(sizeof(char *) * field_length + 1);
807 buf = (char *)malloc(field_length + 1);
812 buf = (char *)malloc(sizeof(char *) * 50);
816 case DBI_TYPE_INTEGER:
817 num = dbi_result_get_longlong(result, field_name);
818 edit_int64(num, buf);
819 field_length = strlen(buf);
821 case DBI_TYPE_STRING:
823 field_length = bsnprintf(buf, field_length + 1, "%s",
824 dbi_result_get_string(result, field_name));
829 case DBI_TYPE_BINARY:
831 * dbi_result_get_binary return a NULL pointer if value is empty
832 * following, change this to what Bacula espected
835 field_length = bsnprintf(buf, field_length + 1, "%s",
836 dbi_result_get_binary(result, field_name));
841 case DBI_TYPE_DATETIME:
845 last = dbi_result_get_datetime(result, field_name);
848 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
850 (void)localtime_r(&last, &tm);
851 field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
852 (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
853 tm.tm_hour, tm.tm_min, tm.tm_sec);
859 dbi_conn_error(dbi_result_get_conn(result), &dbi_errmsg);
860 Dmsg1(500, "dbi_getvalue error: %s\n", dbi_errmsg);
863 Dmsg3(500, "dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
864 buf, field_length, buf);
867 * Don't worry about this buf
872 SQL_ROW B_DB_DBI::sql_fetch_row(void)
875 SQL_ROW row = NULL; /* by default, return NULL */
877 Dmsg0(500, "sql_fetch_row start\n");
878 if ((!m_rows || m_rows_size < m_num_fields) && m_num_rows > 0) {
880 Dmsg0(500, "sql_fetch_row freeing space\n");
881 Dmsg2(500, "sql_fetch_row row: '%p' num_fields: '%d'\n", m_rows, m_num_fields);
882 if (m_num_rows != 0) {
883 for (j = 0; j < m_num_fields; j++) {
884 Dmsg2(500, "sql_fetch_row row '%p' '%d'\n", m_rows[j], j);
892 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
893 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
894 m_rows_size = m_num_fields;
897 * Now reset the row_number now that we have the space allocated
903 * If still within the result set
905 if (m_row_number <= m_num_rows && m_row_number != DBI_ERROR_BADPTR) {
906 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (1..%d)\n", m_row_number, m_num_rows);
908 * Get each value from this row
910 for (j = 0; j < m_num_fields; j++) {
911 m_rows[j] = dbi_getvalue(m_result, m_row_number, j);
913 * Allocate space to queue row
915 m_field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
917 * Store the pointer in queue
919 m_field_get->value = m_rows[j];
920 Dmsg4(500, "sql_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
921 j, m_rows[j], m_field_get->value, m_rows[j]);
923 * Insert in queue to future free
925 dbi_getvalue_list->append(m_field_get);
928 * Increment the row number for the next call
934 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (1..%d)\n", m_row_number, m_num_rows);
937 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
942 const char *B_DB_DBI::sql_strerror(void)
944 const char *dbi_errmsg;
946 dbi_conn_error(m_db_handle, &dbi_errmsg);
951 void B_DB_DBI::sql_data_seek(int row)
954 * Set the row number to be returned on the next call to sql_fetch_row
959 int B_DB_DBI::sql_affected_rows(void)
962 return dbi_result_get_numrows_affected(result);
968 uint64_t B_DB_DBI::sql_insert_autokey_record(const char *query, const char *table_name)
974 * First execute the insert query and then retrieve the currval.
976 if (!sql_query(query)) {
980 m_num_rows = sql_affected_rows();
981 if (m_num_rows != 1) {
988 * Obtain the current value of the sequence that
989 * provides the serial value for primary key of the table.
991 * currval is local to our session. It is not affected by
992 * other transactions.
994 * Determine the name of the sequence.
995 * PostgreSQL automatically creates a sequence using
996 * <table>_<column>_seq.
997 * At the time of writing, all tables used this format for
998 * for their primary key: <table>id
999 * Except for basefiles which has a primary key on baseid.
1000 * Therefore, we need to special case that one table.
1002 * everything else can use the PostgreSQL formula.
1004 if (m_db_type == SQL_TYPE_POSTGRESQL) {
1005 if (strcasecmp(table_name, "basefiles") == 0) {
1006 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1008 bstrncpy(sequence, table_name, sizeof(sequence));
1009 bstrncat(sequence, "_", sizeof(sequence));
1010 bstrncat(sequence, table_name, sizeof(sequence));
1011 bstrncat(sequence, "id", sizeof(sequence));
1014 bstrncat(sequence, "_seq", sizeof(sequence));
1015 id = dbi_conn_sequence_last(m_db_handle, NT_(sequence));
1017 id = dbi_conn_sequence_last(m_db_handle, NT_(table_name));
1025 * int PQgetisnull(const PGresult *res,
1027 * int column_number);
1029 * use dbi_result_seek_row to search in result set
1031 static int dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1034 if (row_number == 0) {
1040 if (dbi_result_seek_row(result, row_number)) {
1041 i = dbi_result_field_is_null_idx(result,column_number);
1048 SQL_FIELD *B_DB_DBI::sql_fetch_field(void)
1056 Dmsg0(500, "sql_fetch_field starts\n");
1058 if (!m_fields || m_fields_size < m_num_fields) {
1063 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
1064 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
1065 m_fields_size = m_num_fields;
1067 for (i = 0; i < m_num_fields; i++) {
1069 * num_fields is starting at 1, increment i by 1
1072 Dmsg1(500, "filling field %d\n", i);
1073 m_fields[i].name = (char *)dbi_result_get_field_name(m_result, dbi_index);
1074 m_fields[i].type = dbi_result_get_field_type_idx(m_result, dbi_index);
1075 m_fields[i].flags = dbi_result_get_field_attribs_idx(m_result, dbi_index);
1078 * For a given column, find the max length.
1081 for (j = 0; j < m_num_rows; j++) {
1082 if (dbi_getisnull(m_result, j, dbi_index)) {
1083 this_length = 4; /* "NULL" */
1085 cbuf = dbi_getvalue(m_result, j, dbi_index);
1086 this_length = cstrlen(cbuf);
1088 * cbuf is always free
1093 if (max_length < this_length) {
1094 max_length = this_length;
1097 m_fields[i].max_length = max_length;
1099 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
1100 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
1105 * Increment field number for the next time around
1107 return &m_fields[m_field_number++];
1110 bool B_DB_DBI::sql_field_is_not_null(int field_type)
1112 switch (field_type) {
1120 bool B_DB_DBI::sql_field_is_numeric(int field_type)
1122 switch (field_type) {
1132 * Escape strings so that PostgreSQL is happy on COPY
1134 * NOTE! len is the length of the old string. Your new
1135 * string must be long enough (max 2*old+1) to hold
1136 * the escaped output.
1138 static char *postgresql_copy_escape(char *dest, char *src, size_t len)
1141 * We have to escape \t, \n, \r, \
1145 while (len > 0 && *src) {
1181 * This can be a bit strang but is the one way to do
1183 * Returns true if OK
1186 bool B_DB_DBI::sql_batch_start(JCR *jcr)
1189 const char *query = "COPY batch FROM STDIN";
1191 Dmsg0(500, "sql_batch_start started\n");
1194 switch (m_db_type) {
1195 case SQL_TYPE_MYSQL:
1196 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1197 "FileIndex integer,"
1203 "DeltaSeq smallint)")) {
1204 Dmsg0(500, "sql_batch_start failed\n");
1207 Dmsg0(500, "sql_batch_start finishing\n");
1209 case SQL_TYPE_POSTGRESQL:
1210 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1218 Dmsg0(500, "sql_batch_start failed\n");
1223 * We are starting a new query. reset everything.
1227 m_field_number = -1;
1231 for (int i=0; i < 10; i++) {
1239 Dmsg1(50, "Query failed: %s\n", query);
1243 m_status = (dbi_error_flag)dbi_conn_error(m_db_handle, NULL);
1244 //m_status = DBI_ERROR_NONE;
1246 if (m_status == DBI_ERROR_NONE) {
1248 * How many fields in the set?
1250 m_num_fields = dbi_result_get_numfields(m_result);
1251 m_num_rows = dbi_result_get_numrows(m_result);
1252 m_status = (dbi_error_flag) 1;
1254 Dmsg1(50, "Result status failed: %s\n", query);
1258 Dmsg0(500, "sql_batch_start finishing\n");
1260 case SQL_TYPE_SQLITE3:
1261 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1262 "FileIndex integer,"
1268 "DeltaSeq smallint)")) {
1269 Dmsg0(500, "sql_batch_start failed\n");
1272 Dmsg0(500, "sql_batch_start finishing\n");
1277 Mmsg1(&errmsg, _("error starting batch mode: %s"), sql_strerror());
1278 m_status = (dbi_error_flag) 0;
1289 * Set error to something to abort operation
1291 bool B_DB_DBI::sql_batch_end(JCR *jcr, const char *error)
1295 int (*custom_function)(void*, const char*) = NULL;
1296 dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1298 Dmsg0(500, "sql_batch_start started\n");
1300 switch (m_db_type) {
1301 case SQL_TYPE_MYSQL:
1302 m_status = (dbi_error_flag) 0;
1304 case SQL_TYPE_POSTGRESQL:
1305 custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQputCopyEnd");
1308 res = (*custom_function)(myconn->connection, error);
1309 } while (res == 0 && --count > 0);
1313 m_status = (dbi_error_flag) 1;
1317 Dmsg0(500, "we failed\n");
1318 m_status = (dbi_error_flag) 0;
1319 //Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(myconn));
1322 case SQL_TYPE_SQLITE3:
1323 m_status = (dbi_error_flag) 0;
1327 Dmsg0(500, "sql_batch_start finishing\n");
1333 * This function is big and use a big switch.
1334 * In near future is better split in small functions
1337 bool B_DB_DBI::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1341 dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1342 int (*custom_function)(void*, const char*, int) = NULL;
1343 char* (*custom_function_error)(void*) = NULL;
1348 Dmsg0(500, "sql_batch_start started \n");
1350 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1351 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1353 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1356 digest = ar->Digest;
1359 switch (m_db_type) {
1360 case SQL_TYPE_MYSQL:
1361 db_escape_string(jcr, esc_name, fname, fnl);
1362 db_escape_string(jcr, esc_path, path, pnl);
1363 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1364 "(%u,%s,'%s','%s','%s','%s',%u)",
1365 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1366 esc_name, ar->attr, digest, ar->DeltaSeq);
1368 if (!sql_query(cmd))
1370 Dmsg0(500, "sql_batch_start failed\n");
1374 Dmsg0(500, "sql_batch_start finishing\n");
1378 case SQL_TYPE_POSTGRESQL:
1379 postgresql_copy_escape(esc_name, fname, fnl);
1380 postgresql_copy_escape(esc_path, path, pnl);
1381 len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1382 ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1383 esc_name, ar->attr, digest, ar->DeltaSeq);
1386 * libdbi don't support CopyData and we need call a postgresql
1387 * specific function to do this work
1389 Dmsg2(500, "sql_batch_insert :\n %s \ncmd_size: %d",cmd, len);
1390 custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn),"PQputCopyData");
1391 if (custom_function != NULL) {
1393 res = (*custom_function)(myconn->connection, cmd, len);
1394 } while (res == 0 && --count > 0);
1399 m_status = (dbi_error_flag) 1;
1403 Dmsg0(500, "sql_batch_insert failed\n");
1407 Dmsg0(500, "sql_batch_insert finishing\n");
1411 * Ensure to detect a PQerror
1413 custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQerrorMessage");
1414 Dmsg1(500, "sql_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1418 case SQL_TYPE_SQLITE3:
1419 db_escape_string(jcr, esc_name, fname, fnl);
1420 db_escape_string(jcr, esc_path, path, pnl);
1421 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1422 "(%u,%s,'%s','%s','%s','%s',%u)",
1423 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1424 esc_name, ar->attr, digest, ar->DeltaSeq);
1426 if (!sql_query(cmd))
1428 Dmsg0(500, "sql_batch_insert failed\n");
1432 Dmsg0(500, "sql_batch_insert finishing\n");
1439 Mmsg1(&errmsg, _("error inserting batch mode: %s"), sql_strerror());
1440 m_status = (dbi_error_flag) 0;
1446 * Initialize database data structure. In principal this should
1447 * never have errors, or it is really fatal.
1449 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1450 const char *db_password, const char *db_address, int db_port,
1451 const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1453 B_DB_DBI *mdb = NULL;
1456 Jmsg(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
1459 if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
1460 Jmsg(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
1464 Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
1468 P(mutex); /* lock DB queue */
1469 if (db_list && !mult_db_connections) {
1471 * Look to see if DB already open
1473 foreach_dlist(mdb, db_list) {
1474 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1475 Dmsg1(100, "DB REopen %s\n", db_name);
1476 mdb->increment_refcount();
1481 Dmsg0(100, "db_init_database first time\n");
1482 mdb = New(B_DB_DBI(jcr, db_driver, db_name, db_user, db_password, db_address,
1483 db_port, db_socket, mult_db_connections, disable_batch_insert));
1490 #endif /* HAVE_DBI */