2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to DBI
30 * These are DBI specific routines
32 * João Henrique Freitas, December 2007
33 * based upon work done by Dan Langille, December 2003 and
34 * by Kern Sibbald, March 2000
38 * This code only compiles against a recent version of libdbi. The current
39 * release found on the libdbi website (0.8.3) won't work for this code.
41 * You find the libdbi library on http://sourceforge.net/projects/libdbi
43 * A fairly recent version of libdbi from CVS works, so either make sure
44 * your distribution has a fairly recent version of libdbi installed or
45 * clone the CVS repositories from sourceforge and compile that code and
49 * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
50 * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
54 /* The following is necessary so that we do not include
55 * the dummy external definition of DB.
57 #define __SQL_C /* indicate that this is sql.c */
64 /* -----------------------------------------------------------------------
66 * DBI dependent defines and subroutines
68 * -----------------------------------------------------------------------
71 /* List of open databases */
72 static dlist *db_list = NULL;
74 /* Control allocated fields by my_dbi_getvalue */
75 static dlist *dbi_getvalue_list = NULL;
77 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
80 * Retrieve database type
89 * Initialize database data structure. In principal this should
90 * never have errors, or it is really fatal.
93 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
94 const char *db_address, int db_port, const char *db_socket,
95 int mult_db_connections)
100 char db_driverdir[256];
102 /* Constraint the db_driver */
104 Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
108 /* Do the correct selection of driver.
109 * Can be one of the varius supported by libdbi
113 bstrncpy(db_driver,"mysql", sizeof(db_driver));
115 case SQL_TYPE_POSTGRESQL:
116 bstrncpy(db_driver,"pgsql", sizeof(db_driver));
118 case SQL_TYPE_SQLITE:
119 bstrncpy(db_driver,"sqlite", sizeof(db_driver));
121 case SQL_TYPE_SQLITE3:
122 bstrncpy(db_driver,"sqlite3", sizeof(db_driver));
126 /* Set db_driverdir whereis is the libdbi drivers */
127 bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
130 Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
133 P(mutex); /* lock DB queue */
134 if (db_list == NULL) {
135 db_list = New(dlist(mdb, &mdb->link));
136 dbi_getvalue_list = New(dlist(field, &field->link));
138 if (!mult_db_connections) {
139 /* Look to see if DB already open */
140 foreach_dlist(mdb, db_list) {
141 if (bstrcmp(mdb->db_name, db_name) &&
142 bstrcmp(mdb->db_address, db_address) &&
143 bstrcmp(mdb->db_driver, db_driver) &&
144 mdb->db_port == db_port) {
145 Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name,
146 dbi_conn_error(mdb->db, NULL));
149 return mdb; /* already open */
153 Dmsg0(100, "db_open first time\n");
154 mdb = (B_DB *)malloc(sizeof(B_DB));
155 memset(mdb, 0, sizeof(B_DB));
156 mdb->db_name = bstrdup(db_name);
157 mdb->db_user = bstrdup(db_user);
159 mdb->db_password = bstrdup(db_password);
162 mdb->db_address = bstrdup(db_address);
165 mdb->db_socket = bstrdup(db_socket);
168 mdb->db_driverdir = bstrdup(db_driverdir);
171 mdb->db_driver = bstrdup(db_driver);
173 mdb->db_type = db_type;
174 mdb->db_port = db_port;
175 mdb->have_insert_id = TRUE;
176 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
178 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
179 mdb->cached_path = get_pool_memory(PM_FNAME);
180 mdb->cached_path_id = 0;
182 mdb->fname = get_pool_memory(PM_FNAME);
183 mdb->path = get_pool_memory(PM_FNAME);
184 mdb->esc_name = get_pool_memory(PM_FNAME);
185 mdb->esc_path = get_pool_memory(PM_FNAME);
186 mdb->allow_transactions = mult_db_connections;
187 db_list->append(mdb); /* put db in list */
193 * Now actually open the database. This can generate errors,
194 * which are returned in the errmsg
196 * DO NOT close the database or free(mdb) here !!!!
199 db_open_database(JCR *jcr, B_DB *mdb)
207 char *db_name = NULL;
211 if (mdb->connected) {
215 mdb->connected = false;
217 if ((errstat=rwl_init(&mdb->lock)) != 0) {
219 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
220 be.bstrerror(errstat));
226 bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
232 numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
233 if (numdrivers < 0) {
234 Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
235 "db_driverdir=%s. It is probaly not found any drivers\n"),
236 mdb->db_driverdir,numdrivers);
240 mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
241 /* Can be many types of databases */
242 switch (mdb->db_type) {
244 dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
245 dbi_conn_set_option(mdb->db, "port", port); /* default port */
246 dbi_conn_set_option(mdb->db, "username", mdb->db_user); /* login name */
247 dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
248 dbi_conn_set_option(mdb->db, "dbname", mdb->db_name); /* database name */
250 case SQL_TYPE_POSTGRESQL:
251 dbi_conn_set_option(mdb->db, "host", mdb->db_address);
252 dbi_conn_set_option(mdb->db, "port", port);
253 dbi_conn_set_option(mdb->db, "username", mdb->db_user);
254 dbi_conn_set_option(mdb->db, "password", mdb->db_password);
255 dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);
257 case SQL_TYPE_SQLITE:
258 len = strlen(working_directory) + 5;
259 db_dir = (char *)malloc(len);
260 strcpy(db_dir, working_directory);
262 len = strlen(mdb->db_name) + 5;
263 db_name = (char *)malloc(len);
264 strcpy(db_name, mdb->db_name);
265 strcat(db_name, ".db");
266 dbi_conn_set_option(mdb->db, "sqlite_dbdir", db_dir);
267 dbi_conn_set_option(mdb->db, "dbname", db_name);
269 case SQL_TYPE_SQLITE3:
270 len = strlen(working_directory) + 5;
271 db_dir = (char *)malloc(len);
272 strcpy(db_dir, working_directory);
274 len = strlen(mdb->db_name) + 5;
275 db_name = (char *)malloc(len);
276 strcpy(db_name, mdb->db_name);
277 strcat(db_name, ".db");
278 dbi_conn_set_option(mdb->db, "sqlite3_dbdir", db_dir);
279 dbi_conn_set_option(mdb->db, "dbname", db_name);
280 Dmsg2(500, "SQLITE: %s %s\n", db_dir, db_name);
284 /* If connection fails, try at 5 sec intervals for 30 seconds. */
285 for (int retry=0; retry < 6; retry++) {
287 dbstat = dbi_conn_connect(mdb->db);
292 dbi_conn_error(mdb->db, &errmsg);
293 Dmsg1(50, "dbi error: %s\n", errmsg);
300 Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
301 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
302 mdb->db_driver, mdb->db_name, mdb->db_user);
307 Dmsg0(50, "dbi_real_connect done\n");
308 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
309 mdb->db_user, mdb->db_name,
310 mdb->db_password==NULL?"(NULL)":mdb->db_password);
312 mdb->connected = true;
314 if (!check_tables_version(jcr, mdb)) {
319 switch (mdb->db_type) {
321 /* Set connection timeout to 8 days specialy for batch mode */
322 sql_query(mdb, "SET wait_timeout=691200");
323 sql_query(mdb, "SET interactive_timeout=691200");
325 case SQL_TYPE_POSTGRESQL:
326 /* tell PostgreSQL we are using standard conforming strings
327 and avoid warnings such as:
328 WARNING: nonstandard use of \\ in a string literal
330 sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
331 sql_query(mdb, "set standard_conforming_strings=on");
347 db_close_database(JCR *jcr, B_DB *mdb)
352 db_end_transaction(jcr, mdb);
354 sql_free_result(mdb);
356 if (mdb->ref_count == 0) {
357 db_list->remove(mdb);
358 if (mdb->connected && mdb->db) {
360 dbi_shutdown_r(mdb->instance);
362 mdb->instance = NULL;
364 rwl_destroy(&mdb->lock);
365 free_pool_memory(mdb->errmsg);
366 free_pool_memory(mdb->cmd);
367 free_pool_memory(mdb->cached_path);
368 free_pool_memory(mdb->fname);
369 free_pool_memory(mdb->path);
370 free_pool_memory(mdb->esc_name);
371 free_pool_memory(mdb->esc_path);
378 if (mdb->db_password) {
379 free(mdb->db_password);
381 if (mdb->db_address) {
382 free(mdb->db_address);
384 if (mdb->db_socket) {
385 free(mdb->db_socket);
387 if (mdb->db_driverdir) {
388 free(mdb->db_driverdir);
390 if (mdb->db_driver) {
391 free(mdb->db_driver);
394 if (db_list->size() == 0) {
402 void db_thread_cleanup()
406 * Return the next unique index (auto-increment) for
407 * the given table. Return NULL on error.
410 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
412 strcpy(index, "NULL");
418 * Escape strings so that DBI is happy
420 * NOTE! len is the length of the old string. Your new
421 * string must be long enough (max 2*old+1) to hold
422 * the escaped output.
424 * dbi_conn_quote_string_copy receives a pointer to pointer.
425 * We need copy the value of pointer to snew because libdbi change the
429 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
437 /* correct the size of old basead in len
438 * and copy new string to inew
440 inew = (char *)malloc(sizeof(char) * len + 1);
441 bstrncpy(inew,old,len + 1);
442 /* escape the correct size of old */
443 dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
445 /* copy the escaped string to snew */
446 bstrncpy(snew, pnew, 2 * len + 1);
449 Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
454 * Submit a general SQL command (cmd), and for each row returned,
455 * the sqlite_handler is called with the ctx.
457 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
461 Dmsg0(500, "db_sql_query started\n");
464 if (sql_query(mdb, query) != 0) {
465 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
467 Dmsg0(500, "db_sql_query failed\n");
470 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
472 if (result_handler != NULL) {
473 Dmsg0(500, "db_sql_query invoking handler\n");
474 if ((mdb->result = sql_store_result(mdb)) != NULL) {
475 int num_fields = sql_num_fields(mdb);
477 Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
478 while ((row = sql_fetch_row(mdb)) != NULL) {
480 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
481 if (result_handler(ctx, num_fields, row))
485 sql_free_result(mdb);
490 Dmsg0(500, "db_sql_query finished\n");
497 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
500 DBI_ROW row = NULL; // by default, return NULL
502 Dmsg0(500, "my_dbi_fetch_row start\n");
503 if ((!mdb->row || mdb->row_size < mdb->num_fields) && mdb->num_rows > 0) {
504 int num_fields = mdb->num_fields;
505 Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
508 Dmsg0(500, "my_dbi_fetch_row freeing space\n");
509 Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
510 if (mdb->num_rows != 0) {
511 for(j = 0; j < mdb->num_fields; j++) {
512 Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
520 //num_fields += 20; /* add a bit extra */
521 mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
522 mdb->row_size = num_fields;
524 // now reset the row_number now that we have the space allocated
528 // if still within the result set
529 if (mdb->row_number <= mdb->num_rows && mdb->row_number != DBI_ERROR_BADPTR) {
530 Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
531 // get each value from this row
532 for (j = 0; j < mdb->num_fields; j++) {
533 mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
534 // allocate space to queue row
535 mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
536 // store the pointer in queue
537 mdb->field_get->value = mdb->row[j];
538 Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
539 j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
540 // insert in queue to future free
541 dbi_getvalue_list->append(mdb->field_get);
543 // increment the row number for the next call
548 Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
551 Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
556 int my_dbi_max_length(B_DB *mdb, int field_num) {
558 // for a given column, find the max length
566 for (i = 0; i < mdb->num_rows; i++) {
567 if (my_dbi_getisnull(mdb->result, i, field_num)) {
568 this_length = 4; // "NULL"
570 cbuf = my_dbi_getvalue(mdb->result, i, field_num);
571 this_length = cstrlen(cbuf);
572 // cbuf is always free
576 if (max_length < this_length) {
577 max_length = this_length;
584 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
589 Dmsg0(500, "my_dbi_fetch_field starts\n");
591 if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
595 Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
596 mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
597 mdb->fields_size = mdb->num_fields;
599 for (i = 0; i < mdb->num_fields; i++) {
600 // num_fileds is starting at 1, increment i by 1
602 Dmsg1(500, "filling field %d\n", i);
603 mdb->fields[i].name = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
604 mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
605 mdb->fields[i].type = dbi_result_get_field_type_idx(mdb->result, dbi_index);
606 mdb->fields[i].flags = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
608 Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
609 mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
610 mdb->fields[i].flags);
614 // increment field number for the next time around
616 Dmsg0(500, "my_dbi_fetch_field finishes\n");
617 return &mdb->fields[mdb->field_number++];
620 void my_dbi_data_seek(B_DB *mdb, int row)
622 // set the row number to be returned on the next call
623 // to my_dbi_fetch_row
624 mdb->row_number = row;
627 void my_dbi_field_seek(B_DB *mdb, int field)
629 mdb->field_number = field;
633 * Note, if this routine returns 1 (failure), Bacula expects
634 * that no result has been stored.
636 * Returns: 0 on success
640 int my_dbi_query(B_DB *mdb, const char *query)
643 Dmsg1(500, "my_dbi_query started %s\n", query);
644 // We are starting a new query. reset everything.
646 mdb->row_number = -1;
647 mdb->field_number = -1;
650 dbi_result_free(mdb->result); /* hmm, someone forgot to free?? */
654 mdb->result = (void **)dbi_conn_query(mdb->db, query);
657 Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);
661 mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
663 if (mdb->status == DBI_ERROR_NONE) {
664 Dmsg1(500, "we have a result\n", query);
666 // how many fields in the set?
667 // num_fields starting at 1
668 mdb->num_fields = dbi_result_get_numfields(mdb->result);
669 Dmsg1(500, "we have %d fields\n", mdb->num_fields);
670 // if no result num_rows is 0
671 mdb->num_rows = dbi_result_get_numrows(mdb->result);
672 Dmsg1(500, "we have %d rows\n", mdb->num_rows);
674 mdb->status = (dbi_error_flag) 0; /* succeed */
676 Dmsg1(50, "Result status failed: %s\n", query);
680 Dmsg0(500, "my_dbi_query finishing\n");
684 mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
685 //dbi_conn_error(mdb->db, &errmsg);
686 Dmsg4(500, "my_dbi_query we failed dbi error: "
687 "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
688 dbi_result_free(mdb->result);
690 mdb->status = (dbi_error_flag) 1; /* failed */
694 void my_dbi_free_result(B_DB *mdb)
700 Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
701 dbi_result_free(mdb->result);
710 /* now is time to free all value return by my_dbi_get_value
711 * this is necessary because libdbi don't free memory return by yours results
712 * and Bacula has some routine wich call more than once time my_dbi_fetch_row
714 * Using a queue to store all pointer allocate is a good way to free all things
717 foreach_dlist(f, dbi_getvalue_list) {
718 Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
730 Dmsg0(500, "my_dbi_free_result finish\n");
734 const char *my_dbi_strerror(B_DB *mdb)
738 dbi_conn_error(mdb->db, &errmsg);
743 #ifdef HAVE_BATCH_FILE_INSERT
746 * This can be a bit strang but is the one way to do
751 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
753 const char *query = "COPY batch FROM STDIN";
755 Dmsg0(500, "my_dbi_batch_start started\n");
757 switch (mdb->db_type) {
760 if (my_dbi_query(mdb,
761 "CREATE TEMPORARY TABLE batch ("
767 "MD5 tinyblob)") == 1)
769 Dmsg0(500, "my_dbi_batch_start failed\n");
773 Dmsg0(500, "my_dbi_batch_start finishing\n");
776 case SQL_TYPE_POSTGRESQL:
778 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
784 "md5 varchar)") == 1)
786 Dmsg0(500, "my_dbi_batch_start failed\n");
790 // We are starting a new query. reset everything.
792 mdb->row_number = -1;
793 mdb->field_number = -1;
795 my_dbi_free_result(mdb);
797 for (int i=0; i < 10; i++) {
798 my_dbi_query(mdb, query);
805 Dmsg1(50, "Query failed: %s\n", query);
809 mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
810 //mdb->status = DBI_ERROR_NONE;
812 if (mdb->status == DBI_ERROR_NONE) {
813 // how many fields in the set?
814 mdb->num_fields = dbi_result_get_numfields(mdb->result);
815 mdb->num_rows = dbi_result_get_numrows(mdb->result);
816 mdb->status = (dbi_error_flag) 1;
818 Dmsg1(50, "Result status failed: %s\n", query);
822 Dmsg0(500, "my_postgresql_batch_start finishing\n");
826 case SQL_TYPE_SQLITE:
828 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
834 "MD5 tinyblob)") == 1)
836 Dmsg0(500, "my_dbi_batch_start failed\n");
840 Dmsg0(500, "my_dbi_batch_start finishing\n");
843 case SQL_TYPE_SQLITE3:
845 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
851 "MD5 tinyblob)") == 1)
853 Dmsg0(500, "my_dbi_batch_start failed\n");
857 Dmsg0(500, "my_dbi_batch_start finishing\n");
863 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
864 mdb->status = (dbi_error_flag) 0;
865 my_dbi_free_result(mdb);
870 /* set error to something to abort operation */
871 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
875 int (*custom_function)(void*, const char*) = NULL;
876 dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
878 Dmsg0(500, "my_dbi_batch_end started\n");
880 if (!mdb) { /* no files ? */
884 switch (mdb->db_type) {
887 mdb->status = (dbi_error_flag) 0;
890 case SQL_TYPE_POSTGRESQL:
891 custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
895 res = (*custom_function)(myconn->connection, error);
896 } while (res == 0 && --count > 0);
900 mdb->status = (dbi_error_flag) 1;
904 Dmsg0(500, "we failed\n");
905 mdb->status = (dbi_error_flag) 0;
906 //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
909 case SQL_TYPE_SQLITE:
911 mdb->status = (dbi_error_flag) 0;
914 case SQL_TYPE_SQLITE3:
916 mdb->status = (dbi_error_flag) 0;
921 Dmsg0(500, "my_dbi_batch_end finishing\n");
927 * This function is big and use a big switch.
928 * In near future is better split in small functions
932 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
936 dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
937 int (*custom_function)(void*, const char*, int) = NULL;
938 char* (*custom_function_error)(void*) = NULL;
943 Dmsg0(500, "my_dbi_batch_insert started \n");
945 mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
946 mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
948 if (ar->Digest == NULL || ar->Digest[0] == 0) {
954 switch (mdb->db_type) {
956 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
957 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
958 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
959 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
960 mdb->esc_name, ar->attr, digest);
962 if (my_dbi_query(mdb,mdb->cmd) == 1)
964 Dmsg0(500, "my_dbi_batch_insert failed\n");
968 Dmsg0(500, "my_dbi_batch_insert finishing\n");
972 case SQL_TYPE_POSTGRESQL:
973 my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
974 my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
975 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
976 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
977 mdb->esc_name, ar->attr, digest);
979 /* libdbi don't support CopyData and we need call a postgresql
980 * specific function to do this work
982 Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
983 if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
984 "PQputCopyData")) != NULL) {
986 res = (*custom_function)(myconn->connection, mdb->cmd, len);
987 } while (res == 0 && --count > 0);
992 mdb->status = (dbi_error_flag) 1;
996 Dmsg0(500, "my_dbi_batch_insert failed\n");
1000 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1003 // ensure to detect a PQerror
1004 custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
1005 Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1009 case SQL_TYPE_SQLITE:
1010 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1011 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1012 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1013 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1014 mdb->esc_name, ar->attr, digest);
1015 if (my_dbi_query(mdb,mdb->cmd) == 1)
1017 Dmsg0(500, "my_dbi_batch_insert failed\n");
1021 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1025 case SQL_TYPE_SQLITE3:
1026 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1027 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1028 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1029 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1030 mdb->esc_name, ar->attr, digest);
1031 if (my_dbi_query(mdb,mdb->cmd) == 1)
1033 Dmsg0(500, "my_dbi_batch_insert failed\n");
1037 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1044 Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
1045 mdb->status = (dbi_error_flag) 0;
1046 my_dbi_free_result(mdb);
1051 * Escape strings so that PostgreSQL is happy on COPY
1053 * NOTE! len is the length of the old string. Your new
1054 * string must be long enough (max 2*old+1) to hold
1055 * the escaped output.
1057 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
1059 /* we have to escape \t, \n, \r, \ */
1062 while (len > 0 && *src) {
1097 #endif /* HAVE_BATCH_FILE_INSERT */
1101 * int PQgetisnull(const PGresult *res,
1103 * int column_number);
1105 * use dbi_result_seek_row to search in result set
1107 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1110 if(row_number == 0) {
1116 if(dbi_result_seek_row(result, row_number)) {
1118 i = dbi_result_field_is_null_idx(result,column_number);
1129 * char *PQgetvalue(const PGresult *res,
1131 * int column_number);
1133 * use dbi_result_seek_row to search in result set
1134 * use example to return only strings
1136 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1140 const char *field_name;
1141 unsigned short dbitype;
1142 size_t field_length;
1145 /* correct the index for dbi interface
1146 * dbi index begins 1
1147 * I prefer do not change others functions
1149 Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
1150 result, row_number, column_number);
1154 if(row_number == 0) {
1158 Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
1159 result, row_number, column_number);
1161 if(dbi_result_seek_row(result, row_number)) {
1163 field_name = dbi_result_get_field_name(result, column_number);
1164 field_length = dbi_result_get_field_length(result, field_name);
1165 dbitype = dbi_result_get_field_type_idx(result,column_number);
1167 Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
1168 "field_length bytes: '%d' fieldname: '%s'\n",
1169 dbitype, field_length, field_name);
1172 //buf = (char *)malloc(sizeof(char *) * field_length + 1);
1173 buf = (char *)malloc(field_length + 1);
1176 buf = (char *)malloc(sizeof(char *) * 50);
1180 case DBI_TYPE_INTEGER:
1181 num = dbi_result_get_longlong(result, field_name);
1182 edit_int64(num, buf);
1183 field_length = strlen(buf);
1185 case DBI_TYPE_STRING:
1187 field_length = bsnprintf(buf, field_length + 1, "%s",
1188 dbi_result_get_string(result, field_name));
1193 case DBI_TYPE_BINARY:
1194 /* dbi_result_get_binary return a NULL pointer if value is empty
1195 * following, change this to what Bacula espected
1198 field_length = bsnprintf(buf, field_length + 1, "%s",
1199 dbi_result_get_binary(result, field_name));
1204 case DBI_TYPE_DATETIME:
1208 last = dbi_result_get_datetime(result, field_name);
1211 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
1213 (void)localtime_r(&last, &tm);
1214 field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1215 (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1216 tm.tm_hour, tm.tm_min, tm.tm_sec);
1222 dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1223 Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1226 Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
1227 buf, field_length, buf);
1229 // don't worry about this buf
1233 int my_dbi_sql_insert_id(B_DB *mdb, char *table_name)
1236 Obtain the current value of the sequence that
1237 provides the serial value for primary key of the table.
1239 currval is local to our session. It is not affected by
1242 Determine the name of the sequence.
1243 PostgreSQL automatically creates a sequence using
1244 <table>_<column>_seq.
1245 At the time of writing, all tables used this format for
1246 for their primary key: <table>id
1247 Except for basefiles which has a primary key on baseid.
1248 Therefore, we need to special case that one table.
1250 everything else can use the PostgreSQL formula.
1256 if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1258 if (strcasecmp(table_name, "basefiles") == 0) {
1259 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1261 bstrncpy(sequence, table_name, sizeof(sequence));
1262 bstrncat(sequence, "_", sizeof(sequence));
1263 bstrncat(sequence, table_name, sizeof(sequence));
1264 bstrncat(sequence, "id", sizeof(sequence));
1267 bstrncat(sequence, "_seq", sizeof(sequence));
1268 id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1270 id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1276 #ifdef HAVE_BATCH_FILE_INSERT
1277 const char *my_dbi_batch_lock_path_query[5] = {
1279 "LOCK TABLES Path write, batch write, Path as p write",
1281 "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1290 const char *my_dbi_batch_lock_filename_query[5] = {
1292 "LOCK TABLES Filename write, batch write, Filename as f write",
1294 "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1303 const char *my_dbi_batch_unlock_tables_query[5] = {
1316 const char *my_dbi_batch_fill_path_query[5] = {
1318 "INSERT INTO Path (Path) "
1319 "SELECT a.Path FROM "
1320 "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1321 "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1323 "INSERT INTO Path (Path) "
1324 "SELECT a.Path FROM "
1325 "(SELECT DISTINCT Path FROM batch) AS a "
1326 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1328 "INSERT INTO Path (Path)"
1329 " SELECT DISTINCT Path FROM batch"
1330 " EXCEPT SELECT Path FROM Path",
1332 "INSERT INTO Path (Path)"
1333 " SELECT DISTINCT Path FROM batch"
1334 " EXCEPT SELECT Path FROM Path",
1336 "INSERT INTO Path (Path) "
1337 "SELECT a.Path FROM "
1338 "(SELECT DISTINCT Path FROM batch) AS a "
1339 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "
1342 const char *my_dbi_batch_fill_filename_query[5] = {
1344 "INSERT INTO Filename (Name) "
1345 "SELECT a.Name FROM "
1346 "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1347 "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1349 "INSERT INTO Filename (Name) "
1350 "SELECT a.Name FROM "
1351 "(SELECT DISTINCT Name FROM batch) as a "
1353 "(SELECT Name FROM Filename WHERE Name = a.Name)",
1355 "INSERT INTO Filename (Name)"
1356 " SELECT DISTINCT Name FROM batch "
1357 " EXCEPT SELECT Name FROM Filename",
1359 "INSERT INTO Filename (Name)"
1360 " SELECT DISTINCT Name FROM batch "
1361 " EXCEPT SELECT Name FROM Filename",
1363 "INSERT INTO Filename (Name) "
1364 "SELECT a.Name FROM "
1365 "(SELECT DISTINCT Name FROM batch) as a "
1367 "(SELECT Name FROM Filename WHERE Name = a.Name)"
1370 #endif /* HAVE_BATCH_FILE_INSERT */
1372 const char *my_dbi_match[5] = {
1385 #endif /* HAVE_DBI */