]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Create patch that may fix bug #1298 and bug #1304, which causes
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
51
52 /* -----------------------------------------------------------------------
53  *
54  *   PostgreSQL dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "PostgreSQL";
71
72 }
73
74 /*
75  * Initialize database data structure. In principal this should
76  * never have errors, or it is really fatal.
77  */
78 B_DB *
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80                  const char *db_address, int db_port, const char *db_socket,
81                  int mult_db_connections)
82 {
83    B_DB *mdb;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101       }
102    }
103    Dmsg0(100, "db_open first time\n");
104    mdb = (B_DB *)malloc(sizeof(B_DB));
105    memset(mdb, 0, sizeof(B_DB));
106    mdb->db_name = bstrdup(db_name);
107    mdb->db_user = bstrdup(db_user);
108    if (db_password) {
109       mdb->db_password = bstrdup(db_password);
110    }
111    if (db_address) {
112       mdb->db_address  = bstrdup(db_address);
113    }
114    if (db_socket) {
115       mdb->db_socket   = bstrdup(db_socket);
116    }
117    mdb->db_port        = db_port;
118    mdb->have_insert_id = TRUE;
119    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
120    *mdb->errmsg        = 0;
121    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
122    mdb->cached_path    = get_pool_memory(PM_FNAME);
123    mdb->cached_path_id = 0;
124    mdb->ref_count      = 1;
125    mdb->fname          = get_pool_memory(PM_FNAME);
126    mdb->path           = get_pool_memory(PM_FNAME);
127    mdb->esc_name       = get_pool_memory(PM_FNAME);
128    mdb->esc_path      = get_pool_memory(PM_FNAME);
129    mdb->allow_transactions = mult_db_connections;
130    qinsert(&db_list, &mdb->bq);            /* put db in list */
131    V(mutex);
132    return mdb;
133 }
134
135 /* Check that the database correspond to the encoding we want */
136 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
137 {
138    SQL_ROW row;
139    int ret=false;
140
141    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
142       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
143       return false;
144    }
145
146    if ((row = sql_fetch_row(mdb)) == NULL) {
147       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
148       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
149    } else {
150       ret = bstrcmp(row[0], "SQL_ASCII");
151       if (!ret) {
152          Mmsg(mdb->errmsg, 
153               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
154               mdb->db_name, row[0]);
155          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
156          Dmsg1(50, "%s", mdb->errmsg);
157       } 
158    }
159    return ret;
160 }
161
162 /*
163  * Now actually open the database.  This can generate errors,
164  *   which are returned in the errmsg
165  *
166  * DO NOT close the database or free(mdb) here !!!!
167  */
168 int
169 db_open_database(JCR *jcr, B_DB *mdb)
170 {
171    int errstat;
172    char buf[10], *port;
173
174 #ifdef xxx                      /* require libpq >= 8.2 */
175    if (!PQisthreadsafe()) {
176       Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
177            "PostgreSQL library is not thread safe. Cannot continue.\n"));
178    }
179 #endif
180    P(mutex);
181    if (mdb->connected) {
182       V(mutex);
183       return 1;
184    }
185    mdb->connected = false;
186
187    if ((errstat=rwl_init(&mdb->lock)) != 0) {
188       berrno be;
189       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
190             be.bstrerror(errstat));
191       V(mutex);
192       return 0;
193    }
194
195    if (mdb->db_port) {
196       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
197       port = buf;
198    } else {
199       port = NULL;
200    }
201
202    /* If connection fails, try at 5 sec intervals for 30 seconds. */
203    for (int retry=0; retry < 6; retry++) {
204       /* connect to the database */
205       mdb->db = PQsetdbLogin(
206            mdb->db_address,           /* default = localhost */
207            port,                      /* default port */
208            NULL,                      /* pg options */
209            NULL,                      /* tty, ignored */
210            mdb->db_name,              /* database name */
211            mdb->db_user,              /* login name */
212            mdb->db_password);         /* password */
213
214       /* If no connect, try once more in case it is a timing problem */
215       if (PQstatus(mdb->db) == CONNECTION_OK) {
216          break;
217       }
218       bmicrosleep(5, 0);
219    }
220
221    Dmsg0(50, "pg_real_connect done\n");
222    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
223             mdb->db_password==NULL?"(NULL)":mdb->db_password);
224
225    if (PQstatus(mdb->db) != CONNECTION_OK) {
226       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
227             "Database=%s User=%s\n"
228             "It is probably not running or your password is incorrect.\n"),
229              mdb->db_name, mdb->db_user);
230       V(mutex);
231       return 0;
232    }
233
234    mdb->connected = true;
235
236    if (!check_tables_version(jcr, mdb)) {
237       V(mutex);
238       return 0;
239    }
240
241    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
242    
243    /* tell PostgreSQL we are using standard conforming strings
244       and avoid warnings such as:
245        WARNING:  nonstandard use of \\ in a string literal
246    */
247    sql_query(mdb, "set standard_conforming_strings=on");
248
249    /* check that encoding is SQL_ASCII */
250    check_database_encoding(jcr, mdb);
251
252    V(mutex);
253    return 1;
254 }
255
256 void
257 db_close_database(JCR *jcr, B_DB *mdb)
258 {
259    if (!mdb) {
260       return;
261    }
262    db_end_transaction(jcr, mdb);
263    P(mutex);
264    sql_free_result(mdb);
265    mdb->ref_count--;
266    if (mdb->ref_count == 0) {
267       qdchain(&mdb->bq);
268       if (mdb->connected && mdb->db) {
269          sql_close(mdb);
270       }
271       rwl_destroy(&mdb->lock);
272       free_pool_memory(mdb->errmsg);
273       free_pool_memory(mdb->cmd);
274       free_pool_memory(mdb->cached_path);
275       free_pool_memory(mdb->fname);
276       free_pool_memory(mdb->path);
277       free_pool_memory(mdb->esc_name);
278       free_pool_memory(mdb->esc_path);
279       if (mdb->db_name) {
280          free(mdb->db_name);
281       }
282       if (mdb->db_user) {
283          free(mdb->db_user);
284       }
285       if (mdb->db_password) {
286          free(mdb->db_password);
287       }
288       if (mdb->db_address) {
289          free(mdb->db_address);
290       }
291       if (mdb->db_socket) {
292          free(mdb->db_socket);
293       }
294       free(mdb);
295    }
296    V(mutex);
297 }
298
299 void db_thread_cleanup()
300 { }
301
302 /*
303  * Return the next unique index (auto-increment) for
304  * the given table.  Return NULL on error.
305  *
306  * For PostgreSQL, NULL causes the auto-increment value
307  *  to be updated.
308  */
309 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
310 {
311    strcpy(index, "NULL");
312    return 1;
313 }
314
315
316 /*
317  * Escape strings so that PostgreSQL is happy
318  *
319  *   NOTE! len is the length of the old string. Your new
320  *         string must be long enough (max 2*old+1) to hold
321  *         the escaped output.
322  */
323 void
324 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
325 {
326    int error;
327   
328    PQescapeStringConn(mdb->db, snew, old, len, &error);
329    if (error) {
330       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
331       /* error on encoding, probably invalid multibyte encoding in the source string
332         see PQescapeStringConn documentation for details. */
333       Dmsg0(500, "PQescapeStringConn failed\n");
334    }
335 }
336
337 /*
338  * Submit a general SQL command (cmd), and for each row returned,
339  *  the sqlite_handler is called with the ctx.
340  */
341 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
342 {
343    SQL_ROW row;
344
345    Dmsg0(500, "db_sql_query started\n");
346
347    db_lock(mdb);
348    if (sql_query(mdb, query) != 0) {
349       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
350       db_unlock(mdb);
351       Dmsg0(500, "db_sql_query failed\n");
352       return false;
353    }
354    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
355
356    if (result_handler != NULL) {
357       Dmsg0(500, "db_sql_query invoking handler\n");
358       if ((mdb->result = sql_store_result(mdb)) != NULL) {
359          int num_fields = sql_num_fields(mdb);
360
361          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
362          while ((row = sql_fetch_row(mdb)) != NULL) {
363
364             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
365             if (result_handler(ctx, num_fields, row))
366                break;
367          }
368
369         sql_free_result(mdb);
370       }
371    }
372    db_unlock(mdb);
373
374    Dmsg0(500, "db_sql_query finished\n");
375
376    return true;
377 }
378
379
380
381 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
382 {
383    int j;
384    POSTGRESQL_ROW row = NULL; // by default, return NULL
385
386    Dmsg0(500, "my_postgresql_fetch_row start\n");
387
388    if (!mdb->row || mdb->row_size < mdb->num_fields) {
389       int num_fields = mdb->num_fields;
390       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
391
392       if (mdb->row) {
393          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
394          free(mdb->row);
395       }
396       num_fields += 20;                  /* add a bit extra */
397       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
398       mdb->row_size = num_fields;
399
400       // now reset the row_number now that we have the space allocated
401       mdb->row_number = 0;
402    }
403
404    // if still within the result set
405    if (mdb->row_number < mdb->num_rows) {
406       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
407       // get each value from this row
408       for (j = 0; j < mdb->num_fields; j++) {
409          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
410          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
411       }
412       // increment the row number for the next call
413       mdb->row_number++;
414
415       row = mdb->row;
416    } else {
417       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
418    }
419
420    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
421
422    return row;
423 }
424
425 int my_postgresql_max_length(B_DB *mdb, int field_num) {
426    //
427    // for a given column, find the max length
428    //
429    int max_length;
430    int i;
431    int this_length;
432
433    max_length = 0;
434    for (i = 0; i < mdb->num_rows; i++) {
435       if (PQgetisnull(mdb->result, i, field_num)) {
436           this_length = 4;        // "NULL"
437       } else {
438           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
439       }
440
441       if (max_length < this_length) {
442           max_length = this_length;
443       }
444    }
445
446    return max_length;
447 }
448
449 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
450 {
451    int     i;
452
453    Dmsg0(500, "my_postgresql_fetch_field starts\n");
454
455    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
456       if (mdb->fields) {
457          free(mdb->fields);
458       }
459       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
460       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
461       mdb->fields_size = mdb->num_fields;
462
463       for (i = 0; i < mdb->num_fields; i++) {
464          Dmsg1(500, "filling field %d\n", i);
465          mdb->fields[i].name           = PQfname(mdb->result, i);
466          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
467          mdb->fields[i].type       = PQftype(mdb->result, i);
468          mdb->fields[i].flags      = 0;
469
470          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
471             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
472             mdb->fields[i].flags);
473       } // end for
474    } // end if
475
476    // increment field number for the next time around
477
478    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
479    return &mdb->fields[mdb->field_number++];
480 }
481
482 void my_postgresql_data_seek(B_DB *mdb, int row)
483 {
484    // set the row number to be returned on the next call
485    // to my_postgresql_fetch_row
486    mdb->row_number = row;
487 }
488
489 void my_postgresql_field_seek(B_DB *mdb, int field)
490 {
491    mdb->field_number = field;
492 }
493
494 /*
495  * Note, if this routine returns 1 (failure), Bacula expects
496  *  that no result has been stored.
497  * This is where QUERY_DB comes with Postgresql.
498  *
499  *  Returns:  0  on success
500  *            1  on failure
501  *
502  */
503 int my_postgresql_query(B_DB *mdb, const char *query)
504 {
505    Dmsg0(500, "my_postgresql_query started\n");
506    // We are starting a new query.  reset everything.
507    mdb->num_rows     = -1;
508    mdb->row_number   = -1;
509    mdb->field_number = -1;
510
511    if (mdb->result) {
512       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
513       mdb->result = NULL;
514    }
515
516    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
517
518    for (int i=0; i < 10; i++) {
519       mdb->result = PQexec(mdb->db, query);
520       if (mdb->result) {
521          break;
522       }
523       bmicrosleep(5, 0);
524    }
525    if (!mdb->result) {
526       Dmsg1(50, "Query failed: %s\n", query);
527       goto bail_out;
528    }
529
530    mdb->status = PQresultStatus(mdb->result);
531    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
532       Dmsg1(500, "we have a result\n", query);
533
534       // how many fields in the set?
535       mdb->num_fields = (int)PQnfields(mdb->result);
536       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
537
538       mdb->num_rows = PQntuples(mdb->result);
539       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
540
541       mdb->status = 0;                  /* succeed */
542    } else {
543       Dmsg1(50, "Result status failed: %s\n", query);
544       goto bail_out;
545    }
546
547    Dmsg0(500, "my_postgresql_query finishing\n");
548    return mdb->status;
549
550 bail_out:
551    Dmsg1(500, "we failed\n", query);
552    PQclear(mdb->result);
553    mdb->result = NULL;
554    mdb->status = 1;                   /* failed */
555    return mdb->status;
556 }
557
558 void my_postgresql_free_result(B_DB *mdb)
559 {
560    
561    db_lock(mdb);
562    if (mdb->result) {
563       PQclear(mdb->result);
564       mdb->result = NULL;
565    }
566
567    if (mdb->row) {
568       free(mdb->row);
569       mdb->row = NULL;
570    }
571
572    if (mdb->fields) {
573       free(mdb->fields);
574       mdb->fields = NULL;
575    }
576    db_unlock(mdb);
577 }
578
579 int my_postgresql_currval(B_DB *mdb, const char *table_name)
580 {
581    // Obtain the current value of the sequence that
582    // provides the serial value for primary key of the table.
583
584    // currval is local to our session.  It is not affected by
585    // other transactions.
586
587    // Determine the name of the sequence.
588    // PostgreSQL automatically creates a sequence using
589    // <table>_<column>_seq.
590    // At the time of writing, all tables used this format for
591    // for their primary key: <table>id
592    // Except for basefiles which has a primary key on baseid.
593    // Therefore, we need to special case that one table.
594
595    // everything else can use the PostgreSQL formula.
596
597    char      sequence[NAMEDATALEN-1];
598    char      query   [NAMEDATALEN+50];
599    PGresult *result;
600    int       id = 0;
601
602    if (strcasecmp(table_name, "basefiles") == 0) {
603       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
604    } else {
605       bstrncpy(sequence, table_name, sizeof(sequence));
606       bstrncat(sequence, "_",        sizeof(sequence));
607       bstrncat(sequence, table_name, sizeof(sequence));
608       bstrncat(sequence, "id",       sizeof(sequence));
609    }
610
611    bstrncat(sequence, "_seq", sizeof(sequence));
612    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
613
614    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
615    for (int i=0; i < 10; i++) {
616       result = PQexec(mdb->db, query);
617       if (result) {
618          break;
619       }
620       bmicrosleep(5, 0);
621    }
622    if (!result) {
623       Dmsg1(50, "Query failed: %s\n", query);
624       goto bail_out;
625    }
626
627    Dmsg0(500, "exec done");
628
629    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
630       Dmsg0(500, "getting value");
631       id = atoi(PQgetvalue(result, 0, 0));
632       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
633    } else {
634       Dmsg1(50, "Result status failed: %s\n", query);
635       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
636    }
637
638 bail_out:
639    PQclear(result);
640
641    return id;
642 }
643
644 #ifdef HAVE_BATCH_FILE_INSERT
645
646 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
647 {
648    const char *query = "COPY batch FROM STDIN";
649
650    Dmsg0(500, "my_postgresql_batch_start started\n");
651
652    if (my_postgresql_query(mdb,
653                            "CREATE TEMPORARY TABLE batch ("
654                                "fileindex int,"
655                                "jobid int,"
656                                "path varchar,"
657                                "name varchar,"
658                                "lstat varchar,"
659                                "md5 varchar)") == 1)
660    {
661       Dmsg0(500, "my_postgresql_batch_start failed\n");
662       return 1;
663    }
664    
665    // We are starting a new query.  reset everything.
666    mdb->num_rows     = -1;
667    mdb->row_number   = -1;
668    mdb->field_number = -1;
669
670    my_postgresql_free_result(mdb);
671
672    for (int i=0; i < 10; i++) {
673       mdb->result = PQexec(mdb->db, query);
674       if (mdb->result) {
675          break;
676       }
677       bmicrosleep(5, 0);
678    }
679    if (!mdb->result) {
680       Dmsg1(50, "Query failed: %s\n", query);
681       goto bail_out;
682    }
683
684    mdb->status = PQresultStatus(mdb->result);
685    if (mdb->status == PGRES_COPY_IN) {
686       // how many fields in the set?
687       mdb->num_fields = (int) PQnfields(mdb->result);
688       mdb->num_rows   = 0;
689       mdb->status = 1;
690    } else {
691       Dmsg1(50, "Result status failed: %s\n", query);
692       goto bail_out;
693    }
694
695    Dmsg0(500, "my_postgresql_batch_start finishing\n");
696
697    return mdb->status;
698
699 bail_out:
700    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
701    mdb->status = 0;
702    PQclear(mdb->result);
703    mdb->result = NULL;
704    return mdb->status;
705 }
706
707 /* set error to something to abort operation */
708 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
709 {
710    int res;
711    int count=30;
712    Dmsg0(500, "my_postgresql_batch_end started\n");
713
714    if (!mdb) {                  /* no files ? */
715       return 0;
716    }
717
718    do { 
719       res = PQputCopyEnd(mdb->db, error);
720    } while (res == 0 && --count > 0);
721
722    if (res == 1) {
723       Dmsg0(500, "ok\n");
724       mdb->status = 1;
725    }
726    
727    if (res <= 0) {
728       Dmsg0(500, "we failed\n");
729       mdb->status = 0;
730       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
731    }
732    
733    Dmsg0(500, "my_postgresql_batch_end finishing\n");
734
735    return mdb->status;
736 }
737
738 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
739 {
740    int res;
741    int count=30;
742    size_t len;
743    const char *digest;
744    char ed1[50];
745
746    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
747    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
748
749    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
750    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
751
752    if (ar->Digest == NULL || ar->Digest[0] == 0) {
753       digest = "0";
754    } else {
755       digest = ar->Digest;
756    }
757
758    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
759               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
760               mdb->esc_name, ar->attr, digest);
761
762    do { 
763       res = PQputCopyData(mdb->db,
764                           mdb->cmd,
765                           len);
766    } while (res == 0 && --count > 0);
767
768    if (res == 1) {
769       Dmsg0(500, "ok\n");
770       mdb->changes++;
771       mdb->status = 1;
772    }
773
774    if (res <= 0) {
775       Dmsg0(500, "we failed\n");
776       mdb->status = 0;
777       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
778    }
779
780    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
781
782    return mdb->status;
783 }
784
785 #endif /* HAVE_BATCH_FILE_INSERT */
786
787 /*
788  * Escape strings so that PostgreSQL is happy on COPY
789  *
790  *   NOTE! len is the length of the old string. Your new
791  *         string must be long enough (max 2*old+1) to hold
792  *         the escaped output.
793  */
794 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
795 {
796    /* we have to escape \t, \n, \r, \ */
797    char c = '\0' ;
798
799    while (len > 0 && *src) {
800       switch (*src) {
801       case '\n':
802          c = 'n';
803          break;
804       case '\\':
805          c = '\\';
806          break;
807       case '\t':
808          c = 't';
809          break;
810       case '\r':
811          c = 'r';
812          break;
813       default:
814          c = '\0' ;
815       }
816
817       if (c) {
818          *dest = '\\';
819          dest++;
820          *dest = c;
821       } else {
822          *dest = *src;
823       }
824
825       len--;
826       src++;
827       dest++;
828    }
829
830    *dest = '\0';
831    return dest;
832 }
833
834 #ifdef HAVE_BATCH_FILE_INSERT
835 const char *my_pg_batch_lock_path_query = 
836    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
837
838
839 const char *my_pg_batch_lock_filename_query = 
840    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
841
842 const char *my_pg_batch_unlock_tables_query = "COMMIT";
843
844 const char *my_pg_batch_fill_path_query = 
845    "INSERT INTO Path (Path) "
846     "SELECT a.Path FROM "
847      "(SELECT DISTINCT Path FROM batch) AS a "
848       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
849
850
851 const char *my_pg_batch_fill_filename_query = 
852    "INSERT INTO Filename (Name) "
853     "SELECT a.Name FROM "
854      "(SELECT DISTINCT Name FROM batch) as a "
855       "WHERE NOT EXISTS "
856        "(SELECT Name FROM Filename WHERE Name = a.Name)";
857 #endif /* HAVE_BATCH_FILE_INSERT */
858
859 #endif /* HAVE_POSTGRESQL */