]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Implement db_big_sql_query() that uses cursor on PostgreSQL and limit memory usage...
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
36  */
37
38 #include "bacula.h"
39
40 #ifdef HAVE_POSTGRESQL
41
42 #include "cats.h"
43 #include "bdb_priv.h"
44 #include "libpq-fe.h"
45 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
48
49 /* -----------------------------------------------------------------------
50  *
51  *   PostgreSQL dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /*
57  * List of open databases
58  */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64                                  const char *db_driver,
65                                  const char *db_name,
66                                  const char *db_user,
67                                  const char *db_password,
68                                  const char *db_address,
69                                  int db_port, 
70                                  const char *db_socket,
71                                  bool mult_db_connections,
72                                  bool disable_batch_insert)
73 {
74    /*
75     * Initialize the parent class members.
76     */
77    m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78    m_db_type = SQL_TYPE_POSTGRESQL;
79    m_db_driver = bstrdup("PostgreSQL");
80    m_db_name = bstrdup(db_name);
81    m_db_user = bstrdup(db_user);
82    if (db_password) {
83       m_db_password = bstrdup(db_password);
84    }
85    if (db_address) {
86       m_db_address = bstrdup(db_address);
87    }
88    if (db_socket) {
89       m_db_socket = bstrdup(db_socket);
90    }
91    m_db_port = db_port;
92    if (disable_batch_insert) {
93       m_disabled_batch_insert = true;
94       m_have_batch_insert = false;
95    } else {
96       m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100       m_have_batch_insert = PQisthreadsafe();
101 #else
102       m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
104 #else
105       m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
107 #else
108       m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
110    }
111    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
112    *errmsg = 0;
113    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114    cached_path = get_pool_memory(PM_FNAME);
115    cached_path_id = 0;
116    m_ref_count = 1;
117    fname = get_pool_memory(PM_FNAME);
118    path = get_pool_memory(PM_FNAME);
119    esc_name = get_pool_memory(PM_FNAME);
120    esc_path = get_pool_memory(PM_FNAME);
121    m_buf =  get_pool_memory(PM_FNAME);
122    m_allow_transactions = mult_db_connections;
123
124    /*
125     * Initialize the private members.
126     */
127    m_db_handle = NULL;
128    m_result = NULL;
129
130    /*
131     * Put the db in the list.
132     */
133    if (db_list == NULL) {
134       db_list = New(dlist(this, &this->m_link));
135    }
136    db_list->append(this);
137 }
138
139 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
140 {
141 }
142
143 /*
144  * Check that the database correspond to the encoding we want
145  */
146 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
147 {
148    SQL_ROW row;
149    int ret = false;
150
151    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
152       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
153       return false;
154    }
155
156    if ((row = mdb->sql_fetch_row()) == NULL) {
157       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
158       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
159    } else {
160       ret = bstrcmp(row[0], "SQL_ASCII");
161
162       if (ret) {
163          /*
164           * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
165           */
166          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
167
168       } else {
169          /*
170           * Something is wrong with database encoding
171           */
172          Mmsg(mdb->errmsg, 
173               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
174               mdb->get_db_name(), row[0]);
175          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
176          Dmsg1(50, "%s", mdb->errmsg);
177       } 
178    }
179    return ret;
180 }
181
182 /*
183  * Now actually open the database.  This can generate errors,
184  *   which are returned in the errmsg
185  *
186  * DO NOT close the database or delete mdb here !!!!
187  */
188 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
189 {
190    bool retval = false;
191    int errstat;
192    char buf[10], *port;
193
194    P(mutex);
195    if (m_connected) {
196       retval = true;
197       goto bail_out;
198    }
199
200    if ((errstat=rwl_init(&m_lock)) != 0) {
201       berrno be;
202       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
203             be.bstrerror(errstat));
204       goto bail_out;
205    }
206
207    if (m_db_port) {
208       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
209       port = buf;
210    } else {
211       port = NULL;
212    }
213
214    /* If connection fails, try at 5 sec intervals for 30 seconds. */
215    for (int retry=0; retry < 6; retry++) {
216       /* connect to the database */
217       m_db_handle = PQsetdbLogin(
218            m_db_address,         /* default = localhost */
219            port,                 /* default port */
220            NULL,                 /* pg options */
221            NULL,                 /* tty, ignored */
222            m_db_name,            /* database name */
223            m_db_user,            /* login name */
224            m_db_password);       /* password */
225
226       /* If no connect, try once more in case it is a timing problem */
227       if (PQstatus(m_db_handle) == CONNECTION_OK) {
228          break;
229       }
230       bmicrosleep(5, 0);
231    }
232
233    Dmsg0(50, "pg_real_connect done\n");
234    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
235         (m_db_password == NULL) ? "(NULL)" : m_db_password);
236
237    if (PQstatus(m_db_handle) != CONNECTION_OK) {
238       Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
239          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
240          m_db_name, m_db_user);
241       goto bail_out;
242    }
243
244    m_connected = true;
245    if (!check_tables_version(jcr, this)) {
246       goto bail_out;
247    }
248
249    sql_query("SET datestyle TO 'ISO, YMD'");
250    sql_query("SET cursor_tuple_fraction=1");
251    
252    /*
253     * Tell PostgreSQL we are using standard conforming strings
254     * and avoid warnings such as:
255     *  WARNING:  nonstandard use of \\ in a string literal
256     */
257    sql_query("SET standard_conforming_strings=on");
258
259    /*
260     * Check that encoding is SQL_ASCII
261     */
262    pgsql_check_database_encoding(jcr, this);
263
264    retval = true;
265
266 bail_out:
267    V(mutex);
268    return retval;
269 }
270
271 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
272 {
273    db_end_transaction(jcr);
274    P(mutex);
275    sql_free_result();
276    m_ref_count--;
277    if (m_ref_count == 0) {
278       db_list->remove(this);
279       if (m_connected && m_db_handle) {
280          PQfinish(m_db_handle);
281       }
282       rwl_destroy(&m_lock);
283       free_pool_memory(errmsg);
284       free_pool_memory(cmd);
285       free_pool_memory(cached_path);
286       free_pool_memory(fname);
287       free_pool_memory(path);
288       free_pool_memory(esc_name);
289       free_pool_memory(esc_path);
290       free_pool_memory(m_buf);
291       if (m_db_driver) {
292          free(m_db_driver);
293       }
294       if (m_db_name) {
295          free(m_db_name);
296       }
297       if (m_db_user) {
298          free(m_db_user);
299       }
300       if (m_db_password) {
301          free(m_db_password);
302       }
303       if (m_db_address) {
304          free(m_db_address);
305       }
306       if (m_db_socket) {
307          free(m_db_socket);
308       }
309       if (esc_obj) {
310          PQfreemem(esc_obj);
311       }
312       delete this;
313       if (db_list->size() == 0) {
314          delete db_list;
315          db_list = NULL;
316       }
317    }
318    V(mutex);
319 }
320
321 void B_DB_POSTGRESQL::db_thread_cleanup(void)
322 {
323 }
324
325 /*
326  * Escape strings so that PostgreSQL is happy
327  *
328  *   NOTE! len is the length of the old string. Your new
329  *         string must be long enough (max 2*old+1) to hold
330  *         the escaped output.
331  */
332 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
333 {
334    int error;
335   
336    PQescapeStringConn(m_db_handle, snew, old, len, &error);
337    if (error) {
338       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
339       /* error on encoding, probably invalid multibyte encoding in the source string
340         see PQescapeStringConn documentation for details. */
341       Dmsg0(500, "PQescapeStringConn failed\n");
342    }
343 }
344
345 /*
346  * Escape binary so that PostgreSQL is happy
347  *
348  */
349 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
350 {
351    size_t new_len;
352    unsigned char *obj;
353
354    obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
355    if (!obj) {
356       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
357    }
358
359    esc_obj = check_pool_memory_size(esc_obj, new_len+1);
360    memcpy(esc_obj, obj, new_len);
361    esc_obj[new_len]=0;
362
363    PQfreemem(obj);
364
365    return (char *)esc_obj;
366 }
367
368 /*
369  * Unescape binary object so that PostgreSQL is happy
370  *
371  */
372 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
373                                          POOLMEM **dest, int32_t *dest_len)
374 {
375    size_t new_len;
376    unsigned char *obj;
377
378    if (!from) {
379       *dest[0] = 0;
380       *dest_len = 0;
381       return;
382    }
383
384    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
385
386    if (!obj) {
387       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
388    }
389
390    *dest_len = new_len;
391    *dest = check_pool_memory_size(*dest, new_len+1);
392    memcpy(*dest, obj, new_len);
393    (*dest)[new_len]=0;
394    
395    PQfreemem(obj);
396
397    Dmsg1(010, "obj size: %d\n", *dest_len);
398 }
399
400 /*
401  * Start a transaction. This groups inserts and makes things
402  * much more efficient. Usually started when inserting
403  * file attributes.
404  */
405 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
406 {
407    if (!jcr->attr) {
408       jcr->attr = get_pool_memory(PM_FNAME);
409    }
410    if (!jcr->ar) {
411       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
412    }
413
414    /*
415     * This is turned off because transactions break
416     * if multiple simultaneous jobs are run.
417     */
418    if (!m_allow_transactions) {
419       return;
420    }
421
422    db_lock(this);
423    /*
424     * Allow only 25,000 changes per transaction
425     */
426    if (m_transaction && changes > 25000) {
427       db_end_transaction(jcr);
428    }
429    if (!m_transaction) {
430       sql_query("BEGIN");  /* begin transaction */
431       Dmsg0(400, "Start PosgreSQL transaction\n");
432       m_transaction = true;
433    }
434    db_unlock(this);
435 }
436
437 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
438 {
439    if (jcr && jcr->cached_attribute) {
440       Dmsg0(400, "Flush last cached attribute.\n");
441       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
442          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
443       }
444       jcr->cached_attribute = false;
445    }
446
447    if (!m_allow_transactions) {
448       return;
449    }
450
451    db_lock(this);
452    if (m_transaction) {
453       sql_query("COMMIT"); /* end transaction */
454       m_transaction = false;
455       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
456    }
457    changes = 0;
458    db_unlock(this);
459 }
460
461
462 /*
463  * Submit a general SQL command (cmd), and for each row returned,
464  * the result_handler is called with the ctx.
465  */
466 bool B_DB_POSTGRESQL::db_big_sql_query(const char *query, 
467                                        DB_RESULT_HANDLER *result_handler, 
468                                        void *ctx)
469 {
470    SQL_ROW row;
471    bool retval = false;
472    bool in_transaction = m_transaction;
473    
474    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
475
476    /* This code handles only SELECT queries */
477    if (strncasecmp(query, "SELECT", 6) != 0) {
478       return db_sql_query(query, result_handler, ctx);
479    }
480
481    db_lock(this);
482
483    if (!result_handler) {       /* no need of big_query without handler */
484       goto bail_out;
485    }
486
487    if (!in_transaction) {       /* CURSOR needs transaction */
488       sql_query("BEGIN");
489    }
490
491    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
492
493    if (!sql_query(m_buf)) {
494       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
495       Dmsg0(50, "db_sql_query failed\n");
496       goto bail_out;
497    }
498
499    do {
500       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
501          goto bail_out;
502       }
503       while ((row = sql_fetch_row()) != NULL) {
504          Dmsg1(500, "Fetching %d rows\n", m_num_rows);
505          if (result_handler(ctx, m_num_fields, row))
506             break;
507       }
508       PQclear(m_result);
509       m_result = NULL;
510       
511    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
512
513    sql_free_result();
514
515    if (!in_transaction) {
516       sql_query("COMMIT");  /* end transaction */
517    }
518
519    Dmsg0(500, "db_big_sql_query finished\n");
520    retval = true;
521
522 bail_out:
523    db_unlock(this);
524    return retval;
525 }
526
527 /*
528  * Submit a general SQL command (cmd), and for each row returned,
529  * the result_handler is called with the ctx.
530  */
531 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
532 {
533    SQL_ROW row;
534    bool retval = true;
535
536    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
537
538    db_lock(this);
539    if (!sql_query(query, QF_STORE_RESULT)) {
540       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
541       Dmsg0(500, "db_sql_query failed\n");
542       retval = false;
543       goto bail_out;
544    }
545
546    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
547
548    if (result_handler != NULL) {
549       Dmsg0(500, "db_sql_query invoking handler\n");
550       while ((row = sql_fetch_row()) != NULL) {
551          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
552          if (result_handler(ctx, m_num_fields, row))
553             break;
554       }
555       sql_free_result();
556    }
557
558    Dmsg0(500, "db_sql_query finished\n");
559
560 bail_out:
561    db_unlock(this);
562    return retval;
563 }
564
565 /*
566  * Note, if this routine returns false (failure), Bacula expects
567  * that no result has been stored.
568  * This is where QUERY_DB comes with Postgresql.
569  *
570  *  Returns:  true  on success
571  *            false on failure
572  *
573  */
574 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
575 {
576    int i;
577    bool retval = false;
578
579    Dmsg1(500, "sql_query starts with '%s'\n", query);
580    /*
581     * We are starting a new query. reset everything.
582     */
583    m_num_rows     = -1;
584    m_row_number   = -1;
585    m_field_number = -1;
586
587    if (m_result) {
588       PQclear(m_result);  /* hmm, someone forgot to free?? */
589       m_result = NULL;
590    }
591
592    for (i = 0; i < 10; i++) {
593       m_result = PQexec(m_db_handle, query);
594       if (m_result) {
595          break;
596       }
597       bmicrosleep(5, 0);
598    }
599    if (!m_result) {
600       Dmsg1(50, "Query failed: %s\n", query);
601       goto bail_out;
602    }
603
604    m_status = PQresultStatus(m_result);
605    if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
606       Dmsg0(500, "we have a result\n");
607
608       /*
609        * How many fields in the set?
610        */
611       m_num_fields = (int)PQnfields(m_result);
612       Dmsg1(500, "we have %d fields\n", m_num_fields);
613
614       m_num_rows = PQntuples(m_result);
615       Dmsg1(500, "we have %d rows\n", m_num_rows);
616
617       m_row_number = 0;      /* we can start to fetch something */
618       m_status = 0;          /* succeed */
619       retval = true;
620    } else {
621       Dmsg1(50, "Result status failed: %s\n", query);
622       goto bail_out;
623    }
624
625    Dmsg0(500, "sql_query finishing\n");
626    goto ok_out;
627
628 bail_out:
629    Dmsg0(500, "we failed\n");
630    PQclear(m_result);
631    m_result = NULL;
632    m_status = 1;                   /* failed */
633
634 ok_out:
635    return retval;
636 }
637
638 void B_DB_POSTGRESQL::sql_free_result(void)
639 {
640    db_lock(this);
641    if (m_result) {
642       PQclear(m_result);
643       m_result = NULL;
644    }
645    if (m_rows) {
646       free(m_rows);
647       m_rows = NULL;
648    }
649    if (m_fields) {
650       free(m_fields);
651       m_fields = NULL;
652    }
653    m_num_rows = m_num_fields = 0;
654    db_unlock(this);
655 }
656
657 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
658 {
659    int j;
660    SQL_ROW row = NULL; /* by default, return NULL */
661
662    Dmsg0(500, "sql_fetch_row start\n");
663
664    if (m_num_fields == 0) {     /* No field, no row */
665       Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
666       return NULL;
667    }
668
669    if (!m_rows || m_rows_size < m_num_fields) {
670       if (m_rows) {
671          Dmsg0(500, "sql_fetch_row freeing space\n");
672          free(m_rows);
673       }
674       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
675       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
676       m_rows_size = m_num_fields;
677
678       /*
679        * Now reset the row_number now that we have the space allocated
680        */
681       m_row_number = 0;
682    }
683
684    /*
685     * If still within the result set
686     */
687    if (m_row_number >= 0 && m_row_number < m_num_rows) {
688       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
689       /*
690        * Get each value from this row
691        */
692       for (j = 0; j < m_num_fields; j++) {
693          m_rows[j] = PQgetvalue(m_result, m_row_number, j);
694          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
695       }
696       /*
697        * Increment the row number for the next call
698        */
699       m_row_number++;
700       row = m_rows;
701    } else {
702       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
703    }
704
705    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
706
707    return row;
708 }
709
710 const char *B_DB_POSTGRESQL::sql_strerror(void)
711 {
712    return PQerrorMessage(m_db_handle);
713 }
714
715 void B_DB_POSTGRESQL::sql_data_seek(int row)
716 {
717    /*
718     * Set the row number to be returned on the next call to sql_fetch_row
719     */
720    m_row_number = row;
721 }
722
723 int B_DB_POSTGRESQL::sql_affected_rows(void)
724 {
725    return (unsigned) str_to_int32(PQcmdTuples(m_result));
726 }
727
728 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
729 {
730    int i;
731    uint64_t id = 0;
732    char sequence[NAMEDATALEN-1];
733    char getkeyval_query[NAMEDATALEN+50];
734    PGresult *pg_result;
735
736    /*
737     * First execute the insert query and then retrieve the currval.
738     */
739    if (!sql_query(query)) {
740       return 0;
741    }
742
743    m_num_rows = sql_affected_rows();
744    if (m_num_rows != 1) {
745       return 0;
746    }
747
748    changes++;
749
750    /*
751     * Obtain the current value of the sequence that
752     * provides the serial value for primary key of the table.
753     *
754     * currval is local to our session.  It is not affected by
755     * other transactions.
756     *
757     * Determine the name of the sequence.
758     * PostgreSQL automatically creates a sequence using
759     * <table>_<column>_seq.
760     * At the time of writing, all tables used this format for
761     * for their primary key: <table>id
762     * Except for basefiles which has a primary key on baseid.
763     * Therefore, we need to special case that one table.
764     *
765     * everything else can use the PostgreSQL formula.
766     */
767    if (strcasecmp(table_name, "basefiles") == 0) {
768       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
769    } else {
770       bstrncpy(sequence, table_name, sizeof(sequence));
771       bstrncat(sequence, "_",        sizeof(sequence));
772       bstrncat(sequence, table_name, sizeof(sequence));
773       bstrncat(sequence, "id",       sizeof(sequence));
774    }
775
776    bstrncat(sequence, "_seq", sizeof(sequence));
777    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
778
779    Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
780    for (i = 0; i < 10; i++) {
781       pg_result = PQexec(m_db_handle, getkeyval_query);
782       if (pg_result) {
783          break;
784       }
785       bmicrosleep(5, 0);
786    }
787    if (!pg_result) {
788       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
789       goto bail_out;
790    }
791
792    Dmsg0(500, "exec done");
793
794    if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
795       Dmsg0(500, "getting value");
796       id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
797       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
798    } else {
799       Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
800       Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
801    }
802
803 bail_out:
804    PQclear(pg_result);
805
806    return id;
807 }
808
809 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
810 {
811    int i, j;
812    int max_length;
813    int this_length;
814
815    Dmsg0(500, "sql_fetch_field starts\n");
816
817    if (!m_fields || m_fields_size < m_num_fields) {
818       if (m_fields) {
819          free(m_fields);
820          m_fields = NULL;
821       }
822       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
823       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
824       m_fields_size = m_num_fields;
825
826       for (i = 0; i < m_num_fields; i++) {
827          Dmsg1(500, "filling field %d\n", i);
828          m_fields[i].name = PQfname(m_result, i);
829          m_fields[i].type = PQftype(m_result, i);
830          m_fields[i].flags = 0;
831
832          /*
833           * For a given column, find the max length.
834           */
835          max_length = 0;
836          for (j = 0; j < m_num_rows; j++) {
837             if (PQgetisnull(m_result, j, i)) {
838                 this_length = 4;        /* "NULL" */
839             } else {
840                 this_length = cstrlen(PQgetvalue(m_result, j, i));
841             }
842          
843             if (max_length < this_length) {
844                max_length = this_length;
845             }
846          }
847          m_fields[i].max_length = max_length;
848
849          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
850                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
851       }
852    }
853
854    /*
855     * Increment field number for the next time around
856     */
857    return &m_fields[m_field_number++];
858 }
859
860 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
861 {
862    switch (field_type) {
863    case 1:
864       return true;
865    default:
866       return false;
867    }
868 }
869
870 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
871 {
872    /*
873     * TEMP: the following is taken from select OID, typname from pg_type;
874     */
875    switch (field_type) {
876    case 20:
877    case 21:
878    case 23:
879    case 700:
880    case 701:
881       return true;
882    default:
883       return false;
884    }
885 }
886
887 /*
888  * Escape strings so that PostgreSQL is happy on COPY
889  *
890  *   NOTE! len is the length of the old string. Your new
891  *         string must be long enough (max 2*old+1) to hold
892  *         the escaped output.
893  */
894 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
895 {
896    /* we have to escape \t, \n, \r, \ */
897    char c = '\0' ;
898
899    while (len > 0 && *src) {
900       switch (*src) {
901       case '\n':
902          c = 'n';
903          break;
904       case '\\':
905          c = '\\';
906          break;
907       case '\t':
908          c = 't';
909          break;
910       case '\r':
911          c = 'r';
912          break;
913       default:
914          c = '\0' ;
915       }
916
917       if (c) {
918          *dest = '\\';
919          dest++;
920          *dest = c;
921       } else {
922          *dest = *src;
923       }
924
925       len--;
926       src++;
927       dest++;
928    }
929
930    *dest = '\0';
931    return dest;
932 }
933
934 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
935 {
936    const char *query = "COPY batch FROM STDIN";
937
938    Dmsg0(500, "sql_batch_start started\n");
939
940    if (!sql_query("CREATE TEMPORARY TABLE batch ("
941                           "fileindex int,"
942                           "jobid int,"
943                           "path varchar,"
944                           "name varchar,"
945                           "lstat varchar,"
946                           "md5 varchar,"
947                           "markid int)")) {
948       Dmsg0(500, "sql_batch_start failed\n");
949       return false;
950    }
951    
952    /*
953     * We are starting a new query.  reset everything.
954     */
955    m_num_rows     = -1;
956    m_row_number   = -1;
957    m_field_number = -1;
958
959    sql_free_result();
960
961    for (int i=0; i < 10; i++) {
962       m_result = PQexec(m_db_handle, query);
963       if (m_result) {
964          break;
965       }
966       bmicrosleep(5, 0);
967    }
968    if (!m_result) {
969       Dmsg1(50, "Query failed: %s\n", query);
970       goto bail_out;
971    }
972
973    m_status = PQresultStatus(m_result);
974    if (m_status == PGRES_COPY_IN) {
975       /*
976        * How many fields in the set?
977        */
978       m_num_fields = (int) PQnfields(m_result);
979       m_num_rows = 0;
980       m_status = 1;
981    } else {
982       Dmsg1(50, "Result status failed: %s\n", query);
983       goto bail_out;
984    }
985
986    Dmsg0(500, "sql_batch_start finishing\n");
987
988    return true;
989
990 bail_out:
991    Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
992    m_status = 0;
993    PQclear(m_result);
994    m_result = NULL;
995    return false;
996 }
997
998 /*
999  * Set error to something to abort operation
1000  */
1001 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1002 {
1003    int res;
1004    int count=30;
1005    PGresult *pg_result;
1006
1007    Dmsg0(500, "sql_batch_end started\n");
1008
1009    do { 
1010       res = PQputCopyEnd(m_db_handle, error);
1011    } while (res == 0 && --count > 0);
1012
1013    if (res == 1) {
1014       Dmsg0(500, "ok\n");
1015       m_status = 1;
1016    }
1017    
1018    if (res <= 0) {
1019       Dmsg0(500, "we failed\n");
1020       m_status = 0;
1021       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1022       Dmsg1(500, "failure %s\n", errmsg);
1023    }
1024
1025    /* Check command status and return to normal libpq state */
1026    pg_result = PQgetResult(m_db_handle);
1027    if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
1028       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1029       m_status = 0;
1030    }
1031    PQclear(pg_result); 
1032
1033    Dmsg0(500, "sql_batch_end finishing\n");
1034
1035    return true;
1036 }
1037
1038 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1039 {
1040    int res;
1041    int count=30;
1042    size_t len;
1043    const char *digest;
1044    char ed1[50];
1045
1046    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1047    pgsql_copy_escape(esc_name, fname, fnl);
1048
1049    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1050    pgsql_copy_escape(esc_path, path, pnl);
1051
1052    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1053       digest = "0";
1054    } else {
1055       digest = ar->Digest;
1056    }
1057
1058    len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n", 
1059               ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path, 
1060               esc_name, ar->attr, digest, ar->DeltaSeq);
1061
1062    do { 
1063       res = PQputCopyData(m_db_handle, cmd, len);
1064    } while (res == 0 && --count > 0);
1065
1066    if (res == 1) {
1067       Dmsg0(500, "ok\n");
1068       changes++;
1069       m_status = 1;
1070    }
1071
1072    if (res <= 0) {
1073       Dmsg0(500, "we failed\n");
1074       m_status = 0;
1075       Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1076       Dmsg1(500, "failure %s\n", errmsg);
1077    }
1078
1079    Dmsg0(500, "sql_batch_insert finishing\n");
1080
1081    return true;
1082 }
1083
1084 /*
1085  * Initialize database data structure. In principal this should
1086  * never have errors, or it is really fatal.
1087  */
1088 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, 
1089                        const char *db_user, const char *db_password, 
1090                        const char *db_address, int db_port, 
1091                        const char *db_socket, bool mult_db_connections, 
1092                        bool disable_batch_insert)
1093 {
1094    B_DB_POSTGRESQL *mdb = NULL;
1095
1096    if (!db_user) {
1097       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1098       return NULL;
1099    }
1100    P(mutex);                          /* lock DB queue */
1101    if (db_list && !mult_db_connections) {
1102       /*
1103        * Look to see if DB already open
1104        */
1105       foreach_dlist(mdb, db_list) {
1106          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1107             Dmsg1(100, "DB REopen %s\n", db_name);
1108             mdb->increment_refcount();
1109             goto bail_out;
1110          }
1111       }
1112    }
1113    Dmsg0(100, "db_init_database first time\n");
1114    mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password, 
1115                              db_address, db_port, db_socket, 
1116                              mult_db_connections, disable_batch_insert));
1117
1118 bail_out:
1119    V(mutex);
1120    return mdb;
1121 }
1122
1123 #endif /* HAVE_POSTGRESQL */