]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Add simple way to add string elements to db_list_ctx
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
36  */
37
38 #include "bacula.h"
39
40 #ifdef HAVE_POSTGRESQL
41
42 #include "cats.h"
43 #include "bdb_priv.h"
44 #include "libpq-fe.h"
45 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
48
49 /* -----------------------------------------------------------------------
50  *
51  *   PostgreSQL dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /*
57  * List of open databases
58  */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64                                  const char *db_driver,
65                                  const char *db_name,
66                                  const char *db_user,
67                                  const char *db_password,
68                                  const char *db_address,
69                                  int db_port, 
70                                  const char *db_socket,
71                                  bool mult_db_connections,
72                                  bool disable_batch_insert)
73 {
74    /*
75     * Initialize the parent class members.
76     */
77    m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78    m_db_type = SQL_TYPE_POSTGRESQL;
79    m_db_driver = bstrdup("PostgreSQL");
80    m_db_name = bstrdup(db_name);
81    m_db_user = bstrdup(db_user);
82    if (db_password) {
83       m_db_password = bstrdup(db_password);
84    }
85    if (db_address) {
86       m_db_address = bstrdup(db_address);
87    }
88    if (db_socket) {
89       m_db_socket = bstrdup(db_socket);
90    }
91    m_db_port = db_port;
92    if (disable_batch_insert) {
93       m_disabled_batch_insert = true;
94       m_have_batch_insert = false;
95    } else {
96       m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100       m_have_batch_insert = PQisthreadsafe();
101 #else
102       m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
104 #else
105       m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
107 #else
108       m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
110    }
111    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
112    *errmsg = 0;
113    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114    cached_path = get_pool_memory(PM_FNAME);
115    cached_path_id = 0;
116    m_ref_count = 1;
117    fname = get_pool_memory(PM_FNAME);
118    path = get_pool_memory(PM_FNAME);
119    esc_name = get_pool_memory(PM_FNAME);
120    esc_path = get_pool_memory(PM_FNAME);
121    m_buf =  get_pool_memory(PM_FNAME);
122    m_allow_transactions = mult_db_connections;
123
124    /*
125     * Initialize the private members.
126     */
127    m_db_handle = NULL;
128    m_result = NULL;
129
130    /*
131     * Put the db in the list.
132     */
133    if (db_list == NULL) {
134       db_list = New(dlist(this, &this->m_link));
135    }
136    db_list->append(this);
137 }
138
139 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
140 {
141 }
142
143 /*
144  * Check that the database correspond to the encoding we want
145  */
146 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
147 {
148    SQL_ROW row;
149    int ret = false;
150
151    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
152       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
153       return false;
154    }
155
156    if ((row = mdb->sql_fetch_row()) == NULL) {
157       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
158       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
159    } else {
160       ret = bstrcmp(row[0], "SQL_ASCII");
161
162       if (ret) {
163          /*
164           * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
165           */
166          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
167
168       } else {
169          /*
170           * Something is wrong with database encoding
171           */
172          Mmsg(mdb->errmsg, 
173               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
174               mdb->get_db_name(), row[0]);
175          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
176          Dmsg1(50, "%s", mdb->errmsg);
177       } 
178    }
179    return ret;
180 }
181
182 /*
183  * Now actually open the database.  This can generate errors,
184  *   which are returned in the errmsg
185  *
186  * DO NOT close the database or delete mdb here !!!!
187  */
188 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
189 {
190    bool retval = false;
191    int errstat;
192    char buf[10], *port;
193
194    P(mutex);
195    if (m_connected) {
196       retval = true;
197       goto bail_out;
198    }
199
200    if ((errstat=rwl_init(&m_lock)) != 0) {
201       berrno be;
202       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
203             be.bstrerror(errstat));
204       goto bail_out;
205    }
206
207    if (m_db_port) {
208       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
209       port = buf;
210    } else {
211       port = NULL;
212    }
213
214    /* If connection fails, try at 5 sec intervals for 30 seconds. */
215    for (int retry=0; retry < 6; retry++) {
216       /* connect to the database */
217       m_db_handle = PQsetdbLogin(
218            m_db_address,         /* default = localhost */
219            port,                 /* default port */
220            NULL,                 /* pg options */
221            NULL,                 /* tty, ignored */
222            m_db_name,            /* database name */
223            m_db_user,            /* login name */
224            m_db_password);       /* password */
225
226       /* If no connect, try once more in case it is a timing problem */
227       if (PQstatus(m_db_handle) == CONNECTION_OK) {
228          break;
229       }
230       bmicrosleep(5, 0);
231    }
232
233    Dmsg0(50, "pg_real_connect done\n");
234    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
235         (m_db_password == NULL) ? "(NULL)" : m_db_password);
236
237    if (PQstatus(m_db_handle) != CONNECTION_OK) {
238       Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
239          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
240          m_db_name, m_db_user);
241       goto bail_out;
242    }
243
244    m_connected = true;
245    if (!check_tables_version(jcr, this)) {
246       goto bail_out;
247    }
248
249    sql_query("SET datestyle TO 'ISO, YMD'");
250    sql_query("SET cursor_tuple_fraction=1");
251    
252    /*
253     * Tell PostgreSQL we are using standard conforming strings
254     * and avoid warnings such as:
255     *  WARNING:  nonstandard use of \\ in a string literal
256     */
257    sql_query("SET standard_conforming_strings=on");
258
259    /*
260     * Check that encoding is SQL_ASCII
261     */
262    pgsql_check_database_encoding(jcr, this);
263
264    retval = true;
265
266 bail_out:
267    V(mutex);
268    return retval;
269 }
270
271 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
272 {
273    db_end_transaction(jcr);
274    P(mutex);
275    sql_free_result();
276    m_ref_count--;
277    if (m_ref_count == 0) {
278       db_list->remove(this);
279       if (m_connected && m_db_handle) {
280          PQfinish(m_db_handle);
281       }
282       rwl_destroy(&m_lock);
283       free_pool_memory(errmsg);
284       free_pool_memory(cmd);
285       free_pool_memory(cached_path);
286       free_pool_memory(fname);
287       free_pool_memory(path);
288       free_pool_memory(esc_name);
289       free_pool_memory(esc_path);
290       free_pool_memory(m_buf);
291       if (m_db_driver) {
292          free(m_db_driver);
293       }
294       if (m_db_name) {
295          free(m_db_name);
296       }
297       if (m_db_user) {
298          free(m_db_user);
299       }
300       if (m_db_password) {
301          free(m_db_password);
302       }
303       if (m_db_address) {
304          free(m_db_address);
305       }
306       if (m_db_socket) {
307          free(m_db_socket);
308       }
309       if (esc_obj) {
310          PQfreemem(esc_obj);
311       }
312       delete this;
313       if (db_list->size() == 0) {
314          delete db_list;
315          db_list = NULL;
316       }
317    }
318    V(mutex);
319 }
320
321 void B_DB_POSTGRESQL::db_thread_cleanup(void)
322 {
323 }
324
325 /*
326  * Escape strings so that PostgreSQL is happy
327  *
328  *   NOTE! len is the length of the old string. Your new
329  *         string must be long enough (max 2*old+1) to hold
330  *         the escaped output.
331  */
332 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
333 {
334    int error;
335   
336    PQescapeStringConn(m_db_handle, snew, old, len, &error);
337    if (error) {
338       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
339       /* error on encoding, probably invalid multibyte encoding in the source string
340         see PQescapeStringConn documentation for details. */
341       Dmsg0(500, "PQescapeStringConn failed\n");
342    }
343 }
344
345 /*
346  * Escape binary so that PostgreSQL is happy
347  *
348  */
349 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
350 {
351    size_t new_len;
352    unsigned char *obj;
353
354    obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
355    if (!obj) {
356       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
357    }
358
359    esc_obj = check_pool_memory_size(esc_obj, new_len+1);
360    memcpy(esc_obj, obj, new_len);
361    esc_obj[new_len]=0;
362
363    PQfreemem(obj);
364
365    return (char *)esc_obj;
366 }
367
368 /*
369  * Unescape binary object so that PostgreSQL is happy
370  *
371  */
372 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
373                                          POOLMEM **dest, int32_t *dest_len)
374 {
375    size_t new_len;
376    unsigned char *obj;
377
378    if (!from) {
379       *dest[0] = 0;
380       *dest_len = 0;
381       return;
382    }
383
384    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
385
386    if (!obj) {
387       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
388    }
389
390    *dest_len = new_len;
391    *dest = check_pool_memory_size(*dest, new_len+1);
392    memcpy(*dest, obj, new_len);
393    (*dest)[new_len]=0;
394    
395    PQfreemem(obj);
396
397    Dmsg1(010, "obj size: %d\n", *dest_len);
398 }
399
400 /*
401  * Start a transaction. This groups inserts and makes things
402  * much more efficient. Usually started when inserting
403  * file attributes.
404  */
405 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
406 {
407    if (!jcr->attr) {
408       jcr->attr = get_pool_memory(PM_FNAME);
409    }
410    if (!jcr->ar) {
411       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
412    }
413
414    /*
415     * This is turned off because transactions break
416     * if multiple simultaneous jobs are run.
417     */
418    if (!m_allow_transactions) {
419       return;
420    }
421
422    db_lock(this);
423    /*
424     * Allow only 25,000 changes per transaction
425     */
426    if (m_transaction && changes > 25000) {
427       db_end_transaction(jcr);
428    }
429    if (!m_transaction) {
430       sql_query("BEGIN");  /* begin transaction */
431       Dmsg0(400, "Start PosgreSQL transaction\n");
432       m_transaction = true;
433    }
434    db_unlock(this);
435 }
436
437 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
438 {
439    if (jcr && jcr->cached_attribute) {
440       Dmsg0(400, "Flush last cached attribute.\n");
441       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
442          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
443       }
444       jcr->cached_attribute = false;
445    }
446
447    if (!m_allow_transactions) {
448       return;
449    }
450
451    db_lock(this);
452    if (m_transaction) {
453       sql_query("COMMIT"); /* end transaction */
454       m_transaction = false;
455       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
456    }
457    changes = 0;
458    db_unlock(this);
459 }
460
461
462 /*
463  * Submit a general SQL command (cmd), and for each row returned,
464  * the result_handler is called with the ctx.
465  */
466 bool B_DB_POSTGRESQL::db_big_sql_query(const char *query, 
467                                        DB_RESULT_HANDLER *result_handler, 
468                                        void *ctx)
469 {
470    SQL_ROW row;
471    bool retval = false;
472    bool in_transaction = m_transaction;
473    
474    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
475
476    /* This code handles only SELECT queries */
477    if (strncasecmp(query, "SELECT", 6) != 0) {
478       return db_sql_query(query, result_handler, ctx);
479    }
480
481    if (!result_handler) {       /* no need of big_query without handler */
482       return false;
483    }
484
485    db_lock(this);
486
487    if (!in_transaction) {       /* CURSOR needs transaction */
488       sql_query("BEGIN");
489    }
490
491    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
492
493    if (!sql_query(m_buf)) {
494       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
495       Dmsg0(50, "db_sql_query failed\n");
496       goto bail_out;
497    }
498
499    do {
500       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
501          goto bail_out;
502       }
503       while ((row = sql_fetch_row()) != NULL) {
504          Dmsg1(500, "Fetching %d rows\n", m_num_rows);
505          if (result_handler(ctx, m_num_fields, row))
506             break;
507       }
508       PQclear(m_result);
509       m_result = NULL;
510       
511    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
512
513    sql_query("CLOSE _bac_cursor");
514
515    Dmsg0(500, "db_big_sql_query finished\n");
516    sql_free_result();
517    retval = true;
518
519 bail_out:
520    if (!in_transaction) {
521       sql_query("COMMIT");  /* end transaction */
522    }
523
524    db_unlock(this);
525    return retval;
526 }
527
528 /*
529  * Submit a general SQL command (cmd), and for each row returned,
530  * the result_handler is called with the ctx.
531  */
532 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
533 {
534    SQL_ROW row;
535    bool retval = true;
536
537    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
538
539    db_lock(this);
540    if (!sql_query(query, QF_STORE_RESULT)) {
541       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
542       Dmsg0(500, "db_sql_query failed\n");
543       retval = false;
544       goto bail_out;
545    }
546
547    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
548
549    if (result_handler != NULL) {
550       Dmsg0(500, "db_sql_query invoking handler\n");
551       while ((row = sql_fetch_row()) != NULL) {
552          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
553          if (result_handler(ctx, m_num_fields, row))
554             break;
555       }
556       sql_free_result();
557    }
558
559    Dmsg0(500, "db_sql_query finished\n");
560
561 bail_out:
562    db_unlock(this);
563    return retval;
564 }
565
566 /*
567  * Note, if this routine returns false (failure), Bacula expects
568  * that no result has been stored.
569  * This is where QUERY_DB comes with Postgresql.
570  *
571  *  Returns:  true  on success
572  *            false on failure
573  *
574  */
575 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
576 {
577    int i;
578    bool retval = false;
579
580    Dmsg1(500, "sql_query starts with '%s'\n", query);
581    /*
582     * We are starting a new query. reset everything.
583     */
584    m_num_rows     = -1;
585    m_row_number   = -1;
586    m_field_number = -1;
587
588    if (m_result) {
589       PQclear(m_result);  /* hmm, someone forgot to free?? */
590       m_result = NULL;
591    }
592
593    for (i = 0; i < 10; i++) {
594       m_result = PQexec(m_db_handle, query);
595       if (m_result) {
596          break;
597       }
598       bmicrosleep(5, 0);
599    }
600    if (!m_result) {
601       Dmsg1(50, "Query failed: %s\n", query);
602       goto bail_out;
603    }
604
605    m_status = PQresultStatus(m_result);
606    if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
607       Dmsg0(500, "we have a result\n");
608
609       /*
610        * How many fields in the set?
611        */
612       m_num_fields = (int)PQnfields(m_result);
613       Dmsg1(500, "we have %d fields\n", m_num_fields);
614
615       m_num_rows = PQntuples(m_result);
616       Dmsg1(500, "we have %d rows\n", m_num_rows);
617
618       m_row_number = 0;      /* we can start to fetch something */
619       m_status = 0;          /* succeed */
620       retval = true;
621    } else {
622       Dmsg1(50, "Result status failed: %s\n", query);
623       goto bail_out;
624    }
625
626    Dmsg0(500, "sql_query finishing\n");
627    goto ok_out;
628
629 bail_out:
630    Dmsg0(500, "we failed\n");
631    PQclear(m_result);
632    m_result = NULL;
633    m_status = 1;                   /* failed */
634
635 ok_out:
636    return retval;
637 }
638
639 void B_DB_POSTGRESQL::sql_free_result(void)
640 {
641    db_lock(this);
642    if (m_result) {
643       PQclear(m_result);
644       m_result = NULL;
645    }
646    if (m_rows) {
647       free(m_rows);
648       m_rows = NULL;
649    }
650    if (m_fields) {
651       free(m_fields);
652       m_fields = NULL;
653    }
654    m_num_rows = m_num_fields = 0;
655    db_unlock(this);
656 }
657
658 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
659 {
660    int j;
661    SQL_ROW row = NULL; /* by default, return NULL */
662
663    Dmsg0(500, "sql_fetch_row start\n");
664
665    if (m_num_fields == 0) {     /* No field, no row */
666       Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
667       return NULL;
668    }
669
670    if (!m_rows || m_rows_size < m_num_fields) {
671       if (m_rows) {
672          Dmsg0(500, "sql_fetch_row freeing space\n");
673          free(m_rows);
674       }
675       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
676       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
677       m_rows_size = m_num_fields;
678
679       /*
680        * Now reset the row_number now that we have the space allocated
681        */
682       m_row_number = 0;
683    }
684
685    /*
686     * If still within the result set
687     */
688    if (m_row_number >= 0 && m_row_number < m_num_rows) {
689       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
690       /*
691        * Get each value from this row
692        */
693       for (j = 0; j < m_num_fields; j++) {
694          m_rows[j] = PQgetvalue(m_result, m_row_number, j);
695          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
696       }
697       /*
698        * Increment the row number for the next call
699        */
700       m_row_number++;
701       row = m_rows;
702    } else {
703       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
704    }
705
706    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
707
708    return row;
709 }
710
711 const char *B_DB_POSTGRESQL::sql_strerror(void)
712 {
713    return PQerrorMessage(m_db_handle);
714 }
715
716 void B_DB_POSTGRESQL::sql_data_seek(int row)
717 {
718    /*
719     * Set the row number to be returned on the next call to sql_fetch_row
720     */
721    m_row_number = row;
722 }
723
724 int B_DB_POSTGRESQL::sql_affected_rows(void)
725 {
726    return (unsigned) str_to_int32(PQcmdTuples(m_result));
727 }
728
729 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
730 {
731    int i;
732    uint64_t id = 0;
733    char sequence[NAMEDATALEN-1];
734    char getkeyval_query[NAMEDATALEN+50];
735    PGresult *pg_result;
736
737    /*
738     * First execute the insert query and then retrieve the currval.
739     */
740    if (!sql_query(query)) {
741       return 0;
742    }
743
744    m_num_rows = sql_affected_rows();
745    if (m_num_rows != 1) {
746       return 0;
747    }
748
749    changes++;
750
751    /*
752     * Obtain the current value of the sequence that
753     * provides the serial value for primary key of the table.
754     *
755     * currval is local to our session.  It is not affected by
756     * other transactions.
757     *
758     * Determine the name of the sequence.
759     * PostgreSQL automatically creates a sequence using
760     * <table>_<column>_seq.
761     * At the time of writing, all tables used this format for
762     * for their primary key: <table>id
763     * Except for basefiles which has a primary key on baseid.
764     * Therefore, we need to special case that one table.
765     *
766     * everything else can use the PostgreSQL formula.
767     */
768    if (strcasecmp(table_name, "basefiles") == 0) {
769       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
770    } else {
771       bstrncpy(sequence, table_name, sizeof(sequence));
772       bstrncat(sequence, "_",        sizeof(sequence));
773       bstrncat(sequence, table_name, sizeof(sequence));
774       bstrncat(sequence, "id",       sizeof(sequence));
775    }
776
777    bstrncat(sequence, "_seq", sizeof(sequence));
778    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
779
780    Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
781    for (i = 0; i < 10; i++) {
782       pg_result = PQexec(m_db_handle, getkeyval_query);
783       if (pg_result) {
784          break;
785       }
786       bmicrosleep(5, 0);
787    }
788    if (!pg_result) {
789       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
790       goto bail_out;
791    }
792
793    Dmsg0(500, "exec done");
794
795    if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
796       Dmsg0(500, "getting value");
797       id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
798       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
799    } else {
800       Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
801       Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
802    }
803
804 bail_out:
805    PQclear(pg_result);
806
807    return id;
808 }
809
810 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
811 {
812    int i, j;
813    int max_length;
814    int this_length;
815
816    Dmsg0(500, "sql_fetch_field starts\n");
817
818    if (!m_fields || m_fields_size < m_num_fields) {
819       if (m_fields) {
820          free(m_fields);
821          m_fields = NULL;
822       }
823       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
824       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
825       m_fields_size = m_num_fields;
826
827       for (i = 0; i < m_num_fields; i++) {
828          Dmsg1(500, "filling field %d\n", i);
829          m_fields[i].name = PQfname(m_result, i);
830          m_fields[i].type = PQftype(m_result, i);
831          m_fields[i].flags = 0;
832
833          /*
834           * For a given column, find the max length.
835           */
836          max_length = 0;
837          for (j = 0; j < m_num_rows; j++) {
838             if (PQgetisnull(m_result, j, i)) {
839                 this_length = 4;        /* "NULL" */
840             } else {
841                 this_length = cstrlen(PQgetvalue(m_result, j, i));
842             }
843          
844             if (max_length < this_length) {
845                max_length = this_length;
846             }
847          }
848          m_fields[i].max_length = max_length;
849
850          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
851                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
852       }
853    }
854
855    /*
856     * Increment field number for the next time around
857     */
858    return &m_fields[m_field_number++];
859 }
860
861 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
862 {
863    switch (field_type) {
864    case 1:
865       return true;
866    default:
867       return false;
868    }
869 }
870
871 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
872 {
873    /*
874     * TEMP: the following is taken from select OID, typname from pg_type;
875     */
876    switch (field_type) {
877    case 20:
878    case 21:
879    case 23:
880    case 700:
881    case 701:
882       return true;
883    default:
884       return false;
885    }
886 }
887
888 /*
889  * Escape strings so that PostgreSQL is happy on COPY
890  *
891  *   NOTE! len is the length of the old string. Your new
892  *         string must be long enough (max 2*old+1) to hold
893  *         the escaped output.
894  */
895 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
896 {
897    /* we have to escape \t, \n, \r, \ */
898    char c = '\0' ;
899
900    while (len > 0 && *src) {
901       switch (*src) {
902       case '\n':
903          c = 'n';
904          break;
905       case '\\':
906          c = '\\';
907          break;
908       case '\t':
909          c = 't';
910          break;
911       case '\r':
912          c = 'r';
913          break;
914       default:
915          c = '\0' ;
916       }
917
918       if (c) {
919          *dest = '\\';
920          dest++;
921          *dest = c;
922       } else {
923          *dest = *src;
924       }
925
926       len--;
927       src++;
928       dest++;
929    }
930
931    *dest = '\0';
932    return dest;
933 }
934
935 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
936 {
937    const char *query = "COPY batch FROM STDIN";
938
939    Dmsg0(500, "sql_batch_start started\n");
940
941    if (!sql_query("CREATE TEMPORARY TABLE batch ("
942                           "fileindex int,"
943                           "jobid int,"
944                           "path varchar,"
945                           "name varchar,"
946                           "lstat varchar,"
947                           "md5 varchar,"
948                           "markid int)")) {
949       Dmsg0(500, "sql_batch_start failed\n");
950       return false;
951    }
952    
953    /*
954     * We are starting a new query.  reset everything.
955     */
956    m_num_rows     = -1;
957    m_row_number   = -1;
958    m_field_number = -1;
959
960    sql_free_result();
961
962    for (int i=0; i < 10; i++) {
963       m_result = PQexec(m_db_handle, query);
964       if (m_result) {
965          break;
966       }
967       bmicrosleep(5, 0);
968    }
969    if (!m_result) {
970       Dmsg1(50, "Query failed: %s\n", query);
971       goto bail_out;
972    }
973
974    m_status = PQresultStatus(m_result);
975    if (m_status == PGRES_COPY_IN) {
976       /*
977        * How many fields in the set?
978        */
979       m_num_fields = (int) PQnfields(m_result);
980       m_num_rows = 0;
981       m_status = 1;
982    } else {
983       Dmsg1(50, "Result status failed: %s\n", query);
984       goto bail_out;
985    }
986
987    Dmsg0(500, "sql_batch_start finishing\n");
988
989    return true;
990
991 bail_out:
992    Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
993    m_status = 0;
994    PQclear(m_result);
995    m_result = NULL;
996    return false;
997 }
998
999 /*
1000  * Set error to something to abort operation
1001  */
1002 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1003 {
1004    int res;
1005    int count=30;
1006    PGresult *pg_result;
1007
1008    Dmsg0(500, "sql_batch_end started\n");
1009
1010    do { 
1011       res = PQputCopyEnd(m_db_handle, error);
1012    } while (res == 0 && --count > 0);
1013
1014    if (res == 1) {
1015       Dmsg0(500, "ok\n");
1016       m_status = 1;
1017    }
1018    
1019    if (res <= 0) {
1020       Dmsg0(500, "we failed\n");
1021       m_status = 0;
1022       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1023       Dmsg1(500, "failure %s\n", errmsg);
1024    }
1025
1026    /* Check command status and return to normal libpq state */
1027    pg_result = PQgetResult(m_db_handle);
1028    if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
1029       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1030       m_status = 0;
1031    }
1032    PQclear(pg_result); 
1033
1034    Dmsg0(500, "sql_batch_end finishing\n");
1035
1036    return true;
1037 }
1038
1039 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1040 {
1041    int res;
1042    int count=30;
1043    size_t len;
1044    const char *digest;
1045    char ed1[50];
1046
1047    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1048    pgsql_copy_escape(esc_name, fname, fnl);
1049
1050    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1051    pgsql_copy_escape(esc_path, path, pnl);
1052
1053    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1054       digest = "0";
1055    } else {
1056       digest = ar->Digest;
1057    }
1058
1059    len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n", 
1060               ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path, 
1061               esc_name, ar->attr, digest, ar->DeltaSeq);
1062
1063    do { 
1064       res = PQputCopyData(m_db_handle, cmd, len);
1065    } while (res == 0 && --count > 0);
1066
1067    if (res == 1) {
1068       Dmsg0(500, "ok\n");
1069       changes++;
1070       m_status = 1;
1071    }
1072
1073    if (res <= 0) {
1074       Dmsg0(500, "we failed\n");
1075       m_status = 0;
1076       Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1077       Dmsg1(500, "failure %s\n", errmsg);
1078    }
1079
1080    Dmsg0(500, "sql_batch_insert finishing\n");
1081
1082    return true;
1083 }
1084
1085 /*
1086  * Initialize database data structure. In principal this should
1087  * never have errors, or it is really fatal.
1088  */
1089 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, 
1090                        const char *db_user, const char *db_password, 
1091                        const char *db_address, int db_port, 
1092                        const char *db_socket, bool mult_db_connections, 
1093                        bool disable_batch_insert)
1094 {
1095    B_DB_POSTGRESQL *mdb = NULL;
1096
1097    if (!db_user) {
1098       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1099       return NULL;
1100    }
1101    P(mutex);                          /* lock DB queue */
1102    if (db_list && !mult_db_connections) {
1103       /*
1104        * Look to see if DB already open
1105        */
1106       foreach_dlist(mdb, db_list) {
1107          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1108             Dmsg1(100, "DB REopen %s\n", db_name);
1109             mdb->increment_refcount();
1110             goto bail_out;
1111          }
1112       }
1113    }
1114    Dmsg0(100, "db_init_database first time\n");
1115    mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password, 
1116                              db_address, db_port, db_socket, 
1117                              mult_db_connections, disable_batch_insert));
1118
1119 bail_out:
1120    V(mutex);
1121    return mdb;
1122 }
1123
1124 #endif /* HAVE_POSTGRESQL */