]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Backport of class based catalog backends into Branch-5.1.
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
36  */
37
38 #include "bacula.h"
39
40 #ifdef HAVE_POSTGRESQL
41
42 #include "cats.h"
43 #include "bdb_priv.h"
44 #include "libpq-fe.h"
45 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
48
49 /* -----------------------------------------------------------------------
50  *
51  *   PostgreSQL dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /*
57  * List of open databases
58  */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64                                  const char *db_driver,
65                                  const char *db_name,
66                                  const char *db_user,
67                                  const char *db_password,
68                                  const char *db_address,
69                                  int db_port, 
70                                  const char *db_socket,
71                                  bool mult_db_connections,
72                                  bool disable_batch_insert)
73 {
74    /*
75     * Initialize the parent class members.
76     */
77    m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78    m_db_type = SQL_TYPE_POSTGRESQL;
79    m_db_driver = bstrdup("PostgreSQL");
80    m_db_name = bstrdup(db_name);
81    m_db_user = bstrdup(db_user);
82    if (db_password) {
83       m_db_password = bstrdup(db_password);
84    }
85    if (db_address) {
86       m_db_address = bstrdup(db_address);
87    }
88    if (db_socket) {
89       m_db_socket = bstrdup(db_socket);
90    }
91    m_db_port = db_port;
92    if (disable_batch_insert) {
93       m_disabled_batch_insert = true;
94       m_have_batch_insert = false;
95    } else {
96       m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100       m_have_batch_insert = PQisthreadsafe();
101 #else
102       m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
104 #else
105       m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
107 #else
108       m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
110    }
111    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
112    *errmsg = 0;
113    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114    cached_path = get_pool_memory(PM_FNAME);
115    cached_path_id = 0;
116    m_ref_count = 1;
117    fname = get_pool_memory(PM_FNAME);
118    path = get_pool_memory(PM_FNAME);
119    esc_name = get_pool_memory(PM_FNAME);
120    esc_path = get_pool_memory(PM_FNAME);
121    m_allow_transactions = mult_db_connections;
122
123    /*
124     * Initialize the private members.
125     */
126    m_db_handle = NULL;
127    m_result = NULL;
128
129    /*
130     * Put the db in the list.
131     */
132    if (db_list == NULL) {
133       db_list = New(dlist(this, &this->m_link));
134    }
135    db_list->append(this);
136 }
137
138 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
139 {
140 }
141
142 /*
143  * Check that the database correspond to the encoding we want
144  */
145 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
146 {
147    SQL_ROW row;
148    int ret = false;
149
150    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
151       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
152       return false;
153    }
154
155    if ((row = mdb->sql_fetch_row()) == NULL) {
156       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
157       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
158    } else {
159       ret = bstrcmp(row[0], "SQL_ASCII");
160
161       if (ret) {
162          /*
163           * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
164           */
165          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
166
167       } else {
168          /*
169           * Something is wrong with database encoding
170           */
171          Mmsg(mdb->errmsg, 
172               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
173               mdb->get_db_name(), row[0]);
174          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
175          Dmsg1(50, "%s", mdb->errmsg);
176       } 
177    }
178    return ret;
179 }
180
181 /*
182  * Now actually open the database.  This can generate errors,
183  *   which are returned in the errmsg
184  *
185  * DO NOT close the database or delete mdb here !!!!
186  */
187 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
188 {
189    bool retval = false;
190    int errstat;
191    char buf[10], *port;
192
193    P(mutex);
194    if (m_connected) {
195       retval = true;
196       goto bail_out;
197    }
198
199    if ((errstat=rwl_init(&m_lock)) != 0) {
200       berrno be;
201       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
202             be.bstrerror(errstat));
203       goto bail_out;
204    }
205
206    if (m_db_port) {
207       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
208       port = buf;
209    } else {
210       port = NULL;
211    }
212
213    /* If connection fails, try at 5 sec intervals for 30 seconds. */
214    for (int retry=0; retry < 6; retry++) {
215       /* connect to the database */
216       m_db_handle = PQsetdbLogin(
217            m_db_address,         /* default = localhost */
218            port,                 /* default port */
219            NULL,                 /* pg options */
220            NULL,                 /* tty, ignored */
221            m_db_name,            /* database name */
222            m_db_user,            /* login name */
223            m_db_password);       /* password */
224
225       /* If no connect, try once more in case it is a timing problem */
226       if (PQstatus(m_db_handle) == CONNECTION_OK) {
227          break;
228       }
229       bmicrosleep(5, 0);
230    }
231
232    Dmsg0(50, "pg_real_connect done\n");
233    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
234         (m_db_password == NULL) ? "(NULL)" : m_db_password);
235
236    if (PQstatus(m_db_handle) != CONNECTION_OK) {
237       Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
238          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
239          m_db_name, m_db_user);
240       goto bail_out;
241    }
242
243    m_connected = true;
244    if (!check_tables_version(jcr, this)) {
245       goto bail_out;
246    }
247
248    sql_query("SET datestyle TO 'ISO, YMD'");
249    
250    /*
251     * Tell PostgreSQL we are using standard conforming strings
252     * and avoid warnings such as:
253     *  WARNING:  nonstandard use of \\ in a string literal
254     */
255    sql_query("SET standard_conforming_strings=on");
256
257    /*
258     * Check that encoding is SQL_ASCII
259     */
260    pgsql_check_database_encoding(jcr, this);
261
262    retval = true;
263
264 bail_out:
265    V(mutex);
266    return retval;
267 }
268
269 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
270 {
271    db_end_transaction(jcr);
272    P(mutex);
273    sql_free_result();
274    m_ref_count--;
275    if (m_ref_count == 0) {
276       db_list->remove(this);
277       if (m_connected && m_db_handle) {
278          PQfinish(m_db_handle);
279       }
280       rwl_destroy(&m_lock);
281       free_pool_memory(errmsg);
282       free_pool_memory(cmd);
283       free_pool_memory(cached_path);
284       free_pool_memory(fname);
285       free_pool_memory(path);
286       free_pool_memory(esc_name);
287       free_pool_memory(esc_path);
288       if (m_db_driver) {
289          free(m_db_driver);
290       }
291       if (m_db_name) {
292          free(m_db_name);
293       }
294       if (m_db_user) {
295          free(m_db_user);
296       }
297       if (m_db_password) {
298          free(m_db_password);
299       }
300       if (m_db_address) {
301          free(m_db_address);
302       }
303       if (m_db_socket) {
304          free(m_db_socket);
305       }
306       if (esc_obj) {
307          PQfreemem(esc_obj);
308       }
309       delete this;
310       if (db_list->size() == 0) {
311          delete db_list;
312          db_list = NULL;
313       }
314    }
315    V(mutex);
316 }
317
318 void B_DB_POSTGRESQL::db_thread_cleanup(void)
319 {
320 }
321
322 /*
323  * Escape strings so that PostgreSQL is happy
324  *
325  *   NOTE! len is the length of the old string. Your new
326  *         string must be long enough (max 2*old+1) to hold
327  *         the escaped output.
328  */
329 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
330 {
331    int error;
332   
333    PQescapeStringConn(m_db_handle, snew, old, len, &error);
334    if (error) {
335       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
336       /* error on encoding, probably invalid multibyte encoding in the source string
337         see PQescapeStringConn documentation for details. */
338       Dmsg0(500, "PQescapeStringConn failed\n");
339    }
340 }
341
342 /*
343  * Escape binary so that PostgreSQL is happy
344  *
345  */
346 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
347 {
348    size_t new_len;
349    unsigned char *obj;
350
351    obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
352    if (!obj) {
353       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
354    }
355
356    esc_obj = check_pool_memory_size(esc_obj, new_len+1);
357    memcpy(esc_obj, obj, new_len);
358    esc_obj[new_len]=0;
359
360    PQfreemem(obj);
361
362    return (char *)esc_obj;
363 }
364
365 /*
366  * Unescape binary object so that PostgreSQL is happy
367  *
368  */
369 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
370                                          POOLMEM **dest, int32_t *dest_len)
371 {
372    size_t new_len;
373    unsigned char *obj;
374
375    if (!from) {
376       *dest[0] = 0;
377       *dest_len = 0;
378       return;
379    }
380
381    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
382
383    if (!obj) {
384       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
385    }
386
387    *dest_len = new_len;
388    *dest = check_pool_memory_size(*dest, new_len+1);
389    memcpy(*dest, obj, new_len);
390    (*dest)[new_len]=0;
391    
392    PQfreemem(obj);
393
394    Dmsg1(010, "obj size: %d\n", *dest_len);
395 }
396
397 /*
398  * Start a transaction. This groups inserts and makes things
399  * much more efficient. Usually started when inserting
400  * file attributes.
401  */
402 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
403 {
404    if (!jcr->attr) {
405       jcr->attr = get_pool_memory(PM_FNAME);
406    }
407    if (!jcr->ar) {
408       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
409    }
410
411    /*
412     * This is turned off because transactions break
413     * if multiple simultaneous jobs are run.
414     */
415    if (!m_allow_transactions) {
416       return;
417    }
418
419    db_lock(this);
420    /*
421     * Allow only 25,000 changes per transaction
422     */
423    if (m_transaction && changes > 25000) {
424       db_end_transaction(jcr);
425    }
426    if (!m_transaction) {
427       sql_query("BEGIN");  /* begin transaction */
428       Dmsg0(400, "Start PosgreSQL transaction\n");
429       m_transaction = true;
430    }
431    db_unlock(this);
432 }
433
434 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
435 {
436    if (jcr && jcr->cached_attribute) {
437       Dmsg0(400, "Flush last cached attribute.\n");
438       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
439          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
440       }
441       jcr->cached_attribute = false;
442    }
443
444    if (!m_allow_transactions) {
445       return;
446    }
447
448    db_lock(this);
449    if (m_transaction) {
450       sql_query("COMMIT"); /* end transaction */
451       m_transaction = false;
452       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
453    }
454    changes = 0;
455    db_unlock(this);
456 }
457
458 /*
459  * Submit a general SQL command (cmd), and for each row returned,
460  * the result_handler is called with the ctx.
461  */
462 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
463 {
464    SQL_ROW row;
465    bool retval = true;
466
467    Dmsg1(500, "db_sql_query starts with '%s'\n", query);
468
469    db_lock(this);
470    if (!sql_query(query, QF_STORE_RESULT)) {
471       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
472       Dmsg0(500, "db_sql_query failed\n");
473       retval = false;
474       goto bail_out;
475    }
476
477    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
478
479    if (result_handler != NULL) {
480       Dmsg0(500, "db_sql_query invoking handler\n");
481       while ((row = sql_fetch_row()) != NULL) {
482          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
483          if (result_handler(ctx, m_num_fields, row))
484             break;
485       }
486       sql_free_result();
487    }
488
489    Dmsg0(500, "db_sql_query finished\n");
490
491 bail_out:
492    db_unlock(this);
493    return retval;
494 }
495
496 /*
497  * Note, if this routine returns false (failure), Bacula expects
498  * that no result has been stored.
499  * This is where QUERY_DB comes with Postgresql.
500  *
501  *  Returns:  true  on success
502  *            false on failure
503  *
504  */
505 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
506 {
507    int i;
508    bool retval = false;
509
510    Dmsg1(500, "sql_query starts with '%s'\n", query);
511    /*
512     * We are starting a new query. reset everything.
513     */
514    m_num_rows     = -1;
515    m_row_number   = -1;
516    m_field_number = -1;
517
518    if (m_result) {
519       PQclear(m_result);  /* hmm, someone forgot to free?? */
520       m_result = NULL;
521    }
522
523    for (i = 0; i < 10; i++) {
524       m_result = PQexec(m_db_handle, query);
525       if (m_result) {
526          break;
527       }
528       bmicrosleep(5, 0);
529    }
530    if (!m_result) {
531       Dmsg1(50, "Query failed: %s\n", query);
532       goto bail_out;
533    }
534
535    m_status = PQresultStatus(m_result);
536    if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
537       Dmsg0(500, "we have a result\n");
538
539       /*
540        * How many fields in the set?
541        */
542       m_num_fields = (int)PQnfields(m_result);
543       Dmsg1(500, "we have %d fields\n", m_num_fields);
544
545       m_num_rows = PQntuples(m_result);
546       Dmsg1(500, "we have %d rows\n", m_num_rows);
547
548       m_row_number = 0;      /* we can start to fetch something */
549       m_status = 0;          /* succeed */
550       retval = true;
551    } else {
552       Dmsg1(50, "Result status failed: %s\n", query);
553       goto bail_out;
554    }
555
556    Dmsg0(500, "sql_query finishing\n");
557    goto ok_out;
558
559 bail_out:
560    Dmsg0(500, "we failed\n");
561    PQclear(m_result);
562    m_result = NULL;
563    m_status = 1;                   /* failed */
564
565 ok_out:
566    return retval;
567 }
568
569 void B_DB_POSTGRESQL::sql_free_result(void)
570 {
571    db_lock(this);
572    if (m_result) {
573       PQclear(m_result);
574       m_result = NULL;
575    }
576    if (m_rows) {
577       free(m_rows);
578       m_rows = NULL;
579    }
580    if (m_fields) {
581       free(m_fields);
582       m_fields = NULL;
583    }
584    m_num_rows = m_num_fields = 0;
585    db_unlock(this);
586 }
587
588 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
589 {
590    int j;
591    SQL_ROW row = NULL; /* by default, return NULL */
592
593    Dmsg0(500, "sql_fetch_row start\n");
594
595    if (!m_rows || m_rows_size < m_num_fields) {
596       if (m_rows) {
597          Dmsg0(500, "sql_fetch_row freeing space\n");
598          free(m_rows);
599       }
600       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
601       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
602       m_rows_size = m_num_fields;
603
604       /*
605        * Now reset the row_number now that we have the space allocated
606        */
607       m_row_number = 0;
608    }
609
610    /*
611     * If still within the result set
612     */
613    if (m_row_number >= 0 && m_row_number < m_num_rows) {
614       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
615       /*
616        * Get each value from this row
617        */
618       for (j = 0; j < m_num_fields; j++) {
619          m_rows[j] = PQgetvalue(m_result, m_row_number, j);
620          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
621       }
622       /*
623        * Increment the row number for the next call
624        */
625       m_row_number++;
626       row = m_rows;
627    } else {
628       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
629    }
630
631    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
632
633    return row;
634 }
635
636 const char *B_DB_POSTGRESQL::sql_strerror(void)
637 {
638    return PQerrorMessage(m_db_handle);
639 }
640
641 void B_DB_POSTGRESQL::sql_data_seek(int row)
642 {
643    /*
644     * Set the row number to be returned on the next call to sql_fetch_row
645     */
646    m_row_number = row;
647 }
648
649 int B_DB_POSTGRESQL::sql_affected_rows(void)
650 {
651    return (unsigned) str_to_int32(PQcmdTuples(m_result));
652 }
653
654 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
655 {
656    int i;
657    uint64_t id = 0;
658    char sequence[NAMEDATALEN-1];
659    char getkeyval_query[NAMEDATALEN+50];
660    PGresult *pg_result;
661
662    /*
663     * First execute the insert query and then retrieve the currval.
664     */
665    if (!sql_query(query)) {
666       return 0;
667    }
668
669    m_num_rows = sql_affected_rows();
670    if (m_num_rows != 1) {
671       return 0;
672    }
673
674    changes++;
675
676    /*
677     * Obtain the current value of the sequence that
678     * provides the serial value for primary key of the table.
679     *
680     * currval is local to our session.  It is not affected by
681     * other transactions.
682     *
683     * Determine the name of the sequence.
684     * PostgreSQL automatically creates a sequence using
685     * <table>_<column>_seq.
686     * At the time of writing, all tables used this format for
687     * for their primary key: <table>id
688     * Except for basefiles which has a primary key on baseid.
689     * Therefore, we need to special case that one table.
690     *
691     * everything else can use the PostgreSQL formula.
692     */
693    if (strcasecmp(table_name, "basefiles") == 0) {
694       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
695    } else {
696       bstrncpy(sequence, table_name, sizeof(sequence));
697       bstrncat(sequence, "_",        sizeof(sequence));
698       bstrncat(sequence, table_name, sizeof(sequence));
699       bstrncat(sequence, "id",       sizeof(sequence));
700    }
701
702    bstrncat(sequence, "_seq", sizeof(sequence));
703    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
704
705    Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
706    for (i = 0; i < 10; i++) {
707       pg_result = PQexec(m_db_handle, getkeyval_query);
708       if (pg_result) {
709          break;
710       }
711       bmicrosleep(5, 0);
712    }
713    if (!pg_result) {
714       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
715       goto bail_out;
716    }
717
718    Dmsg0(500, "exec done");
719
720    if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
721       Dmsg0(500, "getting value");
722       id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
723       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
724    } else {
725       Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
726       Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
727    }
728
729 bail_out:
730    PQclear(pg_result);
731
732    return id;
733 }
734
735 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
736 {
737    int i, j;
738    int max_length;
739    int this_length;
740
741    Dmsg0(500, "sql_fetch_field starts\n");
742
743    if (!m_fields || m_fields_size < m_num_fields) {
744       if (m_fields) {
745          free(m_fields);
746          m_fields = NULL;
747       }
748       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
749       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
750       m_fields_size = m_num_fields;
751
752       for (i = 0; i < m_num_fields; i++) {
753          Dmsg1(500, "filling field %d\n", i);
754          m_fields[i].name = PQfname(m_result, i);
755          m_fields[i].type = PQftype(m_result, i);
756          m_fields[i].flags = 0;
757
758          /*
759           * For a given column, find the max length.
760           */
761          max_length = 0;
762          for (j = 0; j < m_num_rows; j++) {
763             if (PQgetisnull(m_result, j, i)) {
764                 this_length = 4;        /* "NULL" */
765             } else {
766                 this_length = cstrlen(PQgetvalue(m_result, j, i));
767             }
768          
769             if (max_length < this_length) {
770                max_length = this_length;
771             }
772          }
773          m_fields[i].max_length = max_length;
774
775          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
776                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
777       }
778    }
779
780    /*
781     * Increment field number for the next time around
782     */
783    return &m_fields[m_field_number++];
784 }
785
786 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
787 {
788    switch (field_type) {
789    case 1:
790       return true;
791    default:
792       return false;
793    }
794 }
795
796 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
797 {
798    /*
799     * TEMP: the following is taken from select OID, typname from pg_type;
800     */
801    switch (field_type) {
802    case 20:
803    case 21:
804    case 23:
805    case 700:
806    case 701:
807       return true;
808    default:
809       return false;
810    }
811 }
812
813 /*
814  * Escape strings so that PostgreSQL is happy on COPY
815  *
816  *   NOTE! len is the length of the old string. Your new
817  *         string must be long enough (max 2*old+1) to hold
818  *         the escaped output.
819  */
820 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
821 {
822    /* we have to escape \t, \n, \r, \ */
823    char c = '\0' ;
824
825    while (len > 0 && *src) {
826       switch (*src) {
827       case '\n':
828          c = 'n';
829          break;
830       case '\\':
831          c = '\\';
832          break;
833       case '\t':
834          c = 't';
835          break;
836       case '\r':
837          c = 'r';
838          break;
839       default:
840          c = '\0' ;
841       }
842
843       if (c) {
844          *dest = '\\';
845          dest++;
846          *dest = c;
847       } else {
848          *dest = *src;
849       }
850
851       len--;
852       src++;
853       dest++;
854    }
855
856    *dest = '\0';
857    return dest;
858 }
859
860 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
861 {
862    const char *query = "COPY batch FROM STDIN";
863
864    Dmsg0(500, "sql_batch_start started\n");
865
866    if (!sql_query("CREATE TEMPORARY TABLE batch ("
867                           "fileindex int,"
868                           "jobid int,"
869                           "path varchar,"
870                           "name varchar,"
871                           "lstat varchar,"
872                           "md5 varchar,"
873                           "markid int)")) {
874       Dmsg0(500, "sql_batch_start failed\n");
875       return false;
876    }
877    
878    /*
879     * We are starting a new query.  reset everything.
880     */
881    m_num_rows     = -1;
882    m_row_number   = -1;
883    m_field_number = -1;
884
885    sql_free_result();
886
887    for (int i=0; i < 10; i++) {
888       m_result = PQexec(m_db_handle, query);
889       if (m_result) {
890          break;
891       }
892       bmicrosleep(5, 0);
893    }
894    if (!m_result) {
895       Dmsg1(50, "Query failed: %s\n", query);
896       goto bail_out;
897    }
898
899    m_status = PQresultStatus(m_result);
900    if (m_status == PGRES_COPY_IN) {
901       /*
902        * How many fields in the set?
903        */
904       m_num_fields = (int) PQnfields(m_result);
905       m_num_rows = 0;
906       m_status = 1;
907    } else {
908       Dmsg1(50, "Result status failed: %s\n", query);
909       goto bail_out;
910    }
911
912    Dmsg0(500, "sql_batch_start finishing\n");
913
914    return true;
915
916 bail_out:
917    Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
918    m_status = 0;
919    PQclear(m_result);
920    m_result = NULL;
921    return false;
922 }
923
924 /*
925  * Set error to something to abort operation
926  */
927 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
928 {
929    int res;
930    int count=30;
931    PGresult *pg_result;
932
933    Dmsg0(500, "sql_batch_end started\n");
934
935    do { 
936       res = PQputCopyEnd(m_db_handle, error);
937    } while (res == 0 && --count > 0);
938
939    if (res == 1) {
940       Dmsg0(500, "ok\n");
941       m_status = 1;
942    }
943    
944    if (res <= 0) {
945       Dmsg0(500, "we failed\n");
946       m_status = 0;
947       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
948       Dmsg1(500, "failure %s\n", errmsg);
949    }
950
951    /* Check command status and return to normal libpq state */
952    pg_result = PQgetResult(m_db_handle);
953    if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
954       Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
955       m_status = 0;
956    }
957    PQclear(pg_result); 
958
959    Dmsg0(500, "sql_batch_end finishing\n");
960
961    return true;
962 }
963
964 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
965 {
966    int res;
967    int count=30;
968    size_t len;
969    const char *digest;
970    char ed1[50];
971
972    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
973    pgsql_copy_escape(esc_name, fname, fnl);
974
975    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
976    pgsql_copy_escape(esc_path, path, pnl);
977
978    if (ar->Digest == NULL || ar->Digest[0] == 0) {
979       digest = "0";
980    } else {
981       digest = ar->Digest;
982    }
983
984    len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n", 
985               ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path, 
986               esc_name, ar->attr, digest, ar->DeltaSeq);
987
988    do { 
989       res = PQputCopyData(m_db_handle, cmd, len);
990    } while (res == 0 && --count > 0);
991
992    if (res == 1) {
993       Dmsg0(500, "ok\n");
994       changes++;
995       m_status = 1;
996    }
997
998    if (res <= 0) {
999       Dmsg0(500, "we failed\n");
1000       m_status = 0;
1001       Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1002       Dmsg1(500, "failure %s\n", errmsg);
1003    }
1004
1005    Dmsg0(500, "sql_batch_insert finishing\n");
1006
1007    return true;
1008 }
1009
1010 /*
1011  * Initialize database data structure. In principal this should
1012  * never have errors, or it is really fatal.
1013  */
1014 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, 
1015                        const char *db_user, const char *db_password, 
1016                        const char *db_address, int db_port, 
1017                        const char *db_socket, bool mult_db_connections, 
1018                        bool disable_batch_insert)
1019 {
1020    B_DB_POSTGRESQL *mdb = NULL;
1021
1022    if (!db_user) {
1023       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1024       return NULL;
1025    }
1026    P(mutex);                          /* lock DB queue */
1027    if (db_list && !mult_db_connections) {
1028       /*
1029        * Look to see if DB already open
1030        */
1031       foreach_dlist(mdb, db_list) {
1032          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1033             Dmsg1(100, "DB REopen %s\n", db_name);
1034             mdb->increment_refcount();
1035             goto bail_out;
1036          }
1037       }
1038    }
1039    Dmsg0(100, "db_init_database first time\n");
1040    mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password, 
1041                              db_address, db_port, db_socket, 
1042                              mult_db_connections, disable_batch_insert));
1043
1044 bail_out:
1045    V(mutex);
1046    return mdb;
1047 }
1048
1049 #endif /* HAVE_POSTGRESQL */