2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to PostgreSQL
30 * These are PostgreSQL specific routines
32 * Dan Langille, December 2003
33 * based upon work done by Kern Sibbald, March 2000
35 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
40 #ifdef HAVE_POSTGRESQL
45 #include "postgres_ext.h" /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
49 /* -----------------------------------------------------------------------
51 * PostgreSQL dependent defines and subroutines
53 * -----------------------------------------------------------------------
57 * List of open databases
59 static dlist *db_list = NULL;
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64 const char *db_driver,
67 const char *db_password,
68 const char *db_address,
70 const char *db_socket,
71 bool mult_db_connections,
72 bool disable_batch_insert)
75 * Initialize the parent class members.
77 m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78 m_db_type = SQL_TYPE_POSTGRESQL;
79 m_db_driver = bstrdup("PostgreSQL");
80 m_db_name = bstrdup(db_name);
81 m_db_user = bstrdup(db_user);
83 m_db_password = bstrdup(db_password);
86 m_db_address = bstrdup(db_address);
89 m_db_socket = bstrdup(db_socket);
92 if (disable_batch_insert) {
93 m_disabled_batch_insert = true;
94 m_have_batch_insert = false;
96 m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100 m_have_batch_insert = PQisthreadsafe();
102 m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
105 m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
108 m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
111 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
113 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114 cached_path = get_pool_memory(PM_FNAME);
117 fname = get_pool_memory(PM_FNAME);
118 path = get_pool_memory(PM_FNAME);
119 esc_name = get_pool_memory(PM_FNAME);
120 esc_path = get_pool_memory(PM_FNAME);
121 esc_obj = get_pool_memory(PM_FNAME);
122 m_buf = get_pool_memory(PM_FNAME);
123 m_allow_transactions = mult_db_connections;
125 /* At this time, when mult_db_connections == true, this is for
126 * specific console command such as bvfs or batch mode, and we don't
127 * want to share a batch mode or bvfs. In the future, we can change
128 * the creation function to add this parameter.
130 m_dedicated = mult_db_connections;
133 * Initialize the private members.
139 * Put the db in the list.
141 if (db_list == NULL) {
142 db_list = New(dlist(this, &this->m_link));
144 db_list->append(this);
147 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
152 * Check that the database correspond to the encoding we want
154 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
159 if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
160 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
164 if ((row = mdb->sql_fetch_row()) == NULL) {
165 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
166 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
168 ret = bstrcmp(row[0], "SQL_ASCII");
172 * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
174 mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
178 * Something is wrong with database encoding
181 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
182 mdb->get_db_name(), row[0]);
183 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
184 Dmsg1(50, "%s", mdb->errmsg);
191 * Now actually open the database. This can generate errors,
192 * which are returned in the errmsg
194 * DO NOT close the database or delete mdb here !!!!
196 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
208 if ((errstat=rwl_init(&m_lock)) != 0) {
210 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
211 be.bstrerror(errstat));
216 bsnprintf(buf, sizeof(buf), "%d", m_db_port);
222 /* If connection fails, try at 5 sec intervals for 30 seconds. */
223 for (int retry=0; retry < 6; retry++) {
224 /* connect to the database */
225 m_db_handle = PQsetdbLogin(
226 m_db_address, /* default = localhost */
227 port, /* default port */
228 NULL, /* pg options */
229 NULL, /* tty, ignored */
230 m_db_name, /* database name */
231 m_db_user, /* login name */
232 m_db_password); /* password */
234 /* If no connect, try once more in case it is a timing problem */
235 if (PQstatus(m_db_handle) == CONNECTION_OK) {
241 Dmsg0(50, "pg_real_connect done\n");
242 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
243 (m_db_password == NULL) ? "(NULL)" : m_db_password);
245 if (PQstatus(m_db_handle) != CONNECTION_OK) {
246 Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
247 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
248 m_db_name, m_db_user);
253 if (!check_tables_version(jcr, this)) {
257 sql_query("SET datestyle TO 'ISO, YMD'");
258 sql_query("SET cursor_tuple_fraction=1");
261 * Tell PostgreSQL we are using standard conforming strings
262 * and avoid warnings such as:
263 * WARNING: nonstandard use of \\ in a string literal
265 sql_query("SET standard_conforming_strings=on");
268 * Check that encoding is SQL_ASCII
270 pgsql_check_database_encoding(jcr, this);
279 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
281 db_end_transaction(jcr);
284 if (m_ref_count == 0) {
286 db_list->remove(this);
287 if (m_connected && m_db_handle) {
288 PQfinish(m_db_handle);
290 rwl_destroy(&m_lock);
291 free_pool_memory(errmsg);
292 free_pool_memory(cmd);
293 free_pool_memory(cached_path);
294 free_pool_memory(fname);
295 free_pool_memory(path);
296 free_pool_memory(esc_name);
297 free_pool_memory(esc_path);
298 free_pool_memory(esc_obj);
299 free_pool_memory(m_buf);
319 if (db_list->size() == 0) {
327 void B_DB_POSTGRESQL::db_thread_cleanup(void)
332 * Escape strings so that PostgreSQL is happy
334 * NOTE! len is the length of the old string. Your new
335 * string must be long enough (max 2*old+1) to hold
336 * the escaped output.
338 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
342 PQescapeStringConn(m_db_handle, snew, old, len, &error);
344 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
345 /* error on encoding, probably invalid multibyte encoding in the source string
346 see PQescapeStringConn documentation for details. */
347 Dmsg0(500, "PQescapeStringConn failed\n");
352 * Escape binary so that PostgreSQL is happy
355 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
360 obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
362 Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
365 esc_obj = check_pool_memory_size(esc_obj, new_len+1);
366 memcpy(esc_obj, obj, new_len);
371 return (char *)esc_obj;
375 * Unescape binary object so that PostgreSQL is happy
378 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
379 POOLMEM **dest, int32_t *dest_len)
390 obj = PQunescapeBytea((unsigned const char *)from, &new_len);
393 Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
397 *dest = check_pool_memory_size(*dest, new_len+1);
398 memcpy(*dest, obj, new_len);
403 Dmsg1(010, "obj size: %d\n", *dest_len);
407 * Start a transaction. This groups inserts and makes things
408 * much more efficient. Usually started when inserting
411 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
414 jcr->attr = get_pool_memory(PM_FNAME);
417 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
421 * This is turned off because transactions break
422 * if multiple simultaneous jobs are run.
424 if (!m_allow_transactions) {
430 * Allow only 25,000 changes per transaction
432 if (m_transaction && changes > 25000) {
433 db_end_transaction(jcr);
435 if (!m_transaction) {
436 sql_query("BEGIN"); /* begin transaction */
437 Dmsg0(400, "Start PosgreSQL transaction\n");
438 m_transaction = true;
443 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
445 if (jcr && jcr->cached_attribute) {
446 Dmsg0(400, "Flush last cached attribute.\n");
447 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
448 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
450 jcr->cached_attribute = false;
453 if (!m_allow_transactions) {
459 sql_query("COMMIT"); /* end transaction */
460 m_transaction = false;
461 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
469 * Submit a general SQL command (cmd), and for each row returned,
470 * the result_handler is called with the ctx.
472 bool B_DB_POSTGRESQL::db_big_sql_query(const char *query,
473 DB_RESULT_HANDLER *result_handler,
478 bool in_transaction = m_transaction;
480 Dmsg1(500, "db_sql_query starts with '%s'\n", query);
482 /* This code handles only SELECT queries */
483 if (strncasecmp(query, "SELECT", 6) != 0) {
484 return db_sql_query(query, result_handler, ctx);
487 if (!result_handler) { /* no need of big_query without handler */
493 if (!in_transaction) { /* CURSOR needs transaction */
497 Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
499 if (!sql_query(m_buf)) {
500 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
501 Dmsg0(50, "db_sql_query failed\n");
506 if (!sql_query("FETCH 100 FROM _bac_cursor")) {
509 while ((row = sql_fetch_row()) != NULL) {
510 Dmsg1(500, "Fetching %d rows\n", m_num_rows);
511 if (result_handler(ctx, m_num_fields, row))
517 } while (m_num_rows > 0); /* TODO: Can probably test against 100 */
519 sql_query("CLOSE _bac_cursor");
521 Dmsg0(500, "db_big_sql_query finished\n");
526 if (!in_transaction) {
527 sql_query("COMMIT"); /* end transaction */
535 * Submit a general SQL command (cmd), and for each row returned,
536 * the result_handler is called with the ctx.
538 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
543 Dmsg1(500, "db_sql_query starts with '%s'\n", query);
546 if (!sql_query(query, QF_STORE_RESULT)) {
547 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
548 Dmsg0(500, "db_sql_query failed\n");
553 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
555 if (result_handler != NULL) {
556 Dmsg0(500, "db_sql_query invoking handler\n");
557 while ((row = sql_fetch_row()) != NULL) {
558 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
559 if (result_handler(ctx, m_num_fields, row))
565 Dmsg0(500, "db_sql_query finished\n");
573 * Note, if this routine returns false (failure), Bacula expects
574 * that no result has been stored.
575 * This is where QUERY_DB comes with Postgresql.
577 * Returns: true on success
581 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
586 Dmsg1(500, "sql_query starts with '%s'\n", query);
588 * We are starting a new query. reset everything.
595 PQclear(m_result); /* hmm, someone forgot to free?? */
599 for (i = 0; i < 10; i++) {
600 m_result = PQexec(m_db_handle, query);
607 Dmsg1(50, "Query failed: %s\n", query);
611 m_status = PQresultStatus(m_result);
612 if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
613 Dmsg0(500, "we have a result\n");
616 * How many fields in the set?
618 m_num_fields = (int)PQnfields(m_result);
619 Dmsg1(500, "we have %d fields\n", m_num_fields);
621 m_num_rows = PQntuples(m_result);
622 Dmsg1(500, "we have %d rows\n", m_num_rows);
624 m_row_number = 0; /* we can start to fetch something */
625 m_status = 0; /* succeed */
628 Dmsg1(50, "Result status failed: %s\n", query);
632 Dmsg0(500, "sql_query finishing\n");
636 Dmsg0(500, "we failed\n");
639 m_status = 1; /* failed */
645 void B_DB_POSTGRESQL::sql_free_result(void)
660 m_num_rows = m_num_fields = 0;
664 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
667 SQL_ROW row = NULL; /* by default, return NULL */
669 Dmsg0(500, "sql_fetch_row start\n");
671 if (m_num_fields == 0) { /* No field, no row */
672 Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
676 if (!m_rows || m_rows_size < m_num_fields) {
678 Dmsg0(500, "sql_fetch_row freeing space\n");
681 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
682 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
683 m_rows_size = m_num_fields;
686 * Now reset the row_number now that we have the space allocated
692 * If still within the result set
694 if (m_row_number >= 0 && m_row_number < m_num_rows) {
695 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
697 * Get each value from this row
699 for (j = 0; j < m_num_fields; j++) {
700 m_rows[j] = PQgetvalue(m_result, m_row_number, j);
701 Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
704 * Increment the row number for the next call
709 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
712 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
717 const char *B_DB_POSTGRESQL::sql_strerror(void)
719 return PQerrorMessage(m_db_handle);
722 void B_DB_POSTGRESQL::sql_data_seek(int row)
725 * Set the row number to be returned on the next call to sql_fetch_row
730 int B_DB_POSTGRESQL::sql_affected_rows(void)
732 return (unsigned) str_to_int32(PQcmdTuples(m_result));
735 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
739 char sequence[NAMEDATALEN-1];
740 char getkeyval_query[NAMEDATALEN+50];
744 * First execute the insert query and then retrieve the currval.
746 if (!sql_query(query)) {
750 m_num_rows = sql_affected_rows();
751 if (m_num_rows != 1) {
758 * Obtain the current value of the sequence that
759 * provides the serial value for primary key of the table.
761 * currval is local to our session. It is not affected by
762 * other transactions.
764 * Determine the name of the sequence.
765 * PostgreSQL automatically creates a sequence using
766 * <table>_<column>_seq.
767 * At the time of writing, all tables used this format for
768 * for their primary key: <table>id
769 * Except for basefiles which has a primary key on baseid.
770 * Therefore, we need to special case that one table.
772 * everything else can use the PostgreSQL formula.
774 if (strcasecmp(table_name, "basefiles") == 0) {
775 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
777 bstrncpy(sequence, table_name, sizeof(sequence));
778 bstrncat(sequence, "_", sizeof(sequence));
779 bstrncat(sequence, table_name, sizeof(sequence));
780 bstrncat(sequence, "id", sizeof(sequence));
783 bstrncat(sequence, "_seq", sizeof(sequence));
784 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
786 Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
787 for (i = 0; i < 10; i++) {
788 pg_result = PQexec(m_db_handle, getkeyval_query);
795 Dmsg1(50, "Query failed: %s\n", getkeyval_query);
799 Dmsg0(500, "exec done");
801 if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
802 Dmsg0(500, "getting value");
803 id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
804 Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
806 Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
807 Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
816 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
822 Dmsg0(500, "sql_fetch_field starts\n");
824 if (!m_fields || m_fields_size < m_num_fields) {
829 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
830 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
831 m_fields_size = m_num_fields;
833 for (i = 0; i < m_num_fields; i++) {
834 Dmsg1(500, "filling field %d\n", i);
835 m_fields[i].name = PQfname(m_result, i);
836 m_fields[i].type = PQftype(m_result, i);
837 m_fields[i].flags = 0;
840 * For a given column, find the max length.
843 for (j = 0; j < m_num_rows; j++) {
844 if (PQgetisnull(m_result, j, i)) {
845 this_length = 4; /* "NULL" */
847 this_length = cstrlen(PQgetvalue(m_result, j, i));
850 if (max_length < this_length) {
851 max_length = this_length;
854 m_fields[i].max_length = max_length;
856 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
857 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
862 * Increment field number for the next time around
864 return &m_fields[m_field_number++];
867 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
869 switch (field_type) {
877 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
880 * TEMP: the following is taken from select OID, typname from pg_type;
882 switch (field_type) {
895 * Escape strings so that PostgreSQL is happy on COPY
897 * NOTE! len is the length of the old string. Your new
898 * string must be long enough (max 2*old+1) to hold
899 * the escaped output.
901 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
903 /* we have to escape \t, \n, \r, \ */
906 while (len > 0 && *src) {
941 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
943 const char *query = "COPY batch FROM STDIN";
945 Dmsg0(500, "sql_batch_start started\n");
947 if (!sql_query("CREATE TEMPORARY TABLE batch ("
954 "DeltaSeq smallint)")) {
955 Dmsg0(500, "sql_batch_start failed\n");
960 * We are starting a new query. reset everything.
968 for (int i=0; i < 10; i++) {
969 m_result = PQexec(m_db_handle, query);
976 Dmsg1(50, "Query failed: %s\n", query);
980 m_status = PQresultStatus(m_result);
981 if (m_status == PGRES_COPY_IN) {
983 * How many fields in the set?
985 m_num_fields = (int) PQnfields(m_result);
989 Dmsg1(50, "Result status failed: %s\n", query);
993 Dmsg0(500, "sql_batch_start finishing\n");
998 Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
1006 * Set error to something to abort operation
1008 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1012 PGresult *pg_result;
1014 Dmsg0(500, "sql_batch_end started\n");
1017 res = PQputCopyEnd(m_db_handle, error);
1018 } while (res == 0 && --count > 0);
1026 Dmsg0(500, "we failed\n");
1028 Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1029 Dmsg1(500, "failure %s\n", errmsg);
1032 /* Check command status and return to normal libpq state */
1033 pg_result = PQgetResult(m_db_handle);
1034 if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
1035 Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1040 Dmsg0(500, "sql_batch_end finishing\n");
1045 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1053 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1054 pgsql_copy_escape(esc_name, fname, fnl);
1056 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1057 pgsql_copy_escape(esc_path, path, pnl);
1059 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1062 digest = ar->Digest;
1065 len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1066 ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1067 esc_name, ar->attr, digest, ar->DeltaSeq);
1070 res = PQputCopyData(m_db_handle, cmd, len);
1071 } while (res == 0 && --count > 0);
1080 Dmsg0(500, "we failed\n");
1082 Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1083 Dmsg1(500, "failure %s\n", errmsg);
1086 Dmsg0(500, "sql_batch_insert finishing\n");
1092 * Initialize database data structure. In principal this should
1093 * never have errors, or it is really fatal.
1095 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name,
1096 const char *db_user, const char *db_password,
1097 const char *db_address, int db_port,
1098 const char *db_socket, bool mult_db_connections,
1099 bool disable_batch_insert)
1101 B_DB_POSTGRESQL *mdb = NULL;
1104 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1107 P(mutex); /* lock DB queue */
1108 if (db_list && !mult_db_connections) {
1110 * Look to see if DB already open
1112 foreach_dlist(mdb, db_list) {
1113 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1114 Dmsg1(100, "DB REopen %s\n", db_name);
1115 mdb->increment_refcount();
1120 Dmsg0(100, "db_init_database first time\n");
1121 mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password,
1122 db_address, db_port, db_socket,
1123 mult_db_connections, disable_batch_insert));
1130 #endif /* HAVE_POSTGRESQL */