]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Add new include to cats/postgresql.c suggested by Marc Cousins so
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
51
52 /* -----------------------------------------------------------------------
53  *
54  *   PostgreSQL dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "PostgreSQL";
71
72 }
73
74 /*
75  * Initialize database data structure. In principal this should
76  * never have errors, or it is really fatal.
77  */
78 B_DB *
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80                  const char *db_address, int db_port, const char *db_socket,
81                  int mult_db_connections)
82 {
83    B_DB *mdb;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101       }
102    }
103    Dmsg0(100, "db_open first time\n");
104    mdb = (B_DB *)malloc(sizeof(B_DB));
105    memset(mdb, 0, sizeof(B_DB));
106    mdb->db_name = bstrdup(db_name);
107    mdb->db_user = bstrdup(db_user);
108    if (db_password) {
109       mdb->db_password = bstrdup(db_password);
110    }
111    if (db_address) {
112       mdb->db_address  = bstrdup(db_address);
113    }
114    if (db_socket) {
115       mdb->db_socket   = bstrdup(db_socket);
116    }
117    mdb->db_port        = db_port;
118    mdb->have_insert_id = TRUE;
119    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
120    *mdb->errmsg        = 0;
121    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
122    mdb->cached_path    = get_pool_memory(PM_FNAME);
123    mdb->cached_path_id = 0;
124    mdb->ref_count      = 1;
125    mdb->fname          = get_pool_memory(PM_FNAME);
126    mdb->path           = get_pool_memory(PM_FNAME);
127    mdb->esc_name       = get_pool_memory(PM_FNAME);
128    mdb->esc_path      = get_pool_memory(PM_FNAME);
129    mdb->allow_transactions = mult_db_connections;
130    qinsert(&db_list, &mdb->bq);            /* put db in list */
131    V(mutex);
132    return mdb;
133 }
134
135 /*
136  * Now actually open the database.  This can generate errors,
137  *   which are returned in the errmsg
138  *
139  * DO NOT close the database or free(mdb) here !!!!
140  */
141 int
142 db_open_database(JCR *jcr, B_DB *mdb)
143 {
144    int errstat;
145    char buf[10], *port;
146
147 #ifdef xxx
148    if (!PQisthreadsafe()) {
149       Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
150            "PostgreSQL library is not thread safe. Connot continue.\n"));
151    }
152 #endif
153    P(mutex);
154    if (mdb->connected) {
155       V(mutex);
156       return 1;
157    }
158    mdb->connected = false;
159
160    if ((errstat=rwl_init(&mdb->lock)) != 0) {
161       berrno be;
162       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
163             be.bstrerror(errstat));
164       V(mutex);
165       return 0;
166    }
167
168    if (mdb->db_port) {
169       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
170       port = buf;
171    } else {
172       port = NULL;
173    }
174
175    /* If connection fails, try at 5 sec intervals for 30 seconds. */
176    for (int retry=0; retry < 6; retry++) {
177       /* connect to the database */
178       mdb->db = PQsetdbLogin(
179            mdb->db_address,           /* default = localhost */
180            port,                      /* default port */
181            NULL,                      /* pg options */
182            NULL,                      /* tty, ignored */
183            mdb->db_name,              /* database name */
184            mdb->db_user,              /* login name */
185            mdb->db_password);         /* password */
186
187       /* If no connect, try once more in case it is a timing problem */
188       if (PQstatus(mdb->db) == CONNECTION_OK) {
189          break;
190       }
191       bmicrosleep(5, 0);
192    }
193
194    Dmsg0(50, "pg_real_connect done\n");
195    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
196             mdb->db_password==NULL?"(NULL)":mdb->db_password);
197
198    if (PQstatus(mdb->db) != CONNECTION_OK) {
199       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
200             "Database=%s User=%s\n"
201             "It is probably not running or your password is incorrect.\n"),
202              mdb->db_name, mdb->db_user);
203       V(mutex);
204       return 0;
205    }
206
207    mdb->connected = true;
208
209    if (!check_tables_version(jcr, mdb)) {
210       V(mutex);
211       return 0;
212    }
213
214    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
215    
216    /* tell PostgreSQL we are using standard conforming strings
217       and avoid warnings such as:
218        WARNING:  nonstandard use of \\ in a string literal
219    */
220    sql_query(mdb, "set standard_conforming_strings=on");
221
222    V(mutex);
223    return 1;
224 }
225
226 void
227 db_close_database(JCR *jcr, B_DB *mdb)
228 {
229    if (!mdb) {
230       return;
231    }
232    db_end_transaction(jcr, mdb);
233    P(mutex);
234    sql_free_result(mdb);
235    mdb->ref_count--;
236    if (mdb->ref_count == 0) {
237       qdchain(&mdb->bq);
238       if (mdb->connected && mdb->db) {
239          sql_close(mdb);
240       }
241       rwl_destroy(&mdb->lock);
242       free_pool_memory(mdb->errmsg);
243       free_pool_memory(mdb->cmd);
244       free_pool_memory(mdb->cached_path);
245       free_pool_memory(mdb->fname);
246       free_pool_memory(mdb->path);
247       free_pool_memory(mdb->esc_name);
248       free_pool_memory(mdb->esc_path);
249       if (mdb->db_name) {
250          free(mdb->db_name);
251       }
252       if (mdb->db_user) {
253          free(mdb->db_user);
254       }
255       if (mdb->db_password) {
256          free(mdb->db_password);
257       }
258       if (mdb->db_address) {
259          free(mdb->db_address);
260       }
261       if (mdb->db_socket) {
262          free(mdb->db_socket);
263       }
264       free(mdb);
265    }
266    V(mutex);
267 }
268
269 void db_thread_cleanup()
270 { }
271
272 /*
273  * Return the next unique index (auto-increment) for
274  * the given table.  Return NULL on error.
275  *
276  * For PostgreSQL, NULL causes the auto-increment value
277  *  to be updated.
278  */
279 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
280 {
281    strcpy(index, "NULL");
282    return 1;
283 }
284
285
286 /*
287  * Escape strings so that PostgreSQL is happy
288  *
289  *   NOTE! len is the length of the old string. Your new
290  *         string must be long enough (max 2*old+1) to hold
291  *         the escaped output.
292  */
293 void
294 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
295 {
296    int error;
297   
298    PQescapeStringConn(mdb->db, snew, old, len, &error);
299    if (error) {
300       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
301       /* error on encoding, probably invalid multibyte encoding in the source string
302         see PQescapeStringConn documentation for details. */
303       Dmsg0(500, "PQescapeStringConn failed\n");
304    }
305 }
306
307 /*
308  * Submit a general SQL command (cmd), and for each row returned,
309  *  the sqlite_handler is called with the ctx.
310  */
311 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
312 {
313    SQL_ROW row;
314
315    Dmsg0(500, "db_sql_query started\n");
316
317    db_lock(mdb);
318    if (sql_query(mdb, query) != 0) {
319       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
320       db_unlock(mdb);
321       Dmsg0(500, "db_sql_query failed\n");
322       return false;
323    }
324    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
325
326    if (result_handler != NULL) {
327       Dmsg0(500, "db_sql_query invoking handler\n");
328       if ((mdb->result = sql_store_result(mdb)) != NULL) {
329          int num_fields = sql_num_fields(mdb);
330
331          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
332          while ((row = sql_fetch_row(mdb)) != NULL) {
333
334             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
335             if (result_handler(ctx, num_fields, row))
336                break;
337          }
338
339         sql_free_result(mdb);
340       }
341    }
342    db_unlock(mdb);
343
344    Dmsg0(500, "db_sql_query finished\n");
345
346    return true;
347 }
348
349
350
351 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
352 {
353    int j;
354    POSTGRESQL_ROW row = NULL; // by default, return NULL
355
356    Dmsg0(500, "my_postgresql_fetch_row start\n");
357
358    if (!mdb->row || mdb->row_size < mdb->num_fields) {
359       int num_fields = mdb->num_fields;
360       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
361
362       if (mdb->row) {
363          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
364          free(mdb->row);
365       }
366       num_fields += 20;                  /* add a bit extra */
367       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
368       mdb->row_size = num_fields;
369
370       // now reset the row_number now that we have the space allocated
371       mdb->row_number = 0;
372    }
373
374    // if still within the result set
375    if (mdb->row_number < mdb->num_rows) {
376       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
377       // get each value from this row
378       for (j = 0; j < mdb->num_fields; j++) {
379          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
380          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
381       }
382       // increment the row number for the next call
383       mdb->row_number++;
384
385       row = mdb->row;
386    } else {
387       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
388    }
389
390    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
391
392    return row;
393 }
394
395 int my_postgresql_max_length(B_DB *mdb, int field_num) {
396    //
397    // for a given column, find the max length
398    //
399    int max_length;
400    int i;
401    int this_length;
402
403    max_length = 0;
404    for (i = 0; i < mdb->num_rows; i++) {
405       if (PQgetisnull(mdb->result, i, field_num)) {
406           this_length = 4;        // "NULL"
407       } else {
408           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
409       }
410
411       if (max_length < this_length) {
412           max_length = this_length;
413       }
414    }
415
416    return max_length;
417 }
418
419 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
420 {
421    int     i;
422
423    Dmsg0(500, "my_postgresql_fetch_field starts\n");
424
425    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
426       if (mdb->fields) {
427          free(mdb->fields);
428       }
429       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
430       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
431       mdb->fields_size = mdb->num_fields;
432
433       for (i = 0; i < mdb->num_fields; i++) {
434          Dmsg1(500, "filling field %d\n", i);
435          mdb->fields[i].name           = PQfname(mdb->result, i);
436          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
437          mdb->fields[i].type       = PQftype(mdb->result, i);
438          mdb->fields[i].flags      = 0;
439
440          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
441             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
442             mdb->fields[i].flags);
443       } // end for
444    } // end if
445
446    // increment field number for the next time around
447
448    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
449    return &mdb->fields[mdb->field_number++];
450 }
451
452 void my_postgresql_data_seek(B_DB *mdb, int row)
453 {
454    // set the row number to be returned on the next call
455    // to my_postgresql_fetch_row
456    mdb->row_number = row;
457 }
458
459 void my_postgresql_field_seek(B_DB *mdb, int field)
460 {
461    mdb->field_number = field;
462 }
463
464 /*
465  * Note, if this routine returns 1 (failure), Bacula expects
466  *  that no result has been stored.
467  * This is where QUERY_DB comes with Postgresql.
468  *
469  *  Returns:  0  on success
470  *            1  on failure
471  *
472  */
473 int my_postgresql_query(B_DB *mdb, const char *query)
474 {
475    Dmsg0(500, "my_postgresql_query started\n");
476    // We are starting a new query.  reset everything.
477    mdb->num_rows     = -1;
478    mdb->row_number   = -1;
479    mdb->field_number = -1;
480
481    if (mdb->result) {
482       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
483       mdb->result = NULL;
484    }
485
486    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
487
488    for (int i=0; i < 10; i++) {
489       mdb->result = PQexec(mdb->db, query);
490       if (mdb->result) {
491          break;
492       }
493       bmicrosleep(5, 0);
494    }
495    if (!mdb->result) {
496       Dmsg1(50, "Query failed: %s\n", query);
497       goto bail_out;
498    }
499
500    mdb->status = PQresultStatus(mdb->result);
501    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
502       Dmsg1(500, "we have a result\n", query);
503
504       // how many fields in the set?
505       mdb->num_fields = (int)PQnfields(mdb->result);
506       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
507
508       mdb->num_rows = PQntuples(mdb->result);
509       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
510
511       mdb->status = 0;                  /* succeed */
512    } else {
513       Dmsg1(50, "Result status failed: %s\n", query);
514       goto bail_out;
515    }
516
517    Dmsg0(500, "my_postgresql_query finishing\n");
518    return mdb->status;
519
520 bail_out:
521    Dmsg1(500, "we failed\n", query);
522    PQclear(mdb->result);
523    mdb->result = NULL;
524    mdb->status = 1;                   /* failed */
525    return mdb->status;
526 }
527
528 void my_postgresql_free_result(B_DB *mdb)
529 {
530    
531    db_lock(mdb);
532    if (mdb->result) {
533       PQclear(mdb->result);
534       mdb->result = NULL;
535    }
536
537    if (mdb->row) {
538       free(mdb->row);
539       mdb->row = NULL;
540    }
541
542    if (mdb->fields) {
543       free(mdb->fields);
544       mdb->fields = NULL;
545    }
546    db_unlock(mdb);
547 }
548
549 int my_postgresql_currval(B_DB *mdb, char *table_name)
550 {
551    // Obtain the current value of the sequence that
552    // provides the serial value for primary key of the table.
553
554    // currval is local to our session.  It is not affected by
555    // other transactions.
556
557    // Determine the name of the sequence.
558    // PostgreSQL automatically creates a sequence using
559    // <table>_<column>_seq.
560    // At the time of writing, all tables used this format for
561    // for their primary key: <table>id
562    // Except for basefiles which has a primary key on baseid.
563    // Therefore, we need to special case that one table.
564
565    // everything else can use the PostgreSQL formula.
566
567    char      sequence[NAMEDATALEN-1];
568    char      query   [NAMEDATALEN+50];
569    PGresult *result;
570    int       id = 0;
571
572    if (strcasecmp(table_name, "basefiles") == 0) {
573       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
574    } else {
575       bstrncpy(sequence, table_name, sizeof(sequence));
576       bstrncat(sequence, "_",        sizeof(sequence));
577       bstrncat(sequence, table_name, sizeof(sequence));
578       bstrncat(sequence, "id",       sizeof(sequence));
579    }
580
581    bstrncat(sequence, "_seq", sizeof(sequence));
582    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
583
584    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
585    for (int i=0; i < 10; i++) {
586       result = PQexec(mdb->db, query);
587       if (result) {
588          break;
589       }
590       bmicrosleep(5, 0);
591    }
592    if (!result) {
593       Dmsg1(50, "Query failed: %s\n", query);
594       goto bail_out;
595    }
596
597    Dmsg0(500, "exec done");
598
599    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
600       Dmsg0(500, "getting value");
601       id = atoi(PQgetvalue(result, 0, 0));
602       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
603    } else {
604       Dmsg1(50, "Result status failed: %s\n", query);
605       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
606    }
607
608 bail_out:
609    PQclear(result);
610
611    return id;
612 }
613
614 #ifdef HAVE_BATCH_FILE_INSERT
615
616 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
617 {
618    char *query = "COPY batch FROM STDIN";
619
620    Dmsg0(500, "my_postgresql_batch_start started\n");
621
622    if (my_postgresql_query(mdb,
623                            "CREATE TEMPORARY TABLE batch ("
624                                "fileindex int,"
625                                "jobid int,"
626                                "path varchar,"
627                                "name varchar,"
628                                "lstat varchar,"
629                                "md5 varchar)") == 1)
630    {
631       Dmsg0(500, "my_postgresql_batch_start failed\n");
632       return 1;
633    }
634    
635    // We are starting a new query.  reset everything.
636    mdb->num_rows     = -1;
637    mdb->row_number   = -1;
638    mdb->field_number = -1;
639
640    my_postgresql_free_result(mdb);
641
642    for (int i=0; i < 10; i++) {
643       mdb->result = PQexec(mdb->db, query);
644       if (mdb->result) {
645          break;
646       }
647       bmicrosleep(5, 0);
648    }
649    if (!mdb->result) {
650       Dmsg1(50, "Query failed: %s\n", query);
651       goto bail_out;
652    }
653
654    mdb->status = PQresultStatus(mdb->result);
655    if (mdb->status == PGRES_COPY_IN) {
656       // how many fields in the set?
657       mdb->num_fields = (int) PQnfields(mdb->result);
658       mdb->num_rows   = 0;
659       mdb->status = 1;
660    } else {
661       Dmsg1(50, "Result status failed: %s\n", query);
662       goto bail_out;
663    }
664
665    Dmsg0(500, "my_postgresql_batch_start finishing\n");
666
667    return mdb->status;
668
669 bail_out:
670    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
671    mdb->status = 0;
672    PQclear(mdb->result);
673    mdb->result = NULL;
674    return mdb->status;
675 }
676
677 /* set error to something to abort operation */
678 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
679 {
680    int res;
681    int count=30;
682    Dmsg0(500, "my_postgresql_batch_end started\n");
683
684    if (!mdb) {                  /* no files ? */
685       return 0;
686    }
687
688    do { 
689       res = PQputCopyEnd(mdb->db, error);
690    } while (res == 0 && --count > 0);
691
692    if (res == 1) {
693       Dmsg0(500, "ok\n");
694       mdb->status = 1;
695    }
696    
697    if (res <= 0) {
698       Dmsg0(500, "we failed\n");
699       mdb->status = 0;
700       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
701    }
702    
703    Dmsg0(500, "my_postgresql_batch_end finishing\n");
704
705    return mdb->status;
706 }
707
708 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
709 {
710    int res;
711    int count=30;
712    size_t len;
713    char *digest;
714    char ed1[50];
715
716    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
717    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
718
719    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
720    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
721
722    if (ar->Digest == NULL || ar->Digest[0] == 0) {
723       digest = "0";
724    } else {
725       digest = ar->Digest;
726    }
727
728    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
729               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
730               mdb->esc_name, ar->attr, digest);
731
732    do { 
733       res = PQputCopyData(mdb->db,
734                           mdb->cmd,
735                           len);
736    } while (res == 0 && --count > 0);
737
738    if (res == 1) {
739       Dmsg0(500, "ok\n");
740       mdb->changes++;
741       mdb->status = 1;
742    }
743
744    if (res <= 0) {
745       Dmsg0(500, "we failed\n");
746       mdb->status = 0;
747       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
748    }
749
750    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
751
752    return mdb->status;
753 }
754
755 #endif /* HAVE_BATCH_FILE_INSERT */
756
757 /*
758  * Escape strings so that PostgreSQL is happy on COPY
759  *
760  *   NOTE! len is the length of the old string. Your new
761  *         string must be long enough (max 2*old+1) to hold
762  *         the escaped output.
763  */
764 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
765 {
766    /* we have to escape \t, \n, \r, \ */
767    char c = '\0' ;
768
769    while (len > 0 && *src) {
770       switch (*src) {
771       case '\n':
772          c = 'n';
773          break;
774       case '\\':
775          c = '\\';
776          break;
777       case '\t':
778          c = 't';
779          break;
780       case '\r':
781          c = 'r';
782          break;
783       default:
784          c = '\0' ;
785       }
786
787       if (c) {
788          *dest = '\\';
789          dest++;
790          *dest = c;
791       } else {
792          *dest = *src;
793       }
794
795       len--;
796       src++;
797       dest++;
798    }
799
800    *dest = '\0';
801    return dest;
802 }
803
804 #ifdef HAVE_BATCH_FILE_INSERT
805 const char *my_pg_batch_lock_path_query = 
806    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
807
808
809 const char *my_pg_batch_lock_filename_query = 
810    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
811
812 const char *my_pg_batch_unlock_tables_query = "COMMIT";
813
814 const char *my_pg_batch_fill_path_query = 
815    "INSERT INTO Path (Path) "
816     "SELECT a.Path FROM "
817      "(SELECT DISTINCT Path FROM batch) AS a "
818       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
819
820
821 const char *my_pg_batch_fill_filename_query = 
822    "INSERT INTO Filename (Name) "
823     "SELECT a.Name FROM "
824      "(SELECT DISTINCT Name FROM batch) as a "
825       "WHERE NOT EXISTS "
826        "(SELECT Name FROM Filename WHERE Name = a.Name)";
827 #endif /* HAVE_BATCH_FILE_INSERT */
828
829 #endif /* HAVE_POSTGRESQL */