2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to PostgreSQL
30 * These are PostgreSQL specific routines
32 * Dan Langille, December 2003
33 * based upon work done by Kern Sibbald, March 2000
35 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
40 #ifdef HAVE_POSTGRESQL
45 #include "postgres_ext.h" /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
49 /* -----------------------------------------------------------------------
51 * PostgreSQL dependent defines and subroutines
53 * -----------------------------------------------------------------------
57 * List of open databases
59 static dlist *db_list = NULL;
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64 const char *db_driver,
67 const char *db_password,
68 const char *db_address,
70 const char *db_socket,
71 bool mult_db_connections,
72 bool disable_batch_insert)
75 * Initialize the parent class members.
77 m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78 m_db_type = SQL_TYPE_POSTGRESQL;
79 m_db_driver = bstrdup("PostgreSQL");
80 m_db_name = bstrdup(db_name);
81 m_db_user = bstrdup(db_user);
83 m_db_password = bstrdup(db_password);
86 m_db_address = bstrdup(db_address);
89 m_db_socket = bstrdup(db_socket);
92 if (disable_batch_insert) {
93 m_disabled_batch_insert = true;
94 m_have_batch_insert = false;
96 m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100 m_have_batch_insert = PQisthreadsafe();
102 m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
105 m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
108 m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
111 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
113 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114 cached_path = get_pool_memory(PM_FNAME);
117 fname = get_pool_memory(PM_FNAME);
118 path = get_pool_memory(PM_FNAME);
119 esc_name = get_pool_memory(PM_FNAME);
120 esc_path = get_pool_memory(PM_FNAME);
121 esc_obj = get_pool_memory(PM_FNAME);
122 m_buf = get_pool_memory(PM_FNAME);
123 m_allow_transactions = mult_db_connections;
126 * Initialize the private members.
132 * Put the db in the list.
134 if (db_list == NULL) {
135 db_list = New(dlist(this, &this->m_link));
137 db_list->append(this);
140 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
145 * Check that the database correspond to the encoding we want
147 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
152 if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
153 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
157 if ((row = mdb->sql_fetch_row()) == NULL) {
158 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
159 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
161 ret = bstrcmp(row[0], "SQL_ASCII");
165 * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
167 mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
171 * Something is wrong with database encoding
174 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
175 mdb->get_db_name(), row[0]);
176 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
177 Dmsg1(50, "%s", mdb->errmsg);
184 * Now actually open the database. This can generate errors,
185 * which are returned in the errmsg
187 * DO NOT close the database or delete mdb here !!!!
189 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
201 if ((errstat=rwl_init(&m_lock)) != 0) {
203 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
204 be.bstrerror(errstat));
209 bsnprintf(buf, sizeof(buf), "%d", m_db_port);
215 /* If connection fails, try at 5 sec intervals for 30 seconds. */
216 for (int retry=0; retry < 6; retry++) {
217 /* connect to the database */
218 m_db_handle = PQsetdbLogin(
219 m_db_address, /* default = localhost */
220 port, /* default port */
221 NULL, /* pg options */
222 NULL, /* tty, ignored */
223 m_db_name, /* database name */
224 m_db_user, /* login name */
225 m_db_password); /* password */
227 /* If no connect, try once more in case it is a timing problem */
228 if (PQstatus(m_db_handle) == CONNECTION_OK) {
234 Dmsg0(50, "pg_real_connect done\n");
235 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
236 (m_db_password == NULL) ? "(NULL)" : m_db_password);
238 if (PQstatus(m_db_handle) != CONNECTION_OK) {
239 Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
240 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
241 m_db_name, m_db_user);
246 if (!check_tables_version(jcr, this)) {
250 sql_query("SET datestyle TO 'ISO, YMD'");
251 sql_query("SET cursor_tuple_fraction=1");
254 * Tell PostgreSQL we are using standard conforming strings
255 * and avoid warnings such as:
256 * WARNING: nonstandard use of \\ in a string literal
258 sql_query("SET standard_conforming_strings=on");
261 * Check that encoding is SQL_ASCII
263 pgsql_check_database_encoding(jcr, this);
272 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
274 db_end_transaction(jcr);
278 if (m_ref_count == 0) {
279 db_list->remove(this);
280 if (m_connected && m_db_handle) {
281 PQfinish(m_db_handle);
283 rwl_destroy(&m_lock);
284 free_pool_memory(errmsg);
285 free_pool_memory(cmd);
286 free_pool_memory(cached_path);
287 free_pool_memory(fname);
288 free_pool_memory(path);
289 free_pool_memory(esc_name);
290 free_pool_memory(esc_path);
291 free_pool_memory(esc_obj);
292 free_pool_memory(m_buf);
312 if (db_list->size() == 0) {
320 void B_DB_POSTGRESQL::db_thread_cleanup(void)
325 * Escape strings so that PostgreSQL is happy
327 * NOTE! len is the length of the old string. Your new
328 * string must be long enough (max 2*old+1) to hold
329 * the escaped output.
331 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
335 PQescapeStringConn(m_db_handle, snew, old, len, &error);
337 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
338 /* error on encoding, probably invalid multibyte encoding in the source string
339 see PQescapeStringConn documentation for details. */
340 Dmsg0(500, "PQescapeStringConn failed\n");
345 * Escape binary so that PostgreSQL is happy
348 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
353 obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
355 Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
358 esc_obj = check_pool_memory_size(esc_obj, new_len+1);
359 memcpy(esc_obj, obj, new_len);
364 return (char *)esc_obj;
368 * Unescape binary object so that PostgreSQL is happy
371 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
372 POOLMEM **dest, int32_t *dest_len)
383 obj = PQunescapeBytea((unsigned const char *)from, &new_len);
386 Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
390 *dest = check_pool_memory_size(*dest, new_len+1);
391 memcpy(*dest, obj, new_len);
396 Dmsg1(010, "obj size: %d\n", *dest_len);
400 * Start a transaction. This groups inserts and makes things
401 * much more efficient. Usually started when inserting
404 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
407 jcr->attr = get_pool_memory(PM_FNAME);
410 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
414 * This is turned off because transactions break
415 * if multiple simultaneous jobs are run.
417 if (!m_allow_transactions) {
423 * Allow only 25,000 changes per transaction
425 if (m_transaction && changes > 25000) {
426 db_end_transaction(jcr);
428 if (!m_transaction) {
429 sql_query("BEGIN"); /* begin transaction */
430 Dmsg0(400, "Start PosgreSQL transaction\n");
431 m_transaction = true;
436 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
438 if (jcr && jcr->cached_attribute) {
439 Dmsg0(400, "Flush last cached attribute.\n");
440 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
441 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
443 jcr->cached_attribute = false;
446 if (!m_allow_transactions) {
452 sql_query("COMMIT"); /* end transaction */
453 m_transaction = false;
454 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
462 * Submit a general SQL command (cmd), and for each row returned,
463 * the result_handler is called with the ctx.
465 bool B_DB_POSTGRESQL::db_big_sql_query(const char *query,
466 DB_RESULT_HANDLER *result_handler,
471 bool in_transaction = m_transaction;
473 Dmsg1(500, "db_sql_query starts with '%s'\n", query);
475 /* This code handles only SELECT queries */
476 if (strncasecmp(query, "SELECT", 6) != 0) {
477 return db_sql_query(query, result_handler, ctx);
480 if (!result_handler) { /* no need of big_query without handler */
486 if (!in_transaction) { /* CURSOR needs transaction */
490 Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
492 if (!sql_query(m_buf)) {
493 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
494 Dmsg0(50, "db_sql_query failed\n");
499 if (!sql_query("FETCH 100 FROM _bac_cursor")) {
502 while ((row = sql_fetch_row()) != NULL) {
503 Dmsg1(500, "Fetching %d rows\n", m_num_rows);
504 if (result_handler(ctx, m_num_fields, row))
510 } while (m_num_rows > 0); /* TODO: Can probably test against 100 */
512 sql_query("CLOSE _bac_cursor");
514 Dmsg0(500, "db_big_sql_query finished\n");
519 if (!in_transaction) {
520 sql_query("COMMIT"); /* end transaction */
528 * Submit a general SQL command (cmd), and for each row returned,
529 * the result_handler is called with the ctx.
531 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
536 Dmsg1(500, "db_sql_query starts with '%s'\n", query);
539 if (!sql_query(query, QF_STORE_RESULT)) {
540 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
541 Dmsg0(500, "db_sql_query failed\n");
546 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
548 if (result_handler != NULL) {
549 Dmsg0(500, "db_sql_query invoking handler\n");
550 while ((row = sql_fetch_row()) != NULL) {
551 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
552 if (result_handler(ctx, m_num_fields, row))
558 Dmsg0(500, "db_sql_query finished\n");
566 * Note, if this routine returns false (failure), Bacula expects
567 * that no result has been stored.
568 * This is where QUERY_DB comes with Postgresql.
570 * Returns: true on success
574 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
579 Dmsg1(500, "sql_query starts with '%s'\n", query);
581 * We are starting a new query. reset everything.
588 PQclear(m_result); /* hmm, someone forgot to free?? */
592 for (i = 0; i < 10; i++) {
593 m_result = PQexec(m_db_handle, query);
600 Dmsg1(50, "Query failed: %s\n", query);
604 m_status = PQresultStatus(m_result);
605 if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
606 Dmsg0(500, "we have a result\n");
609 * How many fields in the set?
611 m_num_fields = (int)PQnfields(m_result);
612 Dmsg1(500, "we have %d fields\n", m_num_fields);
614 m_num_rows = PQntuples(m_result);
615 Dmsg1(500, "we have %d rows\n", m_num_rows);
617 m_row_number = 0; /* we can start to fetch something */
618 m_status = 0; /* succeed */
621 Dmsg1(50, "Result status failed: %s\n", query);
625 Dmsg0(500, "sql_query finishing\n");
629 Dmsg0(500, "we failed\n");
632 m_status = 1; /* failed */
638 void B_DB_POSTGRESQL::sql_free_result(void)
653 m_num_rows = m_num_fields = 0;
657 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
660 SQL_ROW row = NULL; /* by default, return NULL */
662 Dmsg0(500, "sql_fetch_row start\n");
664 if (m_num_fields == 0) { /* No field, no row */
665 Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
669 if (!m_rows || m_rows_size < m_num_fields) {
671 Dmsg0(500, "sql_fetch_row freeing space\n");
674 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
675 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
676 m_rows_size = m_num_fields;
679 * Now reset the row_number now that we have the space allocated
685 * If still within the result set
687 if (m_row_number >= 0 && m_row_number < m_num_rows) {
688 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
690 * Get each value from this row
692 for (j = 0; j < m_num_fields; j++) {
693 m_rows[j] = PQgetvalue(m_result, m_row_number, j);
694 Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
697 * Increment the row number for the next call
702 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
705 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
710 const char *B_DB_POSTGRESQL::sql_strerror(void)
712 return PQerrorMessage(m_db_handle);
715 void B_DB_POSTGRESQL::sql_data_seek(int row)
718 * Set the row number to be returned on the next call to sql_fetch_row
723 int B_DB_POSTGRESQL::sql_affected_rows(void)
725 return (unsigned) str_to_int32(PQcmdTuples(m_result));
728 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
732 char sequence[NAMEDATALEN-1];
733 char getkeyval_query[NAMEDATALEN+50];
737 * First execute the insert query and then retrieve the currval.
739 if (!sql_query(query)) {
743 m_num_rows = sql_affected_rows();
744 if (m_num_rows != 1) {
751 * Obtain the current value of the sequence that
752 * provides the serial value for primary key of the table.
754 * currval is local to our session. It is not affected by
755 * other transactions.
757 * Determine the name of the sequence.
758 * PostgreSQL automatically creates a sequence using
759 * <table>_<column>_seq.
760 * At the time of writing, all tables used this format for
761 * for their primary key: <table>id
762 * Except for basefiles which has a primary key on baseid.
763 * Therefore, we need to special case that one table.
765 * everything else can use the PostgreSQL formula.
767 if (strcasecmp(table_name, "basefiles") == 0) {
768 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
770 bstrncpy(sequence, table_name, sizeof(sequence));
771 bstrncat(sequence, "_", sizeof(sequence));
772 bstrncat(sequence, table_name, sizeof(sequence));
773 bstrncat(sequence, "id", sizeof(sequence));
776 bstrncat(sequence, "_seq", sizeof(sequence));
777 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
779 Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
780 for (i = 0; i < 10; i++) {
781 pg_result = PQexec(m_db_handle, getkeyval_query);
788 Dmsg1(50, "Query failed: %s\n", getkeyval_query);
792 Dmsg0(500, "exec done");
794 if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
795 Dmsg0(500, "getting value");
796 id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
797 Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
799 Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
800 Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
809 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
815 Dmsg0(500, "sql_fetch_field starts\n");
817 if (!m_fields || m_fields_size < m_num_fields) {
822 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
823 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
824 m_fields_size = m_num_fields;
826 for (i = 0; i < m_num_fields; i++) {
827 Dmsg1(500, "filling field %d\n", i);
828 m_fields[i].name = PQfname(m_result, i);
829 m_fields[i].type = PQftype(m_result, i);
830 m_fields[i].flags = 0;
833 * For a given column, find the max length.
836 for (j = 0; j < m_num_rows; j++) {
837 if (PQgetisnull(m_result, j, i)) {
838 this_length = 4; /* "NULL" */
840 this_length = cstrlen(PQgetvalue(m_result, j, i));
843 if (max_length < this_length) {
844 max_length = this_length;
847 m_fields[i].max_length = max_length;
849 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
850 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
855 * Increment field number for the next time around
857 return &m_fields[m_field_number++];
860 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
862 switch (field_type) {
870 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
873 * TEMP: the following is taken from select OID, typname from pg_type;
875 switch (field_type) {
888 * Escape strings so that PostgreSQL is happy on COPY
890 * NOTE! len is the length of the old string. Your new
891 * string must be long enough (max 2*old+1) to hold
892 * the escaped output.
894 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
896 /* we have to escape \t, \n, \r, \ */
899 while (len > 0 && *src) {
934 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
936 const char *query = "COPY batch FROM STDIN";
938 Dmsg0(500, "sql_batch_start started\n");
940 if (!sql_query("CREATE TEMPORARY TABLE batch ("
948 Dmsg0(500, "sql_batch_start failed\n");
953 * We are starting a new query. reset everything.
961 for (int i=0; i < 10; i++) {
962 m_result = PQexec(m_db_handle, query);
969 Dmsg1(50, "Query failed: %s\n", query);
973 m_status = PQresultStatus(m_result);
974 if (m_status == PGRES_COPY_IN) {
976 * How many fields in the set?
978 m_num_fields = (int) PQnfields(m_result);
982 Dmsg1(50, "Result status failed: %s\n", query);
986 Dmsg0(500, "sql_batch_start finishing\n");
991 Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
999 * Set error to something to abort operation
1001 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1005 PGresult *pg_result;
1007 Dmsg0(500, "sql_batch_end started\n");
1010 res = PQputCopyEnd(m_db_handle, error);
1011 } while (res == 0 && --count > 0);
1019 Dmsg0(500, "we failed\n");
1021 Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1022 Dmsg1(500, "failure %s\n", errmsg);
1025 /* Check command status and return to normal libpq state */
1026 pg_result = PQgetResult(m_db_handle);
1027 if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
1028 Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1033 Dmsg0(500, "sql_batch_end finishing\n");
1038 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1046 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1047 pgsql_copy_escape(esc_name, fname, fnl);
1049 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1050 pgsql_copy_escape(esc_path, path, pnl);
1052 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1055 digest = ar->Digest;
1058 len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1059 ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1060 esc_name, ar->attr, digest, ar->DeltaSeq);
1063 res = PQputCopyData(m_db_handle, cmd, len);
1064 } while (res == 0 && --count > 0);
1073 Dmsg0(500, "we failed\n");
1075 Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1076 Dmsg1(500, "failure %s\n", errmsg);
1079 Dmsg0(500, "sql_batch_insert finishing\n");
1085 * Initialize database data structure. In principal this should
1086 * never have errors, or it is really fatal.
1088 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name,
1089 const char *db_user, const char *db_password,
1090 const char *db_address, int db_port,
1091 const char *db_socket, bool mult_db_connections,
1092 bool disable_batch_insert)
1094 B_DB_POSTGRESQL *mdb = NULL;
1097 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1100 P(mutex); /* lock DB queue */
1101 if (db_list && !mult_db_connections) {
1103 * Look to see if DB already open
1105 foreach_dlist(mdb, db_list) {
1106 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1107 Dmsg1(100, "DB REopen %s\n", db_name);
1108 mdb->increment_refcount();
1113 Dmsg0(100, "db_init_database first time\n");
1114 mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password,
1115 db_address, db_port, db_socket,
1116 mult_db_connections, disable_batch_insert));
1123 #endif /* HAVE_POSTGRESQL */