2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2015 Kern Sibbald
5 Copyright (C) 2003-2014 Free Software Foundation Europe e.V.
7 The original author of Bacula is Kern Sibbald, with contributions
8 from many others, a complete list can be found in the file AUTHORS.
10 You may use this file and others of this release according to the
11 license defined in the LICENSE file, which includes the Affero General
12 Public License, v3.0 ("AGPLv3") and some additional permissions and
13 terms pursuant to its AGPLv3 Section 7.
15 This notice must be preserved when any source code is
16 conveyed and/or propagated.
18 Bacula(R) is a registered trademark of Kern Sibbald.
21 * Bacula Catalog Database routines specific to PostgreSQL
22 * These are PostgreSQL specific routines
24 * Dan Langille, December 2003
25 * based upon work done by Kern Sibbald, March 2000
27 * Note: at one point, this file was changed to class based by a certain
28 * programmer, and other than "wrapping" in a class, which is a trivial
29 * change for a C++ programmer, nothing substantial was done, yet all the
30 * code was recommitted under this programmer's name. Consequently, we
31 * undo those changes here. Unfortunately, it is too difficult to put
32 * back the original author's name (Dan Langille) on the parts he wrote.
37 #ifdef HAVE_POSTGRESQL
41 #include "postgres_ext.h" /* needed for NAMEDATALEN */
42 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
43 #define __BDB_POSTGRESQL_H_ 1
44 #include "bdb_postgresql.h"
46 #define dbglvl_dbg DT_SQL|100
47 #define dbglvl_info DT_SQL|50
48 #define dbglvl_err DT_SQL|10
50 /* -----------------------------------------------------------------------
52 * PostgreSQL dependent defines and subroutines
54 * -----------------------------------------------------------------------
57 /* List of open databases */
58 static dlist *db_list = NULL;
60 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62 BDB_POSTGRESQL::BDB_POSTGRESQL()
64 BDB_POSTGRESQL *mdb = this;
66 if (db_list == NULL) {
67 db_list = New(dlist(mdb, &mdb->m_link));
69 mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
70 mdb->m_db_type = SQL_TYPE_POSTGRESQL;
71 mdb->m_db_driver = bstrdup("PostgreSQL");
73 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
75 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
76 mdb->cached_path = get_pool_memory(PM_FNAME);
77 mdb->cached_path_id = 0;
79 mdb->fname = get_pool_memory(PM_FNAME);
80 mdb->path = get_pool_memory(PM_FNAME);
81 mdb->esc_name = get_pool_memory(PM_FNAME);
82 mdb->esc_path = get_pool_memory(PM_FNAME);
83 mdb->esc_obj = get_pool_memory(PM_FNAME);
84 mdb->m_use_fatal_jmsg = true;
86 /* Initialize the private members. */
87 mdb->m_db_handle = NULL;
89 mdb->m_buf = get_pool_memory(PM_FNAME);
91 db_list->append(this);
94 BDB_POSTGRESQL::~BDB_POSTGRESQL()
99 * Initialize database data structure. In principal this should
100 * never have errors, or it is really fatal.
102 BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name,
103 const char *db_user, const char *db_password,
104 const char *db_address, int db_port,
105 const char *db_socket, bool mult_db_connections,
106 bool disable_batch_insert)
108 BDB_POSTGRESQL *mdb = NULL;
111 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
114 P(mutex); /* lock DB queue */
115 if (db_list && !mult_db_connections) {
117 * Look to see if DB already open
119 foreach_dlist(mdb, db_list) {
120 if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
121 Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
122 mdb->increment_refcount();
127 Dmsg0(dbglvl_info, "db_init_database first time\n");
128 /* Create the global Bacula db context */
129 mdb = New(BDB_POSTGRESQL());
130 if (!mdb) goto get_out;
133 * Initialize the parent class members.
135 mdb->m_db_name = bstrdup(db_name);
136 mdb->m_db_user = bstrdup(db_user);
138 mdb->m_db_password = bstrdup(db_password);
141 mdb->m_db_address = bstrdup(db_address);
144 mdb->m_db_socket = bstrdup(db_socket);
146 mdb->m_db_port = db_port;
148 if (disable_batch_insert) {
149 mdb->m_disabled_batch_insert = true;
150 mdb->m_have_batch_insert = false;
152 mdb->m_disabled_batch_insert = false;
153 #ifdef USE_BATCH_FILE_INSERT
154 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
155 #ifdef HAVE_PQISTHREADSAFE
156 mdb->m_have_batch_insert = PQisthreadsafe();
158 mdb->m_have_batch_insert = true;
159 #endif /* HAVE_PQISTHREADSAFE */
161 mdb->m_have_batch_insert = true;
162 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
164 mdb->m_have_batch_insert = false;
165 #endif /* USE_BATCH_FILE_INSERT */
167 mdb->m_allow_transactions = mult_db_connections;
169 /* At this time, when mult_db_connections == true, this is for
170 * specific console command such as bvfs or batch mode, and we don't
171 * want to share a batch mode or bvfs. In the future, we can change
172 * the creation function to add this parameter.
174 mdb->m_dedicated = mult_db_connections;
182 /* Check that the database corresponds to the encoding we want */
183 static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
188 if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
189 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
193 if ((row = mdb->sql_fetch_row()) == NULL) {
194 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
195 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
197 ret = bstrcmp(row[0], "SQL_ASCII");
200 /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
201 mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
204 /* Something is wrong with database encoding */
206 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
207 mdb->get_db_name(), row[0]);
208 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
209 Dmsg1(dbglvl_err, "%s", mdb->errmsg);
216 * Now actually open the database. This can generate errors,
217 * which are returned in the errmsg
219 * DO NOT close the database or delete mdb here !!!!
221 bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
226 BDB_POSTGRESQL *mdb = this;
229 if (mdb->m_connected) {
234 if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
236 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
237 be.bstrerror(errstat));
241 if (mdb->m_db_port) {
242 bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
248 /* If connection fails, try at 5 sec intervals for 30 seconds. */
249 for (int retry=0; retry < 6; retry++) {
250 /* connect to the database */
251 mdb->m_db_handle = PQsetdbLogin(
252 mdb->m_db_address, /* default = localhost */
253 port, /* default port */
254 NULL, /* pg options */
255 NULL, /* tty, ignored */
256 mdb->m_db_name, /* database name */
257 mdb->m_db_user, /* login name */
258 mdb->m_db_password); /* password */
260 /* If no connect, try once more in case it is a timing problem */
261 if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
267 Dmsg0(dbglvl_info, "pg_real_connect done\n");
268 Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
269 mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
271 if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
272 Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
273 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
274 mdb->m_db_name, mdb->m_db_user);
278 mdb->m_connected = true;
279 if (!bdb_check_version(jcr)) {
283 sql_query("SET datestyle TO 'ISO, YMD'");
284 sql_query("SET cursor_tuple_fraction=1");
287 * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
288 * WARNING: nonstandard use of \\ in a string literal
290 sql_query("SET standard_conforming_strings=on");
292 /* Check that encoding is SQL_ASCII */
293 pgsql_check_database_encoding(jcr, mdb);
302 void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
304 BDB_POSTGRESQL *mdb = this;
306 if (mdb->m_connected) {
307 bdb_end_transaction(jcr);
311 if (mdb->m_ref_count == 0) {
312 if (mdb->m_connected) {
315 db_list->remove(mdb);
316 if (mdb->m_connected && mdb->m_db_handle) {
317 PQfinish(mdb->m_db_handle);
319 if (is_rwl_valid(&mdb->m_lock)) {
320 rwl_destroy(&mdb->m_lock);
322 free_pool_memory(mdb->errmsg);
323 free_pool_memory(mdb->cmd);
324 free_pool_memory(mdb->cached_path);
325 free_pool_memory(mdb->fname);
326 free_pool_memory(mdb->path);
327 free_pool_memory(mdb->esc_name);
328 free_pool_memory(mdb->esc_path);
329 free_pool_memory(mdb->esc_obj);
330 free_pool_memory(mdb->m_buf);
331 if (mdb->m_db_driver) {
332 free(mdb->m_db_driver);
334 if (mdb->m_db_name) {
335 free(mdb->m_db_name);
337 if (mdb->m_db_user) {
338 free(mdb->m_db_user);
340 if (mdb->m_db_password) {
341 free(mdb->m_db_password);
343 if (mdb->m_db_address) {
344 free(mdb->m_db_address);
346 if (mdb->m_db_socket) {
347 free(mdb->m_db_socket);
350 if (db_list->size() == 0) {
358 void BDB_POSTGRESQL::bdb_thread_cleanup(void)
363 * Escape strings so PostgreSQL is happy
365 * len is the length of the old string. Your new
366 * string must be long enough (max 2*old+1) to hold
367 * the escaped output.
369 void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
371 BDB_POSTGRESQL *mdb = this;
374 PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
376 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
377 /* failed on encoding, probably invalid multibyte encoding in the source string
378 see PQescapeStringConn documentation for details. */
379 Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
384 * Escape binary so that PostgreSQL is happy
387 char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
391 BDB_POSTGRESQL *mdb = this;
394 obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
396 Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
398 mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
399 memcpy(mdb->esc_obj, obj, new_len);
400 mdb->esc_obj[new_len] = 0;
403 return (char *)mdb->esc_obj;
407 * Unescape binary object so that PostgreSQL is happy
410 void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
411 POOLMEM **dest, int32_t *dest_len)
422 obj = PQunescapeBytea((unsigned const char *)from, &new_len);
425 Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
429 *dest = check_pool_memory_size(*dest, new_len+1);
430 memcpy(*dest, obj, new_len);
435 Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
439 * Start a transaction. This groups inserts and makes things more efficient.
440 * Usually started when inserting file attributes.
442 void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
444 BDB_POSTGRESQL *mdb = this;
447 jcr->attr = get_pool_memory(PM_FNAME);
450 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
454 * This is turned off because transactions break if
455 * multiple simultaneous jobs are run.
457 if (!mdb->m_allow_transactions) {
462 /* Allow only 25,000 changes per transaction */
463 if (mdb->m_transaction && changes > 25000) {
464 bdb_end_transaction(jcr);
466 if (!mdb->m_transaction) {
467 sql_query("BEGIN"); /* begin transaction */
468 Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
469 mdb->m_transaction = true;
474 void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
476 BDB_POSTGRESQL *mdb = this;
478 if (jcr && jcr->cached_attribute) {
479 Dmsg0(dbglvl_info, "Flush last cached attribute.\n");
480 if (!bdb_create_attributes_record(jcr, jcr->ar)) {
481 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), jcr->db->bdb_strerror());
483 jcr->cached_attribute = false;
486 if (!mdb->m_allow_transactions) {
491 if (mdb->m_transaction) {
492 sql_query("COMMIT"); /* end transaction */
493 mdb->m_transaction = false;
494 Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
502 * Submit a general SQL command, and for each row returned,
503 * the result_handler is called with the ctx.
505 bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
506 DB_RESULT_HANDLER *result_handler,
509 BDB_POSTGRESQL *mdb = this;
512 bool in_transaction = mdb->m_transaction;
514 Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
517 /* This code handles only SELECT queries */
518 if (strncasecmp(query, "SELECT", 6) != 0) {
519 return bdb_sql_query(query, result_handler, ctx);
522 if (!result_handler) { /* no need of big_query without handler */
528 if (!in_transaction) { /* CURSOR needs transaction */
532 Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
534 if (!sql_query(mdb->m_buf)) {
535 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
536 Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
541 if (!sql_query("FETCH 100 FROM _bac_cursor")) {
542 Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
543 Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
546 while ((row = sql_fetch_row()) != NULL) {
547 Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
548 if (result_handler(ctx, mdb->m_num_fields, row))
551 PQclear(mdb->m_result);
554 } while (m_num_rows > 0); /* TODO: Can probably test against 100 */
556 sql_query("CLOSE _bac_cursor");
558 Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
563 if (!in_transaction) {
564 sql_query("COMMIT"); /* end transaction */
572 * Submit a general SQL command, and for each row returned,
573 * the result_handler is called with the ctx.
575 bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
579 BDB_POSTGRESQL *mdb = this;
581 Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
585 if (!sql_query(query, QF_STORE_RESULT)) {
586 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
587 Dmsg0(dbglvl_err, "db_sql_query failed\n");
592 Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
594 if (result_handler) {
595 Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
596 while ((row = sql_fetch_row())) {
597 Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
598 if (result_handler(ctx, mdb->m_num_fields, row))
604 Dmsg0(dbglvl_info, "db_sql_query finished\n");
612 * If this routine returns false (failure), Bacula expects
613 * that no result has been stored.
614 * This is where QueryDB calls to with Postgresql.
616 * Returns: true on success
620 bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
624 BDB_POSTGRESQL *mdb = this;
626 Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
628 /* We are starting a new query. reset everything. */
629 mdb->m_num_rows = -1;
630 mdb->m_row_number = -1;
631 mdb->m_field_number = -1;
634 PQclear(mdb->m_result); /* hmm, someone forgot to free?? */
635 mdb->m_result = NULL;
638 for (i = 0; i < 10; i++) {
639 mdb->m_result = PQexec(mdb->m_db_handle, query);
645 if (!mdb->m_result) {
646 Dmsg1(dbglvl_err, "Query failed: %s\n", query);
650 mdb->m_status = PQresultStatus(mdb->m_result);
651 if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
652 Dmsg0(dbglvl_dbg, "we have a result\n");
654 /* How many fields in the set? */
655 mdb->m_num_fields = (int)PQnfields(mdb->m_result);
656 Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
658 mdb->m_num_rows = PQntuples(mdb->m_result);
659 Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
661 mdb->m_row_number = 0; /* we can start to fetch something */
662 mdb->m_status = 0; /* succeed */
665 Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
669 Dmsg0(dbglvl_info, "sql_query finishing\n");
673 Dmsg0(dbglvl_err, "we failed\n");
674 PQclear(mdb->m_result);
675 mdb->m_result = NULL;
676 mdb->m_status = 1; /* failed */
682 void BDB_POSTGRESQL::sql_free_result(void)
684 BDB_POSTGRESQL *mdb = this;
688 PQclear(mdb->m_result);
689 mdb->m_result = NULL;
697 mdb->m_fields = NULL;
699 mdb->m_num_rows = mdb->m_num_fields = 0;
703 SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
705 SQL_ROW row = NULL; /* by default, return NULL */
706 BDB_POSTGRESQL *mdb = this;
708 Dmsg0(dbglvl_info, "sql_fetch_row start\n");
710 if (mdb->m_num_fields == 0) { /* No field, no row */
711 Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
715 if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
717 Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
720 Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
721 mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
722 mdb->m_rows_size = mdb->m_num_fields;
724 /* Now reset the row_number now that we have the space allocated */
725 mdb->m_row_number = 0;
728 /* If still within the result set */
729 if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
730 Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
732 /* Get each value from this row */
733 for (int j = 0; j < mdb->m_num_fields; j++) {
734 mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
735 Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
737 mdb->m_row_number++; /* Increment the row number for the next call */
740 Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
743 Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
748 const char *BDB_POSTGRESQL::sql_strerror(void)
750 BDB_POSTGRESQL *mdb = this;
751 return PQerrorMessage(mdb->m_db_handle);
754 void BDB_POSTGRESQL::sql_data_seek(int row)
756 BDB_POSTGRESQL *mdb = this;
757 /* Set the row number to be returned on the next call to sql_fetch_row */
758 mdb->m_row_number = row;
761 int BDB_POSTGRESQL::sql_affected_rows(void)
763 BDB_POSTGRESQL *mdb = this;
764 return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
767 uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
770 char sequence[NAMEDATALEN-1];
771 char getkeyval_query[NAMEDATALEN+50];
773 BDB_POSTGRESQL *mdb = this;
775 /* First execute the insert query and then retrieve the currval. */
776 if (!sql_query(query)) {
780 mdb->m_num_rows = sql_affected_rows();
781 if (mdb->m_num_rows != 1) {
786 * Obtain the current value of the sequence that
787 * provides the serial value for primary key of the table.
789 * currval is local to our session. It is not affected by
790 * other transactions.
792 * Determine the name of the sequence.
793 * PostgreSQL automatically creates a sequence using
794 * <table>_<column>_seq.
795 * At the time of writing, all tables used this format for
796 * for their primary key: <table>id
797 * Except for basefiles which has a primary key on baseid.
798 * Therefore, we need to special case that one table.
800 * everything else can use the PostgreSQL formula.
802 if (strcasecmp(table_name, "basefiles") == 0) {
803 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
805 bstrncpy(sequence, table_name, sizeof(sequence));
806 bstrncat(sequence, "_", sizeof(sequence));
807 bstrncat(sequence, table_name, sizeof(sequence));
808 bstrncat(sequence, "id", sizeof(sequence));
811 bstrncat(sequence, "_seq", sizeof(sequence));
812 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
814 Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
815 for (int i = 0; i < 10; i++) {
816 p_result = PQexec(mdb->m_db_handle, getkeyval_query);
823 Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
827 Dmsg0(dbglvl_dbg, "exec done");
829 if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
830 Dmsg0(dbglvl_dbg, "getting value");
831 id = str_to_uint64(PQgetvalue(p_result, 0, 0));
832 Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
834 Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
835 Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
843 SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
847 BDB_POSTGRESQL *mdb = this;
849 Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
851 if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
854 mdb->m_fields = NULL;
856 Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
857 mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
858 mdb->m_fields_size = mdb->m_num_fields;
860 for (int i = 0; i < mdb->m_num_fields; i++) {
861 Dmsg1(dbglvl_dbg, "filling field %d\n", i);
862 mdb->m_fields[i].name = PQfname(mdb->m_result, i);
863 mdb->m_fields[i].type = PQftype(mdb->m_result, i);
864 mdb->m_fields[i].flags = 0;
866 /* For a given column, find the max length. */
868 for (int j = 0; j < mdb->m_num_rows; j++) {
869 if (PQgetisnull(mdb->m_result, j, i)) {
870 this_len = 4; /* "NULL" */
872 this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
875 if (max_len < this_len) {
879 mdb->m_fields[i].max_length = max_len;
881 Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
882 mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
886 /* Increment field number for the next time around */
887 return &mdb->m_fields[mdb->m_field_number++];
890 bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
892 if (field_type == 1) {
898 bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
901 * TEMP: the following is taken from select OID, typname from pg_type;
903 switch (field_type) {
916 * Escape strings so PostgreSQL is happy on COPY
918 * len is the length of the old string. Your new
919 * string must be long enough (max 2*old+1) to hold
920 * the escaped output.
922 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
924 /* we have to escape \t, \n, \r, \ */
927 while (len > 0 && *src) {
962 bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
964 BDB_POSTGRESQL *mdb = this;
965 const char *query = "COPY batch FROM STDIN";
967 Dmsg0(dbglvl_info, "sql_batch_start started\n");
969 if (!sql_query("CREATE TEMPORARY TABLE batch ("
976 "DeltaSeq smallint)")) {
977 Dmsg0(dbglvl_err, "sql_batch_start failed\n");
981 /* We are starting a new query. reset everything. */
982 mdb->m_num_rows = -1;
983 mdb->m_row_number = -1;
984 mdb->m_field_number = -1;
988 for (int i=0; i < 10; i++) {
989 mdb->m_result = PQexec(mdb->m_db_handle, query);
995 if (!mdb->m_result) {
996 Dmsg1(dbglvl_err, "Query failed: %s\n", query);
1000 mdb->m_status = PQresultStatus(mdb->m_result);
1001 if (mdb->m_status == PGRES_COPY_IN) {
1002 /* How many fields in the set? */
1003 mdb->m_num_fields = (int) PQnfields(mdb->m_result);
1004 mdb->m_num_rows = 0;
1007 Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
1011 Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
1016 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1018 PQclear(mdb->m_result);
1019 mdb->m_result = NULL;
1024 * Set error to something to abort the operation
1026 bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1031 BDB_POSTGRESQL *mdb = this;
1033 Dmsg0(dbglvl_info, "sql_batch_end started\n");
1036 res = PQputCopyEnd(mdb->m_db_handle, error);
1037 } while (res == 0 && --count > 0);
1040 Dmsg0(dbglvl_dbg, "ok\n");
1046 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1047 Dmsg1(dbglvl_err, "failure %s\n", errmsg);
1050 /* Check command status and return to normal libpq state */
1051 p_result = PQgetResult(mdb->m_db_handle);
1052 if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
1053 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1057 /* Get some statistics to compute the best plan */
1058 sql_query("ANALYZE batch");
1062 Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
1066 bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1073 BDB_POSTGRESQL *mdb = this;
1075 mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
1076 pgsql_copy_escape(mdb->esc_name, fname, fnl);
1078 mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
1079 pgsql_copy_escape(mdb->esc_path, path, pnl);
1081 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1084 digest = ar->Digest;
1087 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1088 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
1089 mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
1092 res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
1093 } while (res == 0 && --count > 0);
1096 Dmsg0(dbglvl_dbg, "ok\n");
1103 Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1104 Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
1107 Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
1113 #endif /* HAVE_POSTGRESQL */