]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Update patches
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
51
52 /* -----------------------------------------------------------------------
53  *
54  *   PostgreSQL dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "PostgreSQL";
71
72 }
73
74 /*
75  * Initialize database data structure. In principal this should
76  * never have errors, or it is really fatal.
77  */
78 B_DB *
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80                  const char *db_address, int db_port, const char *db_socket,
81                  int mult_db_connections)
82 {
83    B_DB *mdb;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101       }
102    }
103    Dmsg0(100, "db_open first time\n");
104    mdb = (B_DB *)malloc(sizeof(B_DB));
105    memset(mdb, 0, sizeof(B_DB));
106    mdb->db_name = bstrdup(db_name);
107    mdb->db_user = bstrdup(db_user);
108    if (db_password) {
109       mdb->db_password = bstrdup(db_password);
110    }
111    if (db_address) {
112       mdb->db_address  = bstrdup(db_address);
113    }
114    if (db_socket) {
115       mdb->db_socket   = bstrdup(db_socket);
116    }
117    mdb->db_port        = db_port;
118    mdb->have_insert_id = TRUE;
119    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
120    *mdb->errmsg        = 0;
121    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
122    mdb->cached_path    = get_pool_memory(PM_FNAME);
123    mdb->cached_path_id = 0;
124    mdb->ref_count      = 1;
125    mdb->fname          = get_pool_memory(PM_FNAME);
126    mdb->path           = get_pool_memory(PM_FNAME);
127    mdb->esc_name       = get_pool_memory(PM_FNAME);
128    mdb->esc_path      = get_pool_memory(PM_FNAME);
129    mdb->allow_transactions = mult_db_connections;
130    qinsert(&db_list, &mdb->bq);            /* put db in list */
131    V(mutex);
132    return mdb;
133 }
134
135 /* Check that the database correspond to the encoding we want */
136 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
137 {
138    SQL_ROW row;
139    int ret=false;
140
141    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
142       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
143       return false;
144    }
145
146    if ((row = sql_fetch_row(mdb)) == NULL) {
147       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
148       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
149    } else {
150       ret = bstrcmp(row[0], "SQL_ASCII");
151       if (!ret) {
152          Mmsg(mdb->errmsg, 
153               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
154               mdb->db_name, row[0]);
155          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
156       } 
157    }
158    return ret;
159 }
160
161 /*
162  * Now actually open the database.  This can generate errors,
163  *   which are returned in the errmsg
164  *
165  * DO NOT close the database or free(mdb) here !!!!
166  */
167 int
168 db_open_database(JCR *jcr, B_DB *mdb)
169 {
170    int errstat;
171    char buf[10], *port;
172
173 #ifdef xxx                      /* require libpq >= 8.2 */
174    if (!PQisthreadsafe()) {
175       Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
176            "PostgreSQL library is not thread safe. Cannot continue.\n"));
177    }
178 #endif
179    P(mutex);
180    if (mdb->connected) {
181       V(mutex);
182       return 1;
183    }
184    mdb->connected = false;
185
186    if ((errstat=rwl_init(&mdb->lock)) != 0) {
187       berrno be;
188       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
189             be.bstrerror(errstat));
190       V(mutex);
191       return 0;
192    }
193
194    if (mdb->db_port) {
195       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
196       port = buf;
197    } else {
198       port = NULL;
199    }
200
201    /* If connection fails, try at 5 sec intervals for 30 seconds. */
202    for (int retry=0; retry < 6; retry++) {
203       /* connect to the database */
204       mdb->db = PQsetdbLogin(
205            mdb->db_address,           /* default = localhost */
206            port,                      /* default port */
207            NULL,                      /* pg options */
208            NULL,                      /* tty, ignored */
209            mdb->db_name,              /* database name */
210            mdb->db_user,              /* login name */
211            mdb->db_password);         /* password */
212
213       /* If no connect, try once more in case it is a timing problem */
214       if (PQstatus(mdb->db) == CONNECTION_OK) {
215          break;
216       }
217       bmicrosleep(5, 0);
218    }
219
220    Dmsg0(50, "pg_real_connect done\n");
221    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
222             mdb->db_password==NULL?"(NULL)":mdb->db_password);
223
224    if (PQstatus(mdb->db) != CONNECTION_OK) {
225       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
226             "Database=%s User=%s\n"
227             "It is probably not running or your password is incorrect.\n"),
228              mdb->db_name, mdb->db_user);
229       V(mutex);
230       return 0;
231    }
232
233    mdb->connected = true;
234
235    if (!check_tables_version(jcr, mdb)) {
236       V(mutex);
237       return 0;
238    }
239
240    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
241    
242    /* tell PostgreSQL we are using standard conforming strings
243       and avoid warnings such as:
244        WARNING:  nonstandard use of \\ in a string literal
245    */
246    sql_query(mdb, "set standard_conforming_strings=on");
247
248    /* check that encoding is SQL_ASCII */
249    check_database_encoding(jcr, mdb);
250
251    V(mutex);
252    return 1;
253 }
254
255 void
256 db_close_database(JCR *jcr, B_DB *mdb)
257 {
258    if (!mdb) {
259       return;
260    }
261    db_end_transaction(jcr, mdb);
262    P(mutex);
263    sql_free_result(mdb);
264    mdb->ref_count--;
265    if (mdb->ref_count == 0) {
266       qdchain(&mdb->bq);
267       if (mdb->connected && mdb->db) {
268          sql_close(mdb);
269       }
270       rwl_destroy(&mdb->lock);
271       free_pool_memory(mdb->errmsg);
272       free_pool_memory(mdb->cmd);
273       free_pool_memory(mdb->cached_path);
274       free_pool_memory(mdb->fname);
275       free_pool_memory(mdb->path);
276       free_pool_memory(mdb->esc_name);
277       free_pool_memory(mdb->esc_path);
278       if (mdb->db_name) {
279          free(mdb->db_name);
280       }
281       if (mdb->db_user) {
282          free(mdb->db_user);
283       }
284       if (mdb->db_password) {
285          free(mdb->db_password);
286       }
287       if (mdb->db_address) {
288          free(mdb->db_address);
289       }
290       if (mdb->db_socket) {
291          free(mdb->db_socket);
292       }
293       free(mdb);
294    }
295    V(mutex);
296 }
297
298 void db_thread_cleanup()
299 { }
300
301 /*
302  * Return the next unique index (auto-increment) for
303  * the given table.  Return NULL on error.
304  *
305  * For PostgreSQL, NULL causes the auto-increment value
306  *  to be updated.
307  */
308 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
309 {
310    strcpy(index, "NULL");
311    return 1;
312 }
313
314
315 /*
316  * Escape strings so that PostgreSQL is happy
317  *
318  *   NOTE! len is the length of the old string. Your new
319  *         string must be long enough (max 2*old+1) to hold
320  *         the escaped output.
321  */
322 void
323 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
324 {
325    int error;
326   
327    PQescapeStringConn(mdb->db, snew, old, len, &error);
328    if (error) {
329       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
330       /* error on encoding, probably invalid multibyte encoding in the source string
331         see PQescapeStringConn documentation for details. */
332       Dmsg0(500, "PQescapeStringConn failed\n");
333    }
334 }
335
336 /*
337  * Submit a general SQL command (cmd), and for each row returned,
338  *  the sqlite_handler is called with the ctx.
339  */
340 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
341 {
342    SQL_ROW row;
343
344    Dmsg0(500, "db_sql_query started\n");
345
346    db_lock(mdb);
347    if (sql_query(mdb, query) != 0) {
348       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
349       db_unlock(mdb);
350       Dmsg0(500, "db_sql_query failed\n");
351       return false;
352    }
353    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
354
355    if (result_handler != NULL) {
356       Dmsg0(500, "db_sql_query invoking handler\n");
357       if ((mdb->result = sql_store_result(mdb)) != NULL) {
358          int num_fields = sql_num_fields(mdb);
359
360          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
361          while ((row = sql_fetch_row(mdb)) != NULL) {
362
363             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
364             if (result_handler(ctx, num_fields, row))
365                break;
366          }
367
368         sql_free_result(mdb);
369       }
370    }
371    db_unlock(mdb);
372
373    Dmsg0(500, "db_sql_query finished\n");
374
375    return true;
376 }
377
378
379
380 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
381 {
382    int j;
383    POSTGRESQL_ROW row = NULL; // by default, return NULL
384
385    Dmsg0(500, "my_postgresql_fetch_row start\n");
386
387    if (!mdb->row || mdb->row_size < mdb->num_fields) {
388       int num_fields = mdb->num_fields;
389       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
390
391       if (mdb->row) {
392          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
393          free(mdb->row);
394       }
395       num_fields += 20;                  /* add a bit extra */
396       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
397       mdb->row_size = num_fields;
398
399       // now reset the row_number now that we have the space allocated
400       mdb->row_number = 0;
401    }
402
403    // if still within the result set
404    if (mdb->row_number < mdb->num_rows) {
405       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
406       // get each value from this row
407       for (j = 0; j < mdb->num_fields; j++) {
408          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
409          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
410       }
411       // increment the row number for the next call
412       mdb->row_number++;
413
414       row = mdb->row;
415    } else {
416       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
417    }
418
419    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
420
421    return row;
422 }
423
424 int my_postgresql_max_length(B_DB *mdb, int field_num) {
425    //
426    // for a given column, find the max length
427    //
428    int max_length;
429    int i;
430    int this_length;
431
432    max_length = 0;
433    for (i = 0; i < mdb->num_rows; i++) {
434       if (PQgetisnull(mdb->result, i, field_num)) {
435           this_length = 4;        // "NULL"
436       } else {
437           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
438       }
439
440       if (max_length < this_length) {
441           max_length = this_length;
442       }
443    }
444
445    return max_length;
446 }
447
448 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
449 {
450    int     i;
451
452    Dmsg0(500, "my_postgresql_fetch_field starts\n");
453
454    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
455       if (mdb->fields) {
456          free(mdb->fields);
457       }
458       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
459       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
460       mdb->fields_size = mdb->num_fields;
461
462       for (i = 0; i < mdb->num_fields; i++) {
463          Dmsg1(500, "filling field %d\n", i);
464          mdb->fields[i].name           = PQfname(mdb->result, i);
465          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
466          mdb->fields[i].type       = PQftype(mdb->result, i);
467          mdb->fields[i].flags      = 0;
468
469          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
470             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
471             mdb->fields[i].flags);
472       } // end for
473    } // end if
474
475    // increment field number for the next time around
476
477    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
478    return &mdb->fields[mdb->field_number++];
479 }
480
481 void my_postgresql_data_seek(B_DB *mdb, int row)
482 {
483    // set the row number to be returned on the next call
484    // to my_postgresql_fetch_row
485    mdb->row_number = row;
486 }
487
488 void my_postgresql_field_seek(B_DB *mdb, int field)
489 {
490    mdb->field_number = field;
491 }
492
493 /*
494  * Note, if this routine returns 1 (failure), Bacula expects
495  *  that no result has been stored.
496  * This is where QUERY_DB comes with Postgresql.
497  *
498  *  Returns:  0  on success
499  *            1  on failure
500  *
501  */
502 int my_postgresql_query(B_DB *mdb, const char *query)
503 {
504    Dmsg0(500, "my_postgresql_query started\n");
505    // We are starting a new query.  reset everything.
506    mdb->num_rows     = -1;
507    mdb->row_number   = -1;
508    mdb->field_number = -1;
509
510    if (mdb->result) {
511       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
512       mdb->result = NULL;
513    }
514
515    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
516
517    for (int i=0; i < 10; i++) {
518       mdb->result = PQexec(mdb->db, query);
519       if (mdb->result) {
520          break;
521       }
522       bmicrosleep(5, 0);
523    }
524    if (!mdb->result) {
525       Dmsg1(50, "Query failed: %s\n", query);
526       goto bail_out;
527    }
528
529    mdb->status = PQresultStatus(mdb->result);
530    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
531       Dmsg1(500, "we have a result\n", query);
532
533       // how many fields in the set?
534       mdb->num_fields = (int)PQnfields(mdb->result);
535       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
536
537       mdb->num_rows = PQntuples(mdb->result);
538       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
539
540       mdb->status = 0;                  /* succeed */
541    } else {
542       Dmsg1(50, "Result status failed: %s\n", query);
543       goto bail_out;
544    }
545
546    Dmsg0(500, "my_postgresql_query finishing\n");
547    return mdb->status;
548
549 bail_out:
550    Dmsg1(500, "we failed\n", query);
551    PQclear(mdb->result);
552    mdb->result = NULL;
553    mdb->status = 1;                   /* failed */
554    return mdb->status;
555 }
556
557 void my_postgresql_free_result(B_DB *mdb)
558 {
559    
560    db_lock(mdb);
561    if (mdb->result) {
562       PQclear(mdb->result);
563       mdb->result = NULL;
564    }
565
566    if (mdb->row) {
567       free(mdb->row);
568       mdb->row = NULL;
569    }
570
571    if (mdb->fields) {
572       free(mdb->fields);
573       mdb->fields = NULL;
574    }
575    db_unlock(mdb);
576 }
577
578 int my_postgresql_currval(B_DB *mdb, const char *table_name)
579 {
580    // Obtain the current value of the sequence that
581    // provides the serial value for primary key of the table.
582
583    // currval is local to our session.  It is not affected by
584    // other transactions.
585
586    // Determine the name of the sequence.
587    // PostgreSQL automatically creates a sequence using
588    // <table>_<column>_seq.
589    // At the time of writing, all tables used this format for
590    // for their primary key: <table>id
591    // Except for basefiles which has a primary key on baseid.
592    // Therefore, we need to special case that one table.
593
594    // everything else can use the PostgreSQL formula.
595
596    char      sequence[NAMEDATALEN-1];
597    char      query   [NAMEDATALEN+50];
598    PGresult *result;
599    int       id = 0;
600
601    if (strcasecmp(table_name, "basefiles") == 0) {
602       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
603    } else {
604       bstrncpy(sequence, table_name, sizeof(sequence));
605       bstrncat(sequence, "_",        sizeof(sequence));
606       bstrncat(sequence, table_name, sizeof(sequence));
607       bstrncat(sequence, "id",       sizeof(sequence));
608    }
609
610    bstrncat(sequence, "_seq", sizeof(sequence));
611    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
612
613    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
614    for (int i=0; i < 10; i++) {
615       result = PQexec(mdb->db, query);
616       if (result) {
617          break;
618       }
619       bmicrosleep(5, 0);
620    }
621    if (!result) {
622       Dmsg1(50, "Query failed: %s\n", query);
623       goto bail_out;
624    }
625
626    Dmsg0(500, "exec done");
627
628    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
629       Dmsg0(500, "getting value");
630       id = atoi(PQgetvalue(result, 0, 0));
631       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
632    } else {
633       Dmsg1(50, "Result status failed: %s\n", query);
634       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
635    }
636
637 bail_out:
638    PQclear(result);
639
640    return id;
641 }
642
643 #ifdef HAVE_BATCH_FILE_INSERT
644
645 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
646 {
647    const char *query = "COPY batch FROM STDIN";
648
649    Dmsg0(500, "my_postgresql_batch_start started\n");
650
651    if (my_postgresql_query(mdb,
652                            "CREATE TEMPORARY TABLE batch ("
653                                "fileindex int,"
654                                "jobid int,"
655                                "path varchar,"
656                                "name varchar,"
657                                "lstat varchar,"
658                                "md5 varchar)") == 1)
659    {
660       Dmsg0(500, "my_postgresql_batch_start failed\n");
661       return 1;
662    }
663    
664    // We are starting a new query.  reset everything.
665    mdb->num_rows     = -1;
666    mdb->row_number   = -1;
667    mdb->field_number = -1;
668
669    my_postgresql_free_result(mdb);
670
671    for (int i=0; i < 10; i++) {
672       mdb->result = PQexec(mdb->db, query);
673       if (mdb->result) {
674          break;
675       }
676       bmicrosleep(5, 0);
677    }
678    if (!mdb->result) {
679       Dmsg1(50, "Query failed: %s\n", query);
680       goto bail_out;
681    }
682
683    mdb->status = PQresultStatus(mdb->result);
684    if (mdb->status == PGRES_COPY_IN) {
685       // how many fields in the set?
686       mdb->num_fields = (int) PQnfields(mdb->result);
687       mdb->num_rows   = 0;
688       mdb->status = 1;
689    } else {
690       Dmsg1(50, "Result status failed: %s\n", query);
691       goto bail_out;
692    }
693
694    Dmsg0(500, "my_postgresql_batch_start finishing\n");
695
696    return mdb->status;
697
698 bail_out:
699    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
700    mdb->status = 0;
701    PQclear(mdb->result);
702    mdb->result = NULL;
703    return mdb->status;
704 }
705
706 /* set error to something to abort operation */
707 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
708 {
709    int res;
710    int count=30;
711    Dmsg0(500, "my_postgresql_batch_end started\n");
712
713    if (!mdb) {                  /* no files ? */
714       return 0;
715    }
716
717    do { 
718       res = PQputCopyEnd(mdb->db, error);
719    } while (res == 0 && --count > 0);
720
721    if (res == 1) {
722       Dmsg0(500, "ok\n");
723       mdb->status = 1;
724    }
725    
726    if (res <= 0) {
727       Dmsg0(500, "we failed\n");
728       mdb->status = 0;
729       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
730    }
731    
732    Dmsg0(500, "my_postgresql_batch_end finishing\n");
733
734    return mdb->status;
735 }
736
737 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
738 {
739    int res;
740    int count=30;
741    size_t len;
742    const char *digest;
743    char ed1[50];
744
745    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
746    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
747
748    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
749    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
750
751    if (ar->Digest == NULL || ar->Digest[0] == 0) {
752       digest = "0";
753    } else {
754       digest = ar->Digest;
755    }
756
757    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
758               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
759               mdb->esc_name, ar->attr, digest);
760
761    do { 
762       res = PQputCopyData(mdb->db,
763                           mdb->cmd,
764                           len);
765    } while (res == 0 && --count > 0);
766
767    if (res == 1) {
768       Dmsg0(500, "ok\n");
769       mdb->changes++;
770       mdb->status = 1;
771    }
772
773    if (res <= 0) {
774       Dmsg0(500, "we failed\n");
775       mdb->status = 0;
776       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
777    }
778
779    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
780
781    return mdb->status;
782 }
783
784 #endif /* HAVE_BATCH_FILE_INSERT */
785
786 /*
787  * Escape strings so that PostgreSQL is happy on COPY
788  *
789  *   NOTE! len is the length of the old string. Your new
790  *         string must be long enough (max 2*old+1) to hold
791  *         the escaped output.
792  */
793 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
794 {
795    /* we have to escape \t, \n, \r, \ */
796    char c = '\0' ;
797
798    while (len > 0 && *src) {
799       switch (*src) {
800       case '\n':
801          c = 'n';
802          break;
803       case '\\':
804          c = '\\';
805          break;
806       case '\t':
807          c = 't';
808          break;
809       case '\r':
810          c = 'r';
811          break;
812       default:
813          c = '\0' ;
814       }
815
816       if (c) {
817          *dest = '\\';
818          dest++;
819          *dest = c;
820       } else {
821          *dest = *src;
822       }
823
824       len--;
825       src++;
826       dest++;
827    }
828
829    *dest = '\0';
830    return dest;
831 }
832
833 #ifdef HAVE_BATCH_FILE_INSERT
834 const char *my_pg_batch_lock_path_query = 
835    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
836
837
838 const char *my_pg_batch_lock_filename_query = 
839    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
840
841 const char *my_pg_batch_unlock_tables_query = "COMMIT";
842
843 const char *my_pg_batch_fill_path_query = 
844    "INSERT INTO Path (Path) "
845     "SELECT a.Path FROM "
846      "(SELECT DISTINCT Path FROM batch) AS a "
847       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
848
849
850 const char *my_pg_batch_fill_filename_query = 
851    "INSERT INTO Filename (Name) "
852     "SELECT a.Name FROM "
853      "(SELECT DISTINCT Name FROM batch) as a "
854       "WHERE NOT EXISTS "
855        "(SELECT Name FROM Filename WHERE Name = a.Name)";
856 #endif /* HAVE_BATCH_FILE_INSERT */
857
858 #endif /* HAVE_POSTGRESQL */