2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to DBI
30 * These are DBI specific routines
32 * João Henrique Freitas, December 2007
33 * based upon work done by Dan Langille, December 2003 and
34 * by Kern Sibbald, March 2000
38 * This code only compiles against a recent version of libdbi. The current
39 * release found on the libdbi website (0.8.3) won't work for this code.
41 * You find the libdbi library on http://sourceforge.net/projects/libdbi
43 * A fairly recent version of libdbi from CVS works, so either make sure
44 * your distribution has a fairly recent version of libdbi installed or
45 * clone the CVS repositories from sourceforge and compile that code and
49 * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
50 * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
54 /* The following is necessary so that we do not include
55 * the dummy external definition of DB.
57 #define __SQL_C /* indicate that this is sql.c */
64 /* -----------------------------------------------------------------------
66 * DBI dependent defines and subroutines
68 * -----------------------------------------------------------------------
71 /* List of open databases */
72 static dlist *db_list = NULL;
74 /* Control allocated fields by my_dbi_getvalue */
75 static dlist *dbi_getvalue_list = NULL;
77 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
80 * Retrieve database type
89 * Initialize database data structure. In principal this should
90 * never have errors, or it is really fatal.
93 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
94 const char *db_address, int db_port, const char *db_socket,
95 int mult_db_connections)
100 char db_driverdir[256];
102 /* Constraint the db_driver */
104 Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
108 /* Do the correct selection of driver.
109 * Can be one of the varius supported by libdbi
113 bstrncpy(db_driver,"mysql", sizeof(db_driver));
115 case SQL_TYPE_POSTGRESQL:
116 bstrncpy(db_driver,"pgsql", sizeof(db_driver));
118 case SQL_TYPE_SQLITE:
119 bstrncpy(db_driver,"sqlite", sizeof(db_driver));
121 case SQL_TYPE_SQLITE3:
122 bstrncpy(db_driver,"sqlite3", sizeof(db_driver));
126 /* Set db_driverdir whereis is the libdbi drivers */
127 bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
130 Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
133 P(mutex); /* lock DB queue */
134 if (db_list == NULL) {
135 db_list = New(dlist(mdb, &mdb->link));
136 dbi_getvalue_list = New(dlist(field, &field->link));
138 if (!mult_db_connections) {
139 /* Look to see if DB already open */
140 foreach_dlist(mdb, db_list) {
141 if (bstrcmp(mdb->db_name, db_name) &&
142 bstrcmp(mdb->db_address, db_address) &&
143 bstrcmp(mdb->db_driver, db_driver) &&
144 mdb->db_port == db_port) {
145 Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name,
146 dbi_conn_error(mdb->db, NULL));
149 return mdb; /* already open */
153 Dmsg0(100, "db_open first time\n");
154 mdb = (B_DB *)malloc(sizeof(B_DB));
155 memset(mdb, 0, sizeof(B_DB));
156 mdb->db_name = bstrdup(db_name);
157 mdb->db_user = bstrdup(db_user);
159 mdb->db_password = bstrdup(db_password);
162 mdb->db_address = bstrdup(db_address);
165 mdb->db_socket = bstrdup(db_socket);
168 mdb->db_driverdir = bstrdup(db_driverdir);
171 mdb->db_driver = bstrdup(db_driver);
173 mdb->db_type = db_type;
174 mdb->db_port = db_port;
175 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
177 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
178 mdb->cached_path = get_pool_memory(PM_FNAME);
179 mdb->cached_path_id = 0;
181 mdb->fname = get_pool_memory(PM_FNAME);
182 mdb->path = get_pool_memory(PM_FNAME);
183 mdb->esc_name = get_pool_memory(PM_FNAME);
184 mdb->esc_path = get_pool_memory(PM_FNAME);
185 mdb->allow_transactions = mult_db_connections;
186 db_list->append(mdb); /* put db in list */
192 * Now actually open the database. This can generate errors,
193 * which are returned in the errmsg
195 * DO NOT close the database or free(mdb) here !!!!
198 db_open_database(JCR *jcr, B_DB *mdb)
206 char *db_name = NULL;
210 if (mdb->connected) {
214 mdb->connected = false;
216 if ((errstat=rwl_init(&mdb->lock)) != 0) {
218 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
219 be.bstrerror(errstat));
225 bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
231 numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
232 if (numdrivers < 0) {
233 Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
234 "db_driverdir=%s. It is probaly not found any drivers\n"),
235 mdb->db_driverdir,numdrivers);
239 mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
240 /* Can be many types of databases */
241 switch (mdb->db_type) {
243 dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
244 dbi_conn_set_option(mdb->db, "port", port); /* default port */
245 dbi_conn_set_option(mdb->db, "username", mdb->db_user); /* login name */
246 dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
247 dbi_conn_set_option(mdb->db, "dbname", mdb->db_name); /* database name */
249 case SQL_TYPE_POSTGRESQL:
250 dbi_conn_set_option(mdb->db, "host", mdb->db_address);
251 dbi_conn_set_option(mdb->db, "port", port);
252 dbi_conn_set_option(mdb->db, "username", mdb->db_user);
253 dbi_conn_set_option(mdb->db, "password", mdb->db_password);
254 dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);
256 case SQL_TYPE_SQLITE:
257 len = strlen(working_directory) + 5;
258 db_dir = (char *)malloc(len);
259 strcpy(db_dir, working_directory);
261 len = strlen(mdb->db_name) + 5;
262 db_name = (char *)malloc(len);
263 strcpy(db_name, mdb->db_name);
264 strcat(db_name, ".db");
265 dbi_conn_set_option(mdb->db, "sqlite_dbdir", db_dir);
266 dbi_conn_set_option(mdb->db, "dbname", db_name);
268 case SQL_TYPE_SQLITE3:
269 len = strlen(working_directory) + 5;
270 db_dir = (char *)malloc(len);
271 strcpy(db_dir, working_directory);
273 len = strlen(mdb->db_name) + 5;
274 db_name = (char *)malloc(len);
275 strcpy(db_name, mdb->db_name);
276 strcat(db_name, ".db");
277 dbi_conn_set_option(mdb->db, "sqlite3_dbdir", db_dir);
278 dbi_conn_set_option(mdb->db, "dbname", db_name);
279 Dmsg2(500, "SQLITE: %s %s\n", db_dir, db_name);
283 /* If connection fails, try at 5 sec intervals for 30 seconds. */
284 for (int retry=0; retry < 6; retry++) {
286 dbstat = dbi_conn_connect(mdb->db);
291 dbi_conn_error(mdb->db, &errmsg);
292 Dmsg1(50, "dbi error: %s\n", errmsg);
299 Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
300 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
301 mdb->db_driver, mdb->db_name, mdb->db_user);
306 Dmsg0(50, "dbi_real_connect done\n");
307 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
308 mdb->db_user, mdb->db_name,
309 mdb->db_password==NULL?"(NULL)":mdb->db_password);
311 mdb->connected = true;
313 if (!check_tables_version(jcr, mdb)) {
318 switch (mdb->db_type) {
320 /* Set connection timeout to 8 days specialy for batch mode */
321 sql_query(mdb, "SET wait_timeout=691200");
322 sql_query(mdb, "SET interactive_timeout=691200");
324 case SQL_TYPE_POSTGRESQL:
325 /* tell PostgreSQL we are using standard conforming strings
326 and avoid warnings such as:
327 WARNING: nonstandard use of \\ in a string literal
329 sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
330 sql_query(mdb, "set standard_conforming_strings=on");
346 db_close_database(JCR *jcr, B_DB *mdb)
351 db_end_transaction(jcr, mdb);
353 sql_free_result(mdb);
355 if (mdb->ref_count == 0) {
356 db_list->remove(mdb);
357 if (mdb->connected && mdb->db) {
359 dbi_shutdown_r(mdb->instance);
361 mdb->instance = NULL;
363 rwl_destroy(&mdb->lock);
364 free_pool_memory(mdb->errmsg);
365 free_pool_memory(mdb->cmd);
366 free_pool_memory(mdb->cached_path);
367 free_pool_memory(mdb->fname);
368 free_pool_memory(mdb->path);
369 free_pool_memory(mdb->esc_name);
370 free_pool_memory(mdb->esc_path);
377 if (mdb->db_password) {
378 free(mdb->db_password);
380 if (mdb->db_address) {
381 free(mdb->db_address);
383 if (mdb->db_socket) {
384 free(mdb->db_socket);
386 if (mdb->db_driverdir) {
387 free(mdb->db_driverdir);
389 if (mdb->db_driver) {
390 free(mdb->db_driver);
393 if (db_list->size() == 0) {
401 void db_thread_cleanup()
405 * Return the next unique index (auto-increment) for
406 * the given table. Return NULL on error.
409 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
411 strcpy(index, "NULL");
417 * Escape strings so that DBI is happy
419 * NOTE! len is the length of the old string. Your new
420 * string must be long enough (max 2*old+1) to hold
421 * the escaped output.
423 * dbi_conn_quote_string_copy receives a pointer to pointer.
424 * We need copy the value of pointer to snew because libdbi change the
428 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
436 /* correct the size of old basead in len
437 * and copy new string to inew
439 inew = (char *)malloc(sizeof(char) * len + 1);
440 bstrncpy(inew,old,len + 1);
441 /* escape the correct size of old */
442 dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
444 /* copy the escaped string to snew */
445 bstrncpy(snew, pnew, 2 * len + 1);
448 Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
453 * Submit a general SQL command (cmd), and for each row returned,
454 * the sqlite_handler is called with the ctx.
456 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
460 Dmsg0(500, "db_sql_query started\n");
463 if (sql_query(mdb, query) != 0) {
464 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
466 Dmsg0(500, "db_sql_query failed\n");
469 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
471 if (result_handler != NULL) {
472 Dmsg0(500, "db_sql_query invoking handler\n");
473 if ((mdb->result = sql_store_result(mdb)) != NULL) {
474 int num_fields = sql_num_fields(mdb);
476 Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
477 while ((row = sql_fetch_row(mdb)) != NULL) {
479 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
480 if (result_handler(ctx, num_fields, row))
484 sql_free_result(mdb);
489 Dmsg0(500, "db_sql_query finished\n");
496 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
499 DBI_ROW row = NULL; // by default, return NULL
501 Dmsg0(500, "my_dbi_fetch_row start\n");
502 if ((!mdb->row || mdb->row_size < mdb->num_fields) && mdb->num_rows > 0) {
503 int num_fields = mdb->num_fields;
504 Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
507 Dmsg0(500, "my_dbi_fetch_row freeing space\n");
508 Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
509 if (mdb->num_rows != 0) {
510 for(j = 0; j < mdb->num_fields; j++) {
511 Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
519 //num_fields += 20; /* add a bit extra */
520 mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
521 mdb->row_size = num_fields;
523 // now reset the row_number now that we have the space allocated
527 // if still within the result set
528 if (mdb->row_number <= mdb->num_rows && mdb->row_number != DBI_ERROR_BADPTR) {
529 Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
530 // get each value from this row
531 for (j = 0; j < mdb->num_fields; j++) {
532 mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
533 // allocate space to queue row
534 mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
535 // store the pointer in queue
536 mdb->field_get->value = mdb->row[j];
537 Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
538 j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
539 // insert in queue to future free
540 dbi_getvalue_list->append(mdb->field_get);
542 // increment the row number for the next call
547 Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
550 Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
555 int my_dbi_max_length(B_DB *mdb, int field_num) {
557 // for a given column, find the max length
565 for (i = 0; i < mdb->num_rows; i++) {
566 if (my_dbi_getisnull(mdb->result, i, field_num)) {
567 this_length = 4; // "NULL"
569 cbuf = my_dbi_getvalue(mdb->result, i, field_num);
570 this_length = cstrlen(cbuf);
571 // cbuf is always free
575 if (max_length < this_length) {
576 max_length = this_length;
583 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
588 Dmsg0(500, "my_dbi_fetch_field starts\n");
590 if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
594 Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
595 mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
596 mdb->fields_size = mdb->num_fields;
598 for (i = 0; i < mdb->num_fields; i++) {
599 // num_fileds is starting at 1, increment i by 1
601 Dmsg1(500, "filling field %d\n", i);
602 mdb->fields[i].name = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
603 mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
604 mdb->fields[i].type = dbi_result_get_field_type_idx(mdb->result, dbi_index);
605 mdb->fields[i].flags = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
607 Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
608 mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
609 mdb->fields[i].flags);
613 // increment field number for the next time around
615 Dmsg0(500, "my_dbi_fetch_field finishes\n");
616 return &mdb->fields[mdb->field_number++];
619 void my_dbi_data_seek(B_DB *mdb, int row)
621 // set the row number to be returned on the next call
622 // to my_dbi_fetch_row
623 mdb->row_number = row;
626 void my_dbi_field_seek(B_DB *mdb, int field)
628 mdb->field_number = field;
632 * Note, if this routine returns 1 (failure), Bacula expects
633 * that no result has been stored.
635 * Returns: 0 on success
639 int my_dbi_query(B_DB *mdb, const char *query)
642 Dmsg1(500, "my_dbi_query started %s\n", query);
643 // We are starting a new query. reset everything.
645 mdb->row_number = -1;
646 mdb->field_number = -1;
649 dbi_result_free(mdb->result); /* hmm, someone forgot to free?? */
653 mdb->result = (void **)dbi_conn_query(mdb->db, query);
656 Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);
660 mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
662 if (mdb->status == DBI_ERROR_NONE) {
663 Dmsg1(500, "we have a result\n", query);
665 // how many fields in the set?
666 // num_fields starting at 1
667 mdb->num_fields = dbi_result_get_numfields(mdb->result);
668 Dmsg1(500, "we have %d fields\n", mdb->num_fields);
669 // if no result num_rows is 0
670 mdb->num_rows = dbi_result_get_numrows(mdb->result);
671 Dmsg1(500, "we have %d rows\n", mdb->num_rows);
673 mdb->status = (dbi_error_flag) 0; /* succeed */
675 Dmsg1(50, "Result status failed: %s\n", query);
679 Dmsg0(500, "my_dbi_query finishing\n");
683 mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
684 //dbi_conn_error(mdb->db, &errmsg);
685 Dmsg4(500, "my_dbi_query we failed dbi error: "
686 "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
687 dbi_result_free(mdb->result);
689 mdb->status = (dbi_error_flag) 1; /* failed */
693 void my_dbi_free_result(B_DB *mdb)
699 Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
700 dbi_result_free(mdb->result);
709 /* now is time to free all value return by my_dbi_get_value
710 * this is necessary because libdbi don't free memory return by yours results
711 * and Bacula has some routine wich call more than once time my_dbi_fetch_row
713 * Using a queue to store all pointer allocate is a good way to free all things
716 foreach_dlist(f, dbi_getvalue_list) {
717 Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
729 Dmsg0(500, "my_dbi_free_result finish\n");
733 const char *my_dbi_strerror(B_DB *mdb)
737 dbi_conn_error(mdb->db, &errmsg);
742 #ifdef HAVE_BATCH_FILE_INSERT
745 * This can be a bit strang but is the one way to do
750 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
752 const char *query = "COPY batch FROM STDIN";
754 Dmsg0(500, "my_dbi_batch_start started\n");
756 switch (mdb->db_type) {
759 if (my_dbi_query(mdb,
760 "CREATE TEMPORARY TABLE batch ("
766 "MD5 tinyblob)") == 1)
768 Dmsg0(500, "my_dbi_batch_start failed\n");
772 Dmsg0(500, "my_dbi_batch_start finishing\n");
775 case SQL_TYPE_POSTGRESQL:
777 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
783 "md5 varchar)") == 1)
785 Dmsg0(500, "my_dbi_batch_start failed\n");
789 // We are starting a new query. reset everything.
791 mdb->row_number = -1;
792 mdb->field_number = -1;
794 my_dbi_free_result(mdb);
796 for (int i=0; i < 10; i++) {
797 my_dbi_query(mdb, query);
804 Dmsg1(50, "Query failed: %s\n", query);
808 mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
809 //mdb->status = DBI_ERROR_NONE;
811 if (mdb->status == DBI_ERROR_NONE) {
812 // how many fields in the set?
813 mdb->num_fields = dbi_result_get_numfields(mdb->result);
814 mdb->num_rows = dbi_result_get_numrows(mdb->result);
815 mdb->status = (dbi_error_flag) 1;
817 Dmsg1(50, "Result status failed: %s\n", query);
821 Dmsg0(500, "my_postgresql_batch_start finishing\n");
825 case SQL_TYPE_SQLITE:
827 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
833 "MD5 tinyblob)") == 1)
835 Dmsg0(500, "my_dbi_batch_start failed\n");
839 Dmsg0(500, "my_dbi_batch_start finishing\n");
842 case SQL_TYPE_SQLITE3:
844 if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
850 "MD5 tinyblob)") == 1)
852 Dmsg0(500, "my_dbi_batch_start failed\n");
856 Dmsg0(500, "my_dbi_batch_start finishing\n");
862 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
863 mdb->status = (dbi_error_flag) 0;
864 my_dbi_free_result(mdb);
869 /* set error to something to abort operation */
870 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
874 int (*custom_function)(void*, const char*) = NULL;
875 dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
877 Dmsg0(500, "my_dbi_batch_end started\n");
879 if (!mdb) { /* no files ? */
883 switch (mdb->db_type) {
886 mdb->status = (dbi_error_flag) 0;
889 case SQL_TYPE_POSTGRESQL:
890 custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
894 res = (*custom_function)(myconn->connection, error);
895 } while (res == 0 && --count > 0);
899 mdb->status = (dbi_error_flag) 1;
903 Dmsg0(500, "we failed\n");
904 mdb->status = (dbi_error_flag) 0;
905 //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
908 case SQL_TYPE_SQLITE:
910 mdb->status = (dbi_error_flag) 0;
913 case SQL_TYPE_SQLITE3:
915 mdb->status = (dbi_error_flag) 0;
920 Dmsg0(500, "my_dbi_batch_end finishing\n");
926 * This function is big and use a big switch.
927 * In near future is better split in small functions
931 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
935 dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
936 int (*custom_function)(void*, const char*, int) = NULL;
937 char* (*custom_function_error)(void*) = NULL;
942 Dmsg0(500, "my_dbi_batch_insert started \n");
944 mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
945 mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
947 if (ar->Digest == NULL || ar->Digest[0] == 0) {
953 switch (mdb->db_type) {
955 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
956 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
957 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
958 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
959 mdb->esc_name, ar->attr, digest);
961 if (my_dbi_query(mdb,mdb->cmd) == 1)
963 Dmsg0(500, "my_dbi_batch_insert failed\n");
967 Dmsg0(500, "my_dbi_batch_insert finishing\n");
971 case SQL_TYPE_POSTGRESQL:
972 my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
973 my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
974 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
975 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
976 mdb->esc_name, ar->attr, digest);
978 /* libdbi don't support CopyData and we need call a postgresql
979 * specific function to do this work
981 Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
982 if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
983 "PQputCopyData")) != NULL) {
985 res = (*custom_function)(myconn->connection, mdb->cmd, len);
986 } while (res == 0 && --count > 0);
991 mdb->status = (dbi_error_flag) 1;
995 Dmsg0(500, "my_dbi_batch_insert failed\n");
999 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1002 // ensure to detect a PQerror
1003 custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
1004 Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1008 case SQL_TYPE_SQLITE:
1009 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1010 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1011 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1012 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1013 mdb->esc_name, ar->attr, digest);
1014 if (my_dbi_query(mdb,mdb->cmd) == 1)
1016 Dmsg0(500, "my_dbi_batch_insert failed\n");
1020 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1024 case SQL_TYPE_SQLITE3:
1025 db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1026 db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1027 len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1028 ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1029 mdb->esc_name, ar->attr, digest);
1030 if (my_dbi_query(mdb,mdb->cmd) == 1)
1032 Dmsg0(500, "my_dbi_batch_insert failed\n");
1036 Dmsg0(500, "my_dbi_batch_insert finishing\n");
1043 Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
1044 mdb->status = (dbi_error_flag) 0;
1045 my_dbi_free_result(mdb);
1050 * Escape strings so that PostgreSQL is happy on COPY
1052 * NOTE! len is the length of the old string. Your new
1053 * string must be long enough (max 2*old+1) to hold
1054 * the escaped output.
1056 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
1058 /* we have to escape \t, \n, \r, \ */
1061 while (len > 0 && *src) {
1096 #endif /* HAVE_BATCH_FILE_INSERT */
1100 * int PQgetisnull(const PGresult *res,
1102 * int column_number);
1104 * use dbi_result_seek_row to search in result set
1106 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1109 if(row_number == 0) {
1115 if(dbi_result_seek_row(result, row_number)) {
1117 i = dbi_result_field_is_null_idx(result,column_number);
1128 * char *PQgetvalue(const PGresult *res,
1130 * int column_number);
1132 * use dbi_result_seek_row to search in result set
1133 * use example to return only strings
1135 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1139 const char *field_name;
1140 unsigned short dbitype;
1141 size_t field_length;
1144 /* correct the index for dbi interface
1145 * dbi index begins 1
1146 * I prefer do not change others functions
1148 Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
1149 result, row_number, column_number);
1153 if(row_number == 0) {
1157 Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
1158 result, row_number, column_number);
1160 if(dbi_result_seek_row(result, row_number)) {
1162 field_name = dbi_result_get_field_name(result, column_number);
1163 field_length = dbi_result_get_field_length(result, field_name);
1164 dbitype = dbi_result_get_field_type_idx(result,column_number);
1166 Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
1167 "field_length bytes: '%d' fieldname: '%s'\n",
1168 dbitype, field_length, field_name);
1171 //buf = (char *)malloc(sizeof(char *) * field_length + 1);
1172 buf = (char *)malloc(field_length + 1);
1175 buf = (char *)malloc(sizeof(char *) * 50);
1179 case DBI_TYPE_INTEGER:
1180 num = dbi_result_get_longlong(result, field_name);
1181 edit_int64(num, buf);
1182 field_length = strlen(buf);
1184 case DBI_TYPE_STRING:
1186 field_length = bsnprintf(buf, field_length + 1, "%s",
1187 dbi_result_get_string(result, field_name));
1192 case DBI_TYPE_BINARY:
1193 /* dbi_result_get_binary return a NULL pointer if value is empty
1194 * following, change this to what Bacula espected
1197 field_length = bsnprintf(buf, field_length + 1, "%s",
1198 dbi_result_get_binary(result, field_name));
1203 case DBI_TYPE_DATETIME:
1207 last = dbi_result_get_datetime(result, field_name);
1210 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
1212 (void)localtime_r(&last, &tm);
1213 field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1214 (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1215 tm.tm_hour, tm.tm_min, tm.tm_sec);
1221 dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1222 Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1225 Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
1226 buf, field_length, buf);
1228 // don't worry about this buf
1232 static int my_dbi_sequence_last(B_DB *mdb, const char *table_name)
1235 Obtain the current value of the sequence that
1236 provides the serial value for primary key of the table.
1238 currval is local to our session. It is not affected by
1241 Determine the name of the sequence.
1242 PostgreSQL automatically creates a sequence using
1243 <table>_<column>_seq.
1244 At the time of writing, all tables used this format for
1245 for their primary key: <table>id
1246 Except for basefiles which has a primary key on baseid.
1247 Therefore, we need to special case that one table.
1249 everything else can use the PostgreSQL formula.
1255 if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1257 if (strcasecmp(table_name, "basefiles") == 0) {
1258 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1260 bstrncpy(sequence, table_name, sizeof(sequence));
1261 bstrncat(sequence, "_", sizeof(sequence));
1262 bstrncat(sequence, table_name, sizeof(sequence));
1263 bstrncat(sequence, "id", sizeof(sequence));
1266 bstrncat(sequence, "_seq", sizeof(sequence));
1267 id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1269 id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1275 int my_dbi_sql_insert_id(B_DB *mdb, const char *query, const char *table_name)
1278 * First execute the insert query and then retrieve the currval.
1280 if (!my_dbi_query(mdb, query)) {
1284 mdb->num_rows = sql_affected_rows(mdb);
1285 if (mdb->num_rows != 1) {
1291 return my_dbi_sequence_last(mdb, table_name);
1294 #ifdef HAVE_BATCH_FILE_INSERT
1295 const char *my_dbi_batch_lock_path_query[5] = {
1297 "LOCK TABLES Path write, batch write, Path as p write",
1299 "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1308 const char *my_dbi_batch_lock_filename_query[5] = {
1310 "LOCK TABLES Filename write, batch write, Filename as f write",
1312 "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1321 const char *my_dbi_batch_unlock_tables_query[5] = {
1334 const char *my_dbi_batch_fill_path_query[5] = {
1336 "INSERT INTO Path (Path) "
1337 "SELECT a.Path FROM "
1338 "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1339 "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1341 "INSERT INTO Path (Path) "
1342 "SELECT a.Path FROM "
1343 "(SELECT DISTINCT Path FROM batch) AS a "
1344 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1346 "INSERT INTO Path (Path)"
1347 " SELECT DISTINCT Path FROM batch"
1348 " EXCEPT SELECT Path FROM Path",
1350 "INSERT INTO Path (Path)"
1351 " SELECT DISTINCT Path FROM batch"
1352 " EXCEPT SELECT Path FROM Path",
1354 "INSERT INTO Path (Path) "
1355 "SELECT a.Path FROM "
1356 "(SELECT DISTINCT Path FROM batch) AS a "
1357 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "
1360 const char *my_dbi_batch_fill_filename_query[5] = {
1362 "INSERT INTO Filename (Name) "
1363 "SELECT a.Name FROM "
1364 "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1365 "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1367 "INSERT INTO Filename (Name) "
1368 "SELECT a.Name FROM "
1369 "(SELECT DISTINCT Name FROM batch) as a "
1371 "(SELECT Name FROM Filename WHERE Name = a.Name)",
1373 "INSERT INTO Filename (Name)"
1374 " SELECT DISTINCT Name FROM batch "
1375 " EXCEPT SELECT Name FROM Filename",
1377 "INSERT INTO Filename (Name)"
1378 " SELECT DISTINCT Name FROM batch "
1379 " EXCEPT SELECT Name FROM Filename",
1381 "INSERT INTO Filename (Name) "
1382 "SELECT a.Name FROM "
1383 "(SELECT DISTINCT Name FROM batch) as a "
1385 "(SELECT Name FROM Filename WHERE Name = a.Name)"
1388 #endif /* HAVE_BATCH_FILE_INSERT */
1390 const char *my_dbi_match[5] = {
1403 #endif /* HAVE_DBI */