]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Ensure that bvfs SQL link is not shared
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
36  */
37
38 #include "bacula.h"
39
40 #ifdef HAVE_POSTGRESQL
41
42 #include "cats.h"
43 #include "bdb_priv.h"
44 #include "libpq-fe.h"
45 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
48
49 /* -----------------------------------------------------------------------
50  *
51  *   PostgreSQL dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /*
57  * List of open databases
58  */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64                                  const char *db_driver,
65                                  const char *db_name,
66                                  const char *db_user,
67                                  const char *db_password,
68                                  const char *db_address,
69                                  int db_port, 
70                                  const char *db_socket,
71                                  bool mult_db_connections,
72                                  bool disable_batch_insert)
73 {
74    /*
75     * Initialize the parent class members.
76     */
77    m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78    m_db_type = SQL_TYPE_POSTGRESQL;
79    m_db_driver = bstrdup("PostgreSQL");
80    m_db_name = bstrdup(db_name);
81    m_db_user = bstrdup(db_user);
82    if (db_password) {
83       m_db_password = bstrdup(db_password);
84    }
85    if (db_address) {
86       m_db_address = bstrdup(db_address);
87    }
88    if (db_socket) {
89       m_db_socket = bstrdup(db_socket);
90    }
91    m_db_port = db_port;
92    if (disable_batch_insert) {
93       m_disabled_batch_insert = true;
94       m_have_batch_insert = false;
95    } else {
96       m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100       m_have_batch_insert = PQisthreadsafe();
101 #else
102       m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
104 #else
105       m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
107 #else
108       m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
110    }
111    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
112    *errmsg = 0;
113    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114    cached_path = get_pool_memory(PM_FNAME);
115    cached_path_id = 0;
116    m_ref_count = 1;
117    fname = get_pool_memory(PM_FNAME);
118    path = get_pool_memory(PM_FNAME);
119    esc_name = get_pool_memory(PM_FNAME);
120    esc_path = get_pool_memory(PM_FNAME);
121    esc_obj = get_pool_memory(PM_FNAME);
122    m_buf =  get_pool_memory(PM_FNAME);
123    m_allow_transactions = mult_db_connections;
124
125    /* At this time, when mult_db_connections == true, this is for 
126     * specific console command such as bvfs or batch mode, and we don't
127     * want to share a batch mode or bvfs. In the future, we can change
128     * the creation function to add this parameter.
129     */
130    m_dedicated = mult_db_connections; 
131
132    /*
133     * Initialize the private members.
134     */
135    m_db_handle = NULL;
136    m_result = NULL;
137
138    /*
139     * Put the db in the list.
140     */
141    if (db_list == NULL) {
142       db_list = New(dlist(this, &this->m_link));
143    }
144    db_list->append(this);
145 }
146
147 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
148 {
149 }
150
151 /*
152  * Check that the database correspond to the encoding we want
153  */
154 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
155 {
156    SQL_ROW row;
157    int ret = false;
158
159    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
160       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
161       return false;
162    }
163
164    if ((row = mdb->sql_fetch_row()) == NULL) {
165       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
166       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
167    } else {
168       ret = bstrcmp(row[0], "SQL_ASCII");
169
170       if (ret) {
171          /*
172           * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
173           */
174          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
175
176       } else {
177          /*
178           * Something is wrong with database encoding
179           */
180          Mmsg(mdb->errmsg, 
181               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
182               mdb->get_db_name(), row[0]);
183          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
184          Dmsg1(50, "%s", mdb->errmsg);
185       } 
186    }
187    return ret;
188 }
189
190 /*
191  * Now actually open the database.  This can generate errors,
192  *   which are returned in the errmsg
193  *
194  * DO NOT close the database or delete mdb here !!!!
195  */
196 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
197 {
198    bool retval = false;
199    int errstat;
200    char buf[10], *port;
201
202    P(mutex);
203    if (m_connected) {
204       retval = true;
205       goto bail_out;
206    }
207
208    if ((errstat=rwl_init(&m_lock)) != 0) {
209       berrno be;
210       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
211             be.bstrerror(errstat));
212       goto bail_out;
213    }
214
215    if (m_db_port) {
216       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
217       port = buf;
218    } else {
219       port = NULL;
220    }
221
222    /* If connection fails, try at 5 sec intervals for 30 seconds. */
223    for (int retry=0; retry < 6; retry++) {
224       /* connect to the database */
225       m_db_handle = PQsetdbLogin(
226            m_db_address,         /* default = localhost */
227            port,                 /* default port */
228            NULL,                 /* pg options */
229            NULL,                 /* tty, ignored */
230            m_db_name,            /* database name */
231            m_db_user,            /* login name */
232            m_db_password);       /* password */
233
234       /* If no connect, try once more in case it is a timing problem */
235       if (PQstatus(m_db_handle) == CONNECTION_OK) {
236          break;
237       }
238       bmicrosleep(5, 0);
239    }
240
241    Dmsg0(50, "pg_real_connect done\n");
242    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
243         (m_db_password == NULL) ? "(NULL)" : m_db_password);
244
245    if (PQstatus(m_db_handle) != CONNECTION_OK) {
246       Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
247          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
248          m_db_name, m_db_user);
249       goto bail_out;
250    }
251
252    m_connected = true;
253    if (!check_tables_version(jcr, this)) {
254       goto bail_out;
255    }
256
257    sql_query("SET datestyle TO 'ISO, YMD'");
258    sql_query("SET cursor_tuple_fraction=1");
259    
260    /*
261     * Tell PostgreSQL we are using standard conforming strings
262     * and avoid warnings such as:
263     *  WARNING:  nonstandard use of \\ in a string literal
264     */
265    sql_query("SET standard_conforming_strings=on");
266
267    /*
268     * Check that encoding is SQL_ASCII
269     */
270    pgsql_check_database_encoding(jcr, this);
271
272    retval = true;
273
274 bail_out:
275    V(mutex);
276    return retval;
277 }
278
279 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
280 {
281    db_end_transaction(jcr);
282    P(mutex);
283    m_ref_count--;
284    if (m_ref_count == 0) {
285       sql_free_result();
286       db_list->remove(this);
287       if (m_connected && m_db_handle) {
288          PQfinish(m_db_handle);
289       }
290       rwl_destroy(&m_lock);
291       free_pool_memory(errmsg);
292       free_pool_memory(cmd);
293       free_pool_memory(cached_path);
294       free_pool_memory(fname);
295       free_pool_memory(path);
296       free_pool_memory(esc_name);
297       free_pool_memory(esc_path);
298       free_pool_memory(esc_obj);
299       free_pool_memory(m_buf);
300       if (m_db_driver) {
301          free(m_db_driver);
302       }
303       if (m_db_name) {
304          free(m_db_name);
305       }
306       if (m_db_user) {
307          free(m_db_user);
308       }
309       if (m_db_password) {
310          free(m_db_password);
311       }
312       if (m_db_address) {
313          free(m_db_address);
314       }
315       if (m_db_socket) {
316          free(m_db_socket);
317       }
318       delete this;
319       if (db_list->size() == 0) {
320          delete db_list;
321          db_list = NULL;
322       }
323    }
324    V(mutex);
325 }
326
327 void B_DB_POSTGRESQL::db_thread_cleanup(void)
328 {
329 }
330
331 /*
332  * Escape strings so that PostgreSQL is happy
333  *
334  *   NOTE! len is the length of the old string. Your new
335  *         string must be long enough (max 2*old+1) to hold
336  *         the escaped output.
337  */
338 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
339 {
340    int error;
341   
342    PQescapeStringConn(m_db_handle, snew, old, len, &error);
343    if (error) {
344       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
345       /* error on encoding, probably invalid multibyte encoding in the source string
346         see PQescapeStringConn documentation for details. */
347       Dmsg0(500, "PQescapeStringConn failed\n");
348    }
349 }
350
351 /*
352  * Escape binary so that PostgreSQL is happy
353  *
354  */
355 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
356 {
357    size_t new_len;
358    unsigned char *obj;
359
360    obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
361    if (!obj) {
362       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
363    }
364
365    esc_obj = check_pool_memory_size(esc_obj, new_len+1);
366    memcpy(esc_obj, obj, new_len);
367    esc_obj[new_len]=0;
368
369    PQfreemem(obj);
370
371    return (char *)esc_obj;
372 }
373
374 /*
375  * Unescape binary object so that PostgreSQL is happy
376  *
377  */
378 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
379                                          POOLMEM **dest, int32_t *dest_len)
380 {
381    size_t new_len;
382    unsigned char *obj;
383
384    if (!from) {
385       *dest[0] = 0;
386       *dest_len = 0;
387       return;
388    }
389
390    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
391
392    if (!obj) {
393       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
394    }
395
396    *dest_len = new_len;
397    *dest = check_pool_memory_size(*dest, new_len+1);
398    memcpy(*dest, obj, new_len);
399    (*dest)[new_len]=0;
400    
401    PQfreemem(obj);
402
403    Dmsg1(010, "obj size: %d\n", *dest_len);
404 }
405
406 /*
407  * Start a transaction. This groups inserts and makes things
408  * much more efficient. Usually started when inserting
409  * file attributes.
410  */
411 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
412 {
413    if (!jcr->attr) {
414       jcr->attr = get_pool_memory(PM_FNAME);
415    }
416    if (!jcr->ar) {
417       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
418    }
419
420    /*
421     * This is turned off because transactions break
422     * if multiple simultaneous jobs are run.
423     */
424    if (!m_allow_transactions) {
425       return;
426    }
427
428    db_lock(this);
429    /*
430     * Allow only 25,000 changes per transaction
431     */
432    if (m_transaction && changes > 25000) {
433       db_end_transaction(jcr);
434    }
435    if (!m_transaction) {
436       sql_query("BEGIN");  /* begin transaction */
437       Dmsg0(400, "Start PosgreSQL transaction\n");
438       m_transaction = true;
439    }
440    db_unlock(this);
441 }
442
443 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
444 {
445    if (jcr && jcr->cached_attribute) {
446       Dmsg0(400, "Flush last cached attribute.\n");
447       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
448          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
449       }
450       jcr->cached_attribute = false;
451    }
452
453    if (!m_allow_transactions) {
454       return;
455    }
456
457    db_lock(this);
458    if (m_transaction) {
459       sql_query("COMMIT"); /* end transaction */
460       m_transaction = false;
461       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
462    }
463    changes = 0;
464    db_unlock(this);
465 }
466
467
468 /*
469  * Submit a general SQL command (cmd), and for each row returned,
470  * the result_handler is called with the ctx.
471  */
472 bool B_DB_POSTGRESQL::db_big_sql_query(const char *query, 
473                                        DB_RESULT_HANDLER *result_handler, 
474                                        void *ctx)
475 {
476    SQL_ROW row;
477    bool retval = false;
478    bool in_transaction = m_transaction;
479    
480    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
481
482    /* This code handles only SELECT queries */
483    if (strncasecmp(query, "SELECT", 6) != 0) {
484       return db_sql_query(query, result_handler, ctx);
485    }
486
487    if (!result_handler) {       /* no need of big_query without handler */
488       return false;
489    }
490
491    db_lock(this);
492
493    if (!in_transaction) {       /* CURSOR needs transaction */
494       sql_query("BEGIN");
495    }
496
497    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
498
499    if (!sql_query(m_buf)) {
500       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
501       Dmsg0(50, "db_sql_query failed\n");
502       goto bail_out;
503    }
504
505    do {
506       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
507          goto bail_out;
508       }
509       while ((row = sql_fetch_row()) != NULL) {
510          Dmsg1(500, "Fetching %d rows\n", m_num_rows);
511          if (result_handler(ctx, m_num_fields, row))
512             break;
513       }
514       PQclear(m_result);
515       m_result = NULL;
516       
517    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
518
519    sql_query("CLOSE _bac_cursor");
520
521    Dmsg0(500, "db_big_sql_query finished\n");
522    sql_free_result();
523    retval = true;
524
525 bail_out:
526    if (!in_transaction) {
527       sql_query("COMMIT");  /* end transaction */
528    }
529
530    db_unlock(this);
531    return retval;
532 }
533
534 /*
535  * Submit a general SQL command (cmd), and for each row returned,
536  * the result_handler is called with the ctx.
537  */
538 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
539 {
540    SQL_ROW row;
541    bool retval = true;
542
543    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
544
545    db_lock(this);
546    if (!sql_query(query, QF_STORE_RESULT)) {
547       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
548       Dmsg0(500, "db_sql_query failed\n");
549       retval = false;
550       goto bail_out;
551    }
552
553    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
554
555    if (result_handler != NULL) {
556       Dmsg0(500, "db_sql_query invoking handler\n");
557       while ((row = sql_fetch_row()) != NULL) {
558          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
559          if (result_handler(ctx, m_num_fields, row))
560             break;
561       }
562       sql_free_result();
563    }
564
565    Dmsg0(500, "db_sql_query finished\n");
566
567 bail_out:
568    db_unlock(this);
569    return retval;
570 }
571
572 /*
573  * Note, if this routine returns false (failure), Bacula expects
574  * that no result has been stored.
575  * This is where QUERY_DB comes with Postgresql.
576  *
577  *  Returns:  true  on success
578  *            false on failure
579  *
580  */
581 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
582 {
583    int i;
584    bool retval = false;
585
586    Dmsg1(500, "sql_query starts with '%s'\n", query);
587    /*
588     * We are starting a new query. reset everything.
589     */
590    m_num_rows     = -1;
591    m_row_number   = -1;
592    m_field_number = -1;
593
594    if (m_result) {
595       PQclear(m_result);  /* hmm, someone forgot to free?? */
596       m_result = NULL;
597    }
598
599    for (i = 0; i < 10; i++) {
600       m_result = PQexec(m_db_handle, query);
601       if (m_result) {
602          break;
603       }
604       bmicrosleep(5, 0);
605    }
606    if (!m_result) {
607       Dmsg1(50, "Query failed: %s\n", query);
608       goto bail_out;
609    }
610
611    m_status = PQresultStatus(m_result);
612    if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
613       Dmsg0(500, "we have a result\n");
614
615       /*
616        * How many fields in the set?
617        */
618       m_num_fields = (int)PQnfields(m_result);
619       Dmsg1(500, "we have %d fields\n", m_num_fields);
620
621       m_num_rows = PQntuples(m_result);
622       Dmsg1(500, "we have %d rows\n", m_num_rows);
623
624       m_row_number = 0;      /* we can start to fetch something */
625       m_status = 0;          /* succeed */
626       retval = true;
627    } else {
628       Dmsg1(50, "Result status failed: %s\n", query);
629       goto bail_out;
630    }
631
632    Dmsg0(500, "sql_query finishing\n");
633    goto ok_out;
634
635 bail_out:
636    Dmsg0(500, "we failed\n");
637    PQclear(m_result);
638    m_result = NULL;
639    m_status = 1;                   /* failed */
640
641 ok_out:
642    return retval;
643 }
644
645 void B_DB_POSTGRESQL::sql_free_result(void)
646 {
647    db_lock(this);
648    if (m_result) {
649       PQclear(m_result);
650       m_result = NULL;
651    }
652    if (m_rows) {
653       free(m_rows);
654       m_rows = NULL;
655    }
656    if (m_fields) {
657       free(m_fields);
658       m_fields = NULL;
659    }
660    m_num_rows = m_num_fields = 0;
661    db_unlock(this);
662 }
663
664 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
665 {
666    int j;
667    SQL_ROW row = NULL; /* by default, return NULL */
668
669    Dmsg0(500, "sql_fetch_row start\n");
670
671    if (m_num_fields == 0) {     /* No field, no row */
672       Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
673       return NULL;
674    }
675
676    if (!m_rows || m_rows_size < m_num_fields) {
677       if (m_rows) {
678          Dmsg0(500, "sql_fetch_row freeing space\n");
679          free(m_rows);
680       }
681       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
682       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
683       m_rows_size = m_num_fields;
684
685       /*
686        * Now reset the row_number now that we have the space allocated
687        */
688       m_row_number = 0;
689    }
690
691    /*
692     * If still within the result set
693     */
694    if (m_row_number >= 0 && m_row_number < m_num_rows) {
695       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
696       /*
697        * Get each value from this row
698        */
699       for (j = 0; j < m_num_fields; j++) {
700          m_rows[j] = PQgetvalue(m_result, m_row_number, j);
701          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
702       }
703       /*
704        * Increment the row number for the next call
705        */
706       m_row_number++;
707       row = m_rows;
708    } else {
709       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
710    }
711
712    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
713
714    return row;
715 }
716
717 const char *B_DB_POSTGRESQL::sql_strerror(void)
718 {
719    return PQerrorMessage(m_db_handle);
720 }
721
722 void B_DB_POSTGRESQL::sql_data_seek(int row)
723 {
724    /*
725     * Set the row number to be returned on the next call to sql_fetch_row
726     */
727    m_row_number = row;
728 }
729
730 int B_DB_POSTGRESQL::sql_affected_rows(void)
731 {
732    return (unsigned) str_to_int32(PQcmdTuples(m_result));
733 }
734
735 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
736 {
737    int i;
738    uint64_t id = 0;
739    char sequence[NAMEDATALEN-1];
740    char getkeyval_query[NAMEDATALEN+50];
741    PGresult *pg_result;
742
743    /*
744     * First execute the insert query and then retrieve the currval.
745     */
746    if (!sql_query(query)) {
747       return 0;
748    }
749
750    m_num_rows = sql_affected_rows();
751    if (m_num_rows != 1) {
752       return 0;
753    }
754
755    changes++;
756
757    /*
758     * Obtain the current value of the sequence that
759     * provides the serial value for primary key of the table.
760     *
761     * currval is local to our session.  It is not affected by
762     * other transactions.
763     *
764     * Determine the name of the sequence.
765     * PostgreSQL automatically creates a sequence using
766     * <table>_<column>_seq.
767     * At the time of writing, all tables used this format for
768     * for their primary key: <table>id
769     * Except for basefiles which has a primary key on baseid.
770     * Therefore, we need to special case that one table.
771     *
772     * everything else can use the PostgreSQL formula.
773     */
774    if (strcasecmp(table_name, "basefiles") == 0) {
775       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
776    } else {
777       bstrncpy(sequence, table_name, sizeof(sequence));
778       bstrncat(sequence, "_",        sizeof(sequence));
779       bstrncat(sequence, table_name, sizeof(sequence));
780       bstrncat(sequence, "id",       sizeof(sequence));
781    }
782
783    bstrncat(sequence, "_seq", sizeof(sequence));
784    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
785
786    Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
787    for (i = 0; i < 10; i++) {
788       pg_result = PQexec(m_db_handle, getkeyval_query);
789       if (pg_result) {
790          break;
791       }
792       bmicrosleep(5, 0);
793    }
794    if (!pg_result) {
795       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
796       goto bail_out;
797    }
798
799    Dmsg0(500, "exec done");
800
801    if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
802       Dmsg0(500, "getting value");
803       id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
804       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
805    } else {
806       Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
807       Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
808    }
809
810 bail_out:
811    PQclear(pg_result);
812
813    return id;
814 }
815
816 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
817 {
818    int i, j;
819    int max_length;
820    int this_length;
821
822    Dmsg0(500, "sql_fetch_field starts\n");
823
824    if (!m_fields || m_fields_size < m_num_fields) {
825       if (m_fields) {
826          free(m_fields);
827          m_fields = NULL;
828       }
829       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
830       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
831       m_fields_size = m_num_fields;
832
833       for (i = 0; i < m_num_fields; i++) {
834          Dmsg1(500, "filling field %d\n", i);
835          m_fields[i].name = PQfname(m_result, i);
836          m_fields[i].type = PQftype(m_result, i);
837          m_fields[i].flags = 0;
838
839          /*
840           * For a given column, find the max length.
841           */
842          max_length = 0;
843          for (j = 0; j < m_num_rows; j++) {
844             if (PQgetisnull(m_result, j, i)) {
845                 this_length = 4;        /* "NULL" */
846             } else {
847                 this_length = cstrlen(PQgetvalue(m_result, j, i));
848             }
849          
850             if (max_length < this_length) {
851                max_length = this_length;
852             }
853          }
854          m_fields[i].max_length = max_length;
855
856          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
857                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
858       }
859    }
860
861    /*
862     * Increment field number for the next time around
863     */
864    return &m_fields[m_field_number++];
865 }
866
867 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
868 {
869    switch (field_type) {
870    case 1:
871       return true;
872    default:
873       return false;
874    }
875 }
876
877 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
878 {
879    /*
880     * TEMP: the following is taken from select OID, typname from pg_type;
881     */
882    switch (field_type) {
883    case 20:
884    case 21:
885    case 23:
886    case 700:
887    case 701:
888       return true;
889    default:
890       return false;
891    }
892 }
893
894 /*
895  * Escape strings so that PostgreSQL is happy on COPY
896  *
897  *   NOTE! len is the length of the old string. Your new
898  *         string must be long enough (max 2*old+1) to hold
899  *         the escaped output.
900  */
901 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
902 {
903    /* we have to escape \t, \n, \r, \ */
904    char c = '\0' ;
905
906    while (len > 0 && *src) {
907       switch (*src) {
908       case '\n':
909          c = 'n';
910          break;
911       case '\\':
912          c = '\\';
913          break;
914       case '\t':
915          c = 't';
916          break;
917       case '\r':
918          c = 'r';
919          break;
920       default:
921          c = '\0' ;
922       }
923
924       if (c) {
925          *dest = '\\';
926          dest++;
927          *dest = c;
928       } else {
929          *dest = *src;
930       }
931
932       len--;
933       src++;
934       dest++;
935    }
936
937    *dest = '\0';
938    return dest;
939 }
940
941 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
942 {
943    const char *query = "COPY batch FROM STDIN";
944
945    Dmsg0(500, "sql_batch_start started\n");
946
947    if (!sql_query("CREATE TEMPORARY TABLE batch ("
948                           "FileIndex int,"
949                           "JobId int,"
950                           "Path varchar,"
951                           "Name varchar,"
952                           "LStat varchar,"
953                           "Md5 varchar,"
954                           "DeltaSeq smallint)")) {
955       Dmsg0(500, "sql_batch_start failed\n");
956       return false;
957    }
958    
959    /*
960     * We are starting a new query.  reset everything.
961     */
962    m_num_rows     = -1;
963    m_row_number   = -1;
964    m_field_number = -1;
965
966    sql_free_result();
967
968    for (int i=0; i < 10; i++) {
969       m_result = PQexec(m_db_handle, query);
970       if (m_result) {
971          break;
972       }
973       bmicrosleep(5, 0);
974    }
975    if (!m_result) {
976       Dmsg1(50, "Query failed: %s\n", query);
977       goto bail_out;
978    }
979
980    m_status = PQresultStatus(m_result);
981    if (m_status == PGRES_COPY_IN) {
982       /*
983        * How many fields in the set?
984        */
985       m_num_fields = (int) PQnfields(m_result);
986       m_num_rows = 0;
987       m_status = 1;
988    } else {
989       Dmsg1(50, "Result status failed: %s\n", query);
990       goto bail_out;
991    }
992
993    Dmsg0(500, "sql_batch_start finishing\n");
994
995    return true;
996
997 bail_out:
998    Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
999    m_status = 0;
1000    PQclear(m_result);
1001    m_result = NULL;
1002    return false;
1003 }
1004
1005 /*
1006  * Set error to something to abort operation
1007  */
1008 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1009 {
1010    int res;
1011    int count=30;
1012    PGresult *pg_result;
1013
1014    Dmsg0(500, "sql_batch_end started\n");
1015
1016    do { 
1017       res = PQputCopyEnd(m_db_handle, error);
1018    } while (res == 0 && --count > 0);
1019
1020    if (res == 1) {
1021       Dmsg0(500, "ok\n");
1022       m_status = 1;
1023    }
1024    
1025    if (res <= 0) {
1026       Dmsg0(500, "we failed\n");
1027       m_status = 0;
1028       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1029       Dmsg1(500, "failure %s\n", errmsg);
1030    }
1031
1032    /* Check command status and return to normal libpq state */
1033    pg_result = PQgetResult(m_db_handle);
1034    if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
1035       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1036       m_status = 0;
1037    }
1038    PQclear(pg_result); 
1039
1040    Dmsg0(500, "sql_batch_end finishing\n");
1041
1042    return true;
1043 }
1044
1045 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1046 {
1047    int res;
1048    int count=30;
1049    size_t len;
1050    const char *digest;
1051    char ed1[50];
1052
1053    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1054    pgsql_copy_escape(esc_name, fname, fnl);
1055
1056    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1057    pgsql_copy_escape(esc_path, path, pnl);
1058
1059    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1060       digest = "0";
1061    } else {
1062       digest = ar->Digest;
1063    }
1064
1065    len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n", 
1066               ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path, 
1067               esc_name, ar->attr, digest, ar->DeltaSeq);
1068
1069    do { 
1070       res = PQputCopyData(m_db_handle, cmd, len);
1071    } while (res == 0 && --count > 0);
1072
1073    if (res == 1) {
1074       Dmsg0(500, "ok\n");
1075       changes++;
1076       m_status = 1;
1077    }
1078
1079    if (res <= 0) {
1080       Dmsg0(500, "we failed\n");
1081       m_status = 0;
1082       Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1083       Dmsg1(500, "failure %s\n", errmsg);
1084    }
1085
1086    Dmsg0(500, "sql_batch_insert finishing\n");
1087
1088    return true;
1089 }
1090
1091 /*
1092  * Initialize database data structure. In principal this should
1093  * never have errors, or it is really fatal.
1094  */
1095 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, 
1096                        const char *db_user, const char *db_password, 
1097                        const char *db_address, int db_port, 
1098                        const char *db_socket, bool mult_db_connections, 
1099                        bool disable_batch_insert)
1100 {
1101    B_DB_POSTGRESQL *mdb = NULL;
1102
1103    if (!db_user) {
1104       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1105       return NULL;
1106    }
1107    P(mutex);                          /* lock DB queue */
1108    if (db_list && !mult_db_connections) {
1109       /*
1110        * Look to see if DB already open
1111        */
1112       foreach_dlist(mdb, db_list) {
1113          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1114             Dmsg1(100, "DB REopen %s\n", db_name);
1115             mdb->increment_refcount();
1116             goto bail_out;
1117          }
1118       }
1119    }
1120    Dmsg0(100, "db_init_database first time\n");
1121    mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password, 
1122                              db_address, db_port, db_socket, 
1123                              mult_db_connections, disable_batch_insert));
1124
1125 bail_out:
1126    V(mutex);
1127    return mdb;
1128 }
1129
1130 #endif /* HAVE_POSTGRESQL */