]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Fix RestoreObject for PostgreSQL
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  */
36
37
38 /* The following is necessary so that we do not include
39  * the dummy external definition of DB.
40  */
41 #define __SQL_C                       /* indicate that this is sql.c */
42
43 #include "bacula.h"
44 #include "cats.h"
45
46 #ifdef HAVE_POSTGRESQL
47
48 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
49 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb = NULL;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (db_list == NULL) {
90       db_list = New(dlist(mdb, &mdb->link));
91    }
92    if (!mult_db_connections) {
93       /* Look to see if DB already open */
94       foreach_dlist(mdb, db_list) {
95          if (bstrcmp(mdb->db_name, db_name) &&
96              bstrcmp(mdb->db_address, db_address) &&
97              mdb->db_port == db_port) {
98             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
99             mdb->ref_count++;
100             V(mutex);
101             return mdb;                  /* already open */
102          }
103       }
104    }
105    Dmsg0(100, "db_open first time\n");
106    mdb = (B_DB *)malloc(sizeof(B_DB));
107    memset(mdb, 0, sizeof(B_DB));
108    mdb->db_name = bstrdup(db_name);
109    mdb->db_user = bstrdup(db_user);
110    if (db_password) {
111       mdb->db_password = bstrdup(db_password);
112    }
113    if (db_address) {
114       mdb->db_address  = bstrdup(db_address);
115    }
116    if (db_socket) {
117       mdb->db_socket   = bstrdup(db_socket);
118    }
119    mdb->db_port        = db_port;
120    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
121    *mdb->errmsg        = 0;
122    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
123    mdb->cached_path    = get_pool_memory(PM_FNAME);
124    mdb->cached_path_id = 0;
125    mdb->ref_count      = 1;
126    mdb->fname          = get_pool_memory(PM_FNAME);
127    mdb->path           = get_pool_memory(PM_FNAME);
128    mdb->esc_name       = get_pool_memory(PM_FNAME);
129    mdb->esc_path       = get_pool_memory(PM_FNAME);
130    mdb->allow_transactions = mult_db_connections;
131    db_list->append(mdb);                   /* put db in list */
132    V(mutex);
133    return mdb;
134 }
135
136 /* Check that the database correspond to the encoding we want */
137 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
138 {
139    SQL_ROW row;
140    int ret=false;
141
142    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
143       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
144       return false;
145    }
146
147    if ((row = sql_fetch_row(mdb)) == NULL) {
148       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
149       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
150    } else {
151       ret = bstrcmp(row[0], "SQL_ASCII");
152
153       if (ret) {
154          /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
155          db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
156
157       } else {                  /* something is wrong with database encoding */
158          Mmsg(mdb->errmsg, 
159               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
160               mdb->db_name, row[0]);
161          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
162          Dmsg1(50, "%s", mdb->errmsg);
163       } 
164    }
165    return ret;
166 }
167
168 /*
169  * Now actually open the database.  This can generate errors,
170  *   which are returned in the errmsg
171  *
172  * DO NOT close the database or free(mdb) here !!!!
173  */
174 int
175 db_open_database(JCR *jcr, B_DB *mdb)
176 {
177    int errstat;
178    char buf[10], *port;
179
180    P(mutex);
181    if (mdb->connected) {
182       V(mutex);
183       return 1;
184    }
185    mdb->connected = false;
186
187    if ((errstat=rwl_init(&mdb->lock)) != 0) {
188       berrno be;
189       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
190             be.bstrerror(errstat));
191       V(mutex);
192       return 0;
193    }
194
195    if (mdb->db_port) {
196       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
197       port = buf;
198    } else {
199       port = NULL;
200    }
201
202    /* If connection fails, try at 5 sec intervals for 30 seconds. */
203    for (int retry=0; retry < 6; retry++) {
204       /* connect to the database */
205       mdb->db = PQsetdbLogin(
206            mdb->db_address,           /* default = localhost */
207            port,                      /* default port */
208            NULL,                      /* pg options */
209            NULL,                      /* tty, ignored */
210            mdb->db_name,              /* database name */
211            mdb->db_user,              /* login name */
212            mdb->db_password);         /* password */
213
214       /* If no connect, try once more in case it is a timing problem */
215       if (PQstatus(mdb->db) == CONNECTION_OK) {
216          break;
217       }
218       bmicrosleep(5, 0);
219    }
220
221    Dmsg0(50, "pg_real_connect done\n");
222    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
223             mdb->db_password==NULL?"(NULL)":mdb->db_password);
224
225    if (PQstatus(mdb->db) != CONNECTION_OK) {
226       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
227          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
228          mdb->db_name, mdb->db_user);
229       V(mutex);
230       return 0;
231    }
232
233    mdb->connected = true;
234
235    if (!check_tables_version(jcr, mdb)) {
236       V(mutex);
237       return 0;
238    }
239
240    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
241    
242    /* tell PostgreSQL we are using standard conforming strings
243       and avoid warnings such as:
244        WARNING:  nonstandard use of \\ in a string literal
245    */
246    sql_query(mdb, "set standard_conforming_strings=on");
247
248    /* check that encoding is SQL_ASCII */
249    check_database_encoding(jcr, mdb);
250
251    V(mutex);
252    return 1;
253 }
254
255 void
256 db_close_database(JCR *jcr, B_DB *mdb)
257 {
258    if (!mdb) {
259       return;
260    }
261    db_end_transaction(jcr, mdb);
262    P(mutex);
263    sql_free_result(mdb);
264    mdb->ref_count--;
265    if (mdb->ref_count == 0) {
266       db_list->remove(mdb);
267       if (mdb->connected && mdb->db) {
268          sql_close(mdb);
269       }
270       rwl_destroy(&mdb->lock);
271       free_pool_memory(mdb->errmsg);
272       free_pool_memory(mdb->cmd);
273       free_pool_memory(mdb->cached_path);
274       free_pool_memory(mdb->fname);
275       free_pool_memory(mdb->path);
276       free_pool_memory(mdb->esc_name);
277       free_pool_memory(mdb->esc_path);
278       if (mdb->db_name) {
279          free(mdb->db_name);
280       }
281       if (mdb->db_user) {
282          free(mdb->db_user);
283       }
284       if (mdb->db_password) {
285          free(mdb->db_password);
286       }
287       if (mdb->db_address) {
288          free(mdb->db_address);
289       }
290       if (mdb->db_socket) {
291          free(mdb->db_socket);
292       }
293       if (mdb->esc_obj) {
294          PQfreemem(mdb->esc_obj);
295       }
296       free(mdb);
297       if (db_list->size() == 0) {
298          delete db_list;
299          db_list = NULL;
300       }
301    }
302    V(mutex);
303 }
304
305 void db_check_backend_thread_safe()
306 {
307 #ifdef HAVE_BATCH_FILE_INSERT
308 # ifdef HAVE_PQISTHREADSAFE 
309    if (!PQisthreadsafe()) {
310       Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
311                           "when using BatchMode.\n"));
312    }
313 # endif
314 #endif
315 }
316
317 void db_thread_cleanup()
318 { }
319
320 /*
321  * Return the next unique index (auto-increment) for
322  * the given table.  Return NULL on error.
323  *
324  * For PostgreSQL, NULL causes the auto-increment value
325  *  to be updated.
326  */
327 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
328 {
329    strcpy(index, "NULL");
330    return 1;
331 }
332
333 /*
334  * Escape binary so that PostgreSQL is happy
335  *
336  */
337 char *
338 db_escape_object(JCR *jcr, B_DB *mdb, char *old, int len)
339 {
340    size_t new_len;
341    if (mdb->esc_obj) {
342       PQfreemem(mdb->esc_obj);
343    }
344
345    mdb->esc_obj = PQescapeByteaConn(mdb->db, (unsigned const char *)old,
346                                     len, &new_len);
347
348    if (!mdb->esc_obj) {
349       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
350    }
351
352    return (char *)mdb->esc_obj;
353 }
354
355 /*
356  * Unescape binary object so that PostgreSQL is happy
357  *
358  */
359 void
360 db_unescape_object(JCR *jcr, B_DB *mdb, 
361                    char *from, int32_t expected_len,
362                    POOLMEM **dest, int32_t *dest_len)
363 {
364    size_t new_len;
365    unsigned char *obj;
366
367    if (!from) {
368       *dest[0] = 0;
369       *dest_len = 0;
370       return;
371    }
372
373    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
374
375    if (!obj) {
376       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
377    }
378
379    *dest_len = new_len;
380    *dest = check_pool_memory_size(*dest, new_len+1);
381    memcpy(*dest, obj, new_len);
382    (*dest)[new_len]=0;
383    
384    PQfreemem(obj);
385
386    Dmsg1(000, "obj size: %d\n", *dest_len);
387 }
388
389 /*
390  * Escape strings so that PostgreSQL is happy
391  *
392  *   NOTE! len is the length of the old string. Your new
393  *         string must be long enough (max 2*old+1) to hold
394  *         the escaped output.
395  */
396 void
397 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
398 {
399    int error;
400   
401    PQescapeStringConn(mdb->db, snew, old, len, &error);
402    if (error) {
403       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
404       /* error on encoding, probably invalid multibyte encoding in the source string
405         see PQescapeStringConn documentation for details. */
406       Dmsg0(500, "PQescapeStringConn failed\n");
407    }
408 }
409
410 /*
411  * Submit a general SQL command (cmd), and for each row returned,
412  *  the sqlite_handler is called with the ctx.
413  */
414 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
415 {
416    SQL_ROW row;
417
418    Dmsg0(500, "db_sql_query started\n");
419
420    db_lock(mdb);
421    if (sql_query(mdb, query) != 0) {
422       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
423       db_unlock(mdb);
424       Dmsg0(500, "db_sql_query failed\n");
425       return false;
426    }
427    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
428
429    if (result_handler != NULL) {
430       Dmsg0(500, "db_sql_query invoking handler\n");
431       if ((mdb->result = sql_store_result(mdb)) != NULL) {
432          int num_fields = sql_num_fields(mdb);
433
434          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
435          while ((row = sql_fetch_row(mdb)) != NULL) {
436
437             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
438             if (result_handler(ctx, num_fields, row))
439                break;
440          }
441
442         sql_free_result(mdb);
443       }
444    }
445    db_unlock(mdb);
446
447    Dmsg0(500, "db_sql_query finished\n");
448
449    return true;
450 }
451
452
453
454 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
455 {
456    int j;
457    POSTGRESQL_ROW row = NULL; // by default, return NULL
458
459    Dmsg0(500, "my_postgresql_fetch_row start\n");
460
461    if (!mdb->row || mdb->row_size < mdb->num_fields) {
462       int num_fields = mdb->num_fields;
463       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
464
465       if (mdb->row) {
466          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
467          free(mdb->row);
468       }
469       num_fields += 20;                  /* add a bit extra */
470       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
471       mdb->row_size = num_fields;
472
473       // now reset the row_number now that we have the space allocated
474       mdb->row_number = 0;
475    }
476
477    // if still within the result set
478    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
479       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
480       // get each value from this row
481       for (j = 0; j < mdb->num_fields; j++) {
482          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
483          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
484       }
485       // increment the row number for the next call
486       mdb->row_number++;
487
488       row = mdb->row;
489    } else {
490       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
491    }
492
493    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
494
495    return row;
496 }
497
498 int my_postgresql_max_length(B_DB *mdb, int field_num) {
499    //
500    // for a given column, find the max length
501    //
502    int max_length;
503    int i;
504    int this_length;
505
506    max_length = 0;
507    for (i = 0; i < mdb->num_rows; i++) {
508       if (PQgetisnull(mdb->result, i, field_num)) {
509           this_length = 4;        // "NULL"
510       } else {
511           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
512       }
513
514       if (max_length < this_length) {
515           max_length = this_length;
516       }
517    }
518
519    return max_length;
520 }
521
522 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
523 {
524    int     i;
525
526    Dmsg0(500, "my_postgresql_fetch_field starts\n");
527
528    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
529       if (mdb->fields) {
530          free(mdb->fields);
531       }
532       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
533       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
534       mdb->fields_size = mdb->num_fields;
535
536       for (i = 0; i < mdb->num_fields; i++) {
537          Dmsg1(500, "filling field %d\n", i);
538          mdb->fields[i].name           = PQfname(mdb->result, i);
539          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
540          mdb->fields[i].type       = PQftype(mdb->result, i);
541          mdb->fields[i].flags      = 0;
542
543          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
544             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
545             mdb->fields[i].flags);
546       } // end for
547    } // end if
548
549    // increment field number for the next time around
550
551    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
552    return &mdb->fields[mdb->field_number++];
553 }
554
555 void my_postgresql_data_seek(B_DB *mdb, int row)
556 {
557    // set the row number to be returned on the next call
558    // to my_postgresql_fetch_row
559    mdb->row_number = row;
560 }
561
562 void my_postgresql_field_seek(B_DB *mdb, int field)
563 {
564    mdb->field_number = field;
565 }
566
567 /*
568  * Note, if this routine returns 1 (failure), Bacula expects
569  *  that no result has been stored.
570  * This is where QUERY_DB comes with Postgresql.
571  *
572  *  Returns:  0  on success
573  *            1  on failure
574  *
575  */
576 int my_postgresql_query(B_DB *mdb, const char *query)
577 {
578    Dmsg0(500, "my_postgresql_query started\n");
579    // We are starting a new query.  reset everything.
580    mdb->num_rows     = -1;
581    mdb->row_number   = -1;
582    mdb->field_number = -1;
583
584    if (mdb->result) {
585       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
586       mdb->result = NULL;
587    }
588
589    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
590
591    for (int i=0; i < 10; i++) {
592       mdb->result = PQexec(mdb->db, query);
593       if (mdb->result) {
594          break;
595       }
596       bmicrosleep(5, 0);
597    }
598    if (!mdb->result) {
599       Dmsg1(50, "Query failed: %s\n", query);
600       goto bail_out;
601    }
602
603    mdb->status = PQresultStatus(mdb->result);
604    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
605       Dmsg1(500, "we have a result\n", query);
606
607       // how many fields in the set?
608       mdb->num_fields = (int)PQnfields(mdb->result);
609       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
610
611       mdb->num_rows = PQntuples(mdb->result);
612       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
613
614       mdb->row_number = 0;      /* we can start to fetch something */
615       mdb->status = 0;          /* succeed */
616    } else {
617       Dmsg1(50, "Result status failed: %s\n", query);
618       goto bail_out;
619    }
620
621    Dmsg0(500, "my_postgresql_query finishing\n");
622    return mdb->status;
623
624 bail_out:
625    Dmsg1(500, "we failed\n", query);
626    PQclear(mdb->result);
627    mdb->result = NULL;
628    mdb->status = 1;                   /* failed */
629    return mdb->status;
630 }
631
632 void my_postgresql_free_result(B_DB *mdb)
633 {
634    
635    db_lock(mdb);
636    if (mdb->result) {
637       PQclear(mdb->result);
638       mdb->result = NULL;
639    }
640
641    if (mdb->row) {
642       free(mdb->row);
643       mdb->row = NULL;
644    }
645
646    if (mdb->fields) {
647       free(mdb->fields);
648       mdb->fields = NULL;
649    }
650    db_unlock(mdb);
651 }
652
653 static int my_postgresql_currval(B_DB *mdb, const char *table_name)
654 {
655    // Obtain the current value of the sequence that
656    // provides the serial value for primary key of the table.
657
658    // currval is local to our session.  It is not affected by
659    // other transactions.
660
661    // Determine the name of the sequence.
662    // PostgreSQL automatically creates a sequence using
663    // <table>_<column>_seq.
664    // At the time of writing, all tables used this format for
665    // for their primary key: <table>id
666    // Except for basefiles which has a primary key on baseid.
667    // Therefore, we need to special case that one table.
668
669    // everything else can use the PostgreSQL formula.
670
671    char      sequence[NAMEDATALEN-1];
672    char      query   [NAMEDATALEN+50];
673    PGresult *result;
674    int       id = 0;
675
676    if (strcasecmp(table_name, "basefiles") == 0) {
677       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
678    } else {
679       bstrncpy(sequence, table_name, sizeof(sequence));
680       bstrncat(sequence, "_",        sizeof(sequence));
681       bstrncat(sequence, table_name, sizeof(sequence));
682       bstrncat(sequence, "id",       sizeof(sequence));
683    }
684
685    bstrncat(sequence, "_seq", sizeof(sequence));
686    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
687
688    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
689    for (int i=0; i < 10; i++) {
690       result = PQexec(mdb->db, query);
691       if (result) {
692          break;
693       }
694       bmicrosleep(5, 0);
695    }
696    if (!result) {
697       Dmsg1(50, "Query failed: %s\n", query);
698       goto bail_out;
699    }
700
701    Dmsg0(500, "exec done");
702
703    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
704       Dmsg0(500, "getting value");
705       id = atoi(PQgetvalue(result, 0, 0));
706       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
707    } else {
708       Dmsg1(50, "Result status failed: %s\n", query);
709       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
710    }
711
712 bail_out:
713    PQclear(result);
714    
715    return id;
716 }
717
718 int my_postgresql_insert_autokey_record(B_DB *mdb, const char *query, const char *table_name)
719 {
720    /*
721     * First execute the insert query and then retrieve the currval.
722     */
723    if (my_postgresql_query(mdb, query)) {
724       return 0;
725    }
726
727    mdb->num_rows = sql_affected_rows(mdb);
728    if (mdb->num_rows != 1) {
729       return 0;
730    }
731
732    mdb->changes++;
733
734    return my_postgresql_currval(mdb, table_name);
735 }
736
737 #ifdef HAVE_BATCH_FILE_INSERT
738
739 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
740 {
741    const char *query = "COPY batch FROM STDIN";
742
743    Dmsg0(500, "my_postgresql_batch_start started\n");
744
745    if (my_postgresql_query(mdb,
746                            "CREATE TEMPORARY TABLE batch ("
747                                "fileindex int,"
748                                "jobid int,"
749                                "path varchar,"
750                                "name varchar,"
751                                "lstat varchar,"
752                                "md5 varchar)") == 1)
753    {
754       Dmsg0(500, "my_postgresql_batch_start failed\n");
755       return 1;
756    }
757    
758    // We are starting a new query.  reset everything.
759    mdb->num_rows     = -1;
760    mdb->row_number   = -1;
761    mdb->field_number = -1;
762
763    my_postgresql_free_result(mdb);
764
765    for (int i=0; i < 10; i++) {
766       mdb->result = PQexec(mdb->db, query);
767       if (mdb->result) {
768          break;
769       }
770       bmicrosleep(5, 0);
771    }
772    if (!mdb->result) {
773       Dmsg1(50, "Query failed: %s\n", query);
774       goto bail_out;
775    }
776
777    mdb->status = PQresultStatus(mdb->result);
778    if (mdb->status == PGRES_COPY_IN) {
779       // how many fields in the set?
780       mdb->num_fields = (int) PQnfields(mdb->result);
781       mdb->num_rows   = 0;
782       mdb->status = 1;
783    } else {
784       Dmsg1(50, "Result status failed: %s\n", query);
785       goto bail_out;
786    }
787
788    Dmsg0(500, "my_postgresql_batch_start finishing\n");
789
790    return mdb->status;
791
792 bail_out:
793    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
794    mdb->status = 0;
795    PQclear(mdb->result);
796    mdb->result = NULL;
797    return mdb->status;
798 }
799
800 /* set error to something to abort operation */
801 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
802 {
803    int res;
804    int count=30;
805    PGresult *result;
806    Dmsg0(500, "my_postgresql_batch_end started\n");
807
808    if (!mdb) {                  /* no files ? */
809       return 0;
810    }
811
812    do { 
813       res = PQputCopyEnd(mdb->db, error);
814    } while (res == 0 && --count > 0);
815
816    if (res == 1) {
817       Dmsg0(500, "ok\n");
818       mdb->status = 1;
819    }
820    
821    if (res <= 0) {
822       Dmsg0(500, "we failed\n");
823       mdb->status = 0;
824       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
825    }
826
827    /* Check command status and return to normal libpq state */
828    result = PQgetResult(mdb->db);
829    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
830       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
831       mdb->status = 0;
832    }
833    PQclear(result); 
834
835    Dmsg0(500, "my_postgresql_batch_end finishing\n");
836
837    return mdb->status;
838 }
839
840 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
841 {
842    int res;
843    int count=30;
844    size_t len;
845    const char *digest;
846    char ed1[50];
847
848    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
849    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
850
851    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
852    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
853
854    if (ar->Digest == NULL || ar->Digest[0] == 0) {
855       digest = "0";
856    } else {
857       digest = ar->Digest;
858    }
859
860    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
861               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
862               mdb->esc_name, ar->attr, digest);
863
864    do { 
865       res = PQputCopyData(mdb->db,
866                           mdb->cmd,
867                           len);
868    } while (res == 0 && --count > 0);
869
870    if (res == 1) {
871       Dmsg0(500, "ok\n");
872       mdb->changes++;
873       mdb->status = 1;
874    }
875
876    if (res <= 0) {
877       Dmsg0(500, "we failed\n");
878       mdb->status = 0;
879       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
880    }
881
882    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
883
884    return mdb->status;
885 }
886
887 #endif /* HAVE_BATCH_FILE_INSERT */
888
889 /*
890  * Escape strings so that PostgreSQL is happy on COPY
891  *
892  *   NOTE! len is the length of the old string. Your new
893  *         string must be long enough (max 2*old+1) to hold
894  *         the escaped output.
895  */
896 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
897 {
898    /* we have to escape \t, \n, \r, \ */
899    char c = '\0' ;
900
901    while (len > 0 && *src) {
902       switch (*src) {
903       case '\n':
904          c = 'n';
905          break;
906       case '\\':
907          c = '\\';
908          break;
909       case '\t':
910          c = 't';
911          break;
912       case '\r':
913          c = 'r';
914          break;
915       default:
916          c = '\0' ;
917       }
918
919       if (c) {
920          *dest = '\\';
921          dest++;
922          *dest = c;
923       } else {
924          *dest = *src;
925       }
926
927       len--;
928       src++;
929       dest++;
930    }
931
932    *dest = '\0';
933    return dest;
934 }
935
936 #ifdef HAVE_BATCH_FILE_INSERT
937 const char *my_pg_batch_lock_path_query = 
938    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
939
940
941 const char *my_pg_batch_lock_filename_query = 
942    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
943
944 const char *my_pg_batch_unlock_tables_query = "COMMIT";
945
946 const char *my_pg_batch_fill_path_query = 
947    "INSERT INTO Path (Path) "
948     "SELECT a.Path FROM "
949      "(SELECT DISTINCT Path FROM batch) AS a "
950       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
951
952
953 const char *my_pg_batch_fill_filename_query = 
954    "INSERT INTO Filename (Name) "
955     "SELECT a.Name FROM "
956      "(SELECT DISTINCT Name FROM batch) as a "
957       "WHERE NOT EXISTS "
958        "(SELECT Name FROM Filename WHERE Name = a.Name)";
959 #endif /* HAVE_BATCH_FILE_INSERT */
960
961 #endif /* HAVE_POSTGRESQL */