2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to DBI
30 * These are DBI specific routines
32 * João Henrique Freitas, December 2007
33 * based upon work done by Dan Langille, December 2003 and
34 * by Kern Sibbald, March 2000
39 * This code only compiles against a recent version of libdbi. The current
40 * release found on the libdbi website (0.8.3) won't work for this code.
42 * You find the libdbi library on http://sourceforge.net/projects/libdbi
44 * A fairly recent version of libdbi from CVS works, so either make sure
45 * your distribution has a fairly recent version of libdbi installed or
46 * clone the CVS repositories from sourceforge and compile that code and
50 * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
51 * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
55 /* The following is necessary so that we do not include
56 * the dummy external definition of DB.
58 #define __SQL_C /* indicate that this is sql.c */
65 /* -----------------------------------------------------------------------
67 * DBI dependent defines and subroutines
69 * -----------------------------------------------------------------------
72 /* List of open databases */
73 static BQUEUE db_list = {&db_list, &db_list};
75 /* Control allocated fields by my_dbi_getvalue */
76 static BQUEUE dbi_getvalue_list = {&dbi_getvalue_list, &dbi_getvalue_list};
78 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
81 * Retrieve database type
90 * Initialize database data structure. In principal this should
91 * never have errors, or it is really fatal.
94 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
95 const char *db_address, int db_port, const char *db_socket,
96 int mult_db_connections)
100 char db_driverdir[256];
102 /* Constraint the db_driver */
104 Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
108 /* Do the correct selection of driver.
109 * Can be one of the varius supported by libdbi
113 bstrncpy(db_driver,"mysql", sizeof(db_driver));
115 case SQL_TYPE_POSTGRESQL:
116 bstrncpy(db_driver,"pgsql", sizeof(db_driver));
118 case SQL_TYPE_SQLITE:
119 bstrncpy(db_driver,"sqlite", sizeof(db_driver));
121 case SQL_TYPE_SQLITE3:
122 bstrncpy(db_driver,"sqlite3", sizeof(db_driver));
126 /* Set db_driverdir whereis is the libdbi drivers */
127 bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
130 Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
133 P(mutex); /* lock DB queue */
134 if (!mult_db_connections) {
135 /* Look to see if DB already open */
136 for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
137 if (bstrcmp(mdb->db_name, db_name) &&
138 bstrcmp(mdb->db_address, db_address) &&
139 bstrcmp(mdb->db_driver, db_driver) &&
140 mdb->db_port == db_port) {
141 Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name,
142 dbi_conn_error(mdb->db, NULL));
145 return mdb; /* already open */
149 Dmsg0(100, "db_open first time\n");
150 mdb = (B_DB *)malloc(sizeof(B_DB));
151 memset(mdb, 0, sizeof(B_DB));
152 mdb->db_name = bstrdup(db_name);
153 mdb->db_user = bstrdup(db_user);
155 mdb->db_password = bstrdup(db_password);
158 mdb->db_address = bstrdup(db_address);
161 mdb->db_socket = bstrdup(db_socket);
164 mdb->db_driverdir = bstrdup(db_driverdir);
167 mdb->db_driver = bstrdup(db_driver);
169 mdb->db_type = db_type;
170 mdb->db_port = db_port;
171 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
173 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
174 mdb->cached_path = get_pool_memory(PM_FNAME);
175 mdb->cached_path_id = 0;
177 mdb->fname = get_pool_memory(PM_FNAME);
178 mdb->path = get_pool_memory(PM_FNAME);
179 mdb->esc_name = get_pool_memory(PM_FNAME);
180 mdb->esc_path = get_pool_memory(PM_FNAME);
181 mdb->allow_transactions = mult_db_connections;
182 qinsert(&db_list, &mdb->bq); /* put db in list */
188 * Now actually open the database. This can generate errors,
189 * which are returned in the errmsg
191 * DO NOT close the database or free(mdb) here !!!!
194 db_open_database(JCR *jcr, B_DB *mdb)
202 char *db_name = NULL;
206 if (mdb->connected) {
210 mdb->connected = false;
212 if ((errstat=rwl_init(&mdb->lock)) != 0) {
214 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
215 be.bstrerror(errstat));
221 bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
227 numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
228 if (numdrivers < 0) {
229 Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
230 "db_driverdir=%s. It is probaly not found any drivers\n"),
231 mdb->db_driverdir,numdrivers);
235 mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
236 /* Can be many types of databases */
237 switch (mdb->db_type) {
239 dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
240 dbi_conn_set_option(mdb->db, "port", port); /* default port */
241 dbi_conn_set_option(mdb->db, "username", mdb->db_user); /* login name */
242 dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
243 dbi_conn_set_option(mdb->db, "dbname", mdb->db_name); /* database name */
245 case SQL_TYPE_POSTGRESQL:
246 dbi_conn_set_option(mdb->db, "host", mdb->db_address);
247 dbi_conn_set_option(mdb->db, "port", port);
248 dbi_conn_set_option(mdb->db, "username", mdb->db_user);
249 dbi_conn_set_option(mdb->db, "password", mdb->db_password);
250 dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);
252 case SQL_TYPE_SQLITE:
253 len = strlen(working_directory) + 5;
254 db_dir = (char *)malloc(len);
255 strcpy(db_dir, working_directory);
257 len = strlen(mdb->db_name) + 5;
258 db_name = (char *)malloc(len);
259 strcpy(db_name, mdb->db_name);
260 strcat(db_name, ".db");
261 dbi_conn_set_option(mdb->db, "sqlite_dbdir", db_dir);
262 dbi_conn_set_option(mdb->db, "dbname", db_name);
264 case SQL_TYPE_SQLITE3:
265 len = strlen(working_directory) + 5;
266 db_dir = (char *)malloc(len);
267 strcpy(db_dir, working_directory);
269 len = strlen(mdb->db_name) + 5;
270 db_name = (char *)malloc(len);
271 strcpy(db_name, mdb->db_name);
272 strcat(db_name, ".db");
273 dbi_conn_set_option(mdb->db, "sqlite3_dbdir", db_dir);
274 dbi_conn_set_option(mdb->db, "dbname", db_name);
275 Dmsg2(500, "SQLITE: %s %s\n", db_dir, db_name);
279 /* If connection fails, try at 5 sec intervals for 30 seconds. */
280 for (int retry=0; retry < 6; retry++) {
282 dbstat = dbi_conn_connect(mdb->db);
287 dbi_conn_error(mdb->db, &errmsg);
288 Dmsg1(50, "dbi error: %s\n", errmsg);
295 Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
296 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
297 mdb->db_driver, mdb->db_name, mdb->db_user);
302 Dmsg0(50, "dbi_real_connect done\n");
303 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
304 mdb->db_user, mdb->db_name,
305 mdb->db_password==NULL?"(NULL)":mdb->db_password);
307 mdb->connected = true;
309 if (!check_tables_version(jcr, mdb)) {
314 switch (mdb->db_type) {
316 /* Set connection timeout to 8 days specialy for batch mode */
317 sql_query(mdb, "SET wait_timeout=691200");
318 sql_query(mdb, "SET interactive_timeout=691200");
320 case SQL_TYPE_POSTGRESQL:
321 /* tell PostgreSQL we are using standard conforming strings
322 and avoid warnings such as:
323 WARNING: nonstandard use of \\ in a string literal
325 sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
326 sql_query(mdb, "set standard_conforming_strings=on");
342 db_close_database(JCR *jcr, B_DB *mdb)
347 db_end_transaction(jcr, mdb);
349 sql_free_result(mdb);
351 if (mdb->ref_count == 0) {
353 if (mdb->connected && mdb->db) {
355 dbi_shutdown_r(mdb->instance);
357 mdb->instance = NULL;
359 rwl_destroy(&mdb->lock);
360 free_pool_memory(mdb->errmsg);
361 free_pool_memory(mdb->cmd);
362 free_pool_memory(mdb->cached_path);
363 free_pool_memory(mdb->fname);
364 free_pool_memory(mdb->path);
365 free_pool_memory(mdb->esc_name);
366 free_pool_memory(mdb->esc_path);
373 if (mdb->db_password) {
374 free(mdb->db_password);
376 if (mdb->db_address) {
377 free(mdb->db_address);
379 if (mdb->db_socket) {
380 free(mdb->db_socket);
382 if (mdb->db_driverdir) {
383 free(mdb->db_driverdir);
385 if (mdb->db_driver) {
386 free(mdb->db_driver);
393 void db_check_backend_thread_safe()
396 void db_thread_cleanup()
400 * Return the next unique index (auto-increment) for
401 * the given table. Return NULL on error.
404 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
406 strcpy(index, "NULL");
412 * Escape strings so that DBI is happy
414 * NOTE! len is the length of the old string. Your new
415 * string must be long enough (max 2*old+1) to hold
416 * the escaped output.
418 * dbi_conn_quote_string_copy receives a pointer to pointer.
419 * We need copy the value of pointer to snew because libdbi change the
423 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
431 /* correct the size of old basead in len
432 * and copy new string to inew
434 inew = (char *)malloc(sizeof(char) * len + 1);
435 bstrncpy(inew,old,len + 1);
436 /* escape the correct size of old */
437 dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
439 /* copy the escaped string to snew */
440 bstrncpy(snew, pnew, 2 * len + 1);
443 Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
448 * Submit a general SQL command (cmd), and for each row returned,
449 * the sqlite_handler is called with the ctx.
451 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
455 Dmsg0(500, "db_sql_query started\n");
458 if (sql_query(mdb, query) != 0) {
459 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
461 Dmsg0(500, "db_sql_query failed\n");
464 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
466 if (result_handler != NULL) {
467 Dmsg0(500, "db_sql_query invoking handler\n");
468 if ((mdb->result = sql_store_result(mdb)) != NULL) {
469 int num_fields = sql_num_fields(mdb);
471 Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
472 while ((row = sql_fetch_row(mdb)) != NULL) {
474 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
475 if (result_handler(ctx, num_fields, row))
479 sql_free_result(mdb);
484 Dmsg0(500, "db_sql_query finished\n");
491 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
494 DBI_ROW row = NULL; // by default, return NULL
496 Dmsg0(500, "my_dbi_fetch_row start\n");
497 if ((!mdb->row || mdb->row_size < mdb->num_fields) && mdb->num_rows > 0) {
498 int num_fields = mdb->num_fields;
499 Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
502 Dmsg0(500, "my_dbi_fetch_row freeing space\n");
503 Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
504 if (mdb->num_rows != 0) {
505 for(j = 0; j < mdb->num_fields; j++) {
506 Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
514 //num_fields += 20; /* add a bit extra */
515 mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
516 mdb->row_size = num_fields;
518 // now reset the row_number now that we have the space allocated
522 // if still within the result set
523 if (mdb->row_number <= mdb->num_rows && mdb->row_number != DBI_ERROR_BADPTR) {
524 Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
525 // get each value from this row
526 for (j = 0; j < mdb->num_fields; j++) {
527 mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
528 // allocate space to queue row
529 mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
530 // store the pointer in queue
531 mdb->field_get->value = mdb->row[j];
532 Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
533 j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
534 // insert in queue to future free
535 qinsert(&dbi_getvalue_list, &mdb->field_get->bq);
537 // increment the row number for the next call
542 Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
545 Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
550 int my_dbi_max_length(B_DB *mdb, int field_num) {
552 // for a given column, find the max length
560 for (i = 0; i < mdb->num_rows; i++) {
561 if (my_dbi_getisnull(mdb->result, i, field_num)) {
562 this_length = 4; // "NULL"
564 cbuf = my_dbi_getvalue(mdb->result, i, field_num);
565 this_length = cstrlen(cbuf);
566 // cbuf is always free
570 if (max_length < this_length) {
571 max_length = this_length;
578 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
583 Dmsg0(500, "my_dbi_fetch_field starts\n");
585 if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
589 Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
590 mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
591 mdb->fields_size = mdb->num_fields;
593 for (i = 0; i < mdb->num_fields; i++) {
594 // num_fileds is starting at 1, increment i by 1
596 Dmsg1(500, "filling field %d\n", i);
597 mdb->fields[i].name = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
598 mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
599 mdb->fields[i].type = dbi_result_get_field_type_idx(mdb->result, dbi_index);
600 mdb->fields[i].flags = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
602 Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
603 mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
604 mdb->fields[i].flags);
608 // increment field number for the next time around
610 Dmsg0(500, "my_dbi_fetch_field finishes\n");
611 return &mdb->fields[mdb->field_number++];
614 void my_dbi_data_seek(B_DB *mdb, int row)
616 // set the row number to be returned on the next call
617 // to my_dbi_fetch_row
618 mdb->row_number = row;
621 void my_dbi_field_seek(B_DB *mdb, int field)
623 mdb->field_number = field;
627 * Note, if this routine returns 1 (failure), Bacula expects
628 * that no result has been stored.
630 * Returns: 0 on success
634 int my_dbi_query(B_DB *mdb, const char *query)
637 Dmsg1(500, "my_dbi_query started %s\n", query);
638 // We are starting a new query. reset everything.
640 mdb->row_number = -1;
641 mdb->field_number = -1;
644 dbi_result_free(mdb->result); /* hmm, someone forgot to free?? */
648 mdb->result = (void **)dbi_conn_query(mdb->db, query);
651 Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);
655 mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
657 if (mdb->status == DBI_ERROR_NONE) {
658 Dmsg1(500, "we have a result\n", query);
660 // how many fields in the set?
661 // num_fields starting at 1
662 mdb->num_fields = dbi_result_get_numfields(mdb->result);
663 Dmsg1(500, "we have %d fields\n", mdb->num_fields);
664 // if no result num_rows is 0
665 mdb->num_rows = dbi_result_get_numrows(mdb->result);
666 Dmsg1(500, "we have %d rows\n", mdb->num_rows);
668 mdb->status = (dbi_error_flag) 0; /* succeed */
670 Dmsg1(50, "Result status failed: %s\n", query);
674 Dmsg0(500, "my_dbi_query finishing\n");
678 mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
679 //dbi_conn_error(mdb->db, &errmsg);
680 Dmsg4(500, "my_dbi_query we failed dbi error: "
681 "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
682 dbi_result_free(mdb->result);
684 mdb->status = (dbi_error_flag) 1; /* failed */
688 void my_dbi_free_result(B_DB *mdb)
694 Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
695 dbi_result_free(mdb->result);
704 /* now is time to free all value return by my_dbi_get_value
705 * this is necessary because libdbi don't free memory return by yours results
706 * and Bacula has some routine wich call more than once time my_dbi_fetch_row
708 * Using a queue to store all pointer allocate is a good way to free all things
711 while((f=(DBI_FIELD_GET *)qremove(&dbi_getvalue_list))) {
712 Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
724 Dmsg0(500, "my_dbi_free_result finish\n");
728 const char *my_dbi_strerror(B_DB *mdb)
732 dbi_conn_error(mdb->db, &errmsg);
737 #ifdef HAVE_BATCH_FILE_INSERT
740 * This can be a bit strang but is the one way to do
745 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
747 char *query = "COPY batch FROM STDIN";
749 Dmsg0(500, "my_dbi_batch_start started\n");
751 switch (mdb->db_type) {
754 if (my_dbi_query(mdb,
755 "CREATE TEMPORARY TABLE batch ("
761 "MD5 tinyblob)") == 1)
763 Dmsg0(500, "my_dbi_batch_start failed\n");
767 Dmsg0(500, "my_dbi_batch_start finishing\n");
770 case SQL_TYPE_POSTGRESQL:
772 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
778 "md5 varchar)") == 1)
780 Dmsg0(500, "my_dbi_batch_start failed\n");
784 // We are starting a new query. reset everything.
786 mdb->row_number = -1;
787 mdb->field_number = -1;
789 my_dbi_free_result(mdb);
791 for (int i=0; i < 10; i++) {
792 my_dbi_query(mdb, query);
799 Dmsg1(50, "Query failed: %s\n", query);
803 mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
804 //mdb->status = DBI_ERROR_NONE;
806 if (mdb->status == DBI_ERROR_NONE) {
807 // how many fields in the set?
808 mdb->num_fields = dbi_result_get_numfields(mdb->result);
809 mdb->num_rows = dbi_result_get_numrows(mdb->result);
810 mdb->status = (dbi_error_flag) 1;
812 Dmsg1(50, "Result status failed: %s\n", query);
816 Dmsg0(500, "my_postgresql_batch_start finishing\n");
820 case SQL_TYPE_SQLITE:
822 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
828 "MD5 tinyblob)") == 1)
830 Dmsg0(500, "my_dbi_batch_start failed\n");
834 Dmsg0(500, "my_dbi_batch_start finishing\n");
837 case SQL_TYPE_SQLITE3:
839 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
845 "MD5 tinyblob)") == 1)
847 Dmsg0(500, "my_dbi_batch_start failed\n");
851 Dmsg0(500, "my_dbi_batch_start finishing\n");
857 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
858 mdb->status = (dbi_error_flag) 0;
859 my_dbi_free_result(mdb);
864 /* set error to something to abort operation */
865 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
869 int (*custom_function)(void*, const char*) = NULL;
870 dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
872 Dmsg0(500, "my_dbi_batch_end started\n");
874 if (!mdb) { /* no files ? */
878 switch (mdb->db_type) {
881 mdb->status = (dbi_error_flag) 0;
884 case SQL_TYPE_POSTGRESQL:
885 custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
889 res = (*custom_function)(myconn->connection, error);
890 } while (res == 0 && --count > 0);
894 mdb->status = (dbi_error_flag) 1;
898 Dmsg0(500, "we failed\n");
899 mdb->status = (dbi_error_flag) 0;
900 //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
903 case SQL_TYPE_SQLITE:
905 mdb->status = (dbi_error_flag) 0;
908 case SQL_TYPE_SQLITE3:
910 mdb->status = (dbi_error_flag) 0;
915 Dmsg0(500, "my_dbi_batch_end finishing\n");
921 * This function is big and use a big switch.
922 * In near future is better split in small functions
926 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
930 dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
931 int (*custom_function)(void*, const char*, int) = NULL;
932 char* (*custom_function_error)(void*) = NULL;
937 Dmsg0(500, "my_dbi_batch_insert started \n");
939 mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
940 mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
942 if (ar->Digest == NULL || ar->Digest[0] == 0) {
948 switch (mdb->db_type) {
950 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
951 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
952 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
953 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
954 mdb->esc_name, ar->attr, digest);
956 if (my_dbi_query(mdb,mdb->cmd) == 1)
958 Dmsg0(500, "my_dbi_batch_insert failed\n");
962 Dmsg0(500, "my_dbi_batch_insert finishing\n");
966 case SQL_TYPE_POSTGRESQL:
967 my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
968 my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
969 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
970 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
971 mdb->esc_name, ar->attr, digest);
973 /* libdbi don't support CopyData and we need call a postgresql
974 * specific function to do this work
976 Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
977 if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
978 "PQputCopyData")) != NULL) {
980 res = (*custom_function)(myconn->connection, mdb->cmd, len);
981 } while (res == 0 && --count > 0);
986 mdb->status = (dbi_error_flag) 1;
990 Dmsg0(500, "my_dbi_batch_insert failed\n");
994 Dmsg0(500, "my_dbi_batch_insert finishing\n");
997 // ensure to detect a PQerror
998 custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
999 Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1003 case SQL_TYPE_SQLITE:
1004 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1005 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1006 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1007 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1008 mdb->esc_name, ar->attr, digest);
1009 if (my_dbi_query(mdb,mdb->cmd) == 1)
1011 Dmsg0(500, "my_dbi_batch_insert failed\n");
1015 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1019 case SQL_TYPE_SQLITE3:
1020 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1021 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1022 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1023 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1024 mdb->esc_name, ar->attr, digest);
1025 if (my_dbi_query(mdb,mdb->cmd) == 1)
1027 Dmsg0(500, "my_dbi_batch_insert failed\n");
1031 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1038 Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
1039 mdb->status = (dbi_error_flag) 0;
1040 my_dbi_free_result(mdb);
1045 * Escape strings so that PostgreSQL is happy on COPY
1047 * NOTE! len is the length of the old string. Your new
1048 * string must be long enough (max 2*old+1) to hold
1049 * the escaped output.
1051 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
1053 /* we have to escape \t, \n, \r, \ */
1056 while (len > 0 && *src) {
1091 #endif /* HAVE_BATCH_FILE_INSERT */
1095 * int PQgetisnull(const PGresult *res,
1097 * int column_number);
1099 * use dbi_result_seek_row to search in result set
1101 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1104 if(row_number == 0) {
1110 if(dbi_result_seek_row(result, row_number)) {
1112 i = dbi_result_field_is_null_idx(result,column_number);
1123 * char *PQgetvalue(const PGresult *res,
1125 * int column_number);
1127 * use dbi_result_seek_row to search in result set
1128 * use example to return only strings
1130 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1134 const char *field_name;
1135 unsigned short dbitype;
1136 size_t field_length;
1139 /* correct the index for dbi interface
1140 * dbi index begins 1
1141 * I prefer do not change others functions
1143 Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
1144 result, row_number, column_number);
1148 if(row_number == 0) {
1152 Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
1153 result, row_number, column_number);
1155 if(dbi_result_seek_row(result, row_number)) {
1157 field_name = dbi_result_get_field_name(result, column_number);
1158 field_length = dbi_result_get_field_length(result, field_name);
1159 dbitype = dbi_result_get_field_type_idx(result,column_number);
1161 Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
1162 "field_length bytes: '%d' fieldname: '%s'\n",
1163 dbitype, field_length, field_name);
1166 //buf = (char *)malloc(sizeof(char *) * field_length + 1);
1167 buf = (char *)malloc(field_length + 1);
1170 buf = (char *)malloc(sizeof(char *) * 50);
1174 case DBI_TYPE_INTEGER:
1175 num = dbi_result_get_longlong(result, field_name);
1176 edit_int64(num, buf);
1177 field_length = strlen(buf);
1179 case DBI_TYPE_STRING:
1181 field_length = bsnprintf(buf, field_length + 1, "%s",
1182 dbi_result_get_string(result, field_name));
1187 case DBI_TYPE_BINARY:
1188 /* dbi_result_get_binary return a NULL pointer if value is empty
1189 * following, change this to what Bacula espected
1192 field_length = bsnprintf(buf, field_length + 1, "%s",
1193 dbi_result_get_binary(result, field_name));
1198 case DBI_TYPE_DATETIME:
1202 last = dbi_result_get_datetime(result, field_name);
1205 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
1207 (void)localtime_r(&last, &tm);
1208 field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1209 (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1210 tm.tm_hour, tm.tm_min, tm.tm_sec);
1216 dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1217 Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1220 Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
1221 buf, field_length, buf);
1223 // don't worry about this buf
1227 static int my_dbi_sequence_last(B_DB *mdb, const char *table_name)
1230 Obtain the current value of the sequence that
1231 provides the serial value for primary key of the table.
1233 currval is local to our session. It is not affected by
1236 Determine the name of the sequence.
1237 PostgreSQL automatically creates a sequence using
1238 <table>_<column>_seq.
1239 At the time of writing, all tables used this format for
1240 for their primary key: <table>id
1241 Except for basefiles which has a primary key on baseid.
1242 Therefore, we need to special case that one table.
1244 everything else can use the PostgreSQL formula.
1250 if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1252 if (strcasecmp(table_name, "basefiles") == 0) {
1253 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1255 bstrncpy(sequence, table_name, sizeof(sequence));
1256 bstrncat(sequence, "_", sizeof(sequence));
1257 bstrncat(sequence, table_name, sizeof(sequence));
1258 bstrncat(sequence, "id", sizeof(sequence));
1261 bstrncat(sequence, "_seq", sizeof(sequence));
1262 id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1264 id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1270 int my_dbi_sql_insert_id(B_DB *mdb, const char *query, const char *table_name)
1273 * First execute the insert query and then retrieve the currval.
1275 if (my_dbi_query(mdb, query)) {
1279 mdb->num_rows = sql_affected_rows(mdb);
1280 if (mdb->num_rows != 1) {
1286 return my_dbi_sequence_last(mdb, table_name);
1289 #ifdef HAVE_BATCH_FILE_INSERT
1290 const char *my_dbi_batch_lock_path_query[4] = {
1292 "LOCK TABLES Path write, batch write, Path as p write",
1294 "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1303 const char *my_dbi_batch_lock_filename_query[4] = {
1305 "LOCK TABLES Filename write, batch write, Filename as f write",
1307 "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1316 const char *my_dbi_batch_unlock_tables_query[4] = {
1326 const char *my_dbi_batch_fill_path_query[4] = {
1328 "INSERT INTO Path (Path) "
1329 "SELECT a.Path FROM "
1330 "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1331 "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1333 "INSERT INTO Path (Path) "
1334 "SELECT a.Path FROM "
1335 "(SELECT DISTINCT Path FROM batch) AS a "
1336 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1338 "INSERT INTO Path (Path)"
1339 " SELECT DISTINCT Path FROM batch"
1340 " EXCEPT SELECT Path FROM Path",
1342 "INSERT INTO Path (Path)"
1343 " SELECT DISTINCT Path FROM batch"
1344 " EXCEPT SELECT Path FROM Path",
1346 "INSERT INTO Path (Path) "
1347 "SELECT a.Path FROM "
1348 "(SELECT DISTINCT Path FROM batch) AS a "
1349 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "
1352 const char *my_dbi_batch_fill_filename_query[4] = {
1354 "INSERT INTO Filename (Name) "
1355 "SELECT a.Name FROM "
1356 "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1357 "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1359 "INSERT INTO Filename (Name) "
1360 "SELECT a.Name FROM "
1361 "(SELECT DISTINCT Name FROM batch) as a "
1363 "(SELECT Name FROM Filename WHERE Name = a.Name)",
1365 "INSERT INTO Filename (Name)"
1366 " SELECT DISTINCT Name FROM batch "
1367 " EXCEPT SELECT Name FROM Filename",
1369 "INSERT INTO Filename (Name)"
1370 " SELECT DISTINCT Name FROM batch "
1371 " EXCEPT SELECT Name FROM Filename",
1373 "INSERT INTO Filename (Name) "
1374 "SELECT a.Name FROM "
1375 "(SELECT DISTINCT Name FROM batch) as a "
1377 "(SELECT Name FROM Filename WHERE Name = a.Name)"
1380 #endif /* HAVE_BATCH_FILE_INSERT */
1382 const char *my_dbi_match[4] = {
1393 #endif /* HAVE_DBI */