]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Remove old bdb files + change DQUEUE to use dlist
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  */
36
37
38 /* The following is necessary so that we do not include
39  * the dummy external definition of DB.
40  */
41 #define __SQL_C                       /* indicate that this is sql.c */
42
43 #include "bacula.h"
44 #include "cats.h"
45
46 #ifdef HAVE_POSTGRESQL
47
48 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
49 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb = NULL;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (db_list == NULL) {
90       db_list = New(dlist(mdb, &mdb->link));
91    }
92    if (!mult_db_connections) {
93       /* Look to see if DB already open */
94       foreach_dlist(mdb, db_list) {
95          if (bstrcmp(mdb->db_name, db_name) &&
96              bstrcmp(mdb->db_address, db_address) &&
97              mdb->db_port == db_port) {
98             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
99             mdb->ref_count++;
100             V(mutex);
101             return mdb;                  /* already open */
102          }
103       }
104    }
105    Dmsg0(100, "db_open first time\n");
106    mdb = (B_DB *)malloc(sizeof(B_DB));
107    memset(mdb, 0, sizeof(B_DB));
108    mdb->db_name = bstrdup(db_name);
109    mdb->db_user = bstrdup(db_user);
110    if (db_password) {
111       mdb->db_password = bstrdup(db_password);
112    }
113    if (db_address) {
114       mdb->db_address  = bstrdup(db_address);
115    }
116    if (db_socket) {
117       mdb->db_socket   = bstrdup(db_socket);
118    }
119    mdb->db_port        = db_port;
120    mdb->have_insert_id = TRUE;
121    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
122    *mdb->errmsg        = 0;
123    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
124    mdb->cached_path    = get_pool_memory(PM_FNAME);
125    mdb->cached_path_id = 0;
126    mdb->ref_count      = 1;
127    mdb->fname          = get_pool_memory(PM_FNAME);
128    mdb->path           = get_pool_memory(PM_FNAME);
129    mdb->esc_name       = get_pool_memory(PM_FNAME);
130    mdb->esc_path      = get_pool_memory(PM_FNAME);
131    mdb->allow_transactions = mult_db_connections;
132    db_list->append(mdb);                   /* put db in list */
133    V(mutex);
134    return mdb;
135 }
136
137 /* Check that the database correspond to the encoding we want */
138 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
139 {
140    SQL_ROW row;
141    int ret=false;
142
143    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
144       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
145       return false;
146    }
147
148    if ((row = sql_fetch_row(mdb)) == NULL) {
149       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
150       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
151    } else {
152       ret = bstrcmp(row[0], "SQL_ASCII");
153
154       if (ret) {
155          /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
156          db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
157
158       } else {                  /* something is wrong with database encoding */
159          Mmsg(mdb->errmsg, 
160               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
161               mdb->db_name, row[0]);
162          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
163          Dmsg1(50, "%s", mdb->errmsg);
164       } 
165    }
166    return ret;
167 }
168
169 /*
170  * Now actually open the database.  This can generate errors,
171  *   which are returned in the errmsg
172  *
173  * DO NOT close the database or free(mdb) here !!!!
174  */
175 int
176 db_open_database(JCR *jcr, B_DB *mdb)
177 {
178    int errstat;
179    char buf[10], *port;
180
181    P(mutex);
182    if (mdb->connected) {
183       V(mutex);
184       return 1;
185    }
186    mdb->connected = false;
187
188    if ((errstat=rwl_init(&mdb->lock)) != 0) {
189       berrno be;
190       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
191             be.bstrerror(errstat));
192       V(mutex);
193       return 0;
194    }
195
196    if (mdb->db_port) {
197       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
198       port = buf;
199    } else {
200       port = NULL;
201    }
202
203    /* If connection fails, try at 5 sec intervals for 30 seconds. */
204    for (int retry=0; retry < 6; retry++) {
205       /* connect to the database */
206       mdb->db = PQsetdbLogin(
207            mdb->db_address,           /* default = localhost */
208            port,                      /* default port */
209            NULL,                      /* pg options */
210            NULL,                      /* tty, ignored */
211            mdb->db_name,              /* database name */
212            mdb->db_user,              /* login name */
213            mdb->db_password);         /* password */
214
215       /* If no connect, try once more in case it is a timing problem */
216       if (PQstatus(mdb->db) == CONNECTION_OK) {
217          break;
218       }
219       bmicrosleep(5, 0);
220    }
221
222    Dmsg0(50, "pg_real_connect done\n");
223    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
224             mdb->db_password==NULL?"(NULL)":mdb->db_password);
225
226    if (PQstatus(mdb->db) != CONNECTION_OK) {
227       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
228          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
229          mdb->db_name, mdb->db_user);
230       V(mutex);
231       return 0;
232    }
233
234    mdb->connected = true;
235
236    if (!check_tables_version(jcr, mdb)) {
237       V(mutex);
238       return 0;
239    }
240
241    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
242    
243    /* tell PostgreSQL we are using standard conforming strings
244       and avoid warnings such as:
245        WARNING:  nonstandard use of \\ in a string literal
246    */
247    sql_query(mdb, "set standard_conforming_strings=on");
248
249    /* check that encoding is SQL_ASCII */
250    check_database_encoding(jcr, mdb);
251
252    V(mutex);
253    return 1;
254 }
255
256 void
257 db_close_database(JCR *jcr, B_DB *mdb)
258 {
259    if (!mdb) {
260       return;
261    }
262    db_end_transaction(jcr, mdb);
263    P(mutex);
264    sql_free_result(mdb);
265    mdb->ref_count--;
266    if (mdb->ref_count == 0) {
267       db_list->remove(mdb);
268       if (mdb->connected && mdb->db) {
269          sql_close(mdb);
270       }
271       rwl_destroy(&mdb->lock);
272       free_pool_memory(mdb->errmsg);
273       free_pool_memory(mdb->cmd);
274       free_pool_memory(mdb->cached_path);
275       free_pool_memory(mdb->fname);
276       free_pool_memory(mdb->path);
277       free_pool_memory(mdb->esc_name);
278       free_pool_memory(mdb->esc_path);
279       if (mdb->db_name) {
280          free(mdb->db_name);
281       }
282       if (mdb->db_user) {
283          free(mdb->db_user);
284       }
285       if (mdb->db_password) {
286          free(mdb->db_password);
287       }
288       if (mdb->db_address) {
289          free(mdb->db_address);
290       }
291       if (mdb->db_socket) {
292          free(mdb->db_socket);
293       }
294       free(mdb);
295    }
296    V(mutex);
297 }
298
299 void db_check_backend_thread_safe()
300 {
301 #ifdef HAVE_BATCH_FILE_INSERT
302 # ifdef HAVE_PQISTHREADSAFE 
303    if (!PQisthreadsafe()) {
304       Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
305                           "when using BatchMode.\n"));
306    }
307 # endif
308 #endif
309 }
310
311 void db_thread_cleanup()
312 { }
313
314 /*
315  * Return the next unique index (auto-increment) for
316  * the given table.  Return NULL on error.
317  *
318  * For PostgreSQL, NULL causes the auto-increment value
319  *  to be updated.
320  */
321 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
322 {
323    strcpy(index, "NULL");
324    return 1;
325 }
326
327
328 /*
329  * Escape strings so that PostgreSQL is happy
330  *
331  *   NOTE! len is the length of the old string. Your new
332  *         string must be long enough (max 2*old+1) to hold
333  *         the escaped output.
334  */
335 void
336 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
337 {
338    int error;
339   
340    PQescapeStringConn(mdb->db, snew, old, len, &error);
341    if (error) {
342       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
343       /* error on encoding, probably invalid multibyte encoding in the source string
344         see PQescapeStringConn documentation for details. */
345       Dmsg0(500, "PQescapeStringConn failed\n");
346    }
347 }
348
349 /*
350  * Submit a general SQL command (cmd), and for each row returned,
351  *  the sqlite_handler is called with the ctx.
352  */
353 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
354 {
355    SQL_ROW row;
356
357    Dmsg0(500, "db_sql_query started\n");
358
359    db_lock(mdb);
360    if (sql_query(mdb, query) != 0) {
361       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
362       db_unlock(mdb);
363       Dmsg0(500, "db_sql_query failed\n");
364       return false;
365    }
366    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
367
368    if (result_handler != NULL) {
369       Dmsg0(500, "db_sql_query invoking handler\n");
370       if ((mdb->result = sql_store_result(mdb)) != NULL) {
371          int num_fields = sql_num_fields(mdb);
372
373          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
374          while ((row = sql_fetch_row(mdb)) != NULL) {
375
376             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
377             if (result_handler(ctx, num_fields, row))
378                break;
379          }
380
381         sql_free_result(mdb);
382       }
383    }
384    db_unlock(mdb);
385
386    Dmsg0(500, "db_sql_query finished\n");
387
388    return true;
389 }
390
391
392
393 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
394 {
395    int j;
396    POSTGRESQL_ROW row = NULL; // by default, return NULL
397
398    Dmsg0(500, "my_postgresql_fetch_row start\n");
399
400    if (!mdb->row || mdb->row_size < mdb->num_fields) {
401       int num_fields = mdb->num_fields;
402       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
403
404       if (mdb->row) {
405          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
406          free(mdb->row);
407       }
408       num_fields += 20;                  /* add a bit extra */
409       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
410       mdb->row_size = num_fields;
411
412       // now reset the row_number now that we have the space allocated
413       mdb->row_number = 0;
414    }
415
416    // if still within the result set
417    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
418       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
419       // get each value from this row
420       for (j = 0; j < mdb->num_fields; j++) {
421          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
422          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
423       }
424       // increment the row number for the next call
425       mdb->row_number++;
426
427       row = mdb->row;
428    } else {
429       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
430    }
431
432    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
433
434    return row;
435 }
436
437 int my_postgresql_max_length(B_DB *mdb, int field_num) {
438    //
439    // for a given column, find the max length
440    //
441    int max_length;
442    int i;
443    int this_length;
444
445    max_length = 0;
446    for (i = 0; i < mdb->num_rows; i++) {
447       if (PQgetisnull(mdb->result, i, field_num)) {
448           this_length = 4;        // "NULL"
449       } else {
450           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
451       }
452
453       if (max_length < this_length) {
454           max_length = this_length;
455       }
456    }
457
458    return max_length;
459 }
460
461 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
462 {
463    int     i;
464
465    Dmsg0(500, "my_postgresql_fetch_field starts\n");
466
467    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
468       if (mdb->fields) {
469          free(mdb->fields);
470       }
471       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
472       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
473       mdb->fields_size = mdb->num_fields;
474
475       for (i = 0; i < mdb->num_fields; i++) {
476          Dmsg1(500, "filling field %d\n", i);
477          mdb->fields[i].name           = PQfname(mdb->result, i);
478          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
479          mdb->fields[i].type       = PQftype(mdb->result, i);
480          mdb->fields[i].flags      = 0;
481
482          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
483             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
484             mdb->fields[i].flags);
485       } // end for
486    } // end if
487
488    // increment field number for the next time around
489
490    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
491    return &mdb->fields[mdb->field_number++];
492 }
493
494 void my_postgresql_data_seek(B_DB *mdb, int row)
495 {
496    // set the row number to be returned on the next call
497    // to my_postgresql_fetch_row
498    mdb->row_number = row;
499 }
500
501 void my_postgresql_field_seek(B_DB *mdb, int field)
502 {
503    mdb->field_number = field;
504 }
505
506 /*
507  * Note, if this routine returns 1 (failure), Bacula expects
508  *  that no result has been stored.
509  * This is where QUERY_DB comes with Postgresql.
510  *
511  *  Returns:  0  on success
512  *            1  on failure
513  *
514  */
515 int my_postgresql_query(B_DB *mdb, const char *query)
516 {
517    Dmsg0(500, "my_postgresql_query started\n");
518    // We are starting a new query.  reset everything.
519    mdb->num_rows     = -1;
520    mdb->row_number   = -1;
521    mdb->field_number = -1;
522
523    if (mdb->result) {
524       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
525       mdb->result = NULL;
526    }
527
528    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
529
530    for (int i=0; i < 10; i++) {
531       mdb->result = PQexec(mdb->db, query);
532       if (mdb->result) {
533          break;
534       }
535       bmicrosleep(5, 0);
536    }
537    if (!mdb->result) {
538       Dmsg1(50, "Query failed: %s\n", query);
539       goto bail_out;
540    }
541
542    mdb->status = PQresultStatus(mdb->result);
543    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
544       Dmsg1(500, "we have a result\n", query);
545
546       // how many fields in the set?
547       mdb->num_fields = (int)PQnfields(mdb->result);
548       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
549
550       mdb->num_rows = PQntuples(mdb->result);
551       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
552
553       mdb->row_number = 0;      /* we can start to fetch something */
554       mdb->status = 0;          /* succeed */
555    } else {
556       Dmsg1(50, "Result status failed: %s\n", query);
557       goto bail_out;
558    }
559
560    Dmsg0(500, "my_postgresql_query finishing\n");
561    return mdb->status;
562
563 bail_out:
564    Dmsg1(500, "we failed\n", query);
565    PQclear(mdb->result);
566    mdb->result = NULL;
567    mdb->status = 1;                   /* failed */
568    return mdb->status;
569 }
570
571 void my_postgresql_free_result(B_DB *mdb)
572 {
573    
574    db_lock(mdb);
575    if (mdb->result) {
576       PQclear(mdb->result);
577       mdb->result = NULL;
578    }
579
580    if (mdb->row) {
581       free(mdb->row);
582       mdb->row = NULL;
583    }
584
585    if (mdb->fields) {
586       free(mdb->fields);
587       mdb->fields = NULL;
588    }
589    db_unlock(mdb);
590 }
591
592 int my_postgresql_currval(B_DB *mdb, const char *table_name)
593 {
594    // Obtain the current value of the sequence that
595    // provides the serial value for primary key of the table.
596
597    // currval is local to our session.  It is not affected by
598    // other transactions.
599
600    // Determine the name of the sequence.
601    // PostgreSQL automatically creates a sequence using
602    // <table>_<column>_seq.
603    // At the time of writing, all tables used this format for
604    // for their primary key: <table>id
605    // Except for basefiles which has a primary key on baseid.
606    // Therefore, we need to special case that one table.
607
608    // everything else can use the PostgreSQL formula.
609
610    char      sequence[NAMEDATALEN-1];
611    char      query   [NAMEDATALEN+50];
612    PGresult *result;
613    int       id = 0;
614
615    if (strcasecmp(table_name, "basefiles") == 0) {
616       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
617    } else {
618       bstrncpy(sequence, table_name, sizeof(sequence));
619       bstrncat(sequence, "_",        sizeof(sequence));
620       bstrncat(sequence, table_name, sizeof(sequence));
621       bstrncat(sequence, "id",       sizeof(sequence));
622    }
623
624    bstrncat(sequence, "_seq", sizeof(sequence));
625    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
626
627    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
628    for (int i=0; i < 10; i++) {
629       result = PQexec(mdb->db, query);
630       if (result) {
631          break;
632       }
633       bmicrosleep(5, 0);
634    }
635    if (!result) {
636       Dmsg1(50, "Query failed: %s\n", query);
637       goto bail_out;
638    }
639
640    Dmsg0(500, "exec done");
641
642    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
643       Dmsg0(500, "getting value");
644       id = atoi(PQgetvalue(result, 0, 0));
645       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
646    } else {
647       Dmsg1(50, "Result status failed: %s\n", query);
648       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
649    }
650
651 bail_out:
652    PQclear(result);
653
654    return id;
655 }
656
657 #ifdef HAVE_BATCH_FILE_INSERT
658
659 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
660 {
661    const char *query = "COPY batch FROM STDIN";
662
663    Dmsg0(500, "my_postgresql_batch_start started\n");
664
665    if (my_postgresql_query(mdb,
666                            "CREATE TEMPORARY TABLE batch ("
667                                "fileindex int,"
668                                "jobid int,"
669                                "path varchar,"
670                                "name varchar,"
671                                "lstat varchar,"
672                                "md5 varchar)") == 1)
673    {
674       Dmsg0(500, "my_postgresql_batch_start failed\n");
675       return 1;
676    }
677    
678    // We are starting a new query.  reset everything.
679    mdb->num_rows     = -1;
680    mdb->row_number   = -1;
681    mdb->field_number = -1;
682
683    my_postgresql_free_result(mdb);
684
685    for (int i=0; i < 10; i++) {
686       mdb->result = PQexec(mdb->db, query);
687       if (mdb->result) {
688          break;
689       }
690       bmicrosleep(5, 0);
691    }
692    if (!mdb->result) {
693       Dmsg1(50, "Query failed: %s\n", query);
694       goto bail_out;
695    }
696
697    mdb->status = PQresultStatus(mdb->result);
698    if (mdb->status == PGRES_COPY_IN) {
699       // how many fields in the set?
700       mdb->num_fields = (int) PQnfields(mdb->result);
701       mdb->num_rows   = 0;
702       mdb->status = 1;
703    } else {
704       Dmsg1(50, "Result status failed: %s\n", query);
705       goto bail_out;
706    }
707
708    Dmsg0(500, "my_postgresql_batch_start finishing\n");
709
710    return mdb->status;
711
712 bail_out:
713    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
714    mdb->status = 0;
715    PQclear(mdb->result);
716    mdb->result = NULL;
717    return mdb->status;
718 }
719
720 /* set error to something to abort operation */
721 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
722 {
723    int res;
724    int count=30;
725    PGresult *result;
726    Dmsg0(500, "my_postgresql_batch_end started\n");
727
728    if (!mdb) {                  /* no files ? */
729       return 0;
730    }
731
732    do { 
733       res = PQputCopyEnd(mdb->db, error);
734    } while (res == 0 && --count > 0);
735
736    if (res == 1) {
737       Dmsg0(500, "ok\n");
738       mdb->status = 1;
739    }
740    
741    if (res <= 0) {
742       Dmsg0(500, "we failed\n");
743       mdb->status = 0;
744       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
745    }
746
747    /* Check command status and return to normal libpq state */
748    result = PQgetResult(mdb->db);
749    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
750       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
751       mdb->status = 0;
752    }
753    PQclear(result); 
754
755    Dmsg0(500, "my_postgresql_batch_end finishing\n");
756
757    return mdb->status;
758 }
759
760 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
761 {
762    int res;
763    int count=30;
764    size_t len;
765    const char *digest;
766    char ed1[50];
767
768    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
769    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
770
771    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
772    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
773
774    if (ar->Digest == NULL || ar->Digest[0] == 0) {
775       digest = "0";
776    } else {
777       digest = ar->Digest;
778    }
779
780    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
781               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
782               mdb->esc_name, ar->attr, digest);
783
784    do { 
785       res = PQputCopyData(mdb->db,
786                           mdb->cmd,
787                           len);
788    } while (res == 0 && --count > 0);
789
790    if (res == 1) {
791       Dmsg0(500, "ok\n");
792       mdb->changes++;
793       mdb->status = 1;
794    }
795
796    if (res <= 0) {
797       Dmsg0(500, "we failed\n");
798       mdb->status = 0;
799       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
800    }
801
802    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
803
804    return mdb->status;
805 }
806
807 #endif /* HAVE_BATCH_FILE_INSERT */
808
809 /*
810  * Escape strings so that PostgreSQL is happy on COPY
811  *
812  *   NOTE! len is the length of the old string. Your new
813  *         string must be long enough (max 2*old+1) to hold
814  *         the escaped output.
815  */
816 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
817 {
818    /* we have to escape \t, \n, \r, \ */
819    char c = '\0' ;
820
821    while (len > 0 && *src) {
822       switch (*src) {
823       case '\n':
824          c = 'n';
825          break;
826       case '\\':
827          c = '\\';
828          break;
829       case '\t':
830          c = 't';
831          break;
832       case '\r':
833          c = 'r';
834          break;
835       default:
836          c = '\0' ;
837       }
838
839       if (c) {
840          *dest = '\\';
841          dest++;
842          *dest = c;
843       } else {
844          *dest = *src;
845       }
846
847       len--;
848       src++;
849       dest++;
850    }
851
852    *dest = '\0';
853    return dest;
854 }
855
856 #ifdef HAVE_BATCH_FILE_INSERT
857 const char *my_pg_batch_lock_path_query = 
858    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
859
860
861 const char *my_pg_batch_lock_filename_query = 
862    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
863
864 const char *my_pg_batch_unlock_tables_query = "COMMIT";
865
866 const char *my_pg_batch_fill_path_query = 
867    "INSERT INTO Path (Path) "
868     "SELECT a.Path FROM "
869      "(SELECT DISTINCT Path FROM batch) AS a "
870       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
871
872
873 const char *my_pg_batch_fill_filename_query = 
874    "INSERT INTO Filename (Name) "
875     "SELECT a.Name FROM "
876      "(SELECT DISTINCT Name FROM batch) as a "
877       "WHERE NOT EXISTS "
878        "(SELECT Name FROM Filename WHERE Name = a.Name)";
879 #endif /* HAVE_BATCH_FILE_INSERT */
880
881 #endif /* HAVE_POSTGRESQL */