]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Add %D option to edit_job_code, simplify callbacks on director side
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
36  */
37
38 #include "bacula.h"
39
40 #ifdef HAVE_POSTGRESQL
41
42 #include "cats.h"
43 #include "bdb_priv.h"
44 #include "libpq-fe.h"
45 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
48
49 /* -----------------------------------------------------------------------
50  *
51  *   PostgreSQL dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /*
57  * List of open databases
58  */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64                                  const char *db_driver,
65                                  const char *db_name,
66                                  const char *db_user,
67                                  const char *db_password,
68                                  const char *db_address,
69                                  int db_port, 
70                                  const char *db_socket,
71                                  bool mult_db_connections,
72                                  bool disable_batch_insert)
73 {
74    /*
75     * Initialize the parent class members.
76     */
77    m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78    m_db_type = SQL_TYPE_POSTGRESQL;
79    m_db_driver = bstrdup("PostgreSQL");
80    m_db_name = bstrdup(db_name);
81    m_db_user = bstrdup(db_user);
82    if (db_password) {
83       m_db_password = bstrdup(db_password);
84    }
85    if (db_address) {
86       m_db_address = bstrdup(db_address);
87    }
88    if (db_socket) {
89       m_db_socket = bstrdup(db_socket);
90    }
91    m_db_port = db_port;
92    if (disable_batch_insert) {
93       m_disabled_batch_insert = true;
94       m_have_batch_insert = false;
95    } else {
96       m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100       m_have_batch_insert = PQisthreadsafe();
101 #else
102       m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
104 #else
105       m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
107 #else
108       m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
110    }
111    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
112    *errmsg = 0;
113    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114    cached_path = get_pool_memory(PM_FNAME);
115    cached_path_id = 0;
116    m_ref_count = 1;
117    fname = get_pool_memory(PM_FNAME);
118    path = get_pool_memory(PM_FNAME);
119    esc_name = get_pool_memory(PM_FNAME);
120    esc_path = get_pool_memory(PM_FNAME);
121    esc_obj = get_pool_memory(PM_FNAME);
122    m_buf =  get_pool_memory(PM_FNAME);
123    m_allow_transactions = mult_db_connections;
124
125    /*
126     * Initialize the private members.
127     */
128    m_db_handle = NULL;
129    m_result = NULL;
130
131    /*
132     * Put the db in the list.
133     */
134    if (db_list == NULL) {
135       db_list = New(dlist(this, &this->m_link));
136    }
137    db_list->append(this);
138 }
139
140 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
141 {
142 }
143
144 /*
145  * Check that the database correspond to the encoding we want
146  */
147 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
148 {
149    SQL_ROW row;
150    int ret = false;
151
152    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
153       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
154       return false;
155    }
156
157    if ((row = mdb->sql_fetch_row()) == NULL) {
158       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
159       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
160    } else {
161       ret = bstrcmp(row[0], "SQL_ASCII");
162
163       if (ret) {
164          /*
165           * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
166           */
167          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
168
169       } else {
170          /*
171           * Something is wrong with database encoding
172           */
173          Mmsg(mdb->errmsg, 
174               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
175               mdb->get_db_name(), row[0]);
176          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
177          Dmsg1(50, "%s", mdb->errmsg);
178       } 
179    }
180    return ret;
181 }
182
183 /*
184  * Now actually open the database.  This can generate errors,
185  *   which are returned in the errmsg
186  *
187  * DO NOT close the database or delete mdb here !!!!
188  */
189 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
190 {
191    bool retval = false;
192    int errstat;
193    char buf[10], *port;
194
195    P(mutex);
196    if (m_connected) {
197       retval = true;
198       goto bail_out;
199    }
200
201    if ((errstat=rwl_init(&m_lock)) != 0) {
202       berrno be;
203       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
204             be.bstrerror(errstat));
205       goto bail_out;
206    }
207
208    if (m_db_port) {
209       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
210       port = buf;
211    } else {
212       port = NULL;
213    }
214
215    /* If connection fails, try at 5 sec intervals for 30 seconds. */
216    for (int retry=0; retry < 6; retry++) {
217       /* connect to the database */
218       m_db_handle = PQsetdbLogin(
219            m_db_address,         /* default = localhost */
220            port,                 /* default port */
221            NULL,                 /* pg options */
222            NULL,                 /* tty, ignored */
223            m_db_name,            /* database name */
224            m_db_user,            /* login name */
225            m_db_password);       /* password */
226
227       /* If no connect, try once more in case it is a timing problem */
228       if (PQstatus(m_db_handle) == CONNECTION_OK) {
229          break;
230       }
231       bmicrosleep(5, 0);
232    }
233
234    Dmsg0(50, "pg_real_connect done\n");
235    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
236         (m_db_password == NULL) ? "(NULL)" : m_db_password);
237
238    if (PQstatus(m_db_handle) != CONNECTION_OK) {
239       Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
240          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
241          m_db_name, m_db_user);
242       goto bail_out;
243    }
244
245    m_connected = true;
246    if (!check_tables_version(jcr, this)) {
247       goto bail_out;
248    }
249
250    sql_query("SET datestyle TO 'ISO, YMD'");
251    sql_query("SET cursor_tuple_fraction=1");
252    
253    /*
254     * Tell PostgreSQL we are using standard conforming strings
255     * and avoid warnings such as:
256     *  WARNING:  nonstandard use of \\ in a string literal
257     */
258    sql_query("SET standard_conforming_strings=on");
259
260    /*
261     * Check that encoding is SQL_ASCII
262     */
263    pgsql_check_database_encoding(jcr, this);
264
265    retval = true;
266
267 bail_out:
268    V(mutex);
269    return retval;
270 }
271
272 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
273 {
274    db_end_transaction(jcr);
275    P(mutex);
276    m_ref_count--;
277    if (m_ref_count == 0) {
278       sql_free_result();
279       db_list->remove(this);
280       if (m_connected && m_db_handle) {
281          PQfinish(m_db_handle);
282       }
283       rwl_destroy(&m_lock);
284       free_pool_memory(errmsg);
285       free_pool_memory(cmd);
286       free_pool_memory(cached_path);
287       free_pool_memory(fname);
288       free_pool_memory(path);
289       free_pool_memory(esc_name);
290       free_pool_memory(esc_path);
291       free_pool_memory(esc_obj);
292       free_pool_memory(m_buf);
293       if (m_db_driver) {
294          free(m_db_driver);
295       }
296       if (m_db_name) {
297          free(m_db_name);
298       }
299       if (m_db_user) {
300          free(m_db_user);
301       }
302       if (m_db_password) {
303          free(m_db_password);
304       }
305       if (m_db_address) {
306          free(m_db_address);
307       }
308       if (m_db_socket) {
309          free(m_db_socket);
310       }
311       delete this;
312       if (db_list->size() == 0) {
313          delete db_list;
314          db_list = NULL;
315       }
316    }
317    V(mutex);
318 }
319
320 void B_DB_POSTGRESQL::db_thread_cleanup(void)
321 {
322 }
323
324 /*
325  * Escape strings so that PostgreSQL is happy
326  *
327  *   NOTE! len is the length of the old string. Your new
328  *         string must be long enough (max 2*old+1) to hold
329  *         the escaped output.
330  */
331 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
332 {
333    int error;
334   
335    PQescapeStringConn(m_db_handle, snew, old, len, &error);
336    if (error) {
337       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
338       /* error on encoding, probably invalid multibyte encoding in the source string
339         see PQescapeStringConn documentation for details. */
340       Dmsg0(500, "PQescapeStringConn failed\n");
341    }
342 }
343
344 /*
345  * Escape binary so that PostgreSQL is happy
346  *
347  */
348 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
349 {
350    size_t new_len;
351    unsigned char *obj;
352
353    obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
354    if (!obj) {
355       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
356    }
357
358    esc_obj = check_pool_memory_size(esc_obj, new_len+1);
359    memcpy(esc_obj, obj, new_len);
360    esc_obj[new_len]=0;
361
362    PQfreemem(obj);
363
364    return (char *)esc_obj;
365 }
366
367 /*
368  * Unescape binary object so that PostgreSQL is happy
369  *
370  */
371 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
372                                          POOLMEM **dest, int32_t *dest_len)
373 {
374    size_t new_len;
375    unsigned char *obj;
376
377    if (!from) {
378       *dest[0] = 0;
379       *dest_len = 0;
380       return;
381    }
382
383    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
384
385    if (!obj) {
386       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
387    }
388
389    *dest_len = new_len;
390    *dest = check_pool_memory_size(*dest, new_len+1);
391    memcpy(*dest, obj, new_len);
392    (*dest)[new_len]=0;
393    
394    PQfreemem(obj);
395
396    Dmsg1(010, "obj size: %d\n", *dest_len);
397 }
398
399 /*
400  * Start a transaction. This groups inserts and makes things
401  * much more efficient. Usually started when inserting
402  * file attributes.
403  */
404 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
405 {
406    if (!jcr->attr) {
407       jcr->attr = get_pool_memory(PM_FNAME);
408    }
409    if (!jcr->ar) {
410       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
411    }
412
413    /*
414     * This is turned off because transactions break
415     * if multiple simultaneous jobs are run.
416     */
417    if (!m_allow_transactions) {
418       return;
419    }
420
421    db_lock(this);
422    /*
423     * Allow only 25,000 changes per transaction
424     */
425    if (m_transaction && changes > 25000) {
426       db_end_transaction(jcr);
427    }
428    if (!m_transaction) {
429       sql_query("BEGIN");  /* begin transaction */
430       Dmsg0(400, "Start PosgreSQL transaction\n");
431       m_transaction = true;
432    }
433    db_unlock(this);
434 }
435
436 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
437 {
438    if (jcr && jcr->cached_attribute) {
439       Dmsg0(400, "Flush last cached attribute.\n");
440       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
441          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
442       }
443       jcr->cached_attribute = false;
444    }
445
446    if (!m_allow_transactions) {
447       return;
448    }
449
450    db_lock(this);
451    if (m_transaction) {
452       sql_query("COMMIT"); /* end transaction */
453       m_transaction = false;
454       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
455    }
456    changes = 0;
457    db_unlock(this);
458 }
459
460
461 /*
462  * Submit a general SQL command (cmd), and for each row returned,
463  * the result_handler is called with the ctx.
464  */
465 bool B_DB_POSTGRESQL::db_big_sql_query(const char *query, 
466                                        DB_RESULT_HANDLER *result_handler, 
467                                        void *ctx)
468 {
469    SQL_ROW row;
470    bool retval = false;
471    bool in_transaction = m_transaction;
472    
473    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
474
475    /* This code handles only SELECT queries */
476    if (strncasecmp(query, "SELECT", 6) != 0) {
477       return db_sql_query(query, result_handler, ctx);
478    }
479
480    if (!result_handler) {       /* no need of big_query without handler */
481       return false;
482    }
483
484    db_lock(this);
485
486    if (!in_transaction) {       /* CURSOR needs transaction */
487       sql_query("BEGIN");
488    }
489
490    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
491
492    if (!sql_query(m_buf)) {
493       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
494       Dmsg0(50, "db_sql_query failed\n");
495       goto bail_out;
496    }
497
498    do {
499       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
500          goto bail_out;
501       }
502       while ((row = sql_fetch_row()) != NULL) {
503          Dmsg1(500, "Fetching %d rows\n", m_num_rows);
504          if (result_handler(ctx, m_num_fields, row))
505             break;
506       }
507       PQclear(m_result);
508       m_result = NULL;
509       
510    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
511
512    sql_query("CLOSE _bac_cursor");
513
514    Dmsg0(500, "db_big_sql_query finished\n");
515    sql_free_result();
516    retval = true;
517
518 bail_out:
519    if (!in_transaction) {
520       sql_query("COMMIT");  /* end transaction */
521    }
522
523    db_unlock(this);
524    return retval;
525 }
526
527 /*
528  * Submit a general SQL command (cmd), and for each row returned,
529  * the result_handler is called with the ctx.
530  */
531 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
532 {
533    SQL_ROW row;
534    bool retval = true;
535
536    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
537
538    db_lock(this);
539    if (!sql_query(query, QF_STORE_RESULT)) {
540       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
541       Dmsg0(500, "db_sql_query failed\n");
542       retval = false;
543       goto bail_out;
544    }
545
546    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
547
548    if (result_handler != NULL) {
549       Dmsg0(500, "db_sql_query invoking handler\n");
550       while ((row = sql_fetch_row()) != NULL) {
551          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
552          if (result_handler(ctx, m_num_fields, row))
553             break;
554       }
555       sql_free_result();
556    }
557
558    Dmsg0(500, "db_sql_query finished\n");
559
560 bail_out:
561    db_unlock(this);
562    return retval;
563 }
564
565 /*
566  * Note, if this routine returns false (failure), Bacula expects
567  * that no result has been stored.
568  * This is where QUERY_DB comes with Postgresql.
569  *
570  *  Returns:  true  on success
571  *            false on failure
572  *
573  */
574 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
575 {
576    int i;
577    bool retval = false;
578
579    Dmsg1(500, "sql_query starts with '%s'\n", query);
580    /*
581     * We are starting a new query. reset everything.
582     */
583    m_num_rows     = -1;
584    m_row_number   = -1;
585    m_field_number = -1;
586
587    if (m_result) {
588       PQclear(m_result);  /* hmm, someone forgot to free?? */
589       m_result = NULL;
590    }
591
592    for (i = 0; i < 10; i++) {
593       m_result = PQexec(m_db_handle, query);
594       if (m_result) {
595          break;
596       }
597       bmicrosleep(5, 0);
598    }
599    if (!m_result) {
600       Dmsg1(50, "Query failed: %s\n", query);
601       goto bail_out;
602    }
603
604    m_status = PQresultStatus(m_result);
605    if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
606       Dmsg0(500, "we have a result\n");
607
608       /*
609        * How many fields in the set?
610        */
611       m_num_fields = (int)PQnfields(m_result);
612       Dmsg1(500, "we have %d fields\n", m_num_fields);
613
614       m_num_rows = PQntuples(m_result);
615       Dmsg1(500, "we have %d rows\n", m_num_rows);
616
617       m_row_number = 0;      /* we can start to fetch something */
618       m_status = 0;          /* succeed */
619       retval = true;
620    } else {
621       Dmsg1(50, "Result status failed: %s\n", query);
622       goto bail_out;
623    }
624
625    Dmsg0(500, "sql_query finishing\n");
626    goto ok_out;
627
628 bail_out:
629    Dmsg0(500, "we failed\n");
630    PQclear(m_result);
631    m_result = NULL;
632    m_status = 1;                   /* failed */
633
634 ok_out:
635    return retval;
636 }
637
638 void B_DB_POSTGRESQL::sql_free_result(void)
639 {
640    db_lock(this);
641    if (m_result) {
642       PQclear(m_result);
643       m_result = NULL;
644    }
645    if (m_rows) {
646       free(m_rows);
647       m_rows = NULL;
648    }
649    if (m_fields) {
650       free(m_fields);
651       m_fields = NULL;
652    }
653    m_num_rows = m_num_fields = 0;
654    db_unlock(this);
655 }
656
657 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
658 {
659    int j;
660    SQL_ROW row = NULL; /* by default, return NULL */
661
662    Dmsg0(500, "sql_fetch_row start\n");
663
664    if (m_num_fields == 0) {     /* No field, no row */
665       Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
666       return NULL;
667    }
668
669    if (!m_rows || m_rows_size < m_num_fields) {
670       if (m_rows) {
671          Dmsg0(500, "sql_fetch_row freeing space\n");
672          free(m_rows);
673       }
674       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
675       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
676       m_rows_size = m_num_fields;
677
678       /*
679        * Now reset the row_number now that we have the space allocated
680        */
681       m_row_number = 0;
682    }
683
684    /*
685     * If still within the result set
686     */
687    if (m_row_number >= 0 && m_row_number < m_num_rows) {
688       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
689       /*
690        * Get each value from this row
691        */
692       for (j = 0; j < m_num_fields; j++) {
693          m_rows[j] = PQgetvalue(m_result, m_row_number, j);
694          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
695       }
696       /*
697        * Increment the row number for the next call
698        */
699       m_row_number++;
700       row = m_rows;
701    } else {
702       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
703    }
704
705    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
706
707    return row;
708 }
709
710 const char *B_DB_POSTGRESQL::sql_strerror(void)
711 {
712    return PQerrorMessage(m_db_handle);
713 }
714
715 void B_DB_POSTGRESQL::sql_data_seek(int row)
716 {
717    /*
718     * Set the row number to be returned on the next call to sql_fetch_row
719     */
720    m_row_number = row;
721 }
722
723 int B_DB_POSTGRESQL::sql_affected_rows(void)
724 {
725    return (unsigned) str_to_int32(PQcmdTuples(m_result));
726 }
727
728 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
729 {
730    int i;
731    uint64_t id = 0;
732    char sequence[NAMEDATALEN-1];
733    char getkeyval_query[NAMEDATALEN+50];
734    PGresult *pg_result;
735
736    /*
737     * First execute the insert query and then retrieve the currval.
738     */
739    if (!sql_query(query)) {
740       return 0;
741    }
742
743    m_num_rows = sql_affected_rows();
744    if (m_num_rows != 1) {
745       return 0;
746    }
747
748    changes++;
749
750    /*
751     * Obtain the current value of the sequence that
752     * provides the serial value for primary key of the table.
753     *
754     * currval is local to our session.  It is not affected by
755     * other transactions.
756     *
757     * Determine the name of the sequence.
758     * PostgreSQL automatically creates a sequence using
759     * <table>_<column>_seq.
760     * At the time of writing, all tables used this format for
761     * for their primary key: <table>id
762     * Except for basefiles which has a primary key on baseid.
763     * Therefore, we need to special case that one table.
764     *
765     * everything else can use the PostgreSQL formula.
766     */
767    if (strcasecmp(table_name, "basefiles") == 0) {
768       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
769    } else {
770       bstrncpy(sequence, table_name, sizeof(sequence));
771       bstrncat(sequence, "_",        sizeof(sequence));
772       bstrncat(sequence, table_name, sizeof(sequence));
773       bstrncat(sequence, "id",       sizeof(sequence));
774    }
775
776    bstrncat(sequence, "_seq", sizeof(sequence));
777    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
778
779    Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
780    for (i = 0; i < 10; i++) {
781       pg_result = PQexec(m_db_handle, getkeyval_query);
782       if (pg_result) {
783          break;
784       }
785       bmicrosleep(5, 0);
786    }
787    if (!pg_result) {
788       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
789       goto bail_out;
790    }
791
792    Dmsg0(500, "exec done");
793
794    if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
795       Dmsg0(500, "getting value");
796       id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
797       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
798    } else {
799       Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
800       Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
801    }
802
803 bail_out:
804    PQclear(pg_result);
805
806    return id;
807 }
808
809 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
810 {
811    int i, j;
812    int max_length;
813    int this_length;
814
815    Dmsg0(500, "sql_fetch_field starts\n");
816
817    if (!m_fields || m_fields_size < m_num_fields) {
818       if (m_fields) {
819          free(m_fields);
820          m_fields = NULL;
821       }
822       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
823       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
824       m_fields_size = m_num_fields;
825
826       for (i = 0; i < m_num_fields; i++) {
827          Dmsg1(500, "filling field %d\n", i);
828          m_fields[i].name = PQfname(m_result, i);
829          m_fields[i].type = PQftype(m_result, i);
830          m_fields[i].flags = 0;
831
832          /*
833           * For a given column, find the max length.
834           */
835          max_length = 0;
836          for (j = 0; j < m_num_rows; j++) {
837             if (PQgetisnull(m_result, j, i)) {
838                 this_length = 4;        /* "NULL" */
839             } else {
840                 this_length = cstrlen(PQgetvalue(m_result, j, i));
841             }
842          
843             if (max_length < this_length) {
844                max_length = this_length;
845             }
846          }
847          m_fields[i].max_length = max_length;
848
849          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
850                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
851       }
852    }
853
854    /*
855     * Increment field number for the next time around
856     */
857    return &m_fields[m_field_number++];
858 }
859
860 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
861 {
862    switch (field_type) {
863    case 1:
864       return true;
865    default:
866       return false;
867    }
868 }
869
870 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
871 {
872    /*
873     * TEMP: the following is taken from select OID, typname from pg_type;
874     */
875    switch (field_type) {
876    case 20:
877    case 21:
878    case 23:
879    case 700:
880    case 701:
881       return true;
882    default:
883       return false;
884    }
885 }
886
887 /*
888  * Escape strings so that PostgreSQL is happy on COPY
889  *
890  *   NOTE! len is the length of the old string. Your new
891  *         string must be long enough (max 2*old+1) to hold
892  *         the escaped output.
893  */
894 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
895 {
896    /* we have to escape \t, \n, \r, \ */
897    char c = '\0' ;
898
899    while (len > 0 && *src) {
900       switch (*src) {
901       case '\n':
902          c = 'n';
903          break;
904       case '\\':
905          c = '\\';
906          break;
907       case '\t':
908          c = 't';
909          break;
910       case '\r':
911          c = 'r';
912          break;
913       default:
914          c = '\0' ;
915       }
916
917       if (c) {
918          *dest = '\\';
919          dest++;
920          *dest = c;
921       } else {
922          *dest = *src;
923       }
924
925       len--;
926       src++;
927       dest++;
928    }
929
930    *dest = '\0';
931    return dest;
932 }
933
934 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
935 {
936    const char *query = "COPY batch FROM STDIN";
937
938    Dmsg0(500, "sql_batch_start started\n");
939
940    if (!sql_query("CREATE TEMPORARY TABLE batch ("
941                           "FileIndex int,"
942                           "JobId int,"
943                           "Path varchar,"
944                           "Name varchar,"
945                           "LStat varchar,"
946                           "Md5 varchar,"
947                           "DeltaSeq smallint)")) {
948       Dmsg0(500, "sql_batch_start failed\n");
949       return false;
950    }
951    
952    /*
953     * We are starting a new query.  reset everything.
954     */
955    m_num_rows     = -1;
956    m_row_number   = -1;
957    m_field_number = -1;
958
959    sql_free_result();
960
961    for (int i=0; i < 10; i++) {
962       m_result = PQexec(m_db_handle, query);
963       if (m_result) {
964          break;
965       }
966       bmicrosleep(5, 0);
967    }
968    if (!m_result) {
969       Dmsg1(50, "Query failed: %s\n", query);
970       goto bail_out;
971    }
972
973    m_status = PQresultStatus(m_result);
974    if (m_status == PGRES_COPY_IN) {
975       /*
976        * How many fields in the set?
977        */
978       m_num_fields = (int) PQnfields(m_result);
979       m_num_rows = 0;
980       m_status = 1;
981    } else {
982       Dmsg1(50, "Result status failed: %s\n", query);
983       goto bail_out;
984    }
985
986    Dmsg0(500, "sql_batch_start finishing\n");
987
988    return true;
989
990 bail_out:
991    Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
992    m_status = 0;
993    PQclear(m_result);
994    m_result = NULL;
995    return false;
996 }
997
998 /*
999  * Set error to something to abort operation
1000  */
1001 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1002 {
1003    int res;
1004    int count=30;
1005    PGresult *pg_result;
1006
1007    Dmsg0(500, "sql_batch_end started\n");
1008
1009    do { 
1010       res = PQputCopyEnd(m_db_handle, error);
1011    } while (res == 0 && --count > 0);
1012
1013    if (res == 1) {
1014       Dmsg0(500, "ok\n");
1015       m_status = 1;
1016    }
1017    
1018    if (res <= 0) {
1019       Dmsg0(500, "we failed\n");
1020       m_status = 0;
1021       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1022       Dmsg1(500, "failure %s\n", errmsg);
1023    }
1024
1025    /* Check command status and return to normal libpq state */
1026    pg_result = PQgetResult(m_db_handle);
1027    if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
1028       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1029       m_status = 0;
1030    }
1031    PQclear(pg_result); 
1032
1033    Dmsg0(500, "sql_batch_end finishing\n");
1034
1035    return true;
1036 }
1037
1038 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1039 {
1040    int res;
1041    int count=30;
1042    size_t len;
1043    const char *digest;
1044    char ed1[50];
1045
1046    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1047    pgsql_copy_escape(esc_name, fname, fnl);
1048
1049    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1050    pgsql_copy_escape(esc_path, path, pnl);
1051
1052    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1053       digest = "0";
1054    } else {
1055       digest = ar->Digest;
1056    }
1057
1058    len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n", 
1059               ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path, 
1060               esc_name, ar->attr, digest, ar->DeltaSeq);
1061
1062    do { 
1063       res = PQputCopyData(m_db_handle, cmd, len);
1064    } while (res == 0 && --count > 0);
1065
1066    if (res == 1) {
1067       Dmsg0(500, "ok\n");
1068       changes++;
1069       m_status = 1;
1070    }
1071
1072    if (res <= 0) {
1073       Dmsg0(500, "we failed\n");
1074       m_status = 0;
1075       Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1076       Dmsg1(500, "failure %s\n", errmsg);
1077    }
1078
1079    Dmsg0(500, "sql_batch_insert finishing\n");
1080
1081    return true;
1082 }
1083
1084 /*
1085  * Initialize database data structure. In principal this should
1086  * never have errors, or it is really fatal.
1087  */
1088 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, 
1089                        const char *db_user, const char *db_password, 
1090                        const char *db_address, int db_port, 
1091                        const char *db_socket, bool mult_db_connections, 
1092                        bool disable_batch_insert)
1093 {
1094    B_DB_POSTGRESQL *mdb = NULL;
1095
1096    if (!db_user) {
1097       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1098       return NULL;
1099    }
1100    P(mutex);                          /* lock DB queue */
1101    if (db_list && !mult_db_connections) {
1102       /*
1103        * Look to see if DB already open
1104        */
1105       foreach_dlist(mdb, db_list) {
1106          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1107             Dmsg1(100, "DB REopen %s\n", db_name);
1108             mdb->increment_refcount();
1109             goto bail_out;
1110          }
1111       }
1112    }
1113    Dmsg0(100, "db_init_database first time\n");
1114    mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password, 
1115                              db_address, db_port, db_socket, 
1116                              mult_db_connections, disable_batch_insert));
1117
1118 bail_out:
1119    V(mutex);
1120    return mdb;
1121 }
1122
1123 #endif /* HAVE_POSTGRESQL */