]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Backport from BEE
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2014 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from many
7    others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    Bacula® is a registered trademark of Kern Sibbald.
15 */
16 /*
17  * Bacula Catalog Database routines specific to PostgreSQL
18  *   These are PostgreSQL specific routines
19  *
20  *    Dan Langille, December 2003
21  *    based upon work done by Kern Sibbald, March 2000
22  *
23  *   Class wrapper by Marco van Wieringen, January 2010
24  */
25
26 #include "bacula.h"
27
28 #ifdef HAVE_POSTGRESQL
29
30 #include "cats.h"
31 #include "bdb_priv.h"
32 #include "libpq-fe.h"
33 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
34 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
35 #include "bdb_postgresql.h"
36
37 /* -----------------------------------------------------------------------
38  *
39  *   PostgreSQL dependent defines and subroutines
40  *
41  * -----------------------------------------------------------------------
42  */
43
44 /*
45  * List of open databases
46  */
47 static dlist *db_list = NULL;
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
52    const char *db_driver,
53    const char *db_name,
54    const char *db_user,
55    const char *db_password,
56    const char *db_address,
57    int db_port,
58    const char *db_socket,
59    bool mult_db_connections,
60    bool disable_batch_insert)
61 {
62    /*
63     * Initialize the parent class members.
64     */
65    m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
66    m_db_type = SQL_TYPE_POSTGRESQL;
67    m_db_driver = bstrdup("PostgreSQL");
68    m_db_name = bstrdup(db_name);
69    m_db_user = bstrdup(db_user);
70    if (db_password) {
71       m_db_password = bstrdup(db_password);
72    }
73    if (db_address) {
74       m_db_address = bstrdup(db_address);
75    }
76    if (db_socket) {
77       m_db_socket = bstrdup(db_socket);
78    }
79    m_db_port = db_port;
80    if (disable_batch_insert) {
81       m_disabled_batch_insert = true;
82       m_have_batch_insert = false;
83    } else {
84       m_disabled_batch_insert = false;
85 #if defined(USE_BATCH_FILE_INSERT)
86 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
87 #ifdef HAVE_PQISTHREADSAFE
88       m_have_batch_insert = PQisthreadsafe();
89 #else
90       m_have_batch_insert = true;
91 #endif /* HAVE_PQISTHREADSAFE */
92 #else
93       m_have_batch_insert = true;
94 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
95 #else
96       m_have_batch_insert = false;
97 #endif /* USE_BATCH_FILE_INSERT */
98    }
99    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
100    *errmsg = 0;
101    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
102    cached_path = get_pool_memory(PM_FNAME);
103    cached_path_id = 0;
104    m_ref_count = 1;
105    fname = get_pool_memory(PM_FNAME);
106    path = get_pool_memory(PM_FNAME);
107    esc_name = get_pool_memory(PM_FNAME);
108    esc_path = get_pool_memory(PM_FNAME);
109    esc_obj = get_pool_memory(PM_FNAME);
110    m_buf =  get_pool_memory(PM_FNAME);
111    m_allow_transactions = mult_db_connections;
112
113    /* At this time, when mult_db_connections == true, this is for
114     * specific console command such as bvfs or batch mode, and we don't
115     * want to share a batch mode or bvfs. In the future, we can change
116     * the creation function to add this parameter.
117     */
118    m_dedicated = mult_db_connections;
119
120    /*
121     * Initialize the private members.
122     */
123    m_db_handle = NULL;
124    m_result = NULL;
125
126    /*
127     * Put the db in the list.
128     */
129    if (db_list == NULL) {
130       db_list = New(dlist(this, &this->m_link));
131    }
132    db_list->append(this);
133 }
134
135 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
136 {
137 }
138
139 /*
140  * Check that the database correspond to the encoding we want
141  */
142 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
143 {
144    SQL_ROW row;
145    int ret = false;
146
147    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
148       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
149       return false;
150    }
151
152    if ((row = mdb->sql_fetch_row()) == NULL) {
153       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
154       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
155    } else {
156       ret = bstrcmp(row[0], "SQL_ASCII");
157
158       if (ret) {
159          /*
160           * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
161           */
162          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
163
164       } else {
165          /*
166           * Something is wrong with database encoding
167           */
168          Mmsg(mdb->errmsg,
169               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
170               mdb->get_db_name(), row[0]);
171          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
172          Dmsg1(50, "%s", mdb->errmsg);
173       }
174    }
175    return ret;
176 }
177
178 /*
179  * Now actually open the database.  This can generate errors,
180  *   which are returned in the errmsg
181  *
182  * DO NOT close the database or delete mdb here !!!!
183  */
184 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
185 {
186    bool retval = false;
187    int errstat;
188    char buf[10], *port;
189
190    P(mutex);
191    if (m_connected) {
192       retval = true;
193       goto bail_out;
194    }
195
196    if ((errstat=rwl_init(&m_lock)) != 0) {
197       berrno be;
198       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
199             be.bstrerror(errstat));
200       goto bail_out;
201    }
202
203    if (m_db_port) {
204       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
205       port = buf;
206    } else {
207       port = NULL;
208    }
209
210    /* If connection fails, try at 5 sec intervals for 30 seconds. */
211    for (int retry=0; retry < 6; retry++) {
212       /* connect to the database */
213       m_db_handle = PQsetdbLogin(
214            m_db_address,         /* default = localhost */
215            port,                 /* default port */
216            NULL,                 /* pg options */
217            NULL,                 /* tty, ignored */
218            m_db_name,            /* database name */
219            m_db_user,            /* login name */
220            m_db_password);       /* password */
221
222       /* If no connect, try once more in case it is a timing problem */
223       if (PQstatus(m_db_handle) == CONNECTION_OK) {
224          break;
225       }
226       bmicrosleep(5, 0);
227    }
228
229    Dmsg0(50, "pg_real_connect done\n");
230    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
231         (m_db_password == NULL) ? "(NULL)" : m_db_password);
232
233    if (PQstatus(m_db_handle) != CONNECTION_OK) {
234       Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
235          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
236          m_db_name, m_db_user);
237       goto bail_out;
238    }
239
240    m_connected = true;
241    if (!check_tables_version(jcr, this)) {
242       goto bail_out;
243    }
244
245    sql_query("SET datestyle TO 'ISO, YMD'");
246    sql_query("SET cursor_tuple_fraction=1");
247
248    /*
249     * Tell PostgreSQL we are using standard conforming strings
250     * and avoid warnings such as:
251     *  WARNING:  nonstandard use of \\ in a string literal
252     */
253    sql_query("SET standard_conforming_strings=on");
254
255    /*
256     * Check that encoding is SQL_ASCII
257     */
258    pgsql_check_database_encoding(jcr, this);
259
260    retval = true;
261
262 bail_out:
263    V(mutex);
264    return retval;
265 }
266
267 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
268 {
269    if (m_connected) {
270       db_end_transaction(jcr);
271    }
272    P(mutex);
273    m_ref_count--;
274    if (m_ref_count == 0) {
275       if (m_connected) {
276          sql_free_result();
277       }
278       db_list->remove(this);
279       if (m_connected && m_db_handle) {
280          PQfinish(m_db_handle);
281       }
282       if (rwl_is_init(&m_lock)) {
283          rwl_destroy(&m_lock);
284       }
285       free_pool_memory(errmsg);
286       free_pool_memory(cmd);
287       free_pool_memory(cached_path);
288       free_pool_memory(fname);
289       free_pool_memory(path);
290       free_pool_memory(esc_name);
291       free_pool_memory(esc_path);
292       free_pool_memory(esc_obj);
293       free_pool_memory(m_buf);
294       if (m_db_driver) {
295          free(m_db_driver);
296       }
297       if (m_db_name) {
298          free(m_db_name);
299       }
300       if (m_db_user) {
301          free(m_db_user);
302       }
303       if (m_db_password) {
304          free(m_db_password);
305       }
306       if (m_db_address) {
307          free(m_db_address);
308       }
309       if (m_db_socket) {
310          free(m_db_socket);
311       }
312       delete this;
313       if (db_list->size() == 0) {
314          delete db_list;
315          db_list = NULL;
316       }
317    }
318    V(mutex);
319 }
320
321 void B_DB_POSTGRESQL::db_thread_cleanup(void)
322 {
323 }
324
325 /*
326  * Escape strings so that PostgreSQL is happy
327  *
328  *   NOTE! len is the length of the old string. Your new
329  *         string must be long enough (max 2*old+1) to hold
330  *         the escaped output.
331  */
332 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
333 {
334    int error;
335
336    PQescapeStringConn(m_db_handle, snew, old, len, &error);
337    if (error) {
338       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
339       /* error on encoding, probably invalid multibyte encoding in the source string
340         see PQescapeStringConn documentation for details. */
341       Dmsg0(500, "PQescapeStringConn failed\n");
342    }
343 }
344
345 /*
346  * Escape binary so that PostgreSQL is happy
347  *
348  */
349 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
350 {
351    size_t new_len;
352    unsigned char *obj;
353
354    obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
355    if (!obj) {
356       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
357    }
358
359    esc_obj = check_pool_memory_size(esc_obj, new_len+1);
360    memcpy(esc_obj, obj, new_len);
361    esc_obj[new_len]=0;
362
363    PQfreemem(obj);
364
365    return (char *)esc_obj;
366 }
367
368 /*
369  * Unescape binary object so that PostgreSQL is happy
370  *
371  */
372 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
373                                          POOLMEM **dest, int32_t *dest_len)
374 {
375    size_t new_len;
376    unsigned char *obj;
377
378    if (!from) {
379       *dest[0] = 0;
380       *dest_len = 0;
381       return;
382    }
383
384    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
385
386    if (!obj) {
387       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
388    }
389
390    *dest_len = new_len;
391    *dest = check_pool_memory_size(*dest, new_len+1);
392    memcpy(*dest, obj, new_len);
393    (*dest)[new_len]=0;
394
395    PQfreemem(obj);
396
397    Dmsg1(010, "obj size: %d\n", *dest_len);
398 }
399
400 /*
401  * Start a transaction. This groups inserts and makes things
402  * much more efficient. Usually started when inserting
403  * file attributes.
404  */
405 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
406 {
407    if (!jcr->attr) {
408       jcr->attr = get_pool_memory(PM_FNAME);
409    }
410    if (!jcr->ar) {
411       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
412    }
413
414    /*
415     * This is turned off because transactions break
416     * if multiple simultaneous jobs are run.
417     */
418    if (!m_allow_transactions) {
419       return;
420    }
421
422    db_lock(this);
423    /*
424     * Allow only 25,000 changes per transaction
425     */
426    if (m_transaction && changes > 25000) {
427       db_end_transaction(jcr);
428    }
429    if (!m_transaction) {
430       sql_query("BEGIN");  /* begin transaction */
431       Dmsg0(400, "Start PosgreSQL transaction\n");
432       m_transaction = true;
433    }
434    db_unlock(this);
435 }
436
437 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
438 {
439    if (jcr && jcr->cached_attribute) {
440       Dmsg0(400, "Flush last cached attribute.\n");
441       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
442          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
443       }
444       jcr->cached_attribute = false;
445    }
446
447    if (!m_allow_transactions) {
448       return;
449    }
450
451    db_lock(this);
452    if (m_transaction) {
453       sql_query("COMMIT"); /* end transaction */
454       m_transaction = false;
455       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
456    }
457    changes = 0;
458    db_unlock(this);
459 }
460
461
462 /*
463  * Submit a general SQL command (cmd), and for each row returned,
464  * the result_handler is called with the ctx.
465  */
466 bool B_DB_POSTGRESQL::db_big_sql_query(const char *query,
467                                        DB_RESULT_HANDLER *result_handler,
468                                        void *ctx)
469 {
470    SQL_ROW row;
471    bool retval = false;
472    bool in_transaction = m_transaction;
473
474    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
475
476    /* This code handles only SELECT queries */
477    if (strncasecmp(query, "SELECT", 6) != 0) {
478       return db_sql_query(query, result_handler, ctx);
479    }
480
481    if (!result_handler) {       /* no need of big_query without handler */
482       return false;
483    }
484
485    db_lock(this);
486
487    if (!in_transaction) {       /* CURSOR needs transaction */
488       sql_query("BEGIN");
489    }
490
491    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
492
493    if (!sql_query(m_buf)) {
494       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
495       Dmsg0(50, "db_sql_query failed\n");
496       goto bail_out;
497    }
498
499    do {
500       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
501          goto bail_out;
502       }
503       while ((row = sql_fetch_row()) != NULL) {
504          Dmsg1(500, "Fetching %d rows\n", m_num_rows);
505          if (result_handler(ctx, m_num_fields, row))
506             break;
507       }
508       PQclear(m_result);
509       m_result = NULL;
510
511    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
512
513    sql_query("CLOSE _bac_cursor");
514
515    Dmsg0(500, "db_big_sql_query finished\n");
516    sql_free_result();
517    retval = true;
518
519 bail_out:
520    if (!in_transaction) {
521       sql_query("COMMIT");  /* end transaction */
522    }
523
524    db_unlock(this);
525    return retval;
526 }
527
528 /*
529  * Submit a general SQL command (cmd), and for each row returned,
530  * the result_handler is called with the ctx.
531  */
532 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
533 {
534    SQL_ROW row;
535    bool retval = true;
536
537    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
538
539    db_lock(this);
540    if (!sql_query(query, QF_STORE_RESULT)) {
541       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
542       Dmsg0(500, "db_sql_query failed\n");
543       retval = false;
544       goto bail_out;
545    }
546
547    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
548
549    if (result_handler != NULL) {
550       Dmsg0(500, "db_sql_query invoking handler\n");
551       while ((row = sql_fetch_row()) != NULL) {
552          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
553          if (result_handler(ctx, m_num_fields, row))
554             break;
555       }
556       sql_free_result();
557    }
558
559    Dmsg0(500, "db_sql_query finished\n");
560
561 bail_out:
562    db_unlock(this);
563    return retval;
564 }
565
566 /*
567  * Note, if this routine returns false (failure), Bacula expects
568  * that no result has been stored.
569  * This is where QUERY_DB comes with Postgresql.
570  *
571  *  Returns:  true  on success
572  *            false on failure
573  *
574  */
575 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
576 {
577    int i;
578    bool retval = false;
579
580    Dmsg1(500, "sql_query starts with '%s'\n", query);
581    /*
582     * We are starting a new query. reset everything.
583     */
584    m_num_rows     = -1;
585    m_row_number   = -1;
586    m_field_number = -1;
587
588    if (m_result) {
589       PQclear(m_result);  /* hmm, someone forgot to free?? */
590       m_result = NULL;
591    }
592
593    for (i = 0; i < 10; i++) {
594       m_result = PQexec(m_db_handle, query);
595       if (m_result) {
596          break;
597       }
598       bmicrosleep(5, 0);
599    }
600    if (!m_result) {
601       Dmsg1(50, "Query failed: %s\n", query);
602       goto bail_out;
603    }
604
605    m_status = PQresultStatus(m_result);
606    if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
607       Dmsg0(500, "we have a result\n");
608
609       /*
610        * How many fields in the set?
611        */
612       m_num_fields = (int)PQnfields(m_result);
613       Dmsg1(500, "we have %d fields\n", m_num_fields);
614
615       m_num_rows = PQntuples(m_result);
616       Dmsg1(500, "we have %d rows\n", m_num_rows);
617
618       m_row_number = 0;      /* we can start to fetch something */
619       m_status = 0;          /* succeed */
620       retval = true;
621    } else {
622       Dmsg1(50, "Result status failed: %s\n", query);
623       goto bail_out;
624    }
625
626    Dmsg0(500, "sql_query finishing\n");
627    goto ok_out;
628
629 bail_out:
630    Dmsg0(500, "we failed\n");
631    PQclear(m_result);
632    m_result = NULL;
633    m_status = 1;                   /* failed */
634
635 ok_out:
636    return retval;
637 }
638
639 void B_DB_POSTGRESQL::sql_free_result(void)
640 {
641    db_lock(this);
642    if (m_result) {
643       PQclear(m_result);
644       m_result = NULL;
645    }
646    if (m_rows) {
647       free(m_rows);
648       m_rows = NULL;
649    }
650    if (m_fields) {
651       free(m_fields);
652       m_fields = NULL;
653    }
654    m_num_rows = m_num_fields = 0;
655    db_unlock(this);
656 }
657
658 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
659 {
660    int j;
661    SQL_ROW row = NULL; /* by default, return NULL */
662
663    Dmsg0(500, "sql_fetch_row start\n");
664
665    if (m_num_fields == 0) {     /* No field, no row */
666       Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
667       return NULL;
668    }
669
670    if (!m_rows || m_rows_size < m_num_fields) {
671       if (m_rows) {
672          Dmsg0(500, "sql_fetch_row freeing space\n");
673          free(m_rows);
674       }
675       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
676       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
677       m_rows_size = m_num_fields;
678
679       /*
680        * Now reset the row_number now that we have the space allocated
681        */
682       m_row_number = 0;
683    }
684
685    /*
686     * If still within the result set
687     */
688    if (m_row_number >= 0 && m_row_number < m_num_rows) {
689       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
690       /*
691        * Get each value from this row
692        */
693       for (j = 0; j < m_num_fields; j++) {
694          m_rows[j] = PQgetvalue(m_result, m_row_number, j);
695          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
696       }
697       /*
698        * Increment the row number for the next call
699        */
700       m_row_number++;
701       row = m_rows;
702    } else {
703       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
704    }
705
706    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
707
708    return row;
709 }
710
711 const char *B_DB_POSTGRESQL::sql_strerror(void)
712 {
713    return PQerrorMessage(m_db_handle);
714 }
715
716 void B_DB_POSTGRESQL::sql_data_seek(int row)
717 {
718    /*
719     * Set the row number to be returned on the next call to sql_fetch_row
720     */
721    m_row_number = row;
722 }
723
724 int B_DB_POSTGRESQL::sql_affected_rows(void)
725 {
726    return (unsigned) str_to_int32(PQcmdTuples(m_result));
727 }
728
729 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
730 {
731    int i;
732    uint64_t id = 0;
733    char sequence[NAMEDATALEN-1];
734    char getkeyval_query[NAMEDATALEN+50];
735    PGresult *pg_result;
736
737    /*
738     * First execute the insert query and then retrieve the currval.
739     */
740    if (!sql_query(query)) {
741       return 0;
742    }
743
744    m_num_rows = sql_affected_rows();
745    if (m_num_rows != 1) {
746       return 0;
747    }
748
749    changes++;
750
751    /*
752     * Obtain the current value of the sequence that
753     * provides the serial value for primary key of the table.
754     *
755     * currval is local to our session.  It is not affected by
756     * other transactions.
757     *
758     * Determine the name of the sequence.
759     * PostgreSQL automatically creates a sequence using
760     * <table>_<column>_seq.
761     * At the time of writing, all tables used this format for
762     * for their primary key: <table>id
763     * Except for basefiles which has a primary key on baseid.
764     * Therefore, we need to special case that one table.
765     *
766     * everything else can use the PostgreSQL formula.
767     */
768    if (strcasecmp(table_name, "basefiles") == 0) {
769       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
770    } else {
771       bstrncpy(sequence, table_name, sizeof(sequence));
772       bstrncat(sequence, "_",        sizeof(sequence));
773       bstrncat(sequence, table_name, sizeof(sequence));
774       bstrncat(sequence, "id",       sizeof(sequence));
775    }
776
777    bstrncat(sequence, "_seq", sizeof(sequence));
778    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
779
780    Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
781    for (i = 0; i < 10; i++) {
782       pg_result = PQexec(m_db_handle, getkeyval_query);
783       if (pg_result) {
784          break;
785       }
786       bmicrosleep(5, 0);
787    }
788    if (!pg_result) {
789       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
790       goto bail_out;
791    }
792
793    Dmsg0(500, "exec done");
794
795    if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
796       Dmsg0(500, "getting value");
797       id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
798       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
799    } else {
800       Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
801       Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
802    }
803
804 bail_out:
805    PQclear(pg_result);
806
807    return id;
808 }
809
810 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
811 {
812    int i, j;
813    int max_length;
814    int this_length;
815
816    Dmsg0(500, "sql_fetch_field starts\n");
817
818    if (!m_fields || m_fields_size < m_num_fields) {
819       if (m_fields) {
820          free(m_fields);
821          m_fields = NULL;
822       }
823       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
824       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
825       m_fields_size = m_num_fields;
826
827       for (i = 0; i < m_num_fields; i++) {
828          Dmsg1(500, "filling field %d\n", i);
829          m_fields[i].name = PQfname(m_result, i);
830          m_fields[i].type = PQftype(m_result, i);
831          m_fields[i].flags = 0;
832
833          /*
834           * For a given column, find the max length.
835           */
836          max_length = 0;
837          for (j = 0; j < m_num_rows; j++) {
838             if (PQgetisnull(m_result, j, i)) {
839                 this_length = 4;        /* "NULL" */
840             } else {
841                 this_length = cstrlen(PQgetvalue(m_result, j, i));
842             }
843
844             if (max_length < this_length) {
845                max_length = this_length;
846             }
847          }
848          m_fields[i].max_length = max_length;
849
850          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
851                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
852       }
853    }
854
855    /*
856     * Increment field number for the next time around
857     */
858    return &m_fields[m_field_number++];
859 }
860
861 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
862 {
863    switch (field_type) {
864    case 1:
865       return true;
866    default:
867       return false;
868    }
869 }
870
871 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
872 {
873    /*
874     * TEMP: the following is taken from select OID, typname from pg_type;
875     */
876    switch (field_type) {
877    case 20:
878    case 21:
879    case 23:
880    case 700:
881    case 701:
882       return true;
883    default:
884       return false;
885    }
886 }
887
888 /*
889  * Escape strings so that PostgreSQL is happy on COPY
890  *
891  *   NOTE! len is the length of the old string. Your new
892  *         string must be long enough (max 2*old+1) to hold
893  *         the escaped output.
894  */
895 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
896 {
897    /* we have to escape \t, \n, \r, \ */
898    char c = '\0' ;
899
900    while (len > 0 && *src) {
901       switch (*src) {
902       case '\n':
903          c = 'n';
904          break;
905       case '\\':
906          c = '\\';
907          break;
908       case '\t':
909          c = 't';
910          break;
911       case '\r':
912          c = 'r';
913          break;
914       default:
915          c = '\0' ;
916       }
917
918       if (c) {
919          *dest = '\\';
920          dest++;
921          *dest = c;
922       } else {
923          *dest = *src;
924       }
925
926       len--;
927       src++;
928       dest++;
929    }
930
931    *dest = '\0';
932    return dest;
933 }
934
935 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
936 {
937    const char *query = "COPY batch FROM STDIN";
938
939    Dmsg0(500, "sql_batch_start started\n");
940
941    if (!sql_query("CREATE TEMPORARY TABLE batch ("
942                           "FileIndex int,"
943                           "JobId int,"
944                           "Path varchar,"
945                           "Name varchar,"
946                           "LStat varchar,"
947                           "Md5 varchar,"
948                           "DeltaSeq smallint)")) {
949       Dmsg0(500, "sql_batch_start failed\n");
950       return false;
951    }
952
953    /*
954     * We are starting a new query.  reset everything.
955     */
956    m_num_rows     = -1;
957    m_row_number   = -1;
958    m_field_number = -1;
959
960    sql_free_result();
961
962    for (int i=0; i < 10; i++) {
963       m_result = PQexec(m_db_handle, query);
964       if (m_result) {
965          break;
966       }
967       bmicrosleep(5, 0);
968    }
969    if (!m_result) {
970       Dmsg1(50, "Query failed: %s\n", query);
971       goto bail_out;
972    }
973
974    m_status = PQresultStatus(m_result);
975    if (m_status == PGRES_COPY_IN) {
976       /*
977        * How many fields in the set?
978        */
979       m_num_fields = (int) PQnfields(m_result);
980       m_num_rows = 0;
981       m_status = 1;
982    } else {
983       Dmsg1(50, "Result status failed: %s\n", query);
984       goto bail_out;
985    }
986
987    Dmsg0(500, "sql_batch_start finishing\n");
988
989    return true;
990
991 bail_out:
992    Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
993    m_status = 0;
994    PQclear(m_result);
995    m_result = NULL;
996    return false;
997 }
998
999 /*
1000  * Set error to something to abort operation
1001  */
1002 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1003 {
1004    int res;
1005    int count=30;
1006    PGresult *pg_result;
1007
1008    Dmsg0(500, "sql_batch_end started\n");
1009
1010    do {
1011       res = PQputCopyEnd(m_db_handle, error);
1012    } while (res == 0 && --count > 0);
1013
1014    if (res == 1) {
1015       Dmsg0(500, "ok\n");
1016       m_status = 0;
1017    }
1018
1019    if (res <= 0) {
1020       Dmsg0(500, "we failed\n");
1021       m_status = 1;
1022       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1023       Dmsg1(500, "failure %s\n", errmsg);
1024    }
1025
1026    /* Check command status and return to normal libpq state */
1027    pg_result = PQgetResult(m_db_handle);
1028    if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
1029       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1030       m_status = 1;
1031    }
1032
1033    /* Get some statistics to compute the best plan */
1034    sql_query("ANALYZE batch");
1035
1036    PQclear(pg_result);
1037
1038    Dmsg0(500, "sql_batch_end finishing\n");
1039    return true;
1040 }
1041
1042 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1043 {
1044    int res;
1045    int count=30;
1046    size_t len;
1047    const char *digest;
1048    char ed1[50];
1049
1050    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1051    pgsql_copy_escape(esc_name, fname, fnl);
1052
1053    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1054    pgsql_copy_escape(esc_path, path, pnl);
1055
1056    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1057       digest = "0";
1058    } else {
1059       digest = ar->Digest;
1060    }
1061
1062    len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1063               ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1064               esc_name, ar->attr, digest, ar->DeltaSeq);
1065
1066    do {
1067       res = PQputCopyData(m_db_handle, cmd, len);
1068    } while (res == 0 && --count > 0);
1069
1070    if (res == 1) {
1071       Dmsg0(500, "ok\n");
1072       changes++;
1073       m_status = 1;
1074    }
1075
1076    if (res <= 0) {
1077       Dmsg0(500, "we failed\n");
1078       m_status = 0;
1079       Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1080       Dmsg1(500, "failure %s\n", errmsg);
1081    }
1082
1083    Dmsg0(500, "sql_batch_insert finishing\n");
1084
1085    return true;
1086 }
1087
1088 /*
1089  * Initialize database data structure. In principal this should
1090  * never have errors, or it is really fatal.
1091  */
1092 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name,
1093                        const char *db_user, const char *db_password,
1094                        const char *db_address, int db_port,
1095                        const char *db_socket, bool mult_db_connections,
1096                        bool disable_batch_insert)
1097 {
1098    B_DB_POSTGRESQL *mdb = NULL;
1099
1100    if (!db_user) {
1101       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1102       return NULL;
1103    }
1104    P(mutex);                          /* lock DB queue */
1105    if (db_list && !mult_db_connections) {
1106       /*
1107        * Look to see if DB already open
1108        */
1109       foreach_dlist(mdb, db_list) {
1110          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1111             Dmsg1(100, "DB REopen %s\n", db_name);
1112             mdb->increment_refcount();
1113             goto bail_out;
1114          }
1115       }
1116    }
1117    Dmsg0(100, "db_init_database first time\n");
1118    mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password,
1119                              db_address, db_port, db_socket,
1120                              mult_db_connections, disable_batch_insert));
1121
1122 bail_out:
1123    V(mutex);
1124    return mdb;
1125 }
1126
1127 #endif /* HAVE_POSTGRESQL */