2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2015 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Bacula Catalog Database routines specific to PostgreSQL
21 * These are PostgreSQL specific routines
23 * Dan Langille, December 2003
24 * based upon work done by Kern Sibbald, March 2000
26 * Note: at one point, this file was changed to class based by a certain
27 * programmer, and other than "wrapping" in a class, which is a trivial
28 * change for a C++ programmer, nothing substantial was done, yet all the
29 * code was recommitted under this programmer's name. Consequently, we
30 * undo those changes here. Unfortunately, it is too difficult to put
31 * back the original author's name (Dan Langille) on the parts he wrote.
36 #ifdef HAVE_POSTGRESQL
40 #include "postgres_ext.h" /* needed for NAMEDATALEN */
41 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
42 #define __BDB_POSTGRESQL_H_ 1
43 #include "bdb_postgresql.h"
45 #define dbglvl_dbg DT_SQL|100
46 #define dbglvl_info DT_SQL|50
47 #define dbglvl_err DT_SQL|10
49 /* -----------------------------------------------------------------------
51 * PostgreSQL dependent defines and subroutines
53 * -----------------------------------------------------------------------
56 /* List of open databases */
57 static dlist *db_list = NULL;
59 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
61 BDB_POSTGRESQL::BDB_POSTGRESQL()
63 BDB_POSTGRESQL *mdb = this;
65 if (db_list == NULL) {
66 db_list = New(dlist(mdb, &mdb->m_link));
68 mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
69 mdb->m_db_type = SQL_TYPE_POSTGRESQL;
70 mdb->m_db_driver = bstrdup("PostgreSQL");
72 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
74 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
75 mdb->cached_path = get_pool_memory(PM_FNAME);
76 mdb->cached_path_id = 0;
78 mdb->fname = get_pool_memory(PM_FNAME);
79 mdb->path = get_pool_memory(PM_FNAME);
80 mdb->esc_name = get_pool_memory(PM_FNAME);
81 mdb->esc_path = get_pool_memory(PM_FNAME);
82 mdb->esc_obj = get_pool_memory(PM_FNAME);
83 mdb->m_use_fatal_jmsg = true;
85 /* Initialize the private members. */
86 mdb->m_db_handle = NULL;
88 mdb->m_buf = get_pool_memory(PM_FNAME);
90 db_list->append(this);
93 BDB_POSTGRESQL::~BDB_POSTGRESQL()
98 * Initialize database data structure. In principal this should
99 * never have errors, or it is really fatal.
101 BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
102 const char *db_password, const char *db_address, int db_port, const char *db_socket,
103 const char *db_ssl_key, const char *db_ssl_cert, const char *db_ssl_ca,
104 const char *db_ssl_capath, const char *db_ssl_cipher,
105 bool mult_db_connections, bool disable_batch_insert)
107 BDB_POSTGRESQL *mdb = NULL;
110 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
113 P(mutex); /* lock DB queue */
114 if (db_list && !mult_db_connections) {
116 * Look to see if DB already open
118 foreach_dlist(mdb, db_list) {
119 if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
120 Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
121 mdb->increment_refcount();
126 Dmsg0(dbglvl_info, "db_init_database first time\n");
127 /* Create the global Bacula db context */
128 mdb = New(BDB_POSTGRESQL());
129 if (!mdb) goto get_out;
131 /* Initialize the parent class members. */
132 mdb->m_db_name = bstrdup(db_name);
133 mdb->m_db_user = bstrdup(db_user);
135 mdb->m_db_password = bstrdup(db_password);
138 mdb->m_db_address = bstrdup(db_address);
141 mdb->m_db_socket = bstrdup(db_socket);
143 mdb->m_db_port = db_port;
145 if (disable_batch_insert) {
146 mdb->m_disabled_batch_insert = true;
147 mdb->m_have_batch_insert = false;
149 mdb->m_disabled_batch_insert = false;
150 #ifdef USE_BATCH_FILE_INSERT
151 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
152 #ifdef HAVE_PQISTHREADSAFE
153 mdb->m_have_batch_insert = PQisthreadsafe();
155 mdb->m_have_batch_insert = true;
156 #endif /* HAVE_PQISTHREADSAFE */
158 mdb->m_have_batch_insert = true;
159 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
161 mdb->m_have_batch_insert = false;
162 #endif /* USE_BATCH_FILE_INSERT */
164 mdb->m_allow_transactions = mult_db_connections;
166 /* At this time, when mult_db_connections == true, this is for
167 * specific console command such as bvfs or batch mode, and we don't
168 * want to share a batch mode or bvfs. In the future, we can change
169 * the creation function to add this parameter.
171 mdb->m_dedicated = mult_db_connections;
179 /* Check that the database corresponds to the encoding we want */
180 static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
185 if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
186 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
190 if ((row = mdb->sql_fetch_row()) == NULL) {
191 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
192 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
194 ret = bstrcmp(row[0], "SQL_ASCII");
197 /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
198 mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
201 /* Something is wrong with database encoding */
203 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
204 mdb->get_db_name(), row[0]);
205 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
206 Dmsg1(dbglvl_err, "%s", mdb->errmsg);
213 * Now actually open the database. This can generate errors,
214 * which are returned in the errmsg
216 * DO NOT close the database or delete mdb here !!!!
218 bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
223 BDB_POSTGRESQL *mdb = this;
226 if (mdb->m_connected) {
231 if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
233 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
234 be.bstrerror(errstat));
238 if (mdb->m_db_port) {
239 bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
245 /* If connection fails, try at 5 sec intervals for 30 seconds. */
246 for (int retry=0; retry < 6; retry++) {
247 /* connect to the database */
248 mdb->m_db_handle = PQsetdbLogin(
249 mdb->m_db_address, /* default = localhost */
250 port, /* default port */
251 NULL, /* pg options */
252 NULL, /* tty, ignored */
253 mdb->m_db_name, /* database name */
254 mdb->m_db_user, /* login name */
255 mdb->m_db_password); /* password */
257 /* If no connect, try once more in case it is a timing problem */
258 if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
264 Dmsg0(dbglvl_info, "pg_real_connect done\n");
265 Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
266 mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
268 if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
269 Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
270 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
271 mdb->m_db_name, mdb->m_db_user);
275 mdb->m_connected = true;
276 if (!bdb_check_version(jcr)) {
280 sql_query("SET datestyle TO 'ISO, YMD'");
281 sql_query("SET cursor_tuple_fraction=1");
284 * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
285 * WARNING: nonstandard use of \\ in a string literal
287 sql_query("SET standard_conforming_strings=on");
289 /* Check that encoding is SQL_ASCII */
290 pgsql_check_database_encoding(jcr, mdb);
299 void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
301 BDB_POSTGRESQL *mdb = this;
303 if (mdb->m_connected) {
304 bdb_end_transaction(jcr);
308 if (mdb->m_ref_count == 0) {
309 if (mdb->m_connected) {
312 db_list->remove(mdb);
313 if (mdb->m_connected && mdb->m_db_handle) {
314 PQfinish(mdb->m_db_handle);
316 if (is_rwl_valid(&mdb->m_lock)) {
317 rwl_destroy(&mdb->m_lock);
319 free_pool_memory(mdb->errmsg);
320 free_pool_memory(mdb->cmd);
321 free_pool_memory(mdb->cached_path);
322 free_pool_memory(mdb->fname);
323 free_pool_memory(mdb->path);
324 free_pool_memory(mdb->esc_name);
325 free_pool_memory(mdb->esc_path);
326 free_pool_memory(mdb->esc_obj);
327 free_pool_memory(mdb->m_buf);
328 if (mdb->m_db_driver) {
329 free(mdb->m_db_driver);
331 if (mdb->m_db_name) {
332 free(mdb->m_db_name);
334 if (mdb->m_db_user) {
335 free(mdb->m_db_user);
337 if (mdb->m_db_password) {
338 free(mdb->m_db_password);
340 if (mdb->m_db_address) {
341 free(mdb->m_db_address);
343 if (mdb->m_db_socket) {
344 free(mdb->m_db_socket);
347 if (db_list->size() == 0) {
355 void BDB_POSTGRESQL::bdb_thread_cleanup(void)
360 * Escape strings so PostgreSQL is happy
362 * len is the length of the old string. Your new
363 * string must be long enough (max 2*old+1) to hold
364 * the escaped output.
366 void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
368 BDB_POSTGRESQL *mdb = this;
371 PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
373 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
374 /* failed on encoding, probably invalid multibyte encoding in the source string
375 see PQescapeStringConn documentation for details. */
376 Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
381 * Escape binary so that PostgreSQL is happy
384 char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
388 BDB_POSTGRESQL *mdb = this;
391 obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
393 Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
395 mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
396 memcpy(mdb->esc_obj, obj, new_len);
397 mdb->esc_obj[new_len] = 0;
400 return (char *)mdb->esc_obj;
404 * Unescape binary object so that PostgreSQL is happy
407 void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
408 POOLMEM **dest, int32_t *dest_len)
419 obj = PQunescapeBytea((unsigned const char *)from, &new_len);
422 Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
426 *dest = check_pool_memory_size(*dest, new_len+1);
427 memcpy(*dest, obj, new_len);
432 Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
436 * Start a transaction. This groups inserts and makes things more efficient.
437 * Usually started when inserting file attributes.
439 void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
441 BDB_POSTGRESQL *mdb = this;
444 jcr->attr = get_pool_memory(PM_FNAME);
447 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
451 * This is turned off because transactions break if
452 * multiple simultaneous jobs are run.
454 if (!mdb->m_allow_transactions) {
459 /* Allow only 25,000 changes per transaction */
460 if (mdb->m_transaction && changes > 25000) {
461 bdb_end_transaction(jcr);
463 if (!mdb->m_transaction) {
464 sql_query("BEGIN"); /* begin transaction */
465 Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
466 mdb->m_transaction = true;
471 void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
473 BDB_POSTGRESQL *mdb = this;
475 if (jcr && jcr->cached_attribute) {
476 Dmsg0(dbglvl_info, "Flush last cached attribute.\n");
477 if (!bdb_create_attributes_record(jcr, jcr->ar)) {
478 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), jcr->db->bdb_strerror());
480 jcr->cached_attribute = false;
483 if (!mdb->m_allow_transactions) {
488 if (mdb->m_transaction) {
489 sql_query("COMMIT"); /* end transaction */
490 mdb->m_transaction = false;
491 Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
499 * Submit a general SQL command, and for each row returned,
500 * the result_handler is called with the ctx.
502 bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
503 DB_RESULT_HANDLER *result_handler,
506 BDB_POSTGRESQL *mdb = this;
509 bool in_transaction = mdb->m_transaction;
511 Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
514 /* This code handles only SELECT queries */
515 if (strncasecmp(query, "SELECT", 6) != 0) {
516 return bdb_sql_query(query, result_handler, ctx);
519 if (!result_handler) { /* no need of big_query without handler */
525 if (!in_transaction) { /* CURSOR needs transaction */
529 Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
531 if (!sql_query(mdb->m_buf)) {
532 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
533 Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
538 if (!sql_query("FETCH 100 FROM _bac_cursor")) {
539 Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
540 Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
543 while ((row = sql_fetch_row()) != NULL) {
544 Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
545 if (result_handler(ctx, mdb->m_num_fields, row))
548 PQclear(mdb->m_result);
551 } while (m_num_rows > 0); /* TODO: Can probably test against 100 */
553 sql_query("CLOSE _bac_cursor");
555 Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
560 if (!in_transaction) {
561 sql_query("COMMIT"); /* end transaction */
569 * Submit a general SQL command, and for each row returned,
570 * the result_handler is called with the ctx.
572 bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
576 BDB_POSTGRESQL *mdb = this;
578 Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
582 if (!sql_query(query, QF_STORE_RESULT)) {
583 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
584 Dmsg0(dbglvl_err, "db_sql_query failed\n");
589 Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
591 if (result_handler) {
592 Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
593 while ((row = sql_fetch_row())) {
594 Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
595 if (result_handler(ctx, mdb->m_num_fields, row))
601 Dmsg0(dbglvl_info, "db_sql_query finished\n");
609 * If this routine returns false (failure), Bacula expects
610 * that no result has been stored.
611 * This is where QueryDB calls to with Postgresql.
613 * Returns: true on success
617 bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
621 BDB_POSTGRESQL *mdb = this;
623 Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
625 /* We are starting a new query. reset everything. */
626 mdb->m_num_rows = -1;
627 mdb->m_row_number = -1;
628 mdb->m_field_number = -1;
631 PQclear(mdb->m_result); /* hmm, someone forgot to free?? */
632 mdb->m_result = NULL;
635 for (i = 0; i < 10; i++) {
636 mdb->m_result = PQexec(mdb->m_db_handle, query);
642 if (!mdb->m_result) {
643 Dmsg1(dbglvl_err, "Query failed: %s\n", query);
647 mdb->m_status = PQresultStatus(mdb->m_result);
648 if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
649 Dmsg0(dbglvl_dbg, "we have a result\n");
651 /* How many fields in the set? */
652 mdb->m_num_fields = (int)PQnfields(mdb->m_result);
653 Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
655 mdb->m_num_rows = PQntuples(mdb->m_result);
656 Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
658 mdb->m_row_number = 0; /* we can start to fetch something */
659 mdb->m_status = 0; /* succeed */
662 Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
666 Dmsg0(dbglvl_info, "sql_query finishing\n");
670 Dmsg0(dbglvl_err, "we failed\n");
671 PQclear(mdb->m_result);
672 mdb->m_result = NULL;
673 mdb->m_status = 1; /* failed */
679 void BDB_POSTGRESQL::sql_free_result(void)
681 BDB_POSTGRESQL *mdb = this;
685 PQclear(mdb->m_result);
686 mdb->m_result = NULL;
694 mdb->m_fields = NULL;
696 mdb->m_num_rows = mdb->m_num_fields = 0;
700 SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
702 SQL_ROW row = NULL; /* by default, return NULL */
703 BDB_POSTGRESQL *mdb = this;
705 Dmsg0(dbglvl_info, "sql_fetch_row start\n");
707 if (mdb->m_num_fields == 0) { /* No field, no row */
708 Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
712 if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
714 Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
717 Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
718 mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
719 mdb->m_rows_size = mdb->m_num_fields;
721 /* Now reset the row_number now that we have the space allocated */
722 mdb->m_row_number = 0;
725 /* If still within the result set */
726 if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
727 Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
729 /* Get each value from this row */
730 for (int j = 0; j < mdb->m_num_fields; j++) {
731 mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
732 Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
734 mdb->m_row_number++; /* Increment the row number for the next call */
737 Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
740 Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
745 const char *BDB_POSTGRESQL::sql_strerror(void)
747 BDB_POSTGRESQL *mdb = this;
748 return PQerrorMessage(mdb->m_db_handle);
751 void BDB_POSTGRESQL::sql_data_seek(int row)
753 BDB_POSTGRESQL *mdb = this;
754 /* Set the row number to be returned on the next call to sql_fetch_row */
755 mdb->m_row_number = row;
758 int BDB_POSTGRESQL::sql_affected_rows(void)
760 BDB_POSTGRESQL *mdb = this;
761 return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
764 uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
767 char sequence[NAMEDATALEN-1];
768 char getkeyval_query[NAMEDATALEN+50];
770 BDB_POSTGRESQL *mdb = this;
772 /* First execute the insert query and then retrieve the currval. */
773 if (!sql_query(query)) {
777 mdb->m_num_rows = sql_affected_rows();
778 if (mdb->m_num_rows != 1) {
783 * Obtain the current value of the sequence that
784 * provides the serial value for primary key of the table.
786 * currval is local to our session. It is not affected by
787 * other transactions.
789 * Determine the name of the sequence.
790 * PostgreSQL automatically creates a sequence using
791 * <table>_<column>_seq.
792 * At the time of writing, all tables used this format for
793 * for their primary key: <table>id
794 * Except for basefiles which has a primary key on baseid.
795 * Therefore, we need to special case that one table.
797 * everything else can use the PostgreSQL formula.
799 if (strcasecmp(table_name, "basefiles") == 0) {
800 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
802 bstrncpy(sequence, table_name, sizeof(sequence));
803 bstrncat(sequence, "_", sizeof(sequence));
804 bstrncat(sequence, table_name, sizeof(sequence));
805 bstrncat(sequence, "id", sizeof(sequence));
808 bstrncat(sequence, "_seq", sizeof(sequence));
809 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
811 Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
812 for (int i = 0; i < 10; i++) {
813 p_result = PQexec(mdb->m_db_handle, getkeyval_query);
820 Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
824 Dmsg0(dbglvl_dbg, "exec done");
826 if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
827 Dmsg0(dbglvl_dbg, "getting value");
828 id = str_to_uint64(PQgetvalue(p_result, 0, 0));
829 Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
831 Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
832 Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
840 SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
844 BDB_POSTGRESQL *mdb = this;
846 Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
848 if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
851 mdb->m_fields = NULL;
853 Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
854 mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
855 mdb->m_fields_size = mdb->m_num_fields;
857 for (int i = 0; i < mdb->m_num_fields; i++) {
858 Dmsg1(dbglvl_dbg, "filling field %d\n", i);
859 mdb->m_fields[i].name = PQfname(mdb->m_result, i);
860 mdb->m_fields[i].type = PQftype(mdb->m_result, i);
861 mdb->m_fields[i].flags = 0;
863 /* For a given column, find the max length. */
865 for (int j = 0; j < mdb->m_num_rows; j++) {
866 if (PQgetisnull(mdb->m_result, j, i)) {
867 this_len = 4; /* "NULL" */
869 this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
872 if (max_len < this_len) {
876 mdb->m_fields[i].max_length = max_len;
878 Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
879 mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
883 /* Increment field number for the next time around */
884 return &mdb->m_fields[mdb->m_field_number++];
887 bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
889 if (field_type == 1) {
895 bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
898 * TEMP: the following is taken from select OID, typname from pg_type;
900 switch (field_type) {
913 * Escape strings so PostgreSQL is happy on COPY
915 * len is the length of the old string. Your new
916 * string must be long enough (max 2*old+1) to hold
917 * the escaped output.
919 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
921 /* we have to escape \t, \n, \r, \ */
924 while (len > 0 && *src) {
959 bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
961 BDB_POSTGRESQL *mdb = this;
962 const char *query = "COPY batch FROM STDIN";
964 Dmsg0(dbglvl_info, "sql_batch_start started\n");
966 if (!sql_query("CREATE TEMPORARY TABLE batch ("
973 "DeltaSeq smallint)")) {
974 Dmsg0(dbglvl_err, "sql_batch_start failed\n");
978 /* We are starting a new query. reset everything. */
979 mdb->m_num_rows = -1;
980 mdb->m_row_number = -1;
981 mdb->m_field_number = -1;
985 for (int i=0; i < 10; i++) {
986 mdb->m_result = PQexec(mdb->m_db_handle, query);
992 if (!mdb->m_result) {
993 Dmsg1(dbglvl_err, "Query failed: %s\n", query);
997 mdb->m_status = PQresultStatus(mdb->m_result);
998 if (mdb->m_status == PGRES_COPY_IN) {
999 /* How many fields in the set? */
1000 mdb->m_num_fields = (int) PQnfields(mdb->m_result);
1001 mdb->m_num_rows = 0;
1004 Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
1008 Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
1013 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1015 PQclear(mdb->m_result);
1016 mdb->m_result = NULL;
1021 * Set error to something to abort the operation
1023 bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1028 BDB_POSTGRESQL *mdb = this;
1030 Dmsg0(dbglvl_info, "sql_batch_end started\n");
1033 res = PQputCopyEnd(mdb->m_db_handle, error);
1034 } while (res == 0 && --count > 0);
1037 Dmsg0(dbglvl_dbg, "ok\n");
1043 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1044 Dmsg1(dbglvl_err, "failure %s\n", errmsg);
1047 /* Check command status and return to normal libpq state */
1048 p_result = PQgetResult(mdb->m_db_handle);
1049 if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
1050 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1054 /* Get some statistics to compute the best plan */
1055 sql_query("ANALYZE batch");
1059 Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
1063 bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1070 BDB_POSTGRESQL *mdb = this;
1072 mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
1073 pgsql_copy_escape(mdb->esc_name, fname, fnl);
1075 mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
1076 pgsql_copy_escape(mdb->esc_path, path, pnl);
1078 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1081 digest = ar->Digest;
1084 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1085 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
1086 mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
1089 res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
1090 } while (res == 0 && --count > 0);
1093 Dmsg0(dbglvl_dbg, "ok\n");
1100 Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1101 Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
1104 Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
1110 #endif /* HAVE_POSTGRESQL */