]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
backport code from master
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
36  */
37
38 #include "bacula.h"
39
40 #ifdef HAVE_POSTGRESQL
41
42 #include "cats.h"
43 #include "bdb_priv.h"
44 #include "libpq-fe.h"
45 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
48
49 /* -----------------------------------------------------------------------
50  *
51  *   PostgreSQL dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /*
57  * List of open databases
58  */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64                                  const char *db_driver,
65                                  const char *db_name,
66                                  const char *db_user,
67                                  const char *db_password,
68                                  const char *db_address,
69                                  int db_port, 
70                                  const char *db_socket,
71                                  bool mult_db_connections,
72                                  bool disable_batch_insert)
73 {
74    /*
75     * Initialize the parent class members.
76     */
77    m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78    m_db_type = SQL_TYPE_POSTGRESQL;
79    m_db_driver = bstrdup("PostgreSQL");
80    m_db_name = bstrdup(db_name);
81    m_db_user = bstrdup(db_user);
82    if (db_password) {
83       m_db_password = bstrdup(db_password);
84    }
85    if (db_address) {
86       m_db_address = bstrdup(db_address);
87    }
88    if (db_socket) {
89       m_db_socket = bstrdup(db_socket);
90    }
91    m_db_port = db_port;
92    if (disable_batch_insert) {
93       m_disabled_batch_insert = true;
94       m_have_batch_insert = false;
95    } else {
96       m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100       m_have_batch_insert = PQisthreadsafe();
101 #else
102       m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
104 #else
105       m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
107 #else
108       m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
110    }
111    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
112    *errmsg = 0;
113    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114    cached_path = get_pool_memory(PM_FNAME);
115    cached_path_id = 0;
116    m_ref_count = 1;
117    fname = get_pool_memory(PM_FNAME);
118    path = get_pool_memory(PM_FNAME);
119    esc_name = get_pool_memory(PM_FNAME);
120    esc_path = get_pool_memory(PM_FNAME);
121    esc_obj = get_pool_memory(PM_FNAME);
122    m_buf =  get_pool_memory(PM_FNAME);
123    m_allow_transactions = mult_db_connections;
124
125    /* At this time, when mult_db_connections == true, this is for 
126     * specific console command such as bvfs or batch mode, and we don't
127     * want to share a batch mode or bvfs. In the future, we can change
128     * the creation function to add this parameter.
129     */
130    m_dedicated = mult_db_connections; 
131
132    /*
133     * Initialize the private members.
134     */
135    m_db_handle = NULL;
136    m_result = NULL;
137
138    /*
139     * Put the db in the list.
140     */
141    if (db_list == NULL) {
142       db_list = New(dlist(this, &this->m_link));
143    }
144    db_list->append(this);
145 }
146
147 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
148 {
149 }
150
151 /*
152  * Check that the database correspond to the encoding we want
153  */
154 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
155 {
156    SQL_ROW row;
157    int ret = false;
158
159    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
160       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
161       return false;
162    }
163
164    if ((row = mdb->sql_fetch_row()) == NULL) {
165       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
166       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
167    } else {
168       ret = bstrcmp(row[0], "SQL_ASCII");
169
170       if (ret) {
171          /*
172           * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
173           */
174          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
175
176       } else {
177          /*
178           * Something is wrong with database encoding
179           */
180          Mmsg(mdb->errmsg, 
181               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
182               mdb->get_db_name(), row[0]);
183          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
184          Dmsg1(50, "%s", mdb->errmsg);
185       } 
186    }
187    return ret;
188 }
189
190 /*
191  * Now actually open the database.  This can generate errors,
192  *   which are returned in the errmsg
193  *
194  * DO NOT close the database or delete mdb here !!!!
195  */
196 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
197 {
198    bool retval = false;
199    int errstat;
200    char buf[10], *port;
201
202    P(mutex);
203    if (m_connected) {
204       retval = true;
205       goto bail_out;
206    }
207
208    if ((errstat=rwl_init(&m_lock)) != 0) {
209       berrno be;
210       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
211             be.bstrerror(errstat));
212       goto bail_out;
213    }
214
215    if (m_db_port) {
216       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
217       port = buf;
218    } else {
219       port = NULL;
220    }
221
222    /* If connection fails, try at 5 sec intervals for 30 seconds. */
223    for (int retry=0; retry < 6; retry++) {
224       /* connect to the database */
225       m_db_handle = PQsetdbLogin(
226            m_db_address,         /* default = localhost */
227            port,                 /* default port */
228            NULL,                 /* pg options */
229            NULL,                 /* tty, ignored */
230            m_db_name,            /* database name */
231            m_db_user,            /* login name */
232            m_db_password);       /* password */
233
234       /* If no connect, try once more in case it is a timing problem */
235       if (PQstatus(m_db_handle) == CONNECTION_OK) {
236          break;
237       }
238       bmicrosleep(5, 0);
239    }
240
241    Dmsg0(50, "pg_real_connect done\n");
242    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
243         (m_db_password == NULL) ? "(NULL)" : m_db_password);
244
245    if (PQstatus(m_db_handle) != CONNECTION_OK) {
246       Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
247          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
248          m_db_name, m_db_user);
249       goto bail_out;
250    }
251
252    m_connected = true;
253    if (!check_tables_version(jcr, this)) {
254       goto bail_out;
255    }
256
257    sql_query("SET datestyle TO 'ISO, YMD'");
258    sql_query("SET cursor_tuple_fraction=1");
259    
260    /*
261     * Tell PostgreSQL we are using standard conforming strings
262     * and avoid warnings such as:
263     *  WARNING:  nonstandard use of \\ in a string literal
264     */
265    sql_query("SET standard_conforming_strings=on");
266
267    /*
268     * Check that encoding is SQL_ASCII
269     */
270    pgsql_check_database_encoding(jcr, this);
271
272    retval = true;
273
274 bail_out:
275    V(mutex);
276    return retval;
277 }
278
279 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
280 {
281    if (m_connected) {
282       db_end_transaction(jcr);
283    }
284    P(mutex);
285    m_ref_count--;
286    if (m_ref_count == 0) {
287       if (m_connected) {
288          sql_free_result();
289       }
290       db_list->remove(this);
291       if (m_connected && m_db_handle) {
292          PQfinish(m_db_handle);
293       }
294       if (rwl_is_init(&m_lock)) {
295          rwl_destroy(&m_lock);
296       }
297       free_pool_memory(errmsg);
298       free_pool_memory(cmd);
299       free_pool_memory(cached_path);
300       free_pool_memory(fname);
301       free_pool_memory(path);
302       free_pool_memory(esc_name);
303       free_pool_memory(esc_path);
304       free_pool_memory(esc_obj);
305       free_pool_memory(m_buf);
306       if (m_db_driver) {
307          free(m_db_driver);
308       }
309       if (m_db_name) {
310          free(m_db_name);
311       }
312       if (m_db_user) {
313          free(m_db_user);
314       }
315       if (m_db_password) {
316          free(m_db_password);
317       }
318       if (m_db_address) {
319          free(m_db_address);
320       }
321       if (m_db_socket) {
322          free(m_db_socket);
323       }
324       delete this;
325       if (db_list->size() == 0) {
326          delete db_list;
327          db_list = NULL;
328       }
329    }
330    V(mutex);
331 }
332
333 void B_DB_POSTGRESQL::db_thread_cleanup(void)
334 {
335 }
336
337 /*
338  * Escape strings so that PostgreSQL is happy
339  *
340  *   NOTE! len is the length of the old string. Your new
341  *         string must be long enough (max 2*old+1) to hold
342  *         the escaped output.
343  */
344 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
345 {
346    int error;
347   
348    PQescapeStringConn(m_db_handle, snew, old, len, &error);
349    if (error) {
350       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
351       /* error on encoding, probably invalid multibyte encoding in the source string
352         see PQescapeStringConn documentation for details. */
353       Dmsg0(500, "PQescapeStringConn failed\n");
354    }
355 }
356
357 /*
358  * Escape binary so that PostgreSQL is happy
359  *
360  */
361 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
362 {
363    size_t new_len;
364    unsigned char *obj;
365
366    obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
367    if (!obj) {
368       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
369    }
370
371    esc_obj = check_pool_memory_size(esc_obj, new_len+1);
372    memcpy(esc_obj, obj, new_len);
373    esc_obj[new_len]=0;
374
375    PQfreemem(obj);
376
377    return (char *)esc_obj;
378 }
379
380 /*
381  * Unescape binary object so that PostgreSQL is happy
382  *
383  */
384 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
385                                          POOLMEM **dest, int32_t *dest_len)
386 {
387    size_t new_len;
388    unsigned char *obj;
389
390    if (!from) {
391       *dest[0] = 0;
392       *dest_len = 0;
393       return;
394    }
395
396    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
397
398    if (!obj) {
399       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
400    }
401
402    *dest_len = new_len;
403    *dest = check_pool_memory_size(*dest, new_len+1);
404    memcpy(*dest, obj, new_len);
405    (*dest)[new_len]=0;
406    
407    PQfreemem(obj);
408
409    Dmsg1(010, "obj size: %d\n", *dest_len);
410 }
411
412 /*
413  * Start a transaction. This groups inserts and makes things
414  * much more efficient. Usually started when inserting
415  * file attributes.
416  */
417 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
418 {
419    if (!jcr->attr) {
420       jcr->attr = get_pool_memory(PM_FNAME);
421    }
422    if (!jcr->ar) {
423       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
424    }
425
426    /*
427     * This is turned off because transactions break
428     * if multiple simultaneous jobs are run.
429     */
430    if (!m_allow_transactions) {
431       return;
432    }
433
434    db_lock(this);
435    /*
436     * Allow only 25,000 changes per transaction
437     */
438    if (m_transaction && changes > 25000) {
439       db_end_transaction(jcr);
440    }
441    if (!m_transaction) {
442       sql_query("BEGIN");  /* begin transaction */
443       Dmsg0(400, "Start PosgreSQL transaction\n");
444       m_transaction = true;
445    }
446    db_unlock(this);
447 }
448
449 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
450 {
451    if (jcr && jcr->cached_attribute) {
452       Dmsg0(400, "Flush last cached attribute.\n");
453       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
454          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
455       }
456       jcr->cached_attribute = false;
457    }
458
459    if (!m_allow_transactions) {
460       return;
461    }
462
463    db_lock(this);
464    if (m_transaction) {
465       sql_query("COMMIT"); /* end transaction */
466       m_transaction = false;
467       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
468    }
469    changes = 0;
470    db_unlock(this);
471 }
472
473
474 /*
475  * Submit a general SQL command (cmd), and for each row returned,
476  * the result_handler is called with the ctx.
477  */
478 bool B_DB_POSTGRESQL::db_big_sql_query(const char *query, 
479                                        DB_RESULT_HANDLER *result_handler, 
480                                        void *ctx)
481 {
482    SQL_ROW row;
483    bool retval = false;
484    bool in_transaction = m_transaction;
485    
486    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
487
488    /* This code handles only SELECT queries */
489    if (strncasecmp(query, "SELECT", 6) != 0) {
490       return db_sql_query(query, result_handler, ctx);
491    }
492
493    if (!result_handler) {       /* no need of big_query without handler */
494       return false;
495    }
496
497    db_lock(this);
498
499    if (!in_transaction) {       /* CURSOR needs transaction */
500       sql_query("BEGIN");
501    }
502
503    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
504
505    if (!sql_query(m_buf)) {
506       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
507       Dmsg0(50, "db_sql_query failed\n");
508       goto bail_out;
509    }
510
511    do {
512       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
513          goto bail_out;
514       }
515       while ((row = sql_fetch_row()) != NULL) {
516          Dmsg1(500, "Fetching %d rows\n", m_num_rows);
517          if (result_handler(ctx, m_num_fields, row))
518             break;
519       }
520       PQclear(m_result);
521       m_result = NULL;
522       
523    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
524
525    sql_query("CLOSE _bac_cursor");
526
527    Dmsg0(500, "db_big_sql_query finished\n");
528    sql_free_result();
529    retval = true;
530
531 bail_out:
532    if (!in_transaction) {
533       sql_query("COMMIT");  /* end transaction */
534    }
535
536    db_unlock(this);
537    return retval;
538 }
539
540 /*
541  * Submit a general SQL command (cmd), and for each row returned,
542  * the result_handler is called with the ctx.
543  */
544 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
545 {
546    SQL_ROW row;
547    bool retval = true;
548
549    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
550
551    db_lock(this);
552    if (!sql_query(query, QF_STORE_RESULT)) {
553       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
554       Dmsg0(500, "db_sql_query failed\n");
555       retval = false;
556       goto bail_out;
557    }
558
559    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
560
561    if (result_handler != NULL) {
562       Dmsg0(500, "db_sql_query invoking handler\n");
563       while ((row = sql_fetch_row()) != NULL) {
564          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
565          if (result_handler(ctx, m_num_fields, row))
566             break;
567       }
568       sql_free_result();
569    }
570
571    Dmsg0(500, "db_sql_query finished\n");
572
573 bail_out:
574    db_unlock(this);
575    return retval;
576 }
577
578 /*
579  * Note, if this routine returns false (failure), Bacula expects
580  * that no result has been stored.
581  * This is where QUERY_DB comes with Postgresql.
582  *
583  *  Returns:  true  on success
584  *            false on failure
585  *
586  */
587 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
588 {
589    int i;
590    bool retval = false;
591
592    Dmsg1(500, "sql_query starts with '%s'\n", query);
593    /*
594     * We are starting a new query. reset everything.
595     */
596    m_num_rows     = -1;
597    m_row_number   = -1;
598    m_field_number = -1;
599
600    if (m_result) {
601       PQclear(m_result);  /* hmm, someone forgot to free?? */
602       m_result = NULL;
603    }
604
605    for (i = 0; i < 10; i++) {
606       m_result = PQexec(m_db_handle, query);
607       if (m_result) {
608          break;
609       }
610       bmicrosleep(5, 0);
611    }
612    if (!m_result) {
613       Dmsg1(50, "Query failed: %s\n", query);
614       goto bail_out;
615    }
616
617    m_status = PQresultStatus(m_result);
618    if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
619       Dmsg0(500, "we have a result\n");
620
621       /*
622        * How many fields in the set?
623        */
624       m_num_fields = (int)PQnfields(m_result);
625       Dmsg1(500, "we have %d fields\n", m_num_fields);
626
627       m_num_rows = PQntuples(m_result);
628       Dmsg1(500, "we have %d rows\n", m_num_rows);
629
630       m_row_number = 0;      /* we can start to fetch something */
631       m_status = 0;          /* succeed */
632       retval = true;
633    } else {
634       Dmsg1(50, "Result status failed: %s\n", query);
635       goto bail_out;
636    }
637
638    Dmsg0(500, "sql_query finishing\n");
639    goto ok_out;
640
641 bail_out:
642    Dmsg0(500, "we failed\n");
643    PQclear(m_result);
644    m_result = NULL;
645    m_status = 1;                   /* failed */
646
647 ok_out:
648    return retval;
649 }
650
651 void B_DB_POSTGRESQL::sql_free_result(void)
652 {
653    db_lock(this);
654    if (m_result) {
655       PQclear(m_result);
656       m_result = NULL;
657    }
658    if (m_rows) {
659       free(m_rows);
660       m_rows = NULL;
661    }
662    if (m_fields) {
663       free(m_fields);
664       m_fields = NULL;
665    }
666    m_num_rows = m_num_fields = 0;
667    db_unlock(this);
668 }
669
670 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
671 {
672    int j;
673    SQL_ROW row = NULL; /* by default, return NULL */
674
675    Dmsg0(500, "sql_fetch_row start\n");
676
677    if (m_num_fields == 0) {     /* No field, no row */
678       Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
679       return NULL;
680    }
681
682    if (!m_rows || m_rows_size < m_num_fields) {
683       if (m_rows) {
684          Dmsg0(500, "sql_fetch_row freeing space\n");
685          free(m_rows);
686       }
687       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
688       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
689       m_rows_size = m_num_fields;
690
691       /*
692        * Now reset the row_number now that we have the space allocated
693        */
694       m_row_number = 0;
695    }
696
697    /*
698     * If still within the result set
699     */
700    if (m_row_number >= 0 && m_row_number < m_num_rows) {
701       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
702       /*
703        * Get each value from this row
704        */
705       for (j = 0; j < m_num_fields; j++) {
706          m_rows[j] = PQgetvalue(m_result, m_row_number, j);
707          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
708       }
709       /*
710        * Increment the row number for the next call
711        */
712       m_row_number++;
713       row = m_rows;
714    } else {
715       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
716    }
717
718    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
719
720    return row;
721 }
722
723 const char *B_DB_POSTGRESQL::sql_strerror(void)
724 {
725    return PQerrorMessage(m_db_handle);
726 }
727
728 void B_DB_POSTGRESQL::sql_data_seek(int row)
729 {
730    /*
731     * Set the row number to be returned on the next call to sql_fetch_row
732     */
733    m_row_number = row;
734 }
735
736 int B_DB_POSTGRESQL::sql_affected_rows(void)
737 {
738    return (unsigned) str_to_int32(PQcmdTuples(m_result));
739 }
740
741 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
742 {
743    int i;
744    uint64_t id = 0;
745    char sequence[NAMEDATALEN-1];
746    char getkeyval_query[NAMEDATALEN+50];
747    PGresult *pg_result;
748
749    /*
750     * First execute the insert query and then retrieve the currval.
751     */
752    if (!sql_query(query)) {
753       return 0;
754    }
755
756    m_num_rows = sql_affected_rows();
757    if (m_num_rows != 1) {
758       return 0;
759    }
760
761    changes++;
762
763    /*
764     * Obtain the current value of the sequence that
765     * provides the serial value for primary key of the table.
766     *
767     * currval is local to our session.  It is not affected by
768     * other transactions.
769     *
770     * Determine the name of the sequence.
771     * PostgreSQL automatically creates a sequence using
772     * <table>_<column>_seq.
773     * At the time of writing, all tables used this format for
774     * for their primary key: <table>id
775     * Except for basefiles which has a primary key on baseid.
776     * Therefore, we need to special case that one table.
777     *
778     * everything else can use the PostgreSQL formula.
779     */
780    if (strcasecmp(table_name, "basefiles") == 0) {
781       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
782    } else {
783       bstrncpy(sequence, table_name, sizeof(sequence));
784       bstrncat(sequence, "_",        sizeof(sequence));
785       bstrncat(sequence, table_name, sizeof(sequence));
786       bstrncat(sequence, "id",       sizeof(sequence));
787    }
788
789    bstrncat(sequence, "_seq", sizeof(sequence));
790    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
791
792    Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
793    for (i = 0; i < 10; i++) {
794       pg_result = PQexec(m_db_handle, getkeyval_query);
795       if (pg_result) {
796          break;
797       }
798       bmicrosleep(5, 0);
799    }
800    if (!pg_result) {
801       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
802       goto bail_out;
803    }
804
805    Dmsg0(500, "exec done");
806
807    if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
808       Dmsg0(500, "getting value");
809       id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
810       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
811    } else {
812       Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
813       Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
814    }
815
816 bail_out:
817    PQclear(pg_result);
818
819    return id;
820 }
821
822 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
823 {
824    int i, j;
825    int max_length;
826    int this_length;
827
828    Dmsg0(500, "sql_fetch_field starts\n");
829
830    if (!m_fields || m_fields_size < m_num_fields) {
831       if (m_fields) {
832          free(m_fields);
833          m_fields = NULL;
834       }
835       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
836       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
837       m_fields_size = m_num_fields;
838
839       for (i = 0; i < m_num_fields; i++) {
840          Dmsg1(500, "filling field %d\n", i);
841          m_fields[i].name = PQfname(m_result, i);
842          m_fields[i].type = PQftype(m_result, i);
843          m_fields[i].flags = 0;
844
845          /*
846           * For a given column, find the max length.
847           */
848          max_length = 0;
849          for (j = 0; j < m_num_rows; j++) {
850             if (PQgetisnull(m_result, j, i)) {
851                 this_length = 4;        /* "NULL" */
852             } else {
853                 this_length = cstrlen(PQgetvalue(m_result, j, i));
854             }
855          
856             if (max_length < this_length) {
857                max_length = this_length;
858             }
859          }
860          m_fields[i].max_length = max_length;
861
862          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
863                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
864       }
865    }
866
867    /*
868     * Increment field number for the next time around
869     */
870    return &m_fields[m_field_number++];
871 }
872
873 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
874 {
875    switch (field_type) {
876    case 1:
877       return true;
878    default:
879       return false;
880    }
881 }
882
883 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
884 {
885    /*
886     * TEMP: the following is taken from select OID, typname from pg_type;
887     */
888    switch (field_type) {
889    case 20:
890    case 21:
891    case 23:
892    case 700:
893    case 701:
894       return true;
895    default:
896       return false;
897    }
898 }
899
900 /*
901  * Escape strings so that PostgreSQL is happy on COPY
902  *
903  *   NOTE! len is the length of the old string. Your new
904  *         string must be long enough (max 2*old+1) to hold
905  *         the escaped output.
906  */
907 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
908 {
909    /* we have to escape \t, \n, \r, \ */
910    char c = '\0' ;
911
912    while (len > 0 && *src) {
913       switch (*src) {
914       case '\n':
915          c = 'n';
916          break;
917       case '\\':
918          c = '\\';
919          break;
920       case '\t':
921          c = 't';
922          break;
923       case '\r':
924          c = 'r';
925          break;
926       default:
927          c = '\0' ;
928       }
929
930       if (c) {
931          *dest = '\\';
932          dest++;
933          *dest = c;
934       } else {
935          *dest = *src;
936       }
937
938       len--;
939       src++;
940       dest++;
941    }
942
943    *dest = '\0';
944    return dest;
945 }
946
947 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
948 {
949    const char *query = "COPY batch FROM STDIN";
950
951    Dmsg0(500, "sql_batch_start started\n");
952
953    if (!sql_query("CREATE TEMPORARY TABLE batch ("
954                           "FileIndex int,"
955                           "JobId int,"
956                           "Path varchar,"
957                           "Name varchar,"
958                           "LStat varchar,"
959                           "Md5 varchar,"
960                           "DeltaSeq smallint)")) {
961       Dmsg0(500, "sql_batch_start failed\n");
962       return false;
963    }
964    
965    /*
966     * We are starting a new query.  reset everything.
967     */
968    m_num_rows     = -1;
969    m_row_number   = -1;
970    m_field_number = -1;
971
972    sql_free_result();
973
974    for (int i=0; i < 10; i++) {
975       m_result = PQexec(m_db_handle, query);
976       if (m_result) {
977          break;
978       }
979       bmicrosleep(5, 0);
980    }
981    if (!m_result) {
982       Dmsg1(50, "Query failed: %s\n", query);
983       goto bail_out;
984    }
985
986    m_status = PQresultStatus(m_result);
987    if (m_status == PGRES_COPY_IN) {
988       /*
989        * How many fields in the set?
990        */
991       m_num_fields = (int) PQnfields(m_result);
992       m_num_rows = 0;
993       m_status = 1;
994    } else {
995       Dmsg1(50, "Result status failed: %s\n", query);
996       goto bail_out;
997    }
998
999    Dmsg0(500, "sql_batch_start finishing\n");
1000
1001    return true;
1002
1003 bail_out:
1004    Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
1005    m_status = 0;
1006    PQclear(m_result);
1007    m_result = NULL;
1008    return false;
1009 }
1010
1011 /*
1012  * Set error to something to abort operation
1013  */
1014 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1015 {
1016    int res;
1017    int count=30;
1018    PGresult *pg_result;
1019
1020    Dmsg0(500, "sql_batch_end started\n");
1021
1022    do { 
1023       res = PQputCopyEnd(m_db_handle, error);
1024    } while (res == 0 && --count > 0);
1025
1026    if (res == 1) {
1027       Dmsg0(500, "ok\n");
1028       m_status = 1;
1029    }
1030    
1031    if (res <= 0) {
1032       Dmsg0(500, "we failed\n");
1033       m_status = 0;
1034       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1035       Dmsg1(500, "failure %s\n", errmsg);
1036    }
1037
1038    /* Check command status and return to normal libpq state */
1039    pg_result = PQgetResult(m_db_handle);
1040    if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
1041       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
1042       m_status = 0;
1043    }
1044    PQclear(pg_result); 
1045
1046    Dmsg0(500, "sql_batch_end finishing\n");
1047
1048    return true;
1049 }
1050
1051 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1052 {
1053    int res;
1054    int count=30;
1055    size_t len;
1056    const char *digest;
1057    char ed1[50];
1058
1059    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1060    pgsql_copy_escape(esc_name, fname, fnl);
1061
1062    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1063    pgsql_copy_escape(esc_path, path, pnl);
1064
1065    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1066       digest = "0";
1067    } else {
1068       digest = ar->Digest;
1069    }
1070
1071    len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n", 
1072               ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path, 
1073               esc_name, ar->attr, digest, ar->DeltaSeq);
1074
1075    do { 
1076       res = PQputCopyData(m_db_handle, cmd, len);
1077    } while (res == 0 && --count > 0);
1078
1079    if (res == 1) {
1080       Dmsg0(500, "ok\n");
1081       changes++;
1082       m_status = 1;
1083    }
1084
1085    if (res <= 0) {
1086       Dmsg0(500, "we failed\n");
1087       m_status = 0;
1088       Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1089       Dmsg1(500, "failure %s\n", errmsg);
1090    }
1091
1092    Dmsg0(500, "sql_batch_insert finishing\n");
1093
1094    return true;
1095 }
1096
1097 /*
1098  * Initialize database data structure. In principal this should
1099  * never have errors, or it is really fatal.
1100  */
1101 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, 
1102                        const char *db_user, const char *db_password, 
1103                        const char *db_address, int db_port, 
1104                        const char *db_socket, bool mult_db_connections, 
1105                        bool disable_batch_insert)
1106 {
1107    B_DB_POSTGRESQL *mdb = NULL;
1108
1109    if (!db_user) {
1110       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1111       return NULL;
1112    }
1113    P(mutex);                          /* lock DB queue */
1114    if (db_list && !mult_db_connections) {
1115       /*
1116        * Look to see if DB already open
1117        */
1118       foreach_dlist(mdb, db_list) {
1119          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1120             Dmsg1(100, "DB REopen %s\n", db_name);
1121             mdb->increment_refcount();
1122             goto bail_out;
1123          }
1124       }
1125    }
1126    Dmsg0(100, "db_init_database first time\n");
1127    mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password, 
1128                              db_address, db_port, db_socket, 
1129                              mult_db_connections, disable_batch_insert));
1130
1131 bail_out:
1132    V(mutex);
1133    return mdb;
1134 }
1135
1136 #endif /* HAVE_POSTGRESQL */