2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to DBI
30 * These are DBI specific routines
32 * João Henrique Freitas, December 2007
33 * based upon work done by Dan Langille, December 2003 and
34 * by Kern Sibbald, March 2000
36 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
39 * This code only compiles against a recent version of libdbi. The current
40 * release found on the libdbi website (0.8.3) won't work for this code.
42 * You find the libdbi library on http://sourceforge.net/projects/libdbi
44 * A fairly recent version of libdbi from CVS works, so either make sure
45 * your distribution has a fairly recent version of libdbi installed or
46 * clone the CVS repositories from sourceforge and compile that code and
50 * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
51 * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
61 #include <dbi/dbi-dev.h>
64 /* -----------------------------------------------------------------------
66 * DBI dependent defines and subroutines
68 * -----------------------------------------------------------------------
72 * List of open databases
74 static dlist *db_list = NULL;
77 * Control allocated fields by dbi_getvalue
79 static dlist *dbi_getvalue_list = NULL;
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
83 typedef int (*custom_function_insert_t)(void*, const char*, int);
84 typedef char* (*custom_function_error_t)(void*);
85 typedef int (*custom_function_end_t)(void*, const char*);
87 B_DB_DBI::B_DB_DBI(JCR *jcr,
88 const char *db_driver,
91 const char *db_password,
92 const char *db_address,
94 const char *db_socket,
95 bool mult_db_connections,
96 bool disable_batch_insert)
99 char new_db_driver[10];
100 char db_driverdir[256];
101 DBI_FIELD_GET *field;
103 p = (char *)(db_driver + 4);
104 if (strcasecmp(p, "mysql") == 0) {
105 m_db_type = SQL_TYPE_MYSQL;
106 bstrncpy(new_db_driver, "mysql", sizeof(new_db_driver));
107 } else if (strcasecmp(p, "postgresql") == 0) {
108 m_db_type = SQL_TYPE_POSTGRESQL;
109 bstrncpy(new_db_driver, "pgsql", sizeof(new_db_driver));
110 } else if (strcasecmp(p, "sqlite3") == 0) {
111 m_db_type = SQL_TYPE_SQLITE3;
112 bstrncpy(new_db_driver, "sqlite3", sizeof(new_db_driver));
113 } else if (strcasecmp(p, "ingres") == 0) {
114 m_db_type = SQL_TYPE_INGRES;
115 bstrncpy(new_db_driver, "ingres", sizeof(new_db_driver));
117 Jmsg(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
122 * Set db_driverdir whereis is the libdbi drivers
124 bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
127 * Initialize the parent class members.
129 m_db_interface_type = SQL_INTERFACE_TYPE_DBI;
130 m_db_name = bstrdup(db_name);
131 m_db_user = bstrdup(db_user);
133 m_db_password = bstrdup(db_password);
136 m_db_address = bstrdup(db_address);
139 m_db_socket = bstrdup(db_socket);
142 m_db_driverdir = bstrdup(db_driverdir);
144 m_db_driver = bstrdup(new_db_driver);
146 if (disable_batch_insert) {
147 m_disabled_batch_insert = true;
148 m_have_batch_insert = false;
150 m_disabled_batch_insert = false;
151 #if defined(USE_BATCH_FILE_INSERT)
152 #ifdef HAVE_DBI_BATCH_FILE_INSERT
153 m_have_batch_insert = true;
155 m_have_batch_insert = false;
156 #endif /* HAVE_DBI_BATCH_FILE_INSERT */
158 m_have_batch_insert = false;
159 #endif /* USE_BATCH_FILE_INSERT */
161 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
163 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
164 cached_path = get_pool_memory(PM_FNAME);
167 fname = get_pool_memory(PM_FNAME);
168 path = get_pool_memory(PM_FNAME);
169 esc_name = get_pool_memory(PM_FNAME);
170 esc_path = get_pool_memory(PM_FNAME);
171 esc_obj = get_pool_memory(PM_FNAME);
172 m_allow_transactions = mult_db_connections;
175 * Initialize the private members.
182 * Put the db in the list.
184 if (db_list == NULL) {
185 db_list = New(dlist(this, &this->m_link));
186 dbi_getvalue_list = New(dlist(field, &field->link));
188 db_list->append(this);
191 B_DB_DBI::~B_DB_DBI()
196 * Now actually open the database. This can generate errors,
197 * which are returned in the errmsg
199 * DO NOT close the database or delete mdb here !!!!
201 bool B_DB_DBI::db_open_database(JCR *jcr)
207 const char *dbi_errmsg;
210 char *new_db_name = NULL;
211 char *new_db_dir = NULL;
219 if ((errstat=rwl_init(&m_lock)) != 0) {
221 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
222 be.bstrerror(errstat));
227 bsnprintf(buf, sizeof(buf), "%d", m_db_port);
233 numdrivers = dbi_initialize_r(m_db_driverdir, &(m_instance));
234 if (numdrivers < 0) {
235 Mmsg2(&errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
236 "db_driverdir=%s. It is probaly not found any drivers\n"),
237 m_db_driverdir,numdrivers);
240 m_db_handle = (void **)dbi_conn_new_r(m_db_driver, m_instance);
242 * Can be many types of databases
246 dbi_conn_set_option(m_db_handle, "host", m_db_address); /* default = localhost */
247 dbi_conn_set_option(m_db_handle, "port", port); /* default port */
248 dbi_conn_set_option(m_db_handle, "username", m_db_user); /* login name */
249 dbi_conn_set_option(m_db_handle, "password", m_db_password); /* password */
250 dbi_conn_set_option(m_db_handle, "dbname", m_db_name); /* database name */
252 case SQL_TYPE_POSTGRESQL:
253 dbi_conn_set_option(m_db_handle, "host", m_db_address);
254 dbi_conn_set_option(m_db_handle, "port", port);
255 dbi_conn_set_option(m_db_handle, "username", m_db_user);
256 dbi_conn_set_option(m_db_handle, "password", m_db_password);
257 dbi_conn_set_option(m_db_handle, "dbname", m_db_name);
259 case SQL_TYPE_SQLITE3:
260 len = strlen(working_directory) + 5;
261 new_db_dir = (char *)malloc(len);
262 strcpy(new_db_dir, working_directory);
263 strcat(new_db_dir, "/");
264 len = strlen(m_db_name) + 5;
265 new_db_name = (char *)malloc(len);
266 strcpy(new_db_name, m_db_name);
267 strcat(new_db_name, ".db");
268 dbi_conn_set_option(m_db_handle, "sqlite3_dbdir", new_db_dir);
269 dbi_conn_set_option(m_db_handle, "dbname", new_db_name);
270 Dmsg2(500, "SQLITE: %s %s\n", new_db_dir, new_db_name);
277 * If connection fails, try at 5 sec intervals for 30 seconds.
279 for (int retry=0; retry < 6; retry++) {
280 dbstat = dbi_conn_connect(m_db_handle);
285 dbi_conn_error(m_db_handle, &dbi_errmsg);
286 Dmsg1(50, "dbi error: %s\n", dbi_errmsg);
292 Mmsg3(&errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
293 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
294 m_db_driver, m_db_name, m_db_user);
298 Dmsg0(50, "dbi_real_connect done\n");
299 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
300 m_db_user, m_db_name,
301 (m_db_password == NULL) ? "(NULL)" : m_db_password);
305 if (!check_tables_version(jcr, this)) {
312 * Set connection timeout to 8 days specialy for batch mode
314 sql_query("SET wait_timeout=691200");
315 sql_query("SET interactive_timeout=691200");
317 case SQL_TYPE_POSTGRESQL:
319 * Tell PostgreSQL we are using standard conforming strings
320 * and avoid warnings such as:
321 * WARNING: nonstandard use of \\ in a string literal
323 sql_query("SET datestyle TO 'ISO, YMD'");
324 sql_query("SET standard_conforming_strings=on");
335 void B_DB_DBI::db_close_database(JCR *jcr)
337 db_end_transaction(jcr);
341 if (m_ref_count == 0) {
342 db_list->remove(this);
343 if (m_connected && m_db_handle) {
344 dbi_shutdown_r(m_instance);
348 rwl_destroy(&m_lock);
349 free_pool_memory(errmsg);
350 free_pool_memory(cmd);
351 free_pool_memory(cached_path);
352 free_pool_memory(fname);
353 free_pool_memory(path);
354 free_pool_memory(esc_name);
355 free_pool_memory(esc_path);
356 free_pool_memory(esc_obj);
375 if (m_db_driverdir) {
376 free(m_db_driverdir);
379 if (db_list->size() == 0) {
387 void B_DB_DBI::db_thread_cleanup(void)
392 * Escape strings so that DBI is happy
394 * NOTE! len is the length of the old string. Your new
395 * string must be long enough (max 2*old+1) to hold
396 * the escaped output.
398 * dbi_conn_quote_string_copy receives a pointer to pointer.
399 * We need copy the value of pointer to snew because libdbi change the
402 void B_DB_DBI::db_escape_string(JCR *jcr, char *snew, char *old, int len)
411 * Correct the size of old basead in len and copy new string to inew
413 inew = (char *)malloc(sizeof(char) * len + 1);
414 bstrncpy(inew,old,len + 1);
416 * Escape the correct size of old
418 dbi_conn_escape_string_copy(m_db_handle, inew, &pnew);
421 * Copy the escaped string to snew
423 bstrncpy(snew, pnew, 2 * len + 1);
426 Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
430 * Escape binary object so that DBI is happy
431 * Memory is stored in B_DB struct, no need to free it
433 char *B_DB_DBI::db_escape_object(JCR *jcr, char *old, int len)
441 new_len = dbi_conn_escape_string_copy(m_db_handle, esc_obj, &pnew);
442 esc_obj = check_pool_memory_size(esc_obj, new_len+1);
443 memcpy(esc_obj, pnew, new_len);
450 * Unescape binary object so that DBI is happy
452 void B_DB_DBI::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
453 POOLMEM **dest, int32_t *dest_len)
460 *dest = check_pool_memory_size(*dest, expected_len+1);
461 *dest_len = expected_len;
462 memcpy(*dest, from, expected_len);
463 (*dest)[expected_len]=0;
467 * Start a transaction. This groups inserts and makes things
468 * much more efficient. Usually started when inserting
471 void B_DB_DBI::db_start_transaction(JCR *jcr)
474 jcr->attr = get_pool_memory(PM_FNAME);
477 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
481 case SQL_TYPE_SQLITE3:
482 if (!m_allow_transactions) {
488 * Allow only 10,000 changes per transaction
490 if (m_transaction && changes > 10000) {
491 db_end_transaction(jcr);
493 if (!m_transaction) {
494 sql_query("BEGIN"); /* begin transaction */
495 Dmsg0(400, "Start SQLite transaction\n");
496 m_transaction = true;
500 case SQL_TYPE_POSTGRESQL:
502 * This is turned off because transactions break
503 * if multiple simultaneous jobs are run.
505 if (!m_allow_transactions) {
511 * Allow only 25,000 changes per transaction
513 if (m_transaction && changes > 25000) {
514 db_end_transaction(jcr);
516 if (!m_transaction) {
517 sql_query("BEGIN"); /* begin transaction */
518 Dmsg0(400, "Start PosgreSQL transaction\n");
519 m_transaction = true;
523 case SQL_TYPE_INGRES:
524 if (!m_allow_transactions) {
530 * Allow only 25,000 changes per transaction
532 if (m_transaction && changes > 25000) {
533 db_end_transaction(jcr);
535 if (!m_transaction) {
536 sql_query("BEGIN"); /* begin transaction */
537 Dmsg0(400, "Start Ingres transaction\n");
538 m_transaction = true;
547 void B_DB_DBI::db_end_transaction(JCR *jcr)
549 if (jcr && jcr->cached_attribute) {
550 Dmsg0(400, "Flush last cached attribute.\n");
551 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
552 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
554 jcr->cached_attribute = false;
558 case SQL_TYPE_SQLITE3:
559 if (!m_allow_transactions) {
565 sql_query("COMMIT"); /* end transaction */
566 m_transaction = false;
567 Dmsg1(400, "End SQLite transaction changes=%d\n", changes);
572 case SQL_TYPE_POSTGRESQL:
573 if (!m_allow_transactions) {
579 sql_query("COMMIT"); /* end transaction */
580 m_transaction = false;
581 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
586 case SQL_TYPE_INGRES:
587 if (!m_allow_transactions) {
593 sql_query("COMMIT"); /* end transaction */
594 m_transaction = false;
595 Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
606 * Submit a general SQL command (cmd), and for each row returned,
607 * the result_handler is called with the ctx.
609 bool B_DB_DBI::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
614 Dmsg1(500, "db_sql_query starts with %s\n", query);
617 if (!sql_query(query, QF_STORE_RESULT)) {
618 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
619 Dmsg0(500, "db_sql_query failed\n");
624 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
626 if (result_handler != NULL) {
627 Dmsg0(500, "db_sql_query invoking handler\n");
628 while ((row = sql_fetch_row()) != NULL) {
629 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
630 if (result_handler(ctx, m_num_fields, row))
636 Dmsg0(500, "db_sql_query finished\n");
644 * Note, if this routine returns 1 (failure), Bacula expects
645 * that no result has been stored.
647 * Returns: true on success
650 bool B_DB_DBI::sql_query(const char *query, int flags)
653 const char *dbi_errmsg;
655 Dmsg1(500, "sql_query starts with %s\n", query);
658 * We are starting a new query. reset everything.
665 dbi_result_free(m_result); /* hmm, someone forgot to free?? */
669 m_result = (void **)dbi_conn_query(m_db_handle, query);
672 Dmsg2(50, "Query failed: %s %p\n", query, m_result);
676 m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
677 if (m_status == DBI_ERROR_NONE) {
678 Dmsg1(500, "we have a result\n", query);
681 * How many fields in the set?
682 * num_fields starting at 1
684 m_num_fields = dbi_result_get_numfields(m_result);
685 Dmsg1(500, "we have %d fields\n", m_num_fields);
687 * If no result num_rows is 0
689 m_num_rows = dbi_result_get_numrows(m_result);
690 Dmsg1(500, "we have %d rows\n", m_num_rows);
692 m_status = (dbi_error_flag) 0; /* succeed */
694 Dmsg1(50, "Result status failed: %s\n", query);
698 Dmsg0(500, "sql_query finishing\n");
703 m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
704 //dbi_conn_error(m_db_handle, &dbi_errmsg);
705 Dmsg4(500, "sql_query we failed dbi error: "
706 "'%s' '%p' '%d' flag '%d''\n", dbi_errmsg, m_result, m_result, m_status);
707 dbi_result_free(m_result);
709 m_status = (dbi_error_flag) 1; /* failed */
715 void B_DB_DBI::sql_free_result(void)
721 dbi_result_free(m_result);
729 * Now is time to free all value return by dbi_get_value
730 * this is necessary because libdbi don't free memory return by yours results
731 * and Bacula has some routine wich call more than once time sql_fetch_row
733 * Using a queue to store all pointer allocate is a good way to free all things
736 foreach_dlist(f, dbi_getvalue_list) {
744 m_num_rows = m_num_fields = 0;
750 * char *PQgetvalue(const PGresult *res,
752 * int column_number);
754 * use dbi_result_seek_row to search in result set
755 * use example to return only strings
757 static char *dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number)
760 const char *dbi_errmsg;
761 const char *field_name;
762 unsigned short dbitype;
766 /* correct the index for dbi interface
768 * I prefer do not change others functions
770 Dmsg3(600, "dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
771 result, row_number, column_number);
775 if(row_number == 0) {
779 Dmsg3(600, "dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
780 result, row_number, column_number);
782 if(dbi_result_seek_row(result, row_number)) {
784 field_name = dbi_result_get_field_name(result, column_number);
785 field_length = dbi_result_get_field_length(result, field_name);
786 dbitype = dbi_result_get_field_type_idx(result,column_number);
788 Dmsg3(500, "dbi_getvalue start: type: '%d' "
789 "field_length bytes: '%d' fieldname: '%s'\n",
790 dbitype, field_length, field_name);
793 //buf = (char *)malloc(sizeof(char *) * field_length + 1);
794 buf = (char *)malloc(field_length + 1);
799 buf = (char *)malloc(sizeof(char *) * 50);
803 case DBI_TYPE_INTEGER:
804 num = dbi_result_get_longlong(result, field_name);
805 edit_int64(num, buf);
806 field_length = strlen(buf);
808 case DBI_TYPE_STRING:
810 field_length = bsnprintf(buf, field_length + 1, "%s",
811 dbi_result_get_string(result, field_name));
816 case DBI_TYPE_BINARY:
818 * dbi_result_get_binary return a NULL pointer if value is empty
819 * following, change this to what Bacula espected
822 field_length = bsnprintf(buf, field_length + 1, "%s",
823 dbi_result_get_binary(result, field_name));
828 case DBI_TYPE_DATETIME:
832 last = dbi_result_get_datetime(result, field_name);
835 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
837 (void)localtime_r(&last, &tm);
838 field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
839 (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
840 tm.tm_hour, tm.tm_min, tm.tm_sec);
846 dbi_conn_error(dbi_result_get_conn(result), &dbi_errmsg);
847 Dmsg1(500, "dbi_getvalue error: %s\n", dbi_errmsg);
850 Dmsg3(500, "dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
851 buf, field_length, buf);
854 * Don't worry about this buf
859 SQL_ROW B_DB_DBI::sql_fetch_row(void)
862 SQL_ROW row = NULL; /* by default, return NULL */
864 Dmsg0(500, "sql_fetch_row start\n");
865 if ((!m_rows || m_rows_size < m_num_fields) && m_num_rows > 0) {
867 Dmsg0(500, "sql_fetch_row freeing space\n");
868 Dmsg2(500, "sql_fetch_row row: '%p' num_fields: '%d'\n", m_rows, m_num_fields);
869 if (m_num_rows != 0) {
870 for (j = 0; j < m_num_fields; j++) {
871 Dmsg2(500, "sql_fetch_row row '%p' '%d'\n", m_rows[j], j);
879 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
880 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
881 m_rows_size = m_num_fields;
884 * Now reset the row_number now that we have the space allocated
890 * If still within the result set
892 if (m_row_number <= m_num_rows && m_row_number != DBI_ERROR_BADPTR) {
893 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (1..%d)\n", m_row_number, m_num_rows);
895 * Get each value from this row
897 for (j = 0; j < m_num_fields; j++) {
898 m_rows[j] = dbi_getvalue(m_result, m_row_number, j);
900 * Allocate space to queue row
902 m_field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
904 * Store the pointer in queue
906 m_field_get->value = m_rows[j];
907 Dmsg4(500, "sql_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
908 j, m_rows[j], m_field_get->value, m_rows[j]);
910 * Insert in queue to future free
912 dbi_getvalue_list->append(m_field_get);
915 * Increment the row number for the next call
921 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (1..%d)\n", m_row_number, m_num_rows);
924 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
929 const char *B_DB_DBI::sql_strerror(void)
931 const char *dbi_errmsg;
933 dbi_conn_error(m_db_handle, &dbi_errmsg);
938 void B_DB_DBI::sql_data_seek(int row)
941 * Set the row number to be returned on the next call to sql_fetch_row
946 int B_DB_DBI::sql_affected_rows(void)
949 return dbi_result_get_numrows_affected(result);
955 uint64_t B_DB_DBI::sql_insert_autokey_record(const char *query, const char *table_name)
961 * First execute the insert query and then retrieve the currval.
963 if (!sql_query(query)) {
967 m_num_rows = sql_affected_rows();
968 if (m_num_rows != 1) {
975 * Obtain the current value of the sequence that
976 * provides the serial value for primary key of the table.
978 * currval is local to our session. It is not affected by
979 * other transactions.
981 * Determine the name of the sequence.
982 * PostgreSQL automatically creates a sequence using
983 * <table>_<column>_seq.
984 * At the time of writing, all tables used this format for
985 * for their primary key: <table>id
986 * Except for basefiles which has a primary key on baseid.
987 * Therefore, we need to special case that one table.
989 * everything else can use the PostgreSQL formula.
991 if (m_db_type == SQL_TYPE_POSTGRESQL) {
992 if (strcasecmp(table_name, "basefiles") == 0) {
993 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
995 bstrncpy(sequence, table_name, sizeof(sequence));
996 bstrncat(sequence, "_", sizeof(sequence));
997 bstrncat(sequence, table_name, sizeof(sequence));
998 bstrncat(sequence, "id", sizeof(sequence));
1001 bstrncat(sequence, "_seq", sizeof(sequence));
1002 id = dbi_conn_sequence_last(m_db_handle, NT_(sequence));
1004 id = dbi_conn_sequence_last(m_db_handle, NT_(table_name));
1012 * int PQgetisnull(const PGresult *res,
1014 * int column_number);
1016 * use dbi_result_seek_row to search in result set
1018 static int dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1021 if (row_number == 0) {
1027 if (dbi_result_seek_row(result, row_number)) {
1028 i = dbi_result_field_is_null_idx(result,column_number);
1035 SQL_FIELD *B_DB_DBI::sql_fetch_field(void)
1043 Dmsg0(500, "sql_fetch_field starts\n");
1045 if (!m_fields || m_fields_size < m_num_fields) {
1050 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
1051 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
1052 m_fields_size = m_num_fields;
1054 for (i = 0; i < m_num_fields; i++) {
1056 * num_fields is starting at 1, increment i by 1
1059 Dmsg1(500, "filling field %d\n", i);
1060 m_fields[i].name = (char *)dbi_result_get_field_name(m_result, dbi_index);
1061 m_fields[i].type = dbi_result_get_field_type_idx(m_result, dbi_index);
1062 m_fields[i].flags = dbi_result_get_field_attribs_idx(m_result, dbi_index);
1065 * For a given column, find the max length.
1068 for (j = 0; j < m_num_rows; j++) {
1069 if (dbi_getisnull(m_result, j, dbi_index)) {
1070 this_length = 4; /* "NULL" */
1072 cbuf = dbi_getvalue(m_result, j, dbi_index);
1073 this_length = cstrlen(cbuf);
1075 * cbuf is always free
1080 if (max_length < this_length) {
1081 max_length = this_length;
1084 m_fields[i].max_length = max_length;
1086 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
1087 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
1092 * Increment field number for the next time around
1094 return &m_fields[m_field_number++];
1097 bool B_DB_DBI::sql_field_is_not_null(int field_type)
1099 switch (field_type) {
1107 bool B_DB_DBI::sql_field_is_numeric(int field_type)
1109 switch (field_type) {
1119 * Escape strings so that PostgreSQL is happy on COPY
1121 * NOTE! len is the length of the old string. Your new
1122 * string must be long enough (max 2*old+1) to hold
1123 * the escaped output.
1125 static char *postgresql_copy_escape(char *dest, char *src, size_t len)
1128 * We have to escape \t, \n, \r, \
1132 while (len > 0 && *src) {
1168 * This can be a bit strang but is the one way to do
1170 * Returns true if OK
1173 bool B_DB_DBI::sql_batch_start(JCR *jcr)
1176 const char *query = "COPY batch FROM STDIN";
1178 Dmsg0(500, "sql_batch_start started\n");
1181 switch (m_db_type) {
1182 case SQL_TYPE_MYSQL:
1183 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1184 "FileIndex integer,"
1190 "DeltaSeq smallint)")) {
1191 Dmsg0(500, "sql_batch_start failed\n");
1194 Dmsg0(500, "sql_batch_start finishing\n");
1196 case SQL_TYPE_POSTGRESQL:
1197 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1205 Dmsg0(500, "sql_batch_start failed\n");
1210 * We are starting a new query. reset everything.
1214 m_field_number = -1;
1218 for (int i=0; i < 10; i++) {
1226 Dmsg1(50, "Query failed: %s\n", query);
1230 m_status = (dbi_error_flag)dbi_conn_error(m_db_handle, NULL);
1231 //m_status = DBI_ERROR_NONE;
1233 if (m_status == DBI_ERROR_NONE) {
1235 * How many fields in the set?
1237 m_num_fields = dbi_result_get_numfields(m_result);
1238 m_num_rows = dbi_result_get_numrows(m_result);
1239 m_status = (dbi_error_flag) 1;
1241 Dmsg1(50, "Result status failed: %s\n", query);
1245 Dmsg0(500, "sql_batch_start finishing\n");
1247 case SQL_TYPE_SQLITE3:
1248 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1249 "FileIndex integer,"
1255 "DeltaSeq smallint)")) {
1256 Dmsg0(500, "sql_batch_start failed\n");
1259 Dmsg0(500, "sql_batch_start finishing\n");
1264 Mmsg1(&errmsg, _("error starting batch mode: %s"), sql_strerror());
1265 m_status = (dbi_error_flag) 0;
1276 * Set error to something to abort operation
1278 bool B_DB_DBI::sql_batch_end(JCR *jcr, const char *error)
1282 int (*custom_function)(void*, const char*) = NULL;
1283 dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1285 Dmsg0(500, "sql_batch_start started\n");
1287 switch (m_db_type) {
1288 case SQL_TYPE_MYSQL:
1289 m_status = (dbi_error_flag) 0;
1291 case SQL_TYPE_POSTGRESQL:
1292 custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQputCopyEnd");
1295 res = (*custom_function)(myconn->connection, error);
1296 } while (res == 0 && --count > 0);
1300 m_status = (dbi_error_flag) 1;
1304 Dmsg0(500, "we failed\n");
1305 m_status = (dbi_error_flag) 0;
1306 //Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(myconn));
1309 case SQL_TYPE_SQLITE3:
1310 m_status = (dbi_error_flag) 0;
1314 Dmsg0(500, "sql_batch_start finishing\n");
1320 * This function is big and use a big switch.
1321 * In near future is better split in small functions
1324 bool B_DB_DBI::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1328 dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1329 int (*custom_function)(void*, const char*, int) = NULL;
1330 char* (*custom_function_error)(void*) = NULL;
1335 Dmsg0(500, "sql_batch_start started \n");
1337 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1338 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1340 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1343 digest = ar->Digest;
1346 switch (m_db_type) {
1347 case SQL_TYPE_MYSQL:
1348 db_escape_string(jcr, esc_name, fname, fnl);
1349 db_escape_string(jcr, esc_path, path, pnl);
1350 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1351 "(%u,%s,'%s','%s','%s','%s',%u)",
1352 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1353 esc_name, ar->attr, digest, ar->DeltaSeq);
1355 if (!sql_query(cmd))
1357 Dmsg0(500, "sql_batch_start failed\n");
1361 Dmsg0(500, "sql_batch_start finishing\n");
1365 case SQL_TYPE_POSTGRESQL:
1366 postgresql_copy_escape(esc_name, fname, fnl);
1367 postgresql_copy_escape(esc_path, path, pnl);
1368 len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1369 ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1370 esc_name, ar->attr, digest, ar->DeltaSeq);
1373 * libdbi don't support CopyData and we need call a postgresql
1374 * specific function to do this work
1376 Dmsg2(500, "sql_batch_insert :\n %s \ncmd_size: %d",cmd, len);
1377 custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn),"PQputCopyData");
1378 if (custom_function != NULL) {
1380 res = (*custom_function)(myconn->connection, cmd, len);
1381 } while (res == 0 && --count > 0);
1386 m_status = (dbi_error_flag) 1;
1390 Dmsg0(500, "sql_batch_insert failed\n");
1394 Dmsg0(500, "sql_batch_insert finishing\n");
1398 * Ensure to detect a PQerror
1400 custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQerrorMessage");
1401 Dmsg1(500, "sql_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1405 case SQL_TYPE_SQLITE3:
1406 db_escape_string(jcr, esc_name, fname, fnl);
1407 db_escape_string(jcr, esc_path, path, pnl);
1408 len = Mmsg(cmd, "INSERT INTO batch VALUES "
1409 "(%u,%s,'%s','%s','%s','%s',%u)",
1410 ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path,
1411 esc_name, ar->attr, digest, ar->DeltaSeq);
1413 if (!sql_query(cmd))
1415 Dmsg0(500, "sql_batch_insert failed\n");
1419 Dmsg0(500, "sql_batch_insert finishing\n");
1426 Mmsg1(&errmsg, _("error inserting batch mode: %s"), sql_strerror());
1427 m_status = (dbi_error_flag) 0;
1433 * Initialize database data structure. In principal this should
1434 * never have errors, or it is really fatal.
1436 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1437 const char *db_password, const char *db_address, int db_port,
1438 const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1440 B_DB_DBI *mdb = NULL;
1443 Jmsg(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
1446 if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
1447 Jmsg(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
1451 Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
1455 P(mutex); /* lock DB queue */
1456 if (db_list && !mult_db_connections) {
1458 * Look to see if DB already open
1460 foreach_dlist(mdb, db_list) {
1461 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1462 Dmsg1(100, "DB REopen %s\n", db_name);
1463 mdb->increment_refcount();
1468 Dmsg0(100, "db_init_database first time\n");
1469 mdb = New(B_DB_DBI(jcr, db_driver, db_name, db_user, db_password, db_address,
1470 db_port, db_socket, mult_db_connections, disable_batch_insert));
1477 #endif /* HAVE_DBI */