2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to DBI
30 * These are DBI specific routines
32 * João Henrique Freitas, December 2007
33 * based upon work done by Dan Langille, December 2003 and
34 * by Kern Sibbald, March 2000
38 * This code only compiles against a recent version of libdbi. The current
39 * release found on the libdbi website (0.8.3) won't work for this code.
41 * You find the libdbi library on http://sourceforge.net/projects/libdbi
43 * A fairly recent version of libdbi from CVS works, so either make sure
44 * your distribution has a fairly recent version of libdbi installed or
45 * clone the CVS repositories from sourceforge and compile that code and
49 * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
50 * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
54 /* The following is necessary so that we do not include
55 * the dummy external definition of DB.
57 #define __SQL_C /* indicate that this is sql.c */
64 /* -----------------------------------------------------------------------
66 * DBI dependent defines and subroutines
68 * -----------------------------------------------------------------------
71 /* List of open databases */
72 static dlist *db_list = NULL;
74 /* Control allocated fields by my_dbi_getvalue */
75 static dlist *dbi_getvalue_list = NULL;
77 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
80 * Retrieve database type
89 * Initialize database data structure. In principal this should
90 * never have errors, or it is really fatal.
93 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
94 const char *db_address, int db_port, const char *db_socket,
95 int mult_db_connections)
100 char db_driverdir[256];
102 /* Constraint the db_driver */
104 Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
108 /* Do the correct selection of driver.
109 * Can be one of the varius supported by libdbi
113 bstrncpy(db_driver,"mysql", sizeof(db_driver));
115 case SQL_TYPE_POSTGRESQL:
116 bstrncpy(db_driver,"pgsql", sizeof(db_driver));
118 case SQL_TYPE_SQLITE:
119 bstrncpy(db_driver,"sqlite", sizeof(db_driver));
121 case SQL_TYPE_SQLITE3:
122 bstrncpy(db_driver,"sqlite3", sizeof(db_driver));
126 /* Set db_driverdir whereis is the libdbi drivers */
127 bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
130 Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
133 P(mutex); /* lock DB queue */
134 if (db_list == NULL) {
135 db_list = New(dlist(mdb, &mdb->link));
136 dbi_getvalue_list = New(dlist(field, &field->link));
138 if (!mult_db_connections) {
139 /* Look to see if DB already open */
140 foreach_dlist(mdb, db_list) {
141 if (bstrcmp(mdb->db_name, db_name) &&
142 bstrcmp(mdb->db_address, db_address) &&
143 bstrcmp(mdb->db_driver, db_driver) &&
144 mdb->db_port == db_port) {
145 Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name,
146 dbi_conn_error(mdb->db, NULL));
149 return mdb; /* already open */
153 Dmsg0(100, "db_open first time\n");
154 mdb = (B_DB *)malloc(sizeof(B_DB));
155 memset(mdb, 0, sizeof(B_DB));
156 mdb->db_name = bstrdup(db_name);
157 mdb->db_user = bstrdup(db_user);
159 mdb->db_password = bstrdup(db_password);
162 mdb->db_address = bstrdup(db_address);
165 mdb->db_socket = bstrdup(db_socket);
168 mdb->db_driverdir = bstrdup(db_driverdir);
171 mdb->db_driver = bstrdup(db_driver);
173 mdb->db_type = db_type;
174 mdb->db_port = db_port;
175 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
177 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
178 mdb->cached_path = get_pool_memory(PM_FNAME);
179 mdb->cached_path_id = 0;
181 mdb->fname = get_pool_memory(PM_FNAME);
182 mdb->path = get_pool_memory(PM_FNAME);
183 mdb->esc_name = get_pool_memory(PM_FNAME);
184 mdb->esc_path = get_pool_memory(PM_FNAME);
185 mdb->allow_transactions = mult_db_connections;
186 db_list->append(mdb); /* put db in list */
192 * Now actually open the database. This can generate errors,
193 * which are returned in the errmsg
195 * DO NOT close the database or free(mdb) here !!!!
198 db_open_database(JCR *jcr, B_DB *mdb)
206 char *db_name = NULL;
210 if (mdb->connected) {
214 mdb->connected = false;
216 if ((errstat=rwl_init(&mdb->lock)) != 0) {
218 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
219 be.bstrerror(errstat));
225 bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
231 numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
232 if (numdrivers < 0) {
233 Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
234 "db_driverdir=%s. It is probaly not found any drivers\n"),
235 mdb->db_driverdir,numdrivers);
239 mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
240 /* Can be many types of databases */
241 switch (mdb->db_type) {
243 dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
244 dbi_conn_set_option(mdb->db, "port", port); /* default port */
245 dbi_conn_set_option(mdb->db, "username", mdb->db_user); /* login name */
246 dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
247 dbi_conn_set_option(mdb->db, "dbname", mdb->db_name); /* database name */
249 case SQL_TYPE_POSTGRESQL:
250 dbi_conn_set_option(mdb->db, "host", mdb->db_address);
251 dbi_conn_set_option(mdb->db, "port", port);
252 dbi_conn_set_option(mdb->db, "username", mdb->db_user);
253 dbi_conn_set_option(mdb->db, "password", mdb->db_password);
254 dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);
256 case SQL_TYPE_SQLITE:
257 len = strlen(working_directory) + 5;
258 db_dir = (char *)malloc(len);
259 strcpy(db_dir, working_directory);
261 len = strlen(mdb->db_name) + 5;
262 db_name = (char *)malloc(len);
263 strcpy(db_name, mdb->db_name);
264 strcat(db_name, ".db");
265 dbi_conn_set_option(mdb->db, "sqlite_dbdir", db_dir);
266 dbi_conn_set_option(mdb->db, "dbname", db_name);
268 case SQL_TYPE_SQLITE3:
269 len = strlen(working_directory) + 5;
270 db_dir = (char *)malloc(len);
271 strcpy(db_dir, working_directory);
273 len = strlen(mdb->db_name) + 5;
274 db_name = (char *)malloc(len);
275 strcpy(db_name, mdb->db_name);
276 strcat(db_name, ".db");
277 dbi_conn_set_option(mdb->db, "sqlite3_dbdir", db_dir);
278 dbi_conn_set_option(mdb->db, "dbname", db_name);
279 Dmsg2(500, "SQLITE: %s %s\n", db_dir, db_name);
283 /* If connection fails, try at 5 sec intervals for 30 seconds. */
284 for (int retry=0; retry < 6; retry++) {
286 dbstat = dbi_conn_connect(mdb->db);
291 dbi_conn_error(mdb->db, &errmsg);
292 Dmsg1(50, "dbi error: %s\n", errmsg);
299 Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
300 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
301 mdb->db_driver, mdb->db_name, mdb->db_user);
306 Dmsg0(50, "dbi_real_connect done\n");
307 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
308 mdb->db_user, mdb->db_name,
309 mdb->db_password==NULL?"(NULL)":mdb->db_password);
311 mdb->connected = true;
313 if (!check_tables_version(jcr, mdb)) {
318 switch (mdb->db_type) {
320 /* Set connection timeout to 8 days specialy for batch mode */
321 sql_query(mdb, "SET wait_timeout=691200");
322 sql_query(mdb, "SET interactive_timeout=691200");
324 case SQL_TYPE_POSTGRESQL:
325 /* tell PostgreSQL we are using standard conforming strings
326 and avoid warnings such as:
327 WARNING: nonstandard use of \\ in a string literal
329 sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
330 sql_query(mdb, "set standard_conforming_strings=on");
346 db_close_database(JCR *jcr, B_DB *mdb)
351 db_end_transaction(jcr, mdb);
353 sql_free_result(mdb);
355 if (mdb->ref_count == 0) {
356 db_list->remove(mdb);
357 if (mdb->connected && mdb->db) {
359 dbi_shutdown_r(mdb->instance);
361 mdb->instance = NULL;
363 rwl_destroy(&mdb->lock);
364 free_pool_memory(mdb->errmsg);
365 free_pool_memory(mdb->cmd);
366 free_pool_memory(mdb->cached_path);
367 free_pool_memory(mdb->fname);
368 free_pool_memory(mdb->path);
369 free_pool_memory(mdb->esc_name);
370 free_pool_memory(mdb->esc_path);
377 if (mdb->db_password) {
378 free(mdb->db_password);
380 if (mdb->db_address) {
381 free(mdb->db_address);
383 if (mdb->db_socket) {
384 free(mdb->db_socket);
386 if (mdb->db_driverdir) {
387 free(mdb->db_driverdir);
389 if (mdb->db_driver) {
390 free(mdb->db_driver);
393 if (db_list->size() == 0) {
401 void db_check_backend_thread_safe()
404 void db_thread_cleanup()
408 * Return the next unique index (auto-increment) for
409 * the given table. Return NULL on error.
412 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
414 strcpy(index, "NULL");
420 * Escape strings so that DBI is happy
422 * NOTE! len is the length of the old string. Your new
423 * string must be long enough (max 2*old+1) to hold
424 * the escaped output.
426 * dbi_conn_quote_string_copy receives a pointer to pointer.
427 * We need copy the value of pointer to snew because libdbi change the
431 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
439 /* correct the size of old basead in len
440 * and copy new string to inew
442 inew = (char *)malloc(sizeof(char) * len + 1);
443 bstrncpy(inew,old,len + 1);
444 /* escape the correct size of old */
445 dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
447 /* copy the escaped string to snew */
448 bstrncpy(snew, pnew, 2 * len + 1);
451 Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
456 * Submit a general SQL command (cmd), and for each row returned,
457 * the sqlite_handler is called with the ctx.
459 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
463 Dmsg0(500, "db_sql_query started\n");
466 if (sql_query(mdb, query) != 0) {
467 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
469 Dmsg0(500, "db_sql_query failed\n");
472 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
474 if (result_handler != NULL) {
475 Dmsg0(500, "db_sql_query invoking handler\n");
476 if ((mdb->result = sql_store_result(mdb)) != NULL) {
477 int num_fields = sql_num_fields(mdb);
479 Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
480 while ((row = sql_fetch_row(mdb)) != NULL) {
482 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
483 if (result_handler(ctx, num_fields, row))
487 sql_free_result(mdb);
492 Dmsg0(500, "db_sql_query finished\n");
499 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
502 DBI_ROW row = NULL; // by default, return NULL
504 Dmsg0(500, "my_dbi_fetch_row start\n");
505 if ((!mdb->row || mdb->row_size < mdb->num_fields) && mdb->num_rows > 0) {
506 int num_fields = mdb->num_fields;
507 Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
510 Dmsg0(500, "my_dbi_fetch_row freeing space\n");
511 Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
512 if (mdb->num_rows != 0) {
513 for(j = 0; j < mdb->num_fields; j++) {
514 Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
522 //num_fields += 20; /* add a bit extra */
523 mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
524 mdb->row_size = num_fields;
526 // now reset the row_number now that we have the space allocated
530 // if still within the result set
531 if (mdb->row_number <= mdb->num_rows && mdb->row_number != DBI_ERROR_BADPTR) {
532 Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
533 // get each value from this row
534 for (j = 0; j < mdb->num_fields; j++) {
535 mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
536 // allocate space to queue row
537 mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
538 // store the pointer in queue
539 mdb->field_get->value = mdb->row[j];
540 Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
541 j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
542 // insert in queue to future free
543 dbi_getvalue_list->append(mdb->field_get);
545 // increment the row number for the next call
550 Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
553 Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
558 int my_dbi_max_length(B_DB *mdb, int field_num) {
560 // for a given column, find the max length
568 for (i = 0; i < mdb->num_rows; i++) {
569 if (my_dbi_getisnull(mdb->result, i, field_num)) {
570 this_length = 4; // "NULL"
572 cbuf = my_dbi_getvalue(mdb->result, i, field_num);
573 this_length = cstrlen(cbuf);
574 // cbuf is always free
578 if (max_length < this_length) {
579 max_length = this_length;
586 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
591 Dmsg0(500, "my_dbi_fetch_field starts\n");
593 if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
597 Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
598 mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
599 mdb->fields_size = mdb->num_fields;
601 for (i = 0; i < mdb->num_fields; i++) {
602 // num_fileds is starting at 1, increment i by 1
604 Dmsg1(500, "filling field %d\n", i);
605 mdb->fields[i].name = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
606 mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
607 mdb->fields[i].type = dbi_result_get_field_type_idx(mdb->result, dbi_index);
608 mdb->fields[i].flags = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
610 Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
611 mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
612 mdb->fields[i].flags);
616 // increment field number for the next time around
618 Dmsg0(500, "my_dbi_fetch_field finishes\n");
619 return &mdb->fields[mdb->field_number++];
622 void my_dbi_data_seek(B_DB *mdb, int row)
624 // set the row number to be returned on the next call
625 // to my_dbi_fetch_row
626 mdb->row_number = row;
629 void my_dbi_field_seek(B_DB *mdb, int field)
631 mdb->field_number = field;
635 * Note, if this routine returns 1 (failure), Bacula expects
636 * that no result has been stored.
638 * Returns: 0 on success
642 int my_dbi_query(B_DB *mdb, const char *query)
645 Dmsg1(500, "my_dbi_query started %s\n", query);
646 // We are starting a new query. reset everything.
648 mdb->row_number = -1;
649 mdb->field_number = -1;
652 dbi_result_free(mdb->result); /* hmm, someone forgot to free?? */
656 mdb->result = (void **)dbi_conn_query(mdb->db, query);
659 Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);
663 mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
665 if (mdb->status == DBI_ERROR_NONE) {
666 Dmsg1(500, "we have a result\n", query);
668 // how many fields in the set?
669 // num_fields starting at 1
670 mdb->num_fields = dbi_result_get_numfields(mdb->result);
671 Dmsg1(500, "we have %d fields\n", mdb->num_fields);
672 // if no result num_rows is 0
673 mdb->num_rows = dbi_result_get_numrows(mdb->result);
674 Dmsg1(500, "we have %d rows\n", mdb->num_rows);
676 mdb->status = (dbi_error_flag) 0; /* succeed */
678 Dmsg1(50, "Result status failed: %s\n", query);
682 Dmsg0(500, "my_dbi_query finishing\n");
686 mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
687 //dbi_conn_error(mdb->db, &errmsg);
688 Dmsg4(500, "my_dbi_query we failed dbi error: "
689 "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
690 dbi_result_free(mdb->result);
692 mdb->status = (dbi_error_flag) 1; /* failed */
696 void my_dbi_free_result(B_DB *mdb)
702 Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
703 dbi_result_free(mdb->result);
712 /* now is time to free all value return by my_dbi_get_value
713 * this is necessary because libdbi don't free memory return by yours results
714 * and Bacula has some routine wich call more than once time my_dbi_fetch_row
716 * Using a queue to store all pointer allocate is a good way to free all things
719 foreach_dlist(f, dbi_getvalue_list) {
720 Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
732 Dmsg0(500, "my_dbi_free_result finish\n");
736 const char *my_dbi_strerror(B_DB *mdb)
740 dbi_conn_error(mdb->db, &errmsg);
745 #ifdef HAVE_BATCH_FILE_INSERT
748 * This can be a bit strang but is the one way to do
753 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
755 const char *query = "COPY batch FROM STDIN";
757 Dmsg0(500, "my_dbi_batch_start started\n");
759 switch (mdb->db_type) {
762 if (my_dbi_query(mdb,
763 "CREATE TEMPORARY TABLE batch ("
769 "MD5 tinyblob)") == 1)
771 Dmsg0(500, "my_dbi_batch_start failed\n");
775 Dmsg0(500, "my_dbi_batch_start finishing\n");
778 case SQL_TYPE_POSTGRESQL:
780 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
786 "md5 varchar)") == 1)
788 Dmsg0(500, "my_dbi_batch_start failed\n");
792 // We are starting a new query. reset everything.
794 mdb->row_number = -1;
795 mdb->field_number = -1;
797 my_dbi_free_result(mdb);
799 for (int i=0; i < 10; i++) {
800 my_dbi_query(mdb, query);
807 Dmsg1(50, "Query failed: %s\n", query);
811 mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
812 //mdb->status = DBI_ERROR_NONE;
814 if (mdb->status == DBI_ERROR_NONE) {
815 // how many fields in the set?
816 mdb->num_fields = dbi_result_get_numfields(mdb->result);
817 mdb->num_rows = dbi_result_get_numrows(mdb->result);
818 mdb->status = (dbi_error_flag) 1;
820 Dmsg1(50, "Result status failed: %s\n", query);
824 Dmsg0(500, "my_postgresql_batch_start finishing\n");
828 case SQL_TYPE_SQLITE:
830 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
836 "MD5 tinyblob)") == 1)
838 Dmsg0(500, "my_dbi_batch_start failed\n");
842 Dmsg0(500, "my_dbi_batch_start finishing\n");
845 case SQL_TYPE_SQLITE3:
847 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
853 "MD5 tinyblob)") == 1)
855 Dmsg0(500, "my_dbi_batch_start failed\n");
859 Dmsg0(500, "my_dbi_batch_start finishing\n");
865 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
866 mdb->status = (dbi_error_flag) 0;
867 my_dbi_free_result(mdb);
872 /* set error to something to abort operation */
873 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
877 int (*custom_function)(void*, const char*) = NULL;
878 dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
880 Dmsg0(500, "my_dbi_batch_end started\n");
882 if (!mdb) { /* no files ? */
886 switch (mdb->db_type) {
889 mdb->status = (dbi_error_flag) 0;
892 case SQL_TYPE_POSTGRESQL:
893 custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
897 res = (*custom_function)(myconn->connection, error);
898 } while (res == 0 && --count > 0);
902 mdb->status = (dbi_error_flag) 1;
906 Dmsg0(500, "we failed\n");
907 mdb->status = (dbi_error_flag) 0;
908 //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
911 case SQL_TYPE_SQLITE:
913 mdb->status = (dbi_error_flag) 0;
916 case SQL_TYPE_SQLITE3:
918 mdb->status = (dbi_error_flag) 0;
923 Dmsg0(500, "my_dbi_batch_end finishing\n");
929 * This function is big and use a big switch.
930 * In near future is better split in small functions
934 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
938 dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
939 int (*custom_function)(void*, const char*, int) = NULL;
940 char* (*custom_function_error)(void*) = NULL;
945 Dmsg0(500, "my_dbi_batch_insert started \n");
947 mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
948 mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
950 if (ar->Digest == NULL || ar->Digest[0] == 0) {
956 switch (mdb->db_type) {
958 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
959 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
960 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
961 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
962 mdb->esc_name, ar->attr, digest);
964 if (my_dbi_query(mdb,mdb->cmd) == 1)
966 Dmsg0(500, "my_dbi_batch_insert failed\n");
970 Dmsg0(500, "my_dbi_batch_insert finishing\n");
974 case SQL_TYPE_POSTGRESQL:
975 my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
976 my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
977 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
978 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
979 mdb->esc_name, ar->attr, digest);
981 /* libdbi don't support CopyData and we need call a postgresql
982 * specific function to do this work
984 Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
985 if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
986 "PQputCopyData")) != NULL) {
988 res = (*custom_function)(myconn->connection, mdb->cmd, len);
989 } while (res == 0 && --count > 0);
994 mdb->status = (dbi_error_flag) 1;
998 Dmsg0(500, "my_dbi_batch_insert failed\n");
1002 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1005 // ensure to detect a PQerror
1006 custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
1007 Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1011 case SQL_TYPE_SQLITE:
1012 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1013 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1014 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1015 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1016 mdb->esc_name, ar->attr, digest);
1017 if (my_dbi_query(mdb,mdb->cmd) == 1)
1019 Dmsg0(500, "my_dbi_batch_insert failed\n");
1023 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1027 case SQL_TYPE_SQLITE3:
1028 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1029 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1030 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1031 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1032 mdb->esc_name, ar->attr, digest);
1033 if (my_dbi_query(mdb,mdb->cmd) == 1)
1035 Dmsg0(500, "my_dbi_batch_insert failed\n");
1039 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1046 Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
1047 mdb->status = (dbi_error_flag) 0;
1048 my_dbi_free_result(mdb);
1053 * Escape strings so that PostgreSQL is happy on COPY
1055 * NOTE! len is the length of the old string. Your new
1056 * string must be long enough (max 2*old+1) to hold
1057 * the escaped output.
1059 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
1061 /* we have to escape \t, \n, \r, \ */
1064 while (len > 0 && *src) {
1099 #endif /* HAVE_BATCH_FILE_INSERT */
1103 * int PQgetisnull(const PGresult *res,
1105 * int column_number);
1107 * use dbi_result_seek_row to search in result set
1109 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1112 if(row_number == 0) {
1118 if(dbi_result_seek_row(result, row_number)) {
1120 i = dbi_result_field_is_null_idx(result,column_number);
1131 * char *PQgetvalue(const PGresult *res,
1133 * int column_number);
1135 * use dbi_result_seek_row to search in result set
1136 * use example to return only strings
1138 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1142 const char *field_name;
1143 unsigned short dbitype;
1144 size_t field_length;
1147 /* correct the index for dbi interface
1148 * dbi index begins 1
1149 * I prefer do not change others functions
1151 Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
1152 result, row_number, column_number);
1156 if(row_number == 0) {
1160 Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
1161 result, row_number, column_number);
1163 if(dbi_result_seek_row(result, row_number)) {
1165 field_name = dbi_result_get_field_name(result, column_number);
1166 field_length = dbi_result_get_field_length(result, field_name);
1167 dbitype = dbi_result_get_field_type_idx(result,column_number);
1169 Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
1170 "field_length bytes: '%d' fieldname: '%s'\n",
1171 dbitype, field_length, field_name);
1174 //buf = (char *)malloc(sizeof(char *) * field_length + 1);
1175 buf = (char *)malloc(field_length + 1);
1178 buf = (char *)malloc(sizeof(char *) * 50);
1182 case DBI_TYPE_INTEGER:
1183 num = dbi_result_get_longlong(result, field_name);
1184 edit_int64(num, buf);
1185 field_length = strlen(buf);
1187 case DBI_TYPE_STRING:
1189 field_length = bsnprintf(buf, field_length + 1, "%s",
1190 dbi_result_get_string(result, field_name));
1195 case DBI_TYPE_BINARY:
1196 /* dbi_result_get_binary return a NULL pointer if value is empty
1197 * following, change this to what Bacula espected
1200 field_length = bsnprintf(buf, field_length + 1, "%s",
1201 dbi_result_get_binary(result, field_name));
1206 case DBI_TYPE_DATETIME:
1210 last = dbi_result_get_datetime(result, field_name);
1213 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
1215 (void)localtime_r(&last, &tm);
1216 field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1217 (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1218 tm.tm_hour, tm.tm_min, tm.tm_sec);
1224 dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1225 Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1228 Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
1229 buf, field_length, buf);
1231 // don't worry about this buf
1235 static uint64_t my_dbi_sequence_last(B_DB *mdb, const char *table_name)
1238 Obtain the current value of the sequence that
1239 provides the serial value for primary key of the table.
1241 currval is local to our session. It is not affected by
1244 Determine the name of the sequence.
1245 PostgreSQL automatically creates a sequence using
1246 <table>_<column>_seq.
1247 At the time of writing, all tables used this format for
1248 for their primary key: <table>id
1249 Except for basefiles which has a primary key on baseid.
1250 Therefore, we need to special case that one table.
1252 everything else can use the PostgreSQL formula.
1258 if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1260 if (strcasecmp(table_name, "basefiles") == 0) {
1261 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1263 bstrncpy(sequence, table_name, sizeof(sequence));
1264 bstrncat(sequence, "_", sizeof(sequence));
1265 bstrncat(sequence, table_name, sizeof(sequence));
1266 bstrncat(sequence, "id", sizeof(sequence));
1269 bstrncat(sequence, "_seq", sizeof(sequence));
1270 id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1272 id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1278 uint64_t my_dbi_insert_autokey_record(B_DB *mdb, const char *query, const char *table_name)
1281 * First execute the insert query and then retrieve the currval.
1283 if (my_dbi_query(mdb, query)) {
1287 mdb->num_rows = sql_affected_rows(mdb);
1288 if (mdb->num_rows != 1) {
1294 return my_dbi_sequence_last(mdb, table_name);
1297 #ifdef HAVE_BATCH_FILE_INSERT
1298 const char *my_dbi_batch_lock_path_query[5] = {
1300 "LOCK TABLES Path write, batch write, Path as p write",
1302 "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1311 const char *my_dbi_batch_lock_filename_query[5] = {
1313 "LOCK TABLES Filename write, batch write, Filename as f write",
1315 "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1324 const char *my_dbi_batch_unlock_tables_query[5] = {
1337 const char *my_dbi_batch_fill_path_query[5] = {
1339 "INSERT INTO Path (Path) "
1340 "SELECT a.Path FROM "
1341 "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1342 "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1344 "INSERT INTO Path (Path) "
1345 "SELECT a.Path FROM "
1346 "(SELECT DISTINCT Path FROM batch) AS a "
1347 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1349 "INSERT INTO Path (Path)"
1350 " SELECT DISTINCT Path FROM batch"
1351 " EXCEPT SELECT Path FROM Path",
1353 "INSERT INTO Path (Path)"
1354 " SELECT DISTINCT Path FROM batch"
1355 " EXCEPT SELECT Path FROM Path",
1357 "INSERT INTO Path (Path) "
1358 "SELECT a.Path FROM "
1359 "(SELECT DISTINCT Path FROM batch) AS a "
1360 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "
1363 const char *my_dbi_batch_fill_filename_query[5] = {
1365 "INSERT INTO Filename (Name) "
1366 "SELECT a.Name FROM "
1367 "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1368 "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1370 "INSERT INTO Filename (Name) "
1371 "SELECT a.Name FROM "
1372 "(SELECT DISTINCT Name FROM batch) as a "
1374 "(SELECT Name FROM Filename WHERE Name = a.Name)",
1376 "INSERT INTO Filename (Name)"
1377 " SELECT DISTINCT Name FROM batch "
1378 " EXCEPT SELECT Name FROM Filename",
1380 "INSERT INTO Filename (Name)"
1381 " SELECT DISTINCT Name FROM batch "
1382 " EXCEPT SELECT Name FROM Filename",
1384 "INSERT INTO Filename (Name) "
1385 "SELECT a.Name FROM "
1386 "(SELECT DISTINCT Name FROM batch) as a "
1388 "(SELECT Name FROM Filename WHERE Name = a.Name)"
1391 #endif /* HAVE_BATCH_FILE_INSERT */
1393 const char *my_dbi_match[5] = {
1406 #endif /* HAVE_DBI */