]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Fix seg fault in PostgreSQL driver code
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
36  */
37
38 #include "bacula.h"
39
40 #ifdef HAVE_POSTGRESQL
41
42 #include "cats.h"
43 #include "bdb_priv.h"
44 #include "libpq-fe.h"
45 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
48
49 /* -----------------------------------------------------------------------
50  *
51  *   PostgreSQL dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /*
57  * List of open databases
58  */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64                                  const char *db_driver,
65                                  const char *db_name,
66                                  const char *db_user,
67                                  const char *db_password,
68                                  const char *db_address,
69                                  int db_port, 
70                                  const char *db_socket,
71                                  bool mult_db_connections,
72                                  bool disable_batch_insert)
73 {
74    /*
75     * Initialize the parent class members.
76     */
77    m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78    m_db_type = SQL_TYPE_POSTGRESQL;
79    m_db_driver = bstrdup("PostgreSQL");
80    m_db_name = bstrdup(db_name);
81    m_db_user = bstrdup(db_user);
82    if (db_password) {
83       m_db_password = bstrdup(db_password);
84    }
85    if (db_address) {
86       m_db_address = bstrdup(db_address);
87    }
88    if (db_socket) {
89       m_db_socket = bstrdup(db_socket);
90    }
91    m_db_port = db_port;
92    if (disable_batch_insert) {
93       m_disabled_batch_insert = true;
94       m_have_batch_insert = false;
95    } else {
96       m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100       m_have_batch_insert = PQisthreadsafe();
101 #else
102       m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
104 #else
105       m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
107 #else
108       m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
110    }
111    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
112    *errmsg = 0;
113    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114    cached_path = get_pool_memory(PM_FNAME);
115    cached_path_id = 0;
116    m_ref_count = 1;
117    fname = get_pool_memory(PM_FNAME);
118    path = get_pool_memory(PM_FNAME);
119    esc_name = get_pool_memory(PM_FNAME);
120    esc_path = get_pool_memory(PM_FNAME);
121    esc_obj = get_pool_memory(PM_FNAME);
122    m_buf =  get_pool_memory(PM_FNAME);
123    m_allow_transactions = mult_db_connections;
124
125    /*
126     * Initialize the private members.
127     */
128    m_db_handle = NULL;
129    m_result = NULL;
130
131    /*
132     * Put the db in the list.
133     */
134    if (db_list == NULL) {
135       db_list = New(dlist(this, &this->m_link));
136    }
137    db_list->append(this);
138 }
139
140 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
141 {
142 }
143
144 /*
145  * Check that the database correspond to the encoding we want
146  */
147 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
148 {
149    SQL_ROW row;
150    int ret = false;
151
152    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
153       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
154       return false;
155    }
156
157    if ((row = mdb->sql_fetch_row()) == NULL) {
158       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
159       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
160    } else {
161       ret = bstrcmp(row[0], "SQL_ASCII");
162
163       if (ret) {
164          /*
165           * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
166           */
167          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
168
169       } else {
170          /*
171           * Something is wrong with database encoding
172           */
173          Mmsg(mdb->errmsg, 
174               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
175               mdb->get_db_name(), row[0]);
176          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
177          Dmsg1(50, "%s", mdb->errmsg);
178       } 
179    }
180    return ret;
181 }
182
183 /*
184  * Now actually open the database.  This can generate errors,
185  *   which are returned in the errmsg
186  *
187  * DO NOT close the database or delete mdb here !!!!
188  */
189 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
190 {
191    bool retval = false;
192    int errstat;
193    char buf[10], *port;
194
195    P(mutex);
196    if (m_connected) {
197       retval = true;
198       goto bail_out;
199    }
200
201    if ((errstat=rwl_init(&m_lock)) != 0) {
202       berrno be;
203       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
204             be.bstrerror(errstat));
205       goto bail_out;
206    }
207
208    if (m_db_port) {
209       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
210       port = buf;
211    } else {
212       port = NULL;
213    }
214
215    /* If connection fails, try at 5 sec intervals for 30 seconds. */
216    for (int retry=0; retry < 6; retry++) {
217       /* connect to the database */
218       m_db_handle = PQsetdbLogin(
219            m_db_address,         /* default = localhost */
220            port,                 /* default port */
221            NULL,                 /* pg options */
222            NULL,                 /* tty, ignored */
223            m_db_name,            /* database name */
224            m_db_user,            /* login name */
225            m_db_password);       /* password */
226
227       /* If no connect, try once more in case it is a timing problem */
228       if (PQstatus(m_db_handle) == CONNECTION_OK) {
229          break;
230       }
231       bmicrosleep(5, 0);
232    }
233
234    Dmsg0(50, "pg_real_connect done\n");
235    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
236         (m_db_password == NULL) ? "(NULL)" : m_db_password);
237
238    if (PQstatus(m_db_handle) != CONNECTION_OK) {
239       Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
240          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
241          m_db_name, m_db_user);
242       goto bail_out;
243    }
244
245    m_connected = true;
246    if (!check_tables_version(jcr, this)) {
247       goto bail_out;
248    }
249
250    sql_query("SET datestyle TO 'ISO, YMD'");
251    sql_query("SET cursor_tuple_fraction=1");
252    
253    /*
254     * Tell PostgreSQL we are using standard conforming strings
255     * and avoid warnings such as:
256     *  WARNING:  nonstandard use of \\ in a string literal
257     */
258    sql_query("SET standard_conforming_strings=on");
259
260    /*
261     * Check that encoding is SQL_ASCII
262     */
263    pgsql_check_database_encoding(jcr, this);
264
265    retval = true;
266
267 bail_out:
268    V(mutex);
269    return retval;
270 }
271
272 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
273 {
274    db_end_transaction(jcr);
275    P(mutex);
276    sql_free_result();
277    m_ref_count--;
278    if (m_ref_count == 0) {
279       db_list->remove(this);
280       if (m_connected && m_db_handle) {
281          PQfinish(m_db_handle);
282       }
283       rwl_destroy(&m_lock);
284       free_pool_memory(errmsg);
285       free_pool_memory(cmd);
286       free_pool_memory(cached_path);
287       free_pool_memory(fname);
288       free_pool_memory(path);
289       free_pool_memory(esc_name);
290       free_pool_memory(esc_path);
291       free_pool_memory(esc_obj);
292       free_pool_memory(m_buf);
293       if (m_db_driver) {
294          free(m_db_driver);
295       }
296       if (m_db_name) {
297          free(m_db_name);
298       }
299       if (m_db_user) {
300          free(m_db_user);
301       }
302       if (m_db_password) {
303          free(m_db_password);
304       }
305       if (m_db_address) {
306          free(m_db_address);
307       }
308       if (m_db_socket) {
309          free(m_db_socket);
310       }
311       if (esc_obj) {
312          PQfreemem(esc_obj);
313       }
314       delete this;
315       if (db_list->size() == 0) {
316          delete db_list;
317          db_list = NULL;
318       }
319    }
320    V(mutex);
321 }
322
323 void B_DB_POSTGRESQL::db_thread_cleanup(void)
324 {
325 }
326
327 /*
328  * Escape strings so that PostgreSQL is happy
329  *
330  *   NOTE! len is the length of the old string. Your new
331  *         string must be long enough (max 2*old+1) to hold
332  *         the escaped output.
333  */
334 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
335 {
336    int error;
337   
338    PQescapeStringConn(m_db_handle, snew, old, len, &error);
339    if (error) {
340       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
341       /* error on encoding, probably invalid multibyte encoding in the source string
342         see PQescapeStringConn documentation for details. */
343       Dmsg0(500, "PQescapeStringConn failed\n");
344    }
345 }
346
347 /*
348  * Escape binary so that PostgreSQL is happy
349  *
350  */
351 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
352 {
353    size_t new_len;
354    unsigned char *obj;
355
356    obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
357    if (!obj) {
358       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
359    }
360
361    esc_obj = check_pool_memory_size(esc_obj, new_len+1);
362    memcpy(esc_obj, obj, new_len);
363    esc_obj[new_len]=0;
364
365    PQfreemem(obj);
366
367    return (char *)esc_obj;
368 }
369
370 /*
371  * Unescape binary object so that PostgreSQL is happy
372  *
373  */
374 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
375                                          POOLMEM **dest, int32_t *dest_len)
376 {
377    size_t new_len;
378    unsigned char *obj;
379
380    if (!from) {
381       *dest[0] = 0;
382       *dest_len = 0;
383       return;
384    }
385
386    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
387
388    if (!obj) {
389       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
390    }
391
392    *dest_len = new_len;
393    *dest = check_pool_memory_size(*dest, new_len+1);
394    memcpy(*dest, obj, new_len);
395    (*dest)[new_len]=0;
396    
397    PQfreemem(obj);
398
399    Dmsg1(010, "obj size: %d\n", *dest_len);
400 }
401
402 /*
403  * Start a transaction. This groups inserts and makes things
404  * much more efficient. Usually started when inserting
405  * file attributes.
406  */
407 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
408 {
409    if (!jcr->attr) {
410       jcr->attr = get_pool_memory(PM_FNAME);
411    }
412    if (!jcr->ar) {
413       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
414    }
415
416    /*
417     * This is turned off because transactions break
418     * if multiple simultaneous jobs are run.
419     */
420    if (!m_allow_transactions) {
421       return;
422    }
423
424    db_lock(this);
425    /*
426     * Allow only 25,000 changes per transaction
427     */
428    if (m_transaction && changes > 25000) {
429       db_end_transaction(jcr);
430    }
431    if (!m_transaction) {
432       sql_query("BEGIN");  /* begin transaction */
433       Dmsg0(400, "Start PosgreSQL transaction\n");
434       m_transaction = true;
435    }
436    db_unlock(this);
437 }
438
439 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
440 {
441    if (jcr && jcr->cached_attribute) {
442       Dmsg0(400, "Flush last cached attribute.\n");
443       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
444          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
445       }
446       jcr->cached_attribute = false;
447    }
448
449    if (!m_allow_transactions) {
450       return;
451    }
452
453    db_lock(this);
454    if (m_transaction) {
455       sql_query("COMMIT"); /* end transaction */
456       m_transaction = false;
457       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
458    }
459    changes = 0;
460    db_unlock(this);
461 }
462
463
464 /*
465  * Submit a general SQL command (cmd), and for each row returned,
466  * the result_handler is called with the ctx.
467  */
468 bool B_DB_POSTGRESQL::db_big_sql_query(const char *query, 
469                                        DB_RESULT_HANDLER *result_handler, 
470                                        void *ctx)
471 {
472    SQL_ROW row;
473    bool retval = false;
474    bool in_transaction = m_transaction;
475    
476    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
477
478    /* This code handles only SELECT queries */
479    if (strncasecmp(query, "SELECT", 6) != 0) {
480       return db_sql_query(query, result_handler, ctx);
481    }
482
483    if (!result_handler) {       /* no need of big_query without handler */
484       return false;
485    }
486
487    db_lock(this);
488
489    if (!in_transaction) {       /* CURSOR needs transaction */
490       sql_query("BEGIN");
491    }
492
493    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
494
495    if (!sql_query(m_buf)) {
496       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
497       Dmsg0(50, "db_sql_query failed\n");
498       goto bail_out;
499    }
500
501    do {
502       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
503          goto bail_out;
504       }
505       while ((row = sql_fetch_row()) != NULL) {
506          Dmsg1(500, "Fetching %d rows\n", m_num_rows);
507          if (result_handler(ctx, m_num_fields, row))
508             break;
509       }
510       PQclear(m_result);
511       m_result = NULL;
512       
513    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
514
515    sql_query("CLOSE _bac_cursor");
516
517    Dmsg0(500, "db_big_sql_query finished\n");
518    sql_free_result();
519    retval = true;
520
521 bail_out:
522    if (!in_transaction) {
523       sql_query("COMMIT");  /* end transaction */
524    }
525
526    db_unlock(this);
527    return retval;
528 }
529
530 /*
531  * Submit a general SQL command (cmd), and for each row returned,
532  * the result_handler is called with the ctx.
533  */
534 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
535 {
536    SQL_ROW row;
537    bool retval = true;
538
539    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
540
541    db_lock(this);
542    if (!sql_query(query, QF_STORE_RESULT)) {
543       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
544       Dmsg0(500, "db_sql_query failed\n");
545       retval = false;
546       goto bail_out;
547    }
548
549    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
550
551    if (result_handler != NULL) {
552       Dmsg0(500, "db_sql_query invoking handler\n");
553       while ((row = sql_fetch_row()) != NULL) {
554          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
555          if (result_handler(ctx, m_num_fields, row))
556             break;
557       }
558       sql_free_result();
559    }
560
561    Dmsg0(500, "db_sql_query finished\n");
562
563 bail_out:
564    db_unlock(this);
565    return retval;
566 }
567
568 /*
569  * Note, if this routine returns false (failure), Bacula expects
570  * that no result has been stored.
571  * This is where QUERY_DB comes with Postgresql.
572  *
573  *  Returns:  true  on success
574  *            false on failure
575  *
576  */
577 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
578 {
579    int i;
580    bool retval = false;
581
582    Dmsg1(500, "sql_query starts with '%s'\n", query);
583    /*
584     * We are starting a new query. reset everything.
585     */
586    m_num_rows     = -1;
587    m_row_number   = -1;
588    m_field_number = -1;
589
590    if (m_result) {
591       PQclear(m_result);  /* hmm, someone forgot to free?? */
592       m_result = NULL;
593    }
594
595    for (i = 0; i < 10; i++) {
596       m_result = PQexec(m_db_handle, query);
597       if (m_result) {
598          break;
599       }
600       bmicrosleep(5, 0);
601    }
602    if (!m_result) {
603       Dmsg1(50, "Query failed: %s\n", query);
604       goto bail_out;
605    }
606
607    m_status = PQresultStatus(m_result);
608    if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
609       Dmsg0(500, "we have a result\n");
610
611       /*
612        * How many fields in the set?
613        */
614       m_num_fields = (int)PQnfields(m_result);
615       Dmsg1(500, "we have %d fields\n", m_num_fields);
616
617       m_num_rows = PQntuples(m_result);
618       Dmsg1(500, "we have %d rows\n", m_num_rows);
619
620       m_row_number = 0;      /* we can start to fetch something */
621       m_status = 0;          /* succeed */
622       retval = true;
623    } else {
624       Dmsg1(50, "Result status failed: %s\n", query);
625       goto bail_out;
626    }
627
628    Dmsg0(500, "sql_query finishing\n");
629    goto ok_out;
630
631 bail_out:
632    Dmsg0(500, "we failed\n");
633    PQclear(m_result);
634    m_result = NULL;
635    m_status = 1;                   /* failed */
636
637 ok_out:
638    return retval;
639 }
640
641 void B_DB_POSTGRESQL::sql_free_result(void)
642 {
643    db_lock(this);
644    if (m_result) {
645       PQclear(m_result);
646       m_result = NULL;
647    }
648    if (m_rows) {
649       free(m_rows);
650       m_rows = NULL;
651    }
652    if (m_fields) {
653       free(m_fields);
654       m_fields = NULL;
655    }
656    m_num_rows = m_num_fields = 0;
657    db_unlock(this);
658 }
659
660 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
661 {
662    int j;
663    SQL_ROW row = NULL; /* by default, return NULL */
664
665    Dmsg0(500, "sql_fetch_row start\n");
666
667    if (m_num_fields == 0) {     /* No field, no row */
668       Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
669       return NULL;
670    }
671
672    if (!m_rows || m_rows_size < m_num_fields) {
673       if (m_rows) {
674          Dmsg0(500, "sql_fetch_row freeing space\n");
675          free(m_rows);
676       }
677       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
678       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
679       m_rows_size = m_num_fields;
680
681       /*
682        * Now reset the row_number now that we have the space allocated
683        */
684       m_row_number = 0;
685    }
686
687    /*
688     * If still within the result set
689     */
690    if (m_row_number >= 0 && m_row_number < m_num_rows) {
691       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
692       /*
693        * Get each value from this row
694        */
695       for (j = 0; j < m_num_fields; j++) {
696          m_rows[j] = PQgetvalue(m_result, m_row_number, j);
697          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
698       }
699       /*
700        * Increment the row number for the next call
701        */
702       m_row_number++;
703       row = m_rows;
704    } else {
705       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
706    }
707
708    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
709
710    return row;
711 }
712
713 const char *B_DB_POSTGRESQL::sql_strerror(void)
714 {
715    return PQerrorMessage(m_db_handle);
716 }
717
718 void B_DB_POSTGRESQL::sql_data_seek(int row)
719 {
720    /*
721     * Set the row number to be returned on the next call to sql_fetch_row
722     */
723    m_row_number = row;
724 }
725
726 int B_DB_POSTGRESQL::sql_affected_rows(void)
727 {
728    return (unsigned) str_to_int32(PQcmdTuples(m_result));
729 }
730
731 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
732 {
733    int i;
734    uint64_t id = 0;
735    char sequence[NAMEDATALEN-1];
736    char getkeyval_query[NAMEDATALEN+50];
737    PGresult *pg_result;
738
739    /*
740     * First execute the insert query and then retrieve the currval.
741     */
742    if (!sql_query(query)) {
743       return 0;
744    }
745
746    m_num_rows = sql_affected_rows();
747    if (m_num_rows != 1) {
748       return 0;
749    }
750
751    changes++;
752
753    /*
754     * Obtain the current value of the sequence that
755     * provides the serial value for primary key of the table.
756     *
757     * currval is local to our session.  It is not affected by
758     * other transactions.
759     *
760     * Determine the name of the sequence.
761     * PostgreSQL automatically creates a sequence using
762     * <table>_<column>_seq.
763     * At the time of writing, all tables used this format for
764     * for their primary key: <table>id
765     * Except for basefiles which has a primary key on baseid.
766     * Therefore, we need to special case that one table.
767     *
768     * everything else can use the PostgreSQL formula.
769     */
770    if (strcasecmp(table_name, "basefiles") == 0) {
771       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
772    } else {
773       bstrncpy(sequence, table_name, sizeof(sequence));
774       bstrncat(sequence, "_",        sizeof(sequence));
775       bstrncat(sequence, table_name, sizeof(sequence));
776       bstrncat(sequence, "id",       sizeof(sequence));
777    }
778
779    bstrncat(sequence, "_seq", sizeof(sequence));
780    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
781
782    Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
783    for (i = 0; i < 10; i++) {
784       pg_result = PQexec(m_db_handle, getkeyval_query);
785       if (pg_result) {
786          break;
787       }
788       bmicrosleep(5, 0);
789    }
790    if (!pg_result) {
791       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
792       goto bail_out;
793    }
794
795    Dmsg0(500, "exec done");
796
797    if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
798       Dmsg0(500, "getting value");
799       id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
800       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
801    } else {
802       Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
803       Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
804    }
805
806 bail_out:
807    PQclear(pg_result);
808
809    return id;
810 }
811
812 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
813 {
814    int i, j;
815    int max_length;
816    int this_length;
817
818    Dmsg0(500, "sql_fetch_field starts\n");
819
820    if (!m_fields || m_fields_size < m_num_fields) {
821       if (m_fields) {
822          free(m_fields);
823          m_fields = NULL;
824       }
825       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
826       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
827       m_fields_size = m_num_fields;
828
829       for (i = 0; i < m_num_fields; i++) {
830          Dmsg1(500, "filling field %d\n", i);
831          m_fields[i].name = PQfname(m_result, i);
832          m_fields[i].type = PQftype(m_result, i);
833          m_fields[i].flags = 0;
834
835          /*
836           * For a given column, find the max length.
837           */
838          max_length = 0;
839          for (j = 0; j < m_num_rows; j++) {
840             if (PQgetisnull(m_result, j, i)) {
841                 this_length = 4;        /* "NULL" */
842             } else {
843                 this_length = cstrlen(PQgetvalue(m_result, j, i));
844             }
845          
846             if (max_length < this_length) {
847                max_length = this_length;
848             }
849          }
850          m_fields[i].max_length = max_length;
851
852          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
853                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
854       }
855    }
856
857    /*
858     * Increment field number for the next time around
859     */
860    return &m_fields[m_field_number++];
861 }
862
863 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
864 {
865    switch (field_type) {
866    case 1:
867       return true;
868    default:
869       return false;
870    }
871 }
872
873 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
874 {
875    /*
876     * TEMP: the following is taken from select OID, typname from pg_type;
877     */
878    switch (field_type) {
879    case 20:
880    case 21:
881    case 23:
882    case 700:
883    case 701:
884       return true;
885    default:
886       return false;
887    }
888 }
889
890 /*
891  * Escape strings so that PostgreSQL is happy on COPY
892  *
893  *   NOTE! len is the length of the old string. Your new
894  *         string must be long enough (max 2*old+1) to hold
895  *         the escaped output.
896  */
897 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
898 {
899    /* we have to escape \t, \n, \r, \ */
900    char c = '\0' ;
901
902    while (len > 0 && *src) {
903       switch (*src) {
904       case '\n':
905          c = 'n';
906          break;
907       case '\\':
908          c = '\\';
909          break;
910       case '\t':
911          c = 't';
912          break;
913       case '\r':
914          c = 'r';
915          break;
916       default:
917          c = '\0' ;
918       }
919
920       if (c) {
921          *dest = '\\';
922          dest++;
923          *dest = c;
924       } else {
925          *dest = *src;
926       }
927
928       len--;
929       src++;
930       dest++;
931    }
932
933    *dest = '\0';
934    return dest;
935 }
936
937 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
938 {
939    const char *query = "COPY batch FROM STDIN";
940
941    Dmsg0(500, "sql_batch_start started\n");
942
943    if (!sql_query("CREATE TEMPORARY TABLE batch ("
944                           "fileindex int,"
945                           "jobid int,"
946                           "path varchar,"
947                           "name varchar,"
948                           "lstat varchar,"
949                           "md5 varchar,"
950                           "markid int)")) {
951       Dmsg0(500, "sql_batch_start failed\n");
952       return false;
953    }
954    
955    /*
956     * We are starting a new query.  reset everything.
957     */
958    m_num_rows     = -1;
959    m_row_number   = -1;
960    m_field_number = -1;
961
962    sql_free_result();
963
964    for (int i=0; i < 10; i++) {
965       m_result = PQexec(m_db_handle, query);
966       if (m_result) {
967          break;
968       }
969       bmicrosleep(5, 0);
970    }
971    if (!m_result) {
972       Dmsg1(50, "Query failed: %s\n", query);
973       goto bail_out;
974    }
975
976    m_status = PQresultStatus(m_result);
977    if (m_status == PGRES_COPY_IN) {
978       /*
979        * How many fields in the set?
980        */
981       m_num_fields = (int) PQnfields(m_result);
982       m_num_rows = 0;
983       m_status = 1;
984    } else {
985       Dmsg1(50, "Result status failed: %s\n", query);
986       goto bail_out;
987    }
988
989    Dmsg0(500, "sql_batch_start finishing\n");
990
991    return true;
992
993 bail_out:
994    Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
995    m_status = 0;
996    PQclear(m_result);
997    m_result = NULL;
998    return false;
999 }
1000
1001 /*
1002  * Set error to something to abort operation
1003  */
1004 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1005 {
1006    int res;
1007    int count=30;
1008    PGresult *pg_result;
1009
1010    Dmsg0(500, "sql_batch_end started\n");
1011
1012    do { 
1013       res = PQputCopyEnd(m_db_handle, error);
1014    } while (res == 0 && --count > 0);
1015
1016    if (res == 1) {
1017       Dmsg0(500, "ok\n");
1018       m_status = 1;
1019    }
1020    
1021    if (res <= 0) {
1022       Dmsg0(500, "we failed\n");
1023       m_status = 0;
1024       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1025       Dmsg1(500, "failure %s\n", errmsg);
1026    }
1027
1028    /* Check command status and return to normal libpq state */
1029    pg_result = PQgetResult(m_db_handle);
1030    if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
1031       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1032       m_status = 0;
1033    }
1034    PQclear(pg_result); 
1035
1036    Dmsg0(500, "sql_batch_end finishing\n");
1037
1038    return true;
1039 }
1040
1041 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1042 {
1043    int res;
1044    int count=30;
1045    size_t len;
1046    const char *digest;
1047    char ed1[50];
1048
1049    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1050    pgsql_copy_escape(esc_name, fname, fnl);
1051
1052    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1053    pgsql_copy_escape(esc_path, path, pnl);
1054
1055    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1056       digest = "0";
1057    } else {
1058       digest = ar->Digest;
1059    }
1060
1061    len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n", 
1062               ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path, 
1063               esc_name, ar->attr, digest, ar->DeltaSeq);
1064
1065    do { 
1066       res = PQputCopyData(m_db_handle, cmd, len);
1067    } while (res == 0 && --count > 0);
1068
1069    if (res == 1) {
1070       Dmsg0(500, "ok\n");
1071       changes++;
1072       m_status = 1;
1073    }
1074
1075    if (res <= 0) {
1076       Dmsg0(500, "we failed\n");
1077       m_status = 0;
1078       Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1079       Dmsg1(500, "failure %s\n", errmsg);
1080    }
1081
1082    Dmsg0(500, "sql_batch_insert finishing\n");
1083
1084    return true;
1085 }
1086
1087 /*
1088  * Initialize database data structure. In principal this should
1089  * never have errors, or it is really fatal.
1090  */
1091 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, 
1092                        const char *db_user, const char *db_password, 
1093                        const char *db_address, int db_port, 
1094                        const char *db_socket, bool mult_db_connections, 
1095                        bool disable_batch_insert)
1096 {
1097    B_DB_POSTGRESQL *mdb = NULL;
1098
1099    if (!db_user) {
1100       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1101       return NULL;
1102    }
1103    P(mutex);                          /* lock DB queue */
1104    if (db_list && !mult_db_connections) {
1105       /*
1106        * Look to see if DB already open
1107        */
1108       foreach_dlist(mdb, db_list) {
1109          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1110             Dmsg1(100, "DB REopen %s\n", db_name);
1111             mdb->increment_refcount();
1112             goto bail_out;
1113          }
1114       }
1115    }
1116    Dmsg0(100, "db_init_database first time\n");
1117    mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password, 
1118                              db_address, db_port, db_socket, 
1119                              mult_db_connections, disable_batch_insert));
1120
1121 bail_out:
1122    V(mutex);
1123    return mdb;
1124 }
1125
1126 #endif /* HAVE_POSTGRESQL */