]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Free db_list when not used
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  */
36
37
38 /* The following is necessary so that we do not include
39  * the dummy external definition of DB.
40  */
41 #define __SQL_C                       /* indicate that this is sql.c */
42
43 #include "bacula.h"
44 #include "cats.h"
45
46 #ifdef HAVE_POSTGRESQL
47
48 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
49 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb = NULL;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (db_list == NULL) {
90       db_list = New(dlist(mdb, &mdb->link));
91    }
92    if (!mult_db_connections) {
93       /* Look to see if DB already open */
94       foreach_dlist(mdb, db_list) {
95          if (bstrcmp(mdb->db_name, db_name) &&
96              bstrcmp(mdb->db_address, db_address) &&
97              mdb->db_port == db_port) {
98             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
99             mdb->ref_count++;
100             V(mutex);
101             return mdb;                  /* already open */
102          }
103       }
104    }
105    Dmsg0(100, "db_open first time\n");
106    mdb = (B_DB *)malloc(sizeof(B_DB));
107    memset(mdb, 0, sizeof(B_DB));
108    mdb->db_name = bstrdup(db_name);
109    mdb->db_user = bstrdup(db_user);
110    if (db_password) {
111       mdb->db_password = bstrdup(db_password);
112    }
113    if (db_address) {
114       mdb->db_address  = bstrdup(db_address);
115    }
116    if (db_socket) {
117       mdb->db_socket   = bstrdup(db_socket);
118    }
119    mdb->db_port        = db_port;
120    mdb->have_insert_id = TRUE;
121    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
122    *mdb->errmsg        = 0;
123    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
124    mdb->cached_path    = get_pool_memory(PM_FNAME);
125    mdb->cached_path_id = 0;
126    mdb->ref_count      = 1;
127    mdb->fname          = get_pool_memory(PM_FNAME);
128    mdb->path           = get_pool_memory(PM_FNAME);
129    mdb->esc_name       = get_pool_memory(PM_FNAME);
130    mdb->esc_path      = get_pool_memory(PM_FNAME);
131    mdb->allow_transactions = mult_db_connections;
132    db_list->append(mdb);                   /* put db in list */
133    V(mutex);
134    return mdb;
135 }
136
137 /* Check that the database correspond to the encoding we want */
138 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
139 {
140    SQL_ROW row;
141    int ret=false;
142
143    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
144       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
145       return false;
146    }
147
148    if ((row = sql_fetch_row(mdb)) == NULL) {
149       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
150       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
151    } else {
152       ret = bstrcmp(row[0], "SQL_ASCII");
153
154       if (ret) {
155          /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
156          db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
157
158       } else {                  /* something is wrong with database encoding */
159          Mmsg(mdb->errmsg, 
160               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
161               mdb->db_name, row[0]);
162          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
163          Dmsg1(50, "%s", mdb->errmsg);
164       } 
165    }
166    return ret;
167 }
168
169 /*
170  * Now actually open the database.  This can generate errors,
171  *   which are returned in the errmsg
172  *
173  * DO NOT close the database or free(mdb) here !!!!
174  */
175 int
176 db_open_database(JCR *jcr, B_DB *mdb)
177 {
178    int errstat;
179    char buf[10], *port;
180
181    P(mutex);
182    if (mdb->connected) {
183       V(mutex);
184       return 1;
185    }
186    mdb->connected = false;
187
188    if ((errstat=rwl_init(&mdb->lock)) != 0) {
189       berrno be;
190       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
191             be.bstrerror(errstat));
192       V(mutex);
193       return 0;
194    }
195
196    if (mdb->db_port) {
197       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
198       port = buf;
199    } else {
200       port = NULL;
201    }
202
203    /* If connection fails, try at 5 sec intervals for 30 seconds. */
204    for (int retry=0; retry < 6; retry++) {
205       /* connect to the database */
206       mdb->db = PQsetdbLogin(
207            mdb->db_address,           /* default = localhost */
208            port,                      /* default port */
209            NULL,                      /* pg options */
210            NULL,                      /* tty, ignored */
211            mdb->db_name,              /* database name */
212            mdb->db_user,              /* login name */
213            mdb->db_password);         /* password */
214
215       /* If no connect, try once more in case it is a timing problem */
216       if (PQstatus(mdb->db) == CONNECTION_OK) {
217          break;
218       }
219       bmicrosleep(5, 0);
220    }
221
222    Dmsg0(50, "pg_real_connect done\n");
223    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
224             mdb->db_password==NULL?"(NULL)":mdb->db_password);
225
226    if (PQstatus(mdb->db) != CONNECTION_OK) {
227       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
228          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
229          mdb->db_name, mdb->db_user);
230       V(mutex);
231       return 0;
232    }
233
234    mdb->connected = true;
235
236    if (!check_tables_version(jcr, mdb)) {
237       V(mutex);
238       return 0;
239    }
240
241    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
242    
243    /* tell PostgreSQL we are using standard conforming strings
244       and avoid warnings such as:
245        WARNING:  nonstandard use of \\ in a string literal
246    */
247    sql_query(mdb, "set standard_conforming_strings=on");
248
249    /* check that encoding is SQL_ASCII */
250    check_database_encoding(jcr, mdb);
251
252    V(mutex);
253    return 1;
254 }
255
256 void
257 db_close_database(JCR *jcr, B_DB *mdb)
258 {
259    if (!mdb) {
260       return;
261    }
262    db_end_transaction(jcr, mdb);
263    P(mutex);
264    sql_free_result(mdb);
265    mdb->ref_count--;
266    if (mdb->ref_count == 0) {
267       db_list->remove(mdb);
268       if (mdb->connected && mdb->db) {
269          sql_close(mdb);
270       }
271       rwl_destroy(&mdb->lock);
272       free_pool_memory(mdb->errmsg);
273       free_pool_memory(mdb->cmd);
274       free_pool_memory(mdb->cached_path);
275       free_pool_memory(mdb->fname);
276       free_pool_memory(mdb->path);
277       free_pool_memory(mdb->esc_name);
278       free_pool_memory(mdb->esc_path);
279       if (mdb->db_name) {
280          free(mdb->db_name);
281       }
282       if (mdb->db_user) {
283          free(mdb->db_user);
284       }
285       if (mdb->db_password) {
286          free(mdb->db_password);
287       }
288       if (mdb->db_address) {
289          free(mdb->db_address);
290       }
291       if (mdb->db_socket) {
292          free(mdb->db_socket);
293       }
294       free(mdb);
295       if (db_list->size() == 0) {
296          delete db_list;
297          db_list = NULL;
298       }
299    }
300    V(mutex);
301 }
302
303 void db_check_backend_thread_safe()
304 {
305 #ifdef HAVE_BATCH_FILE_INSERT
306 # ifdef HAVE_PQISTHREADSAFE 
307    if (!PQisthreadsafe()) {
308       Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
309                           "when using BatchMode.\n"));
310    }
311 # endif
312 #endif
313 }
314
315 void db_thread_cleanup()
316 { }
317
318 /*
319  * Return the next unique index (auto-increment) for
320  * the given table.  Return NULL on error.
321  *
322  * For PostgreSQL, NULL causes the auto-increment value
323  *  to be updated.
324  */
325 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
326 {
327    strcpy(index, "NULL");
328    return 1;
329 }
330
331
332 /*
333  * Escape strings so that PostgreSQL is happy
334  *
335  *   NOTE! len is the length of the old string. Your new
336  *         string must be long enough (max 2*old+1) to hold
337  *         the escaped output.
338  */
339 void
340 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
341 {
342    int error;
343   
344    PQescapeStringConn(mdb->db, snew, old, len, &error);
345    if (error) {
346       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
347       /* error on encoding, probably invalid multibyte encoding in the source string
348         see PQescapeStringConn documentation for details. */
349       Dmsg0(500, "PQescapeStringConn failed\n");
350    }
351 }
352
353 /*
354  * Submit a general SQL command (cmd), and for each row returned,
355  *  the sqlite_handler is called with the ctx.
356  */
357 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
358 {
359    SQL_ROW row;
360
361    Dmsg0(500, "db_sql_query started\n");
362
363    db_lock(mdb);
364    if (sql_query(mdb, query) != 0) {
365       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
366       db_unlock(mdb);
367       Dmsg0(500, "db_sql_query failed\n");
368       return false;
369    }
370    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
371
372    if (result_handler != NULL) {
373       Dmsg0(500, "db_sql_query invoking handler\n");
374       if ((mdb->result = sql_store_result(mdb)) != NULL) {
375          int num_fields = sql_num_fields(mdb);
376
377          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
378          while ((row = sql_fetch_row(mdb)) != NULL) {
379
380             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
381             if (result_handler(ctx, num_fields, row))
382                break;
383          }
384
385         sql_free_result(mdb);
386       }
387    }
388    db_unlock(mdb);
389
390    Dmsg0(500, "db_sql_query finished\n");
391
392    return true;
393 }
394
395
396
397 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
398 {
399    int j;
400    POSTGRESQL_ROW row = NULL; // by default, return NULL
401
402    Dmsg0(500, "my_postgresql_fetch_row start\n");
403
404    if (!mdb->row || mdb->row_size < mdb->num_fields) {
405       int num_fields = mdb->num_fields;
406       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
407
408       if (mdb->row) {
409          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
410          free(mdb->row);
411       }
412       num_fields += 20;                  /* add a bit extra */
413       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
414       mdb->row_size = num_fields;
415
416       // now reset the row_number now that we have the space allocated
417       mdb->row_number = 0;
418    }
419
420    // if still within the result set
421    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
422       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
423       // get each value from this row
424       for (j = 0; j < mdb->num_fields; j++) {
425          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
426          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
427       }
428       // increment the row number for the next call
429       mdb->row_number++;
430
431       row = mdb->row;
432    } else {
433       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
434    }
435
436    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
437
438    return row;
439 }
440
441 int my_postgresql_max_length(B_DB *mdb, int field_num) {
442    //
443    // for a given column, find the max length
444    //
445    int max_length;
446    int i;
447    int this_length;
448
449    max_length = 0;
450    for (i = 0; i < mdb->num_rows; i++) {
451       if (PQgetisnull(mdb->result, i, field_num)) {
452           this_length = 4;        // "NULL"
453       } else {
454           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
455       }
456
457       if (max_length < this_length) {
458           max_length = this_length;
459       }
460    }
461
462    return max_length;
463 }
464
465 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
466 {
467    int     i;
468
469    Dmsg0(500, "my_postgresql_fetch_field starts\n");
470
471    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
472       if (mdb->fields) {
473          free(mdb->fields);
474       }
475       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
476       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
477       mdb->fields_size = mdb->num_fields;
478
479       for (i = 0; i < mdb->num_fields; i++) {
480          Dmsg1(500, "filling field %d\n", i);
481          mdb->fields[i].name           = PQfname(mdb->result, i);
482          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
483          mdb->fields[i].type       = PQftype(mdb->result, i);
484          mdb->fields[i].flags      = 0;
485
486          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
487             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
488             mdb->fields[i].flags);
489       } // end for
490    } // end if
491
492    // increment field number for the next time around
493
494    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
495    return &mdb->fields[mdb->field_number++];
496 }
497
498 void my_postgresql_data_seek(B_DB *mdb, int row)
499 {
500    // set the row number to be returned on the next call
501    // to my_postgresql_fetch_row
502    mdb->row_number = row;
503 }
504
505 void my_postgresql_field_seek(B_DB *mdb, int field)
506 {
507    mdb->field_number = field;
508 }
509
510 /*
511  * Note, if this routine returns 1 (failure), Bacula expects
512  *  that no result has been stored.
513  * This is where QUERY_DB comes with Postgresql.
514  *
515  *  Returns:  0  on success
516  *            1  on failure
517  *
518  */
519 int my_postgresql_query(B_DB *mdb, const char *query)
520 {
521    Dmsg0(500, "my_postgresql_query started\n");
522    // We are starting a new query.  reset everything.
523    mdb->num_rows     = -1;
524    mdb->row_number   = -1;
525    mdb->field_number = -1;
526
527    if (mdb->result) {
528       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
529       mdb->result = NULL;
530    }
531
532    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
533
534    for (int i=0; i < 10; i++) {
535       mdb->result = PQexec(mdb->db, query);
536       if (mdb->result) {
537          break;
538       }
539       bmicrosleep(5, 0);
540    }
541    if (!mdb->result) {
542       Dmsg1(50, "Query failed: %s\n", query);
543       goto bail_out;
544    }
545
546    mdb->status = PQresultStatus(mdb->result);
547    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
548       Dmsg1(500, "we have a result\n", query);
549
550       // how many fields in the set?
551       mdb->num_fields = (int)PQnfields(mdb->result);
552       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
553
554       mdb->num_rows = PQntuples(mdb->result);
555       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
556
557       mdb->row_number = 0;      /* we can start to fetch something */
558       mdb->status = 0;          /* succeed */
559    } else {
560       Dmsg1(50, "Result status failed: %s\n", query);
561       goto bail_out;
562    }
563
564    Dmsg0(500, "my_postgresql_query finishing\n");
565    return mdb->status;
566
567 bail_out:
568    Dmsg1(500, "we failed\n", query);
569    PQclear(mdb->result);
570    mdb->result = NULL;
571    mdb->status = 1;                   /* failed */
572    return mdb->status;
573 }
574
575 void my_postgresql_free_result(B_DB *mdb)
576 {
577    
578    db_lock(mdb);
579    if (mdb->result) {
580       PQclear(mdb->result);
581       mdb->result = NULL;
582    }
583
584    if (mdb->row) {
585       free(mdb->row);
586       mdb->row = NULL;
587    }
588
589    if (mdb->fields) {
590       free(mdb->fields);
591       mdb->fields = NULL;
592    }
593    db_unlock(mdb);
594 }
595
596 int my_postgresql_currval(B_DB *mdb, const char *table_name)
597 {
598    // Obtain the current value of the sequence that
599    // provides the serial value for primary key of the table.
600
601    // currval is local to our session.  It is not affected by
602    // other transactions.
603
604    // Determine the name of the sequence.
605    // PostgreSQL automatically creates a sequence using
606    // <table>_<column>_seq.
607    // At the time of writing, all tables used this format for
608    // for their primary key: <table>id
609    // Except for basefiles which has a primary key on baseid.
610    // Therefore, we need to special case that one table.
611
612    // everything else can use the PostgreSQL formula.
613
614    char      sequence[NAMEDATALEN-1];
615    char      query   [NAMEDATALEN+50];
616    PGresult *result;
617    int       id = 0;
618
619    if (strcasecmp(table_name, "basefiles") == 0) {
620       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
621    } else {
622       bstrncpy(sequence, table_name, sizeof(sequence));
623       bstrncat(sequence, "_",        sizeof(sequence));
624       bstrncat(sequence, table_name, sizeof(sequence));
625       bstrncat(sequence, "id",       sizeof(sequence));
626    }
627
628    bstrncat(sequence, "_seq", sizeof(sequence));
629    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
630
631    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
632    for (int i=0; i < 10; i++) {
633       result = PQexec(mdb->db, query);
634       if (result) {
635          break;
636       }
637       bmicrosleep(5, 0);
638    }
639    if (!result) {
640       Dmsg1(50, "Query failed: %s\n", query);
641       goto bail_out;
642    }
643
644    Dmsg0(500, "exec done");
645
646    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
647       Dmsg0(500, "getting value");
648       id = atoi(PQgetvalue(result, 0, 0));
649       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
650    } else {
651       Dmsg1(50, "Result status failed: %s\n", query);
652       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
653    }
654
655 bail_out:
656    PQclear(result);
657
658    return id;
659 }
660
661 #ifdef HAVE_BATCH_FILE_INSERT
662
663 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
664 {
665    const char *query = "COPY batch FROM STDIN";
666
667    Dmsg0(500, "my_postgresql_batch_start started\n");
668
669    if (my_postgresql_query(mdb,
670                            "CREATE TEMPORARY TABLE batch ("
671                                "fileindex int,"
672                                "jobid int,"
673                                "path varchar,"
674                                "name varchar,"
675                                "lstat varchar,"
676                                "md5 varchar)") == 1)
677    {
678       Dmsg0(500, "my_postgresql_batch_start failed\n");
679       return 1;
680    }
681    
682    // We are starting a new query.  reset everything.
683    mdb->num_rows     = -1;
684    mdb->row_number   = -1;
685    mdb->field_number = -1;
686
687    my_postgresql_free_result(mdb);
688
689    for (int i=0; i < 10; i++) {
690       mdb->result = PQexec(mdb->db, query);
691       if (mdb->result) {
692          break;
693       }
694       bmicrosleep(5, 0);
695    }
696    if (!mdb->result) {
697       Dmsg1(50, "Query failed: %s\n", query);
698       goto bail_out;
699    }
700
701    mdb->status = PQresultStatus(mdb->result);
702    if (mdb->status == PGRES_COPY_IN) {
703       // how many fields in the set?
704       mdb->num_fields = (int) PQnfields(mdb->result);
705       mdb->num_rows   = 0;
706       mdb->status = 1;
707    } else {
708       Dmsg1(50, "Result status failed: %s\n", query);
709       goto bail_out;
710    }
711
712    Dmsg0(500, "my_postgresql_batch_start finishing\n");
713
714    return mdb->status;
715
716 bail_out:
717    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
718    mdb->status = 0;
719    PQclear(mdb->result);
720    mdb->result = NULL;
721    return mdb->status;
722 }
723
724 /* set error to something to abort operation */
725 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
726 {
727    int res;
728    int count=30;
729    PGresult *result;
730    Dmsg0(500, "my_postgresql_batch_end started\n");
731
732    if (!mdb) {                  /* no files ? */
733       return 0;
734    }
735
736    do { 
737       res = PQputCopyEnd(mdb->db, error);
738    } while (res == 0 && --count > 0);
739
740    if (res == 1) {
741       Dmsg0(500, "ok\n");
742       mdb->status = 1;
743    }
744    
745    if (res <= 0) {
746       Dmsg0(500, "we failed\n");
747       mdb->status = 0;
748       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
749    }
750
751    /* Check command status and return to normal libpq state */
752    result = PQgetResult(mdb->db);
753    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
754       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
755       mdb->status = 0;
756    }
757    PQclear(result); 
758
759    Dmsg0(500, "my_postgresql_batch_end finishing\n");
760
761    return mdb->status;
762 }
763
764 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
765 {
766    int res;
767    int count=30;
768    size_t len;
769    const char *digest;
770    char ed1[50];
771
772    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
773    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
774
775    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
776    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
777
778    if (ar->Digest == NULL || ar->Digest[0] == 0) {
779       digest = "0";
780    } else {
781       digest = ar->Digest;
782    }
783
784    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
785               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
786               mdb->esc_name, ar->attr, digest);
787
788    do { 
789       res = PQputCopyData(mdb->db,
790                           mdb->cmd,
791                           len);
792    } while (res == 0 && --count > 0);
793
794    if (res == 1) {
795       Dmsg0(500, "ok\n");
796       mdb->changes++;
797       mdb->status = 1;
798    }
799
800    if (res <= 0) {
801       Dmsg0(500, "we failed\n");
802       mdb->status = 0;
803       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
804    }
805
806    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
807
808    return mdb->status;
809 }
810
811 #endif /* HAVE_BATCH_FILE_INSERT */
812
813 /*
814  * Escape strings so that PostgreSQL is happy on COPY
815  *
816  *   NOTE! len is the length of the old string. Your new
817  *         string must be long enough (max 2*old+1) to hold
818  *         the escaped output.
819  */
820 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
821 {
822    /* we have to escape \t, \n, \r, \ */
823    char c = '\0' ;
824
825    while (len > 0 && *src) {
826       switch (*src) {
827       case '\n':
828          c = 'n';
829          break;
830       case '\\':
831          c = '\\';
832          break;
833       case '\t':
834          c = 't';
835          break;
836       case '\r':
837          c = 'r';
838          break;
839       default:
840          c = '\0' ;
841       }
842
843       if (c) {
844          *dest = '\\';
845          dest++;
846          *dest = c;
847       } else {
848          *dest = *src;
849       }
850
851       len--;
852       src++;
853       dest++;
854    }
855
856    *dest = '\0';
857    return dest;
858 }
859
860 #ifdef HAVE_BATCH_FILE_INSERT
861 const char *my_pg_batch_lock_path_query = 
862    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
863
864
865 const char *my_pg_batch_lock_filename_query = 
866    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
867
868 const char *my_pg_batch_unlock_tables_query = "COMMIT";
869
870 const char *my_pg_batch_fill_path_query = 
871    "INSERT INTO Path (Path) "
872     "SELECT a.Path FROM "
873      "(SELECT DISTINCT Path FROM batch) AS a "
874       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
875
876
877 const char *my_pg_batch_fill_filename_query = 
878    "INSERT INTO Filename (Name) "
879     "SELECT a.Name FROM "
880      "(SELECT DISTINCT Name FROM batch) as a "
881       "WHERE NOT EXISTS "
882        "(SELECT Name FROM Filename WHERE Name = a.Name)";
883 #endif /* HAVE_BATCH_FILE_INSERT */
884
885 #endif /* HAVE_POSTGRESQL */