2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2017 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Bacula Catalog Database routines specific to PostgreSQL
21 * These are PostgreSQL specific routines
23 * Dan Langille, December 2003
24 * based upon work done by Kern Sibbald, March 2000
26 * Note: at one point, this file was changed to class based by a certain
27 * programmer, and other than "wrapping" in a class, which is a trivial
28 * change for a C++ programmer, nothing substantial was done, yet all the
29 * code was recommitted under this programmer's name. Consequently, we
30 * undo those changes here. Unfortunately, it is too difficult to put
31 * back the original author's name (Dan Langille) on the parts he wrote.
36 #ifdef HAVE_POSTGRESQL
40 #include "postgres_ext.h" /* needed for NAMEDATALEN */
41 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
42 #define __BDB_POSTGRESQL_H_ 1
43 #include "bdb_postgresql.h"
45 #define dbglvl_dbg DT_SQL|100
46 #define dbglvl_info DT_SQL|50
47 #define dbglvl_err DT_SQL|10
49 /* -----------------------------------------------------------------------
51 * PostgreSQL dependent defines and subroutines
53 * -----------------------------------------------------------------------
56 /* List of open databases */
57 static dlist *db_list = NULL;
59 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
61 BDB_POSTGRESQL::BDB_POSTGRESQL(): BDB()
63 BDB_POSTGRESQL *mdb = this;
65 if (db_list == NULL) {
66 db_list = New(dlist(mdb, &mdb->m_link));
68 mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
69 mdb->m_db_type = SQL_TYPE_POSTGRESQL;
70 mdb->m_db_driver = bstrdup("PostgreSQL");
72 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
74 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
75 mdb->cached_path = get_pool_memory(PM_FNAME);
76 mdb->cached_path_id = 0;
78 mdb->fname = get_pool_memory(PM_FNAME);
79 mdb->path = get_pool_memory(PM_FNAME);
80 mdb->esc_name = get_pool_memory(PM_FNAME);
81 mdb->esc_path = get_pool_memory(PM_FNAME);
82 mdb->esc_obj = get_pool_memory(PM_FNAME);
83 mdb->m_use_fatal_jmsg = true;
85 /* Initialize the private members. */
86 mdb->m_db_handle = NULL;
88 mdb->m_buf = get_pool_memory(PM_FNAME);
90 db_list->append(this);
93 BDB_POSTGRESQL::~BDB_POSTGRESQL()
98 * Initialize database data structure. In principal this should
99 * never have errors, or it is really fatal.
101 BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
102 const char *db_password, const char *db_address, int db_port, const char *db_socket,
103 const char *db_ssl_mode, const char *db_ssl_key, const char *db_ssl_cert,
104 const char *db_ssl_ca, const char *db_ssl_capath, const char *db_ssl_cipher,
105 bool mult_db_connections, bool disable_batch_insert)
107 BDB_POSTGRESQL *mdb = NULL;
110 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
113 P(mutex); /* lock DB queue */
114 if (db_list && !mult_db_connections) {
116 * Look to see if DB already open
118 foreach_dlist(mdb, db_list) {
119 if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
120 Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
121 mdb->increment_refcount();
126 Dmsg0(dbglvl_info, "db_init_database first time\n");
127 /* Create the global Bacula db context */
128 mdb = New(BDB_POSTGRESQL());
129 if (!mdb) goto get_out;
131 /* Initialize the parent class members. */
132 mdb->m_db_name = bstrdup(db_name);
133 mdb->m_db_user = bstrdup(db_user);
135 mdb->m_db_password = bstrdup(db_password);
138 mdb->m_db_address = bstrdup(db_address);
141 mdb->m_db_socket = bstrdup(db_socket);
144 mdb->m_db_ssl_mode = bstrdup(db_ssl_mode);
146 mdb->m_db_ssl_mode = bstrdup("prefer");
149 mdb->m_db_ssl_key = bstrdup(db_ssl_key);
152 mdb->m_db_ssl_cert = bstrdup(db_ssl_cert);
155 mdb->m_db_ssl_ca = bstrdup(db_ssl_ca);
157 mdb->m_db_port = db_port;
159 if (disable_batch_insert) {
160 mdb->m_disabled_batch_insert = true;
161 mdb->m_have_batch_insert = false;
163 mdb->m_disabled_batch_insert = false;
164 #ifdef USE_BATCH_FILE_INSERT
165 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
166 #ifdef HAVE_PQISTHREADSAFE
167 mdb->m_have_batch_insert = PQisthreadsafe();
169 mdb->m_have_batch_insert = true;
170 #endif /* HAVE_PQISTHREADSAFE */
172 mdb->m_have_batch_insert = true;
173 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
175 mdb->m_have_batch_insert = false;
176 #endif /* USE_BATCH_FILE_INSERT */
178 mdb->m_allow_transactions = mult_db_connections;
180 /* At this time, when mult_db_connections == true, this is for
181 * specific console command such as bvfs or batch mode, and we don't
182 * want to share a batch mode or bvfs. In the future, we can change
183 * the creation function to add this parameter.
185 mdb->m_dedicated = mult_db_connections;
193 /* Check that the database corresponds to the encoding we want */
194 static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
199 if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
200 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
204 if ((row = mdb->sql_fetch_row()) == NULL) {
205 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
206 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
208 ret = bstrcmp(row[0], "SQL_ASCII");
211 /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
212 mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
215 /* Something is wrong with database encoding */
217 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
218 mdb->get_db_name(), row[0]);
219 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
220 Dmsg1(dbglvl_err, "%s", mdb->errmsg);
227 * Now actually open the database. This can generate errors,
228 * which are returned in the errmsg
230 * DO NOT close the database or delete mdb here !!!!
232 bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
237 BDB_POSTGRESQL *mdb = this;
240 if (mdb->m_connected) {
245 if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
247 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
248 be.bstrerror(errstat));
252 if (mdb->m_db_port) {
253 bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
259 /* Tells libpq that the SSL library has already been initialized */
262 /* If connection fails, try at 5 sec intervals for 30 seconds. */
263 for (int retry=0; retry < 6; retry++) {
264 /* connect to the database */
265 const char *keywords[10] = {"host", "port",
267 "password", "sslmode",
269 "sslrootcert", NULL };
270 const char *values[10] = {mdb->m_db_address, /* default localhost */
271 port, /* default port */
280 mdb->m_db_handle = PQconnectdbParams(keywords,
283 /* If no connect, try once more in case it is a timing problem */
284 if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
290 Dmsg0(dbglvl_info, "pg_real_connect done\n");
291 Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
292 mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
295 #define USE_OPENSSL 1
297 if (PQgetssl(mdb->m_db_handle) != NULL) {
298 Dmsg0(dbglvl_info, "SSL in use\n");
299 ssl = (SSL *)PQgetssl(mdb->m_db_handle);
300 Dmsg2(dbglvl_info, "Version:%s Cipher:%s\n", SSL_get_version(ssl), SSL_get_cipher(ssl));
302 Dmsg0(dbglvl_info, "SSL not in use\n");
306 if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
307 Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
308 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
309 mdb->m_db_name, mdb->m_db_user);
313 mdb->m_connected = true;
314 if (!bdb_check_version(jcr)) {
318 sql_query("SET datestyle TO 'ISO, YMD'");
319 sql_query("SET cursor_tuple_fraction=1");
322 * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
323 * WARNING: nonstandard use of \\ in a string literal
325 sql_query("SET standard_conforming_strings=on");
327 /* Check that encoding is SQL_ASCII */
328 pgsql_check_database_encoding(jcr, mdb);
337 void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
339 BDB_POSTGRESQL *mdb = this;
341 if (mdb->m_connected) {
342 bdb_end_transaction(jcr);
346 if (mdb->m_ref_count == 0) {
347 if (mdb->m_connected) {
350 db_list->remove(mdb);
351 if (mdb->m_connected && mdb->m_db_handle) {
352 PQfinish(mdb->m_db_handle);
354 if (is_rwl_valid(&mdb->m_lock)) {
355 rwl_destroy(&mdb->m_lock);
357 free_pool_memory(mdb->errmsg);
358 free_pool_memory(mdb->cmd);
359 free_pool_memory(mdb->cached_path);
360 free_pool_memory(mdb->fname);
361 free_pool_memory(mdb->path);
362 free_pool_memory(mdb->esc_name);
363 free_pool_memory(mdb->esc_path);
364 free_pool_memory(mdb->esc_obj);
365 free_pool_memory(mdb->m_buf);
366 if (mdb->m_db_driver) {
367 free(mdb->m_db_driver);
369 if (mdb->m_db_name) {
370 free(mdb->m_db_name);
372 if (mdb->m_db_user) {
373 free(mdb->m_db_user);
375 if (mdb->m_db_password) {
376 free(mdb->m_db_password);
378 if (mdb->m_db_address) {
379 free(mdb->m_db_address);
381 if (mdb->m_db_socket) {
382 free(mdb->m_db_socket);
384 if (mdb->m_db_ssl_mode) {
385 free(mdb->m_db_ssl_mode);
387 if (mdb->m_db_ssl_key) {
388 free(mdb->m_db_ssl_key);
390 if (mdb->m_db_ssl_cert) {
391 free(mdb->m_db_ssl_cert);
393 if (mdb->m_db_ssl_ca) {
394 free(mdb->m_db_ssl_ca);
397 if (db_list->size() == 0) {
405 void BDB_POSTGRESQL::bdb_thread_cleanup(void)
410 * Escape strings so PostgreSQL is happy
412 * len is the length of the old string. Your new
413 * string must be long enough (max 2*old+1) to hold
414 * the escaped output.
416 void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
418 BDB_POSTGRESQL *mdb = this;
421 PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
423 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
424 /* failed on encoding, probably invalid multibyte encoding in the source string
425 see PQescapeStringConn documentation for details. */
426 Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
431 * Escape binary so that PostgreSQL is happy
434 char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
438 BDB_POSTGRESQL *mdb = this;
441 obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
443 Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
445 mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
446 memcpy(mdb->esc_obj, obj, new_len);
447 mdb->esc_obj[new_len] = 0;
450 return (char *)mdb->esc_obj;
454 * Unescape binary object so that PostgreSQL is happy
457 void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
458 POOLMEM **dest, int32_t *dest_len)
469 obj = PQunescapeBytea((unsigned const char *)from, &new_len);
472 Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
476 *dest = check_pool_memory_size(*dest, new_len+1);
477 memcpy(*dest, obj, new_len);
482 Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
486 * Start a transaction. This groups inserts and makes things more efficient.
487 * Usually started when inserting file attributes.
489 void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
491 BDB_POSTGRESQL *mdb = this;
494 jcr->attr = get_pool_memory(PM_FNAME);
497 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
498 memset(jcr->ar, 0, sizeof(ATTR_DBR));
502 * This is turned off because transactions break if
503 * multiple simultaneous jobs are run.
505 if (!mdb->m_allow_transactions) {
510 /* Allow only 25,000 changes per transaction */
511 if (mdb->m_transaction && changes > 25000) {
512 bdb_end_transaction(jcr);
514 if (!mdb->m_transaction) {
515 sql_query("BEGIN"); /* begin transaction */
516 Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
517 mdb->m_transaction = true;
522 void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
524 BDB_POSTGRESQL *mdb = this;
526 if (jcr && jcr->cached_attribute) {
527 Dmsg0(dbglvl_info, "Flush last cached attribute.\n");
528 if (!bdb_create_attributes_record(jcr, jcr->ar)) {
529 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), jcr->db->bdb_strerror());
531 jcr->cached_attribute = false;
534 if (!mdb->m_allow_transactions) {
539 if (mdb->m_transaction) {
540 sql_query("COMMIT"); /* end transaction */
541 mdb->m_transaction = false;
542 Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
550 * Submit a general SQL command, and for each row returned,
551 * the result_handler is called with the ctx.
553 bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
554 DB_RESULT_HANDLER *result_handler,
557 BDB_POSTGRESQL *mdb = this;
560 bool in_transaction = mdb->m_transaction;
562 Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
565 /* This code handles only SELECT queries */
566 if (strncasecmp(query, "SELECT", 6) != 0) {
567 return bdb_sql_query(query, result_handler, ctx);
570 if (!result_handler) { /* no need of big_query without handler */
576 if (!in_transaction) { /* CURSOR needs transaction */
580 Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
582 if (!sql_query(mdb->m_buf)) {
583 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
584 Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
589 if (!sql_query("FETCH 100 FROM _bac_cursor")) {
590 Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
591 Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
594 while ((row = sql_fetch_row()) != NULL) {
595 Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
596 if (result_handler(ctx, mdb->m_num_fields, row))
599 PQclear(mdb->m_result);
602 } while (m_num_rows > 0); /* TODO: Can probably test against 100 */
604 sql_query("CLOSE _bac_cursor");
606 Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
611 if (!in_transaction) {
612 sql_query("COMMIT"); /* end transaction */
620 * Submit a general SQL command, and for each row returned,
621 * the result_handler is called with the ctx.
623 bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
627 BDB_POSTGRESQL *mdb = this;
629 Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
633 if (!sql_query(query, QF_STORE_RESULT)) {
634 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
635 Dmsg0(dbglvl_err, "db_sql_query failed\n");
640 Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
642 if (result_handler) {
643 Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
644 while ((row = sql_fetch_row())) {
645 Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
646 if (result_handler(ctx, mdb->m_num_fields, row))
652 Dmsg0(dbglvl_info, "db_sql_query finished\n");
660 * If this routine returns false (failure), Bacula expects
661 * that no result has been stored.
662 * This is where QueryDB calls to with Postgresql.
664 * Returns: true on success
668 bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
672 BDB_POSTGRESQL *mdb = this;
674 Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
676 /* We are starting a new query. reset everything. */
677 mdb->m_num_rows = -1;
678 mdb->m_row_number = -1;
679 mdb->m_field_number = -1;
682 PQclear(mdb->m_result); /* hmm, someone forgot to free?? */
683 mdb->m_result = NULL;
686 for (i = 0; i < 10; i++) {
687 mdb->m_result = PQexec(mdb->m_db_handle, query);
693 if (!mdb->m_result) {
694 Dmsg1(dbglvl_err, "Query failed: %s\n", query);
698 mdb->m_status = PQresultStatus(mdb->m_result);
699 if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
700 Dmsg0(dbglvl_dbg, "we have a result\n");
702 /* How many fields in the set? */
703 mdb->m_num_fields = (int)PQnfields(mdb->m_result);
704 Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
706 mdb->m_num_rows = PQntuples(mdb->m_result);
707 Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
709 mdb->m_row_number = 0; /* we can start to fetch something */
710 mdb->m_status = 0; /* succeed */
713 Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
717 Dmsg0(dbglvl_info, "sql_query finishing\n");
721 Dmsg0(dbglvl_err, "we failed\n");
722 PQclear(mdb->m_result);
723 mdb->m_result = NULL;
724 mdb->m_status = 1; /* failed */
730 void BDB_POSTGRESQL::sql_free_result(void)
732 BDB_POSTGRESQL *mdb = this;
736 PQclear(mdb->m_result);
737 mdb->m_result = NULL;
745 mdb->m_fields = NULL;
747 mdb->m_num_rows = mdb->m_num_fields = 0;
751 SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
753 SQL_ROW row = NULL; /* by default, return NULL */
754 BDB_POSTGRESQL *mdb = this;
756 Dmsg0(dbglvl_info, "sql_fetch_row start\n");
758 if (mdb->m_num_fields == 0) { /* No field, no row */
759 Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
763 if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
765 Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
768 Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
769 mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
770 mdb->m_rows_size = mdb->m_num_fields;
772 /* Now reset the row_number now that we have the space allocated */
773 mdb->m_row_number = 0;
776 /* If still within the result set */
777 if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
778 Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
780 /* Get each value from this row */
781 for (int j = 0; j < mdb->m_num_fields; j++) {
782 mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
783 Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
785 mdb->m_row_number++; /* Increment the row number for the next call */
788 Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
791 Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
796 const char *BDB_POSTGRESQL::sql_strerror(void)
798 BDB_POSTGRESQL *mdb = this;
799 return PQerrorMessage(mdb->m_db_handle);
802 void BDB_POSTGRESQL::sql_data_seek(int row)
804 BDB_POSTGRESQL *mdb = this;
805 /* Set the row number to be returned on the next call to sql_fetch_row */
806 mdb->m_row_number = row;
809 int BDB_POSTGRESQL::sql_affected_rows(void)
811 BDB_POSTGRESQL *mdb = this;
812 return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
815 uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
818 char sequence[NAMEDATALEN-1];
819 char getkeyval_query[NAMEDATALEN+50];
821 BDB_POSTGRESQL *mdb = this;
823 /* First execute the insert query and then retrieve the currval. */
824 if (!sql_query(query)) {
828 mdb->m_num_rows = sql_affected_rows();
829 if (mdb->m_num_rows != 1) {
834 * Obtain the current value of the sequence that
835 * provides the serial value for primary key of the table.
837 * currval is local to our session. It is not affected by
838 * other transactions.
840 * Determine the name of the sequence.
841 * PostgreSQL automatically creates a sequence using
842 * <table>_<column>_seq.
843 * At the time of writing, all tables used this format for
844 * for their primary key: <table>id
845 * Except for basefiles which has a primary key on baseid.
846 * Therefore, we need to special case that one table.
848 * everything else can use the PostgreSQL formula.
850 if (strcasecmp(table_name, "basefiles") == 0) {
851 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
853 bstrncpy(sequence, table_name, sizeof(sequence));
854 bstrncat(sequence, "_", sizeof(sequence));
855 bstrncat(sequence, table_name, sizeof(sequence));
856 bstrncat(sequence, "id", sizeof(sequence));
859 bstrncat(sequence, "_seq", sizeof(sequence));
860 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
862 Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
863 for (int i = 0; i < 10; i++) {
864 p_result = PQexec(mdb->m_db_handle, getkeyval_query);
871 Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
875 Dmsg0(dbglvl_dbg, "exec done");
877 if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
878 Dmsg0(dbglvl_dbg, "getting value");
879 id = str_to_uint64(PQgetvalue(p_result, 0, 0));
880 Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
882 Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
883 Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
891 SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
895 BDB_POSTGRESQL *mdb = this;
897 Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
899 if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
902 mdb->m_fields = NULL;
904 Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
905 mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
906 mdb->m_fields_size = mdb->m_num_fields;
908 for (int i = 0; i < mdb->m_num_fields; i++) {
909 Dmsg1(dbglvl_dbg, "filling field %d\n", i);
910 mdb->m_fields[i].name = PQfname(mdb->m_result, i);
911 mdb->m_fields[i].type = PQftype(mdb->m_result, i);
912 mdb->m_fields[i].flags = 0;
914 /* For a given column, find the max length. */
916 for (int j = 0; j < mdb->m_num_rows; j++) {
917 if (PQgetisnull(mdb->m_result, j, i)) {
918 this_len = 4; /* "NULL" */
920 this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
923 if (max_len < this_len) {
927 mdb->m_fields[i].max_length = max_len;
929 Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
930 mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
934 /* Increment field number for the next time around */
935 return &mdb->m_fields[mdb->m_field_number++];
938 bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
940 if (field_type == 1) {
946 bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
949 * TEMP: the following is taken from select OID, typname from pg_type;
951 switch (field_type) {
964 * Escape strings so PostgreSQL is happy on COPY
966 * len is the length of the old string. Your new
967 * string must be long enough (max 2*old+1) to hold
968 * the escaped output.
970 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
972 /* we have to escape \t, \n, \r, \ */
975 while (len > 0 && *src) {
1010 bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
1012 BDB_POSTGRESQL *mdb = this;
1013 const char *query = "COPY batch FROM STDIN";
1015 Dmsg0(dbglvl_info, "sql_batch_start started\n");
1017 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1024 "DeltaSeq smallint)")) {
1025 Dmsg0(dbglvl_err, "sql_batch_start failed\n");
1029 /* We are starting a new query. reset everything. */
1030 mdb->m_num_rows = -1;
1031 mdb->m_row_number = -1;
1032 mdb->m_field_number = -1;
1036 for (int i=0; i < 10; i++) {
1037 mdb->m_result = PQexec(mdb->m_db_handle, query);
1038 if (mdb->m_result) {
1043 if (!mdb->m_result) {
1044 Dmsg1(dbglvl_err, "Query failed: %s\n", query);
1048 mdb->m_status = PQresultStatus(mdb->m_result);
1049 if (mdb->m_status == PGRES_COPY_IN) {
1050 /* How many fields in the set? */
1051 mdb->m_num_fields = (int) PQnfields(mdb->m_result);
1052 mdb->m_num_rows = 0;
1055 Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
1059 Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
1064 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1066 PQclear(mdb->m_result);
1067 mdb->m_result = NULL;
1072 * Set error to something to abort the operation
1074 bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1079 BDB_POSTGRESQL *mdb = this;
1081 Dmsg0(dbglvl_info, "sql_batch_end started\n");
1084 res = PQputCopyEnd(mdb->m_db_handle, error);
1085 } while (res == 0 && --count > 0);
1088 Dmsg0(dbglvl_dbg, "ok\n");
1094 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1095 Dmsg1(dbglvl_err, "failure %s\n", errmsg);
1098 /* Check command status and return to normal libpq state */
1099 p_result = PQgetResult(mdb->m_db_handle);
1100 if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
1101 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1105 /* Get some statistics to compute the best plan */
1106 sql_query("ANALYZE batch");
1110 Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
1114 bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1121 BDB_POSTGRESQL *mdb = this;
1123 mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
1124 pgsql_copy_escape(mdb->esc_name, fname, fnl);
1126 mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
1127 pgsql_copy_escape(mdb->esc_path, path, pnl);
1129 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1132 digest = ar->Digest;
1135 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1136 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
1137 mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
1140 res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
1141 } while (res == 0 && --count > 0);
1144 Dmsg0(dbglvl_dbg, "ok\n");
1151 Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1152 Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
1155 Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
1161 #endif /* HAVE_POSTGRESQL */