2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to PostgreSQL
30 * These are PostgreSQL specific routines
32 * Dan Langille, December 2003
33 * based upon work done by Kern Sibbald, March 2000
35 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
40 #ifdef HAVE_POSTGRESQL
45 #include "postgres_ext.h" /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
49 /* -----------------------------------------------------------------------
51 * PostgreSQL dependent defines and subroutines
53 * -----------------------------------------------------------------------
57 * List of open databases
59 static dlist *db_list = NULL;
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64 const char *db_driver,
67 const char *db_password,
68 const char *db_address,
70 const char *db_socket,
71 bool mult_db_connections,
72 bool disable_batch_insert)
75 * Initialize the parent class members.
77 m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78 m_db_type = SQL_TYPE_POSTGRESQL;
79 m_db_driver = bstrdup("PostgreSQL");
80 m_db_name = bstrdup(db_name);
81 m_db_user = bstrdup(db_user);
83 m_db_password = bstrdup(db_password);
86 m_db_address = bstrdup(db_address);
89 m_db_socket = bstrdup(db_socket);
92 if (disable_batch_insert) {
93 m_disabled_batch_insert = true;
94 m_have_batch_insert = false;
96 m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100 m_have_batch_insert = PQisthreadsafe();
102 m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
105 m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
108 m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
111 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
113 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114 cached_path = get_pool_memory(PM_FNAME);
117 fname = get_pool_memory(PM_FNAME);
118 path = get_pool_memory(PM_FNAME);
119 esc_name = get_pool_memory(PM_FNAME);
120 esc_path = get_pool_memory(PM_FNAME);
121 esc_obj = get_pool_memory(PM_FNAME);
122 m_buf = get_pool_memory(PM_FNAME);
123 m_allow_transactions = mult_db_connections;
126 * Initialize the private members.
132 * Put the db in the list.
134 if (db_list == NULL) {
135 db_list = New(dlist(this, &this->m_link));
137 db_list->append(this);
140 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
145 * Check that the database correspond to the encoding we want
147 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
152 if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
153 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
157 if ((row = mdb->sql_fetch_row()) == NULL) {
158 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
159 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
161 ret = bstrcmp(row[0], "SQL_ASCII");
165 * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
167 mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
171 * Something is wrong with database encoding
174 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
175 mdb->get_db_name(), row[0]);
176 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
177 Dmsg1(50, "%s", mdb->errmsg);
184 * Now actually open the database. This can generate errors,
185 * which are returned in the errmsg
187 * DO NOT close the database or delete mdb here !!!!
189 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
201 if ((errstat=rwl_init(&m_lock)) != 0) {
203 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
204 be.bstrerror(errstat));
209 bsnprintf(buf, sizeof(buf), "%d", m_db_port);
215 /* If connection fails, try at 5 sec intervals for 30 seconds. */
216 for (int retry=0; retry < 6; retry++) {
217 /* connect to the database */
218 m_db_handle = PQsetdbLogin(
219 m_db_address, /* default = localhost */
220 port, /* default port */
221 NULL, /* pg options */
222 NULL, /* tty, ignored */
223 m_db_name, /* database name */
224 m_db_user, /* login name */
225 m_db_password); /* password */
227 /* If no connect, try once more in case it is a timing problem */
228 if (PQstatus(m_db_handle) == CONNECTION_OK) {
234 Dmsg0(50, "pg_real_connect done\n");
235 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
236 (m_db_password == NULL) ? "(NULL)" : m_db_password);
238 if (PQstatus(m_db_handle) != CONNECTION_OK) {
239 Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
240 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
241 m_db_name, m_db_user);
246 if (!check_tables_version(jcr, this)) {
250 sql_query("SET datestyle TO 'ISO, YMD'");
251 sql_query("SET cursor_tuple_fraction=1");
254 * Tell PostgreSQL we are using standard conforming strings
255 * and avoid warnings such as:
256 * WARNING: nonstandard use of \\ in a string literal
258 sql_query("SET standard_conforming_strings=on");
261 * Check that encoding is SQL_ASCII
263 pgsql_check_database_encoding(jcr, this);
272 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
274 db_end_transaction(jcr);
278 if (m_ref_count == 0) {
279 db_list->remove(this);
280 if (m_connected && m_db_handle) {
281 PQfinish(m_db_handle);
283 rwl_destroy(&m_lock);
284 free_pool_memory(errmsg);
285 free_pool_memory(cmd);
286 free_pool_memory(cached_path);
287 free_pool_memory(fname);
288 free_pool_memory(path);
289 free_pool_memory(esc_name);
290 free_pool_memory(esc_path);
291 free_pool_memory(esc_obj);
292 free_pool_memory(m_buf);
315 if (db_list->size() == 0) {
323 void B_DB_POSTGRESQL::db_thread_cleanup(void)
328 * Escape strings so that PostgreSQL is happy
330 * NOTE! len is the length of the old string. Your new
331 * string must be long enough (max 2*old+1) to hold
332 * the escaped output.
334 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
338 PQescapeStringConn(m_db_handle, snew, old, len, &error);
340 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
341 /* error on encoding, probably invalid multibyte encoding in the source string
342 see PQescapeStringConn documentation for details. */
343 Dmsg0(500, "PQescapeStringConn failed\n");
348 * Escape binary so that PostgreSQL is happy
351 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
356 obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
358 Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
361 esc_obj = check_pool_memory_size(esc_obj, new_len+1);
362 memcpy(esc_obj, obj, new_len);
367 return (char *)esc_obj;
371 * Unescape binary object so that PostgreSQL is happy
374 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
375 POOLMEM **dest, int32_t *dest_len)
386 obj = PQunescapeBytea((unsigned const char *)from, &new_len);
389 Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
393 *dest = check_pool_memory_size(*dest, new_len+1);
394 memcpy(*dest, obj, new_len);
399 Dmsg1(010, "obj size: %d\n", *dest_len);
403 * Start a transaction. This groups inserts and makes things
404 * much more efficient. Usually started when inserting
407 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
410 jcr->attr = get_pool_memory(PM_FNAME);
413 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
417 * This is turned off because transactions break
418 * if multiple simultaneous jobs are run.
420 if (!m_allow_transactions) {
426 * Allow only 25,000 changes per transaction
428 if (m_transaction && changes > 25000) {
429 db_end_transaction(jcr);
431 if (!m_transaction) {
432 sql_query("BEGIN"); /* begin transaction */
433 Dmsg0(400, "Start PosgreSQL transaction\n");
434 m_transaction = true;
439 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
441 if (jcr && jcr->cached_attribute) {
442 Dmsg0(400, "Flush last cached attribute.\n");
443 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
444 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
446 jcr->cached_attribute = false;
449 if (!m_allow_transactions) {
455 sql_query("COMMIT"); /* end transaction */
456 m_transaction = false;
457 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
465 * Submit a general SQL command (cmd), and for each row returned,
466 * the result_handler is called with the ctx.
468 bool B_DB_POSTGRESQL::db_big_sql_query(const char *query,
469 DB_RESULT_HANDLER *result_handler,
474 bool in_transaction = m_transaction;
476 Dmsg1(500, "db_sql_query starts with '%s'\n", query);
478 /* This code handles only SELECT queries */
479 if (strncasecmp(query, "SELECT", 6) != 0) {
480 return db_sql_query(query, result_handler, ctx);
483 if (!result_handler) { /* no need of big_query without handler */
489 if (!in_transaction) { /* CURSOR needs transaction */
493 Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
495 if (!sql_query(m_buf)) {
496 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
497 Dmsg0(50, "db_sql_query failed\n");
502 if (!sql_query("FETCH 100 FROM _bac_cursor")) {
505 while ((row = sql_fetch_row()) != NULL) {
506 Dmsg1(500, "Fetching %d rows\n", m_num_rows);
507 if (result_handler(ctx, m_num_fields, row))
513 } while (m_num_rows > 0); /* TODO: Can probably test against 100 */
515 sql_query("CLOSE _bac_cursor");
517 Dmsg0(500, "db_big_sql_query finished\n");
522 if (!in_transaction) {
523 sql_query("COMMIT"); /* end transaction */
531 * Submit a general SQL command (cmd), and for each row returned,
532 * the result_handler is called with the ctx.
534 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
539 Dmsg1(500, "db_sql_query starts with '%s'\n", query);
542 if (!sql_query(query, QF_STORE_RESULT)) {
543 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
544 Dmsg0(500, "db_sql_query failed\n");
549 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
551 if (result_handler != NULL) {
552 Dmsg0(500, "db_sql_query invoking handler\n");
553 while ((row = sql_fetch_row()) != NULL) {
554 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
555 if (result_handler(ctx, m_num_fields, row))
561 Dmsg0(500, "db_sql_query finished\n");
569 * Note, if this routine returns false (failure), Bacula expects
570 * that no result has been stored.
571 * This is where QUERY_DB comes with Postgresql.
573 * Returns: true on success
577 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
582 Dmsg1(500, "sql_query starts with '%s'\n", query);
584 * We are starting a new query. reset everything.
591 PQclear(m_result); /* hmm, someone forgot to free?? */
595 for (i = 0; i < 10; i++) {
596 m_result = PQexec(m_db_handle, query);
603 Dmsg1(50, "Query failed: %s\n", query);
607 m_status = PQresultStatus(m_result);
608 if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
609 Dmsg0(500, "we have a result\n");
612 * How many fields in the set?
614 m_num_fields = (int)PQnfields(m_result);
615 Dmsg1(500, "we have %d fields\n", m_num_fields);
617 m_num_rows = PQntuples(m_result);
618 Dmsg1(500, "we have %d rows\n", m_num_rows);
620 m_row_number = 0; /* we can start to fetch something */
621 m_status = 0; /* succeed */
624 Dmsg1(50, "Result status failed: %s\n", query);
628 Dmsg0(500, "sql_query finishing\n");
632 Dmsg0(500, "we failed\n");
635 m_status = 1; /* failed */
641 void B_DB_POSTGRESQL::sql_free_result(void)
656 m_num_rows = m_num_fields = 0;
660 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
663 SQL_ROW row = NULL; /* by default, return NULL */
665 Dmsg0(500, "sql_fetch_row start\n");
667 if (m_num_fields == 0) { /* No field, no row */
668 Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
672 if (!m_rows || m_rows_size < m_num_fields) {
674 Dmsg0(500, "sql_fetch_row freeing space\n");
677 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
678 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
679 m_rows_size = m_num_fields;
682 * Now reset the row_number now that we have the space allocated
688 * If still within the result set
690 if (m_row_number >= 0 && m_row_number < m_num_rows) {
691 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
693 * Get each value from this row
695 for (j = 0; j < m_num_fields; j++) {
696 m_rows[j] = PQgetvalue(m_result, m_row_number, j);
697 Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
700 * Increment the row number for the next call
705 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
708 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
713 const char *B_DB_POSTGRESQL::sql_strerror(void)
715 return PQerrorMessage(m_db_handle);
718 void B_DB_POSTGRESQL::sql_data_seek(int row)
721 * Set the row number to be returned on the next call to sql_fetch_row
726 int B_DB_POSTGRESQL::sql_affected_rows(void)
728 return (unsigned) str_to_int32(PQcmdTuples(m_result));
731 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
735 char sequence[NAMEDATALEN-1];
736 char getkeyval_query[NAMEDATALEN+50];
740 * First execute the insert query and then retrieve the currval.
742 if (!sql_query(query)) {
746 m_num_rows = sql_affected_rows();
747 if (m_num_rows != 1) {
754 * Obtain the current value of the sequence that
755 * provides the serial value for primary key of the table.
757 * currval is local to our session. It is not affected by
758 * other transactions.
760 * Determine the name of the sequence.
761 * PostgreSQL automatically creates a sequence using
762 * <table>_<column>_seq.
763 * At the time of writing, all tables used this format for
764 * for their primary key: <table>id
765 * Except for basefiles which has a primary key on baseid.
766 * Therefore, we need to special case that one table.
768 * everything else can use the PostgreSQL formula.
770 if (strcasecmp(table_name, "basefiles") == 0) {
771 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
773 bstrncpy(sequence, table_name, sizeof(sequence));
774 bstrncat(sequence, "_", sizeof(sequence));
775 bstrncat(sequence, table_name, sizeof(sequence));
776 bstrncat(sequence, "id", sizeof(sequence));
779 bstrncat(sequence, "_seq", sizeof(sequence));
780 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
782 Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
783 for (i = 0; i < 10; i++) {
784 pg_result = PQexec(m_db_handle, getkeyval_query);
791 Dmsg1(50, "Query failed: %s\n", getkeyval_query);
795 Dmsg0(500, "exec done");
797 if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
798 Dmsg0(500, "getting value");
799 id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
800 Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
802 Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
803 Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
812 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
818 Dmsg0(500, "sql_fetch_field starts\n");
820 if (!m_fields || m_fields_size < m_num_fields) {
825 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
826 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
827 m_fields_size = m_num_fields;
829 for (i = 0; i < m_num_fields; i++) {
830 Dmsg1(500, "filling field %d\n", i);
831 m_fields[i].name = PQfname(m_result, i);
832 m_fields[i].type = PQftype(m_result, i);
833 m_fields[i].flags = 0;
836 * For a given column, find the max length.
839 for (j = 0; j < m_num_rows; j++) {
840 if (PQgetisnull(m_result, j, i)) {
841 this_length = 4; /* "NULL" */
843 this_length = cstrlen(PQgetvalue(m_result, j, i));
846 if (max_length < this_length) {
847 max_length = this_length;
850 m_fields[i].max_length = max_length;
852 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
853 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
858 * Increment field number for the next time around
860 return &m_fields[m_field_number++];
863 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
865 switch (field_type) {
873 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
876 * TEMP: the following is taken from select OID, typname from pg_type;
878 switch (field_type) {
891 * Escape strings so that PostgreSQL is happy on COPY
893 * NOTE! len is the length of the old string. Your new
894 * string must be long enough (max 2*old+1) to hold
895 * the escaped output.
897 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
899 /* we have to escape \t, \n, \r, \ */
902 while (len > 0 && *src) {
937 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
939 const char *query = "COPY batch FROM STDIN";
941 Dmsg0(500, "sql_batch_start started\n");
943 if (!sql_query("CREATE TEMPORARY TABLE batch ("
951 Dmsg0(500, "sql_batch_start failed\n");
956 * We are starting a new query. reset everything.
964 for (int i=0; i < 10; i++) {
965 m_result = PQexec(m_db_handle, query);
972 Dmsg1(50, "Query failed: %s\n", query);
976 m_status = PQresultStatus(m_result);
977 if (m_status == PGRES_COPY_IN) {
979 * How many fields in the set?
981 m_num_fields = (int) PQnfields(m_result);
985 Dmsg1(50, "Result status failed: %s\n", query);
989 Dmsg0(500, "sql_batch_start finishing\n");
994 Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
1002 * Set error to something to abort operation
1004 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1008 PGresult *pg_result;
1010 Dmsg0(500, "sql_batch_end started\n");
1013 res = PQputCopyEnd(m_db_handle, error);
1014 } while (res == 0 && --count > 0);
1022 Dmsg0(500, "we failed\n");
1024 Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1025 Dmsg1(500, "failure %s\n", errmsg);
1028 /* Check command status and return to normal libpq state */
1029 pg_result = PQgetResult(m_db_handle);
1030 if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
1031 Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1036 Dmsg0(500, "sql_batch_end finishing\n");
1041 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1049 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1050 pgsql_copy_escape(esc_name, fname, fnl);
1052 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1053 pgsql_copy_escape(esc_path, path, pnl);
1055 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1058 digest = ar->Digest;
1061 len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1062 ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1063 esc_name, ar->attr, digest, ar->DeltaSeq);
1066 res = PQputCopyData(m_db_handle, cmd, len);
1067 } while (res == 0 && --count > 0);
1076 Dmsg0(500, "we failed\n");
1078 Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1079 Dmsg1(500, "failure %s\n", errmsg);
1082 Dmsg0(500, "sql_batch_insert finishing\n");
1088 * Initialize database data structure. In principal this should
1089 * never have errors, or it is really fatal.
1091 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name,
1092 const char *db_user, const char *db_password,
1093 const char *db_address, int db_port,
1094 const char *db_socket, bool mult_db_connections,
1095 bool disable_batch_insert)
1097 B_DB_POSTGRESQL *mdb = NULL;
1100 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1103 P(mutex); /* lock DB queue */
1104 if (db_list && !mult_db_connections) {
1106 * Look to see if DB already open
1108 foreach_dlist(mdb, db_list) {
1109 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1110 Dmsg1(100, "DB REopen %s\n", db_name);
1111 mdb->increment_refcount();
1116 Dmsg0(100, "db_init_database first time\n");
1117 mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password,
1118 db_address, db_port, db_socket,
1119 mult_db_connections, disable_batch_insert));
1126 #endif /* HAVE_POSTGRESQL */