2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2017 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Bacula Catalog Database routines specific to PostgreSQL
21 * These are PostgreSQL specific routines
23 * Dan Langille, December 2003
24 * based upon work done by Kern Sibbald, March 2000
26 * Note: at one point, this file was changed to class based by a certain
27 * programmer, and other than "wrapping" in a class, which is a trivial
28 * change for a C++ programmer, nothing substantial was done, yet all the
29 * code was recommitted under this programmer's name. Consequently, we
30 * undo those changes here. Unfortunately, it is too difficult to put
31 * back the original author's name (Dan Langille) on the parts he wrote.
36 #ifdef HAVE_POSTGRESQL
40 #include "postgres_ext.h" /* needed for NAMEDATALEN */
41 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
42 #define __BDB_POSTGRESQL_H_ 1
43 #include "bdb_postgresql.h"
45 #define dbglvl_dbg DT_SQL|100
46 #define dbglvl_info DT_SQL|50
47 #define dbglvl_err DT_SQL|10
49 /* -----------------------------------------------------------------------
51 * PostgreSQL dependent defines and subroutines
53 * -----------------------------------------------------------------------
56 /* List of open databases */
57 static dlist *db_list = NULL;
59 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
61 BDB_POSTGRESQL::BDB_POSTGRESQL(): BDB()
63 BDB_POSTGRESQL *mdb = this;
65 if (db_list == NULL) {
66 db_list = New(dlist(mdb, &mdb->m_link));
68 mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
69 mdb->m_db_type = SQL_TYPE_POSTGRESQL;
70 mdb->m_db_driver = bstrdup("PostgreSQL");
72 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
74 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
75 mdb->cached_path = get_pool_memory(PM_FNAME);
76 mdb->cached_path_id = 0;
78 mdb->fname = get_pool_memory(PM_FNAME);
79 mdb->path = get_pool_memory(PM_FNAME);
80 mdb->esc_name = get_pool_memory(PM_FNAME);
81 mdb->esc_path = get_pool_memory(PM_FNAME);
82 mdb->esc_obj = get_pool_memory(PM_FNAME);
83 mdb->m_use_fatal_jmsg = true;
85 /* Initialize the private members. */
86 mdb->m_db_handle = NULL;
88 mdb->m_buf = get_pool_memory(PM_FNAME);
90 db_list->append(this);
93 BDB_POSTGRESQL::~BDB_POSTGRESQL()
98 * Initialize database data structure. In principal this should
99 * never have errors, or it is really fatal.
101 BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
102 const char *db_password, const char *db_address, int db_port, const char *db_socket,
103 const char *db_ssl_key, const char *db_ssl_cert, const char *db_ssl_ca,
104 const char *db_ssl_capath, const char *db_ssl_cipher,
105 bool mult_db_connections, bool disable_batch_insert)
107 BDB_POSTGRESQL *mdb = NULL;
110 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
113 P(mutex); /* lock DB queue */
114 if (db_list && !mult_db_connections) {
116 * Look to see if DB already open
118 foreach_dlist(mdb, db_list) {
119 if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
120 Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
121 mdb->increment_refcount();
126 Dmsg0(dbglvl_info, "db_init_database first time\n");
127 /* Create the global Bacula db context */
128 mdb = New(BDB_POSTGRESQL());
129 if (!mdb) goto get_out;
131 /* Initialize the parent class members. */
132 mdb->m_db_name = bstrdup(db_name);
133 mdb->m_db_user = bstrdup(db_user);
135 mdb->m_db_password = bstrdup(db_password);
138 mdb->m_db_address = bstrdup(db_address);
141 mdb->m_db_socket = bstrdup(db_socket);
143 mdb->m_db_port = db_port;
145 if (disable_batch_insert) {
146 mdb->m_disabled_batch_insert = true;
147 mdb->m_have_batch_insert = false;
149 mdb->m_disabled_batch_insert = false;
150 #ifdef USE_BATCH_FILE_INSERT
151 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
152 #ifdef HAVE_PQISTHREADSAFE
153 mdb->m_have_batch_insert = PQisthreadsafe();
155 mdb->m_have_batch_insert = true;
156 #endif /* HAVE_PQISTHREADSAFE */
158 mdb->m_have_batch_insert = true;
159 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
161 mdb->m_have_batch_insert = false;
162 #endif /* USE_BATCH_FILE_INSERT */
164 mdb->m_allow_transactions = mult_db_connections;
166 /* At this time, when mult_db_connections == true, this is for
167 * specific console command such as bvfs or batch mode, and we don't
168 * want to share a batch mode or bvfs. In the future, we can change
169 * the creation function to add this parameter.
171 mdb->m_dedicated = mult_db_connections;
179 /* Check that the database corresponds to the encoding we want */
180 static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
185 if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
186 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
190 if ((row = mdb->sql_fetch_row()) == NULL) {
191 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
192 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
194 ret = bstrcmp(row[0], "SQL_ASCII");
197 /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
198 mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
201 /* Something is wrong with database encoding */
203 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
204 mdb->get_db_name(), row[0]);
205 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
206 Dmsg1(dbglvl_err, "%s", mdb->errmsg);
213 * Now actually open the database. This can generate errors,
214 * which are returned in the errmsg
216 * DO NOT close the database or delete mdb here !!!!
218 bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
223 BDB_POSTGRESQL *mdb = this;
226 if (mdb->m_connected) {
231 if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
233 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
234 be.bstrerror(errstat));
238 if (mdb->m_db_port) {
239 bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
245 /* If connection fails, try at 5 sec intervals for 30 seconds. */
246 for (int retry=0; retry < 6; retry++) {
247 /* connect to the database */
248 mdb->m_db_handle = PQsetdbLogin(
249 mdb->m_db_address, /* default = localhost */
250 port, /* default port */
251 NULL, /* pg options */
252 NULL, /* tty, ignored */
253 mdb->m_db_name, /* database name */
254 mdb->m_db_user, /* login name */
255 mdb->m_db_password); /* password */
257 /* If no connect, try once more in case it is a timing problem */
258 if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
264 Dmsg0(dbglvl_info, "pg_real_connect done\n");
265 Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
266 mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
268 if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
269 Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
270 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
271 mdb->m_db_name, mdb->m_db_user);
275 mdb->m_connected = true;
276 if (!bdb_check_version(jcr)) {
280 sql_query("SET datestyle TO 'ISO, YMD'");
281 sql_query("SET cursor_tuple_fraction=1");
284 * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
285 * WARNING: nonstandard use of \\ in a string literal
287 sql_query("SET standard_conforming_strings=on");
289 /* Check that encoding is SQL_ASCII */
290 pgsql_check_database_encoding(jcr, mdb);
299 void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
301 BDB_POSTGRESQL *mdb = this;
303 if (mdb->m_connected) {
304 bdb_end_transaction(jcr);
308 if (mdb->m_ref_count == 0) {
309 if (mdb->m_connected) {
312 db_list->remove(mdb);
313 if (mdb->m_connected && mdb->m_db_handle) {
314 PQfinish(mdb->m_db_handle);
316 if (is_rwl_valid(&mdb->m_lock)) {
317 rwl_destroy(&mdb->m_lock);
319 free_pool_memory(mdb->errmsg);
320 free_pool_memory(mdb->cmd);
321 free_pool_memory(mdb->cached_path);
322 free_pool_memory(mdb->fname);
323 free_pool_memory(mdb->path);
324 free_pool_memory(mdb->esc_name);
325 free_pool_memory(mdb->esc_path);
326 free_pool_memory(mdb->esc_obj);
327 free_pool_memory(mdb->m_buf);
328 if (mdb->m_db_driver) {
329 free(mdb->m_db_driver);
331 if (mdb->m_db_name) {
332 free(mdb->m_db_name);
334 if (mdb->m_db_user) {
335 free(mdb->m_db_user);
337 if (mdb->m_db_password) {
338 free(mdb->m_db_password);
340 if (mdb->m_db_address) {
341 free(mdb->m_db_address);
343 if (mdb->m_db_socket) {
344 free(mdb->m_db_socket);
347 if (db_list->size() == 0) {
355 void BDB_POSTGRESQL::bdb_thread_cleanup(void)
360 * Escape strings so PostgreSQL is happy
362 * len is the length of the old string. Your new
363 * string must be long enough (max 2*old+1) to hold
364 * the escaped output.
366 void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
368 BDB_POSTGRESQL *mdb = this;
371 PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
373 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
374 /* failed on encoding, probably invalid multibyte encoding in the source string
375 see PQescapeStringConn documentation for details. */
376 Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
381 * Escape binary so that PostgreSQL is happy
384 char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
388 BDB_POSTGRESQL *mdb = this;
391 obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
393 Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
395 mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
396 memcpy(mdb->esc_obj, obj, new_len);
397 mdb->esc_obj[new_len] = 0;
400 return (char *)mdb->esc_obj;
404 * Unescape binary object so that PostgreSQL is happy
407 void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
408 POOLMEM **dest, int32_t *dest_len)
419 obj = PQunescapeBytea((unsigned const char *)from, &new_len);
422 Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
426 *dest = check_pool_memory_size(*dest, new_len+1);
427 memcpy(*dest, obj, new_len);
432 Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
436 * Start a transaction. This groups inserts and makes things more efficient.
437 * Usually started when inserting file attributes.
439 void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
441 BDB_POSTGRESQL *mdb = this;
444 jcr->attr = get_pool_memory(PM_FNAME);
447 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
448 memset(jcr->ar, 0, sizeof(ATTR_DBR));
452 * This is turned off because transactions break if
453 * multiple simultaneous jobs are run.
455 if (!mdb->m_allow_transactions) {
460 /* Allow only 25,000 changes per transaction */
461 if (mdb->m_transaction && changes > 25000) {
462 bdb_end_transaction(jcr);
464 if (!mdb->m_transaction) {
465 sql_query("BEGIN"); /* begin transaction */
466 Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
467 mdb->m_transaction = true;
472 void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
474 BDB_POSTGRESQL *mdb = this;
476 if (jcr && jcr->cached_attribute) {
477 Dmsg0(dbglvl_info, "Flush last cached attribute.\n");
478 if (!bdb_create_attributes_record(jcr, jcr->ar)) {
479 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), jcr->db->bdb_strerror());
481 jcr->cached_attribute = false;
484 if (!mdb->m_allow_transactions) {
489 if (mdb->m_transaction) {
490 sql_query("COMMIT"); /* end transaction */
491 mdb->m_transaction = false;
492 Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
500 * Submit a general SQL command, and for each row returned,
501 * the result_handler is called with the ctx.
503 bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
504 DB_RESULT_HANDLER *result_handler,
507 BDB_POSTGRESQL *mdb = this;
510 bool in_transaction = mdb->m_transaction;
512 Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
515 /* This code handles only SELECT queries */
516 if (strncasecmp(query, "SELECT", 6) != 0) {
517 return bdb_sql_query(query, result_handler, ctx);
520 if (!result_handler) { /* no need of big_query without handler */
526 if (!in_transaction) { /* CURSOR needs transaction */
530 Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
532 if (!sql_query(mdb->m_buf)) {
533 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
534 Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
539 if (!sql_query("FETCH 100 FROM _bac_cursor")) {
540 Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
541 Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
544 while ((row = sql_fetch_row()) != NULL) {
545 Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
546 if (result_handler(ctx, mdb->m_num_fields, row))
549 PQclear(mdb->m_result);
552 } while (m_num_rows > 0); /* TODO: Can probably test against 100 */
554 sql_query("CLOSE _bac_cursor");
556 Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
561 if (!in_transaction) {
562 sql_query("COMMIT"); /* end transaction */
570 * Submit a general SQL command, and for each row returned,
571 * the result_handler is called with the ctx.
573 bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
577 BDB_POSTGRESQL *mdb = this;
579 Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
583 if (!sql_query(query, QF_STORE_RESULT)) {
584 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
585 Dmsg0(dbglvl_err, "db_sql_query failed\n");
590 Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
592 if (result_handler) {
593 Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
594 while ((row = sql_fetch_row())) {
595 Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
596 if (result_handler(ctx, mdb->m_num_fields, row))
602 Dmsg0(dbglvl_info, "db_sql_query finished\n");
610 * If this routine returns false (failure), Bacula expects
611 * that no result has been stored.
612 * This is where QueryDB calls to with Postgresql.
614 * Returns: true on success
618 bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
622 BDB_POSTGRESQL *mdb = this;
624 Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
626 /* We are starting a new query. reset everything. */
627 mdb->m_num_rows = -1;
628 mdb->m_row_number = -1;
629 mdb->m_field_number = -1;
632 PQclear(mdb->m_result); /* hmm, someone forgot to free?? */
633 mdb->m_result = NULL;
636 for (i = 0; i < 10; i++) {
637 mdb->m_result = PQexec(mdb->m_db_handle, query);
643 if (!mdb->m_result) {
644 Dmsg1(dbglvl_err, "Query failed: %s\n", query);
648 mdb->m_status = PQresultStatus(mdb->m_result);
649 if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
650 Dmsg0(dbglvl_dbg, "we have a result\n");
652 /* How many fields in the set? */
653 mdb->m_num_fields = (int)PQnfields(mdb->m_result);
654 Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
656 mdb->m_num_rows = PQntuples(mdb->m_result);
657 Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
659 mdb->m_row_number = 0; /* we can start to fetch something */
660 mdb->m_status = 0; /* succeed */
663 Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
667 Dmsg0(dbglvl_info, "sql_query finishing\n");
671 Dmsg0(dbglvl_err, "we failed\n");
672 PQclear(mdb->m_result);
673 mdb->m_result = NULL;
674 mdb->m_status = 1; /* failed */
680 void BDB_POSTGRESQL::sql_free_result(void)
682 BDB_POSTGRESQL *mdb = this;
686 PQclear(mdb->m_result);
687 mdb->m_result = NULL;
695 mdb->m_fields = NULL;
697 mdb->m_num_rows = mdb->m_num_fields = 0;
701 SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
703 SQL_ROW row = NULL; /* by default, return NULL */
704 BDB_POSTGRESQL *mdb = this;
706 Dmsg0(dbglvl_info, "sql_fetch_row start\n");
708 if (mdb->m_num_fields == 0) { /* No field, no row */
709 Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
713 if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
715 Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
718 Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
719 mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
720 mdb->m_rows_size = mdb->m_num_fields;
722 /* Now reset the row_number now that we have the space allocated */
723 mdb->m_row_number = 0;
726 /* If still within the result set */
727 if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
728 Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
730 /* Get each value from this row */
731 for (int j = 0; j < mdb->m_num_fields; j++) {
732 mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
733 Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
735 mdb->m_row_number++; /* Increment the row number for the next call */
738 Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
741 Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
746 const char *BDB_POSTGRESQL::sql_strerror(void)
748 BDB_POSTGRESQL *mdb = this;
749 return PQerrorMessage(mdb->m_db_handle);
752 void BDB_POSTGRESQL::sql_data_seek(int row)
754 BDB_POSTGRESQL *mdb = this;
755 /* Set the row number to be returned on the next call to sql_fetch_row */
756 mdb->m_row_number = row;
759 int BDB_POSTGRESQL::sql_affected_rows(void)
761 BDB_POSTGRESQL *mdb = this;
762 return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
765 uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
768 char sequence[NAMEDATALEN-1];
769 char getkeyval_query[NAMEDATALEN+50];
771 BDB_POSTGRESQL *mdb = this;
773 /* First execute the insert query and then retrieve the currval. */
774 if (!sql_query(query)) {
778 mdb->m_num_rows = sql_affected_rows();
779 if (mdb->m_num_rows != 1) {
784 * Obtain the current value of the sequence that
785 * provides the serial value for primary key of the table.
787 * currval is local to our session. It is not affected by
788 * other transactions.
790 * Determine the name of the sequence.
791 * PostgreSQL automatically creates a sequence using
792 * <table>_<column>_seq.
793 * At the time of writing, all tables used this format for
794 * for their primary key: <table>id
795 * Except for basefiles which has a primary key on baseid.
796 * Therefore, we need to special case that one table.
798 * everything else can use the PostgreSQL formula.
800 if (strcasecmp(table_name, "basefiles") == 0) {
801 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
803 bstrncpy(sequence, table_name, sizeof(sequence));
804 bstrncat(sequence, "_", sizeof(sequence));
805 bstrncat(sequence, table_name, sizeof(sequence));
806 bstrncat(sequence, "id", sizeof(sequence));
809 bstrncat(sequence, "_seq", sizeof(sequence));
810 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
812 Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
813 for (int i = 0; i < 10; i++) {
814 p_result = PQexec(mdb->m_db_handle, getkeyval_query);
821 Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
825 Dmsg0(dbglvl_dbg, "exec done");
827 if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
828 Dmsg0(dbglvl_dbg, "getting value");
829 id = str_to_uint64(PQgetvalue(p_result, 0, 0));
830 Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
832 Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
833 Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
841 SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
845 BDB_POSTGRESQL *mdb = this;
847 Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
849 if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
852 mdb->m_fields = NULL;
854 Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
855 mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
856 mdb->m_fields_size = mdb->m_num_fields;
858 for (int i = 0; i < mdb->m_num_fields; i++) {
859 Dmsg1(dbglvl_dbg, "filling field %d\n", i);
860 mdb->m_fields[i].name = PQfname(mdb->m_result, i);
861 mdb->m_fields[i].type = PQftype(mdb->m_result, i);
862 mdb->m_fields[i].flags = 0;
864 /* For a given column, find the max length. */
866 for (int j = 0; j < mdb->m_num_rows; j++) {
867 if (PQgetisnull(mdb->m_result, j, i)) {
868 this_len = 4; /* "NULL" */
870 this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
873 if (max_len < this_len) {
877 mdb->m_fields[i].max_length = max_len;
879 Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
880 mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
884 /* Increment field number for the next time around */
885 return &mdb->m_fields[mdb->m_field_number++];
888 bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
890 if (field_type == 1) {
896 bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
899 * TEMP: the following is taken from select OID, typname from pg_type;
901 switch (field_type) {
914 * Escape strings so PostgreSQL is happy on COPY
916 * len is the length of the old string. Your new
917 * string must be long enough (max 2*old+1) to hold
918 * the escaped output.
920 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
922 /* we have to escape \t, \n, \r, \ */
925 while (len > 0 && *src) {
960 bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
962 BDB_POSTGRESQL *mdb = this;
963 const char *query = "COPY batch FROM STDIN";
965 Dmsg0(dbglvl_info, "sql_batch_start started\n");
967 if (!sql_query("CREATE TEMPORARY TABLE batch ("
974 "DeltaSeq smallint)")) {
975 Dmsg0(dbglvl_err, "sql_batch_start failed\n");
979 /* We are starting a new query. reset everything. */
980 mdb->m_num_rows = -1;
981 mdb->m_row_number = -1;
982 mdb->m_field_number = -1;
986 for (int i=0; i < 10; i++) {
987 mdb->m_result = PQexec(mdb->m_db_handle, query);
993 if (!mdb->m_result) {
994 Dmsg1(dbglvl_err, "Query failed: %s\n", query);
998 mdb->m_status = PQresultStatus(mdb->m_result);
999 if (mdb->m_status == PGRES_COPY_IN) {
1000 /* How many fields in the set? */
1001 mdb->m_num_fields = (int) PQnfields(mdb->m_result);
1002 mdb->m_num_rows = 0;
1005 Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
1009 Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
1014 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1016 PQclear(mdb->m_result);
1017 mdb->m_result = NULL;
1022 * Set error to something to abort the operation
1024 bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1029 BDB_POSTGRESQL *mdb = this;
1031 Dmsg0(dbglvl_info, "sql_batch_end started\n");
1034 res = PQputCopyEnd(mdb->m_db_handle, error);
1035 } while (res == 0 && --count > 0);
1038 Dmsg0(dbglvl_dbg, "ok\n");
1044 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1045 Dmsg1(dbglvl_err, "failure %s\n", errmsg);
1048 /* Check command status and return to normal libpq state */
1049 p_result = PQgetResult(mdb->m_db_handle);
1050 if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
1051 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1055 /* Get some statistics to compute the best plan */
1056 sql_query("ANALYZE batch");
1060 Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
1064 bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1071 BDB_POSTGRESQL *mdb = this;
1073 mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
1074 pgsql_copy_escape(mdb->esc_name, fname, fnl);
1076 mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
1077 pgsql_copy_escape(mdb->esc_path, path, pnl);
1079 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1082 digest = ar->Digest;
1085 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1086 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
1087 mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
1090 res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
1091 } while (res == 0 && --count > 0);
1094 Dmsg0(dbglvl_dbg, "ok\n");
1101 Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1102 Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
1105 Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
1111 #endif /* HAVE_POSTGRESQL */