]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Drop have_insert_id in mdb as its always true for all backends.
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  */
36
37
38 /* The following is necessary so that we do not include
39  * the dummy external definition of DB.
40  */
41 #define __SQL_C                       /* indicate that this is sql.c */
42
43 #include "bacula.h"
44 #include "cats.h"
45
46 #ifdef HAVE_POSTGRESQL
47
48 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
49 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb = NULL;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (db_list == NULL) {
90       db_list = New(dlist(mdb, &mdb->link));
91    }
92    if (!mult_db_connections) {
93       /* Look to see if DB already open */
94       foreach_dlist(mdb, db_list) {
95          if (bstrcmp(mdb->db_name, db_name) &&
96              bstrcmp(mdb->db_address, db_address) &&
97              mdb->db_port == db_port) {
98             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
99             mdb->ref_count++;
100             V(mutex);
101             return mdb;                  /* already open */
102          }
103       }
104    }
105    Dmsg0(100, "db_open first time\n");
106    mdb = (B_DB *)malloc(sizeof(B_DB));
107    memset(mdb, 0, sizeof(B_DB));
108    mdb->db_name = bstrdup(db_name);
109    mdb->db_user = bstrdup(db_user);
110    if (db_password) {
111       mdb->db_password = bstrdup(db_password);
112    }
113    if (db_address) {
114       mdb->db_address  = bstrdup(db_address);
115    }
116    if (db_socket) {
117       mdb->db_socket   = bstrdup(db_socket);
118    }
119    mdb->db_port        = db_port;
120    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
121    *mdb->errmsg        = 0;
122    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
123    mdb->cached_path    = get_pool_memory(PM_FNAME);
124    mdb->cached_path_id = 0;
125    mdb->ref_count      = 1;
126    mdb->fname          = get_pool_memory(PM_FNAME);
127    mdb->path           = get_pool_memory(PM_FNAME);
128    mdb->esc_name       = get_pool_memory(PM_FNAME);
129    mdb->esc_path      = get_pool_memory(PM_FNAME);
130    mdb->allow_transactions = mult_db_connections;
131    db_list->append(mdb);                   /* put db in list */
132    V(mutex);
133    return mdb;
134 }
135
136 /* Check that the database correspond to the encoding we want */
137 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
138 {
139    SQL_ROW row;
140    int ret=false;
141
142    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
143       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
144       return false;
145    }
146
147    if ((row = sql_fetch_row(mdb)) == NULL) {
148       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
149       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
150    } else {
151       ret = bstrcmp(row[0], "SQL_ASCII");
152
153       if (ret) {
154          /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
155          db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
156
157       } else {                  /* something is wrong with database encoding */
158          Mmsg(mdb->errmsg, 
159               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
160               mdb->db_name, row[0]);
161          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
162          Dmsg1(50, "%s", mdb->errmsg);
163       } 
164    }
165    return ret;
166 }
167
168 /*
169  * Now actually open the database.  This can generate errors,
170  *   which are returned in the errmsg
171  *
172  * DO NOT close the database or free(mdb) here !!!!
173  */
174 int
175 db_open_database(JCR *jcr, B_DB *mdb)
176 {
177    int errstat;
178    char buf[10], *port;
179
180    P(mutex);
181    if (mdb->connected) {
182       V(mutex);
183       return 1;
184    }
185    mdb->connected = false;
186
187    if ((errstat=rwl_init(&mdb->lock)) != 0) {
188       berrno be;
189       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
190             be.bstrerror(errstat));
191       V(mutex);
192       return 0;
193    }
194
195    if (mdb->db_port) {
196       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
197       port = buf;
198    } else {
199       port = NULL;
200    }
201
202    /* If connection fails, try at 5 sec intervals for 30 seconds. */
203    for (int retry=0; retry < 6; retry++) {
204       /* connect to the database */
205       mdb->db = PQsetdbLogin(
206            mdb->db_address,           /* default = localhost */
207            port,                      /* default port */
208            NULL,                      /* pg options */
209            NULL,                      /* tty, ignored */
210            mdb->db_name,              /* database name */
211            mdb->db_user,              /* login name */
212            mdb->db_password);         /* password */
213
214       /* If no connect, try once more in case it is a timing problem */
215       if (PQstatus(mdb->db) == CONNECTION_OK) {
216          break;
217       }
218       bmicrosleep(5, 0);
219    }
220
221    Dmsg0(50, "pg_real_connect done\n");
222    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
223             mdb->db_password==NULL?"(NULL)":mdb->db_password);
224
225    if (PQstatus(mdb->db) != CONNECTION_OK) {
226       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
227          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
228          mdb->db_name, mdb->db_user);
229       V(mutex);
230       return 0;
231    }
232
233    mdb->connected = true;
234
235    if (!check_tables_version(jcr, mdb)) {
236       V(mutex);
237       return 0;
238    }
239
240    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
241    
242    /* tell PostgreSQL we are using standard conforming strings
243       and avoid warnings such as:
244        WARNING:  nonstandard use of \\ in a string literal
245    */
246    sql_query(mdb, "set standard_conforming_strings=on");
247
248    /* check that encoding is SQL_ASCII */
249    check_database_encoding(jcr, mdb);
250
251    V(mutex);
252    return 1;
253 }
254
255 void
256 db_close_database(JCR *jcr, B_DB *mdb)
257 {
258    if (!mdb) {
259       return;
260    }
261    db_end_transaction(jcr, mdb);
262    P(mutex);
263    sql_free_result(mdb);
264    mdb->ref_count--;
265    if (mdb->ref_count == 0) {
266       db_list->remove(mdb);
267       if (mdb->connected && mdb->db) {
268          sql_close(mdb);
269       }
270       rwl_destroy(&mdb->lock);
271       free_pool_memory(mdb->errmsg);
272       free_pool_memory(mdb->cmd);
273       free_pool_memory(mdb->cached_path);
274       free_pool_memory(mdb->fname);
275       free_pool_memory(mdb->path);
276       free_pool_memory(mdb->esc_name);
277       free_pool_memory(mdb->esc_path);
278       if (mdb->db_name) {
279          free(mdb->db_name);
280       }
281       if (mdb->db_user) {
282          free(mdb->db_user);
283       }
284       if (mdb->db_password) {
285          free(mdb->db_password);
286       }
287       if (mdb->db_address) {
288          free(mdb->db_address);
289       }
290       if (mdb->db_socket) {
291          free(mdb->db_socket);
292       }
293       free(mdb);
294       if (db_list->size() == 0) {
295          delete db_list;
296          db_list = NULL;
297       }
298    }
299    V(mutex);
300 }
301
302 void db_check_backend_thread_safe()
303 {
304 #ifdef HAVE_BATCH_FILE_INSERT
305 # ifdef HAVE_PQISTHREADSAFE 
306    if (!PQisthreadsafe()) {
307       Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
308                           "when using BatchMode.\n"));
309    }
310 # endif
311 #endif
312 }
313
314 void db_thread_cleanup()
315 { }
316
317 /*
318  * Return the next unique index (auto-increment) for
319  * the given table.  Return NULL on error.
320  *
321  * For PostgreSQL, NULL causes the auto-increment value
322  *  to be updated.
323  */
324 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
325 {
326    strcpy(index, "NULL");
327    return 1;
328 }
329
330
331 /*
332  * Escape strings so that PostgreSQL is happy
333  *
334  *   NOTE! len is the length of the old string. Your new
335  *         string must be long enough (max 2*old+1) to hold
336  *         the escaped output.
337  */
338 void
339 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
340 {
341    int error;
342   
343    PQescapeStringConn(mdb->db, snew, old, len, &error);
344    if (error) {
345       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
346       /* error on encoding, probably invalid multibyte encoding in the source string
347         see PQescapeStringConn documentation for details. */
348       Dmsg0(500, "PQescapeStringConn failed\n");
349    }
350 }
351
352 /*
353  * Submit a general SQL command (cmd), and for each row returned,
354  *  the sqlite_handler is called with the ctx.
355  */
356 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
357 {
358    SQL_ROW row;
359
360    Dmsg0(500, "db_sql_query started\n");
361
362    db_lock(mdb);
363    if (sql_query(mdb, query) != 0) {
364       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
365       db_unlock(mdb);
366       Dmsg0(500, "db_sql_query failed\n");
367       return false;
368    }
369    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
370
371    if (result_handler != NULL) {
372       Dmsg0(500, "db_sql_query invoking handler\n");
373       if ((mdb->result = sql_store_result(mdb)) != NULL) {
374          int num_fields = sql_num_fields(mdb);
375
376          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
377          while ((row = sql_fetch_row(mdb)) != NULL) {
378
379             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
380             if (result_handler(ctx, num_fields, row))
381                break;
382          }
383
384         sql_free_result(mdb);
385       }
386    }
387    db_unlock(mdb);
388
389    Dmsg0(500, "db_sql_query finished\n");
390
391    return true;
392 }
393
394
395
396 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
397 {
398    int j;
399    POSTGRESQL_ROW row = NULL; // by default, return NULL
400
401    Dmsg0(500, "my_postgresql_fetch_row start\n");
402
403    if (!mdb->row || mdb->row_size < mdb->num_fields) {
404       int num_fields = mdb->num_fields;
405       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
406
407       if (mdb->row) {
408          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
409          free(mdb->row);
410       }
411       num_fields += 20;                  /* add a bit extra */
412       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
413       mdb->row_size = num_fields;
414
415       // now reset the row_number now that we have the space allocated
416       mdb->row_number = 0;
417    }
418
419    // if still within the result set
420    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
421       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
422       // get each value from this row
423       for (j = 0; j < mdb->num_fields; j++) {
424          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
425          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
426       }
427       // increment the row number for the next call
428       mdb->row_number++;
429
430       row = mdb->row;
431    } else {
432       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
433    }
434
435    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
436
437    return row;
438 }
439
440 int my_postgresql_max_length(B_DB *mdb, int field_num) {
441    //
442    // for a given column, find the max length
443    //
444    int max_length;
445    int i;
446    int this_length;
447
448    max_length = 0;
449    for (i = 0; i < mdb->num_rows; i++) {
450       if (PQgetisnull(mdb->result, i, field_num)) {
451           this_length = 4;        // "NULL"
452       } else {
453           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
454       }
455
456       if (max_length < this_length) {
457           max_length = this_length;
458       }
459    }
460
461    return max_length;
462 }
463
464 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
465 {
466    int     i;
467
468    Dmsg0(500, "my_postgresql_fetch_field starts\n");
469
470    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
471       if (mdb->fields) {
472          free(mdb->fields);
473       }
474       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
475       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
476       mdb->fields_size = mdb->num_fields;
477
478       for (i = 0; i < mdb->num_fields; i++) {
479          Dmsg1(500, "filling field %d\n", i);
480          mdb->fields[i].name           = PQfname(mdb->result, i);
481          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
482          mdb->fields[i].type       = PQftype(mdb->result, i);
483          mdb->fields[i].flags      = 0;
484
485          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
486             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
487             mdb->fields[i].flags);
488       } // end for
489    } // end if
490
491    // increment field number for the next time around
492
493    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
494    return &mdb->fields[mdb->field_number++];
495 }
496
497 void my_postgresql_data_seek(B_DB *mdb, int row)
498 {
499    // set the row number to be returned on the next call
500    // to my_postgresql_fetch_row
501    mdb->row_number = row;
502 }
503
504 void my_postgresql_field_seek(B_DB *mdb, int field)
505 {
506    mdb->field_number = field;
507 }
508
509 /*
510  * Note, if this routine returns 1 (failure), Bacula expects
511  *  that no result has been stored.
512  * This is where QUERY_DB comes with Postgresql.
513  *
514  *  Returns:  0  on success
515  *            1  on failure
516  *
517  */
518 int my_postgresql_query(B_DB *mdb, const char *query)
519 {
520    Dmsg0(500, "my_postgresql_query started\n");
521    // We are starting a new query.  reset everything.
522    mdb->num_rows     = -1;
523    mdb->row_number   = -1;
524    mdb->field_number = -1;
525
526    if (mdb->result) {
527       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
528       mdb->result = NULL;
529    }
530
531    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
532
533    for (int i=0; i < 10; i++) {
534       mdb->result = PQexec(mdb->db, query);
535       if (mdb->result) {
536          break;
537       }
538       bmicrosleep(5, 0);
539    }
540    if (!mdb->result) {
541       Dmsg1(50, "Query failed: %s\n", query);
542       goto bail_out;
543    }
544
545    mdb->status = PQresultStatus(mdb->result);
546    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
547       Dmsg1(500, "we have a result\n", query);
548
549       // how many fields in the set?
550       mdb->num_fields = (int)PQnfields(mdb->result);
551       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
552
553       mdb->num_rows = PQntuples(mdb->result);
554       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
555
556       mdb->row_number = 0;      /* we can start to fetch something */
557       mdb->status = 0;          /* succeed */
558    } else {
559       Dmsg1(50, "Result status failed: %s\n", query);
560       goto bail_out;
561    }
562
563    Dmsg0(500, "my_postgresql_query finishing\n");
564    return mdb->status;
565
566 bail_out:
567    Dmsg1(500, "we failed\n", query);
568    PQclear(mdb->result);
569    mdb->result = NULL;
570    mdb->status = 1;                   /* failed */
571    return mdb->status;
572 }
573
574 void my_postgresql_free_result(B_DB *mdb)
575 {
576    
577    db_lock(mdb);
578    if (mdb->result) {
579       PQclear(mdb->result);
580       mdb->result = NULL;
581    }
582
583    if (mdb->row) {
584       free(mdb->row);
585       mdb->row = NULL;
586    }
587
588    if (mdb->fields) {
589       free(mdb->fields);
590       mdb->fields = NULL;
591    }
592    db_unlock(mdb);
593 }
594
595 int my_postgresql_currval(B_DB *mdb, const char *table_name)
596 {
597    // Obtain the current value of the sequence that
598    // provides the serial value for primary key of the table.
599
600    // currval is local to our session.  It is not affected by
601    // other transactions.
602
603    // Determine the name of the sequence.
604    // PostgreSQL automatically creates a sequence using
605    // <table>_<column>_seq.
606    // At the time of writing, all tables used this format for
607    // for their primary key: <table>id
608    // Except for basefiles which has a primary key on baseid.
609    // Therefore, we need to special case that one table.
610
611    // everything else can use the PostgreSQL formula.
612
613    char      sequence[NAMEDATALEN-1];
614    char      query   [NAMEDATALEN+50];
615    PGresult *result;
616    int       id = 0;
617
618    if (strcasecmp(table_name, "basefiles") == 0) {
619       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
620    } else {
621       bstrncpy(sequence, table_name, sizeof(sequence));
622       bstrncat(sequence, "_",        sizeof(sequence));
623       bstrncat(sequence, table_name, sizeof(sequence));
624       bstrncat(sequence, "id",       sizeof(sequence));
625    }
626
627    bstrncat(sequence, "_seq", sizeof(sequence));
628    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
629
630    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
631    for (int i=0; i < 10; i++) {
632       result = PQexec(mdb->db, query);
633       if (result) {
634          break;
635       }
636       bmicrosleep(5, 0);
637    }
638    if (!result) {
639       Dmsg1(50, "Query failed: %s\n", query);
640       goto bail_out;
641    }
642
643    Dmsg0(500, "exec done");
644
645    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
646       Dmsg0(500, "getting value");
647       id = atoi(PQgetvalue(result, 0, 0));
648       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
649    } else {
650       Dmsg1(50, "Result status failed: %s\n", query);
651       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
652    }
653
654 bail_out:
655    PQclear(result);
656
657    return id;
658 }
659
660 #ifdef HAVE_BATCH_FILE_INSERT
661
662 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
663 {
664    const char *query = "COPY batch FROM STDIN";
665
666    Dmsg0(500, "my_postgresql_batch_start started\n");
667
668    if (my_postgresql_query(mdb,
669                            "CREATE TEMPORARY TABLE batch ("
670                                "fileindex int,"
671                                "jobid int,"
672                                "path varchar,"
673                                "name varchar,"
674                                "lstat varchar,"
675                                "md5 varchar)") == 1)
676    {
677       Dmsg0(500, "my_postgresql_batch_start failed\n");
678       return 1;
679    }
680    
681    // We are starting a new query.  reset everything.
682    mdb->num_rows     = -1;
683    mdb->row_number   = -1;
684    mdb->field_number = -1;
685
686    my_postgresql_free_result(mdb);
687
688    for (int i=0; i < 10; i++) {
689       mdb->result = PQexec(mdb->db, query);
690       if (mdb->result) {
691          break;
692       }
693       bmicrosleep(5, 0);
694    }
695    if (!mdb->result) {
696       Dmsg1(50, "Query failed: %s\n", query);
697       goto bail_out;
698    }
699
700    mdb->status = PQresultStatus(mdb->result);
701    if (mdb->status == PGRES_COPY_IN) {
702       // how many fields in the set?
703       mdb->num_fields = (int) PQnfields(mdb->result);
704       mdb->num_rows   = 0;
705       mdb->status = 1;
706    } else {
707       Dmsg1(50, "Result status failed: %s\n", query);
708       goto bail_out;
709    }
710
711    Dmsg0(500, "my_postgresql_batch_start finishing\n");
712
713    return mdb->status;
714
715 bail_out:
716    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
717    mdb->status = 0;
718    PQclear(mdb->result);
719    mdb->result = NULL;
720    return mdb->status;
721 }
722
723 /* set error to something to abort operation */
724 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
725 {
726    int res;
727    int count=30;
728    PGresult *result;
729    Dmsg0(500, "my_postgresql_batch_end started\n");
730
731    if (!mdb) {                  /* no files ? */
732       return 0;
733    }
734
735    do { 
736       res = PQputCopyEnd(mdb->db, error);
737    } while (res == 0 && --count > 0);
738
739    if (res == 1) {
740       Dmsg0(500, "ok\n");
741       mdb->status = 1;
742    }
743    
744    if (res <= 0) {
745       Dmsg0(500, "we failed\n");
746       mdb->status = 0;
747       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
748    }
749
750    /* Check command status and return to normal libpq state */
751    result = PQgetResult(mdb->db);
752    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
753       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
754       mdb->status = 0;
755    }
756    PQclear(result); 
757
758    Dmsg0(500, "my_postgresql_batch_end finishing\n");
759
760    return mdb->status;
761 }
762
763 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
764 {
765    int res;
766    int count=30;
767    size_t len;
768    const char *digest;
769    char ed1[50];
770
771    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
772    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
773
774    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
775    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
776
777    if (ar->Digest == NULL || ar->Digest[0] == 0) {
778       digest = "0";
779    } else {
780       digest = ar->Digest;
781    }
782
783    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
784               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
785               mdb->esc_name, ar->attr, digest);
786
787    do { 
788       res = PQputCopyData(mdb->db,
789                           mdb->cmd,
790                           len);
791    } while (res == 0 && --count > 0);
792
793    if (res == 1) {
794       Dmsg0(500, "ok\n");
795       mdb->changes++;
796       mdb->status = 1;
797    }
798
799    if (res <= 0) {
800       Dmsg0(500, "we failed\n");
801       mdb->status = 0;
802       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
803    }
804
805    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
806
807    return mdb->status;
808 }
809
810 #endif /* HAVE_BATCH_FILE_INSERT */
811
812 /*
813  * Escape strings so that PostgreSQL is happy on COPY
814  *
815  *   NOTE! len is the length of the old string. Your new
816  *         string must be long enough (max 2*old+1) to hold
817  *         the escaped output.
818  */
819 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
820 {
821    /* we have to escape \t, \n, \r, \ */
822    char c = '\0' ;
823
824    while (len > 0 && *src) {
825       switch (*src) {
826       case '\n':
827          c = 'n';
828          break;
829       case '\\':
830          c = '\\';
831          break;
832       case '\t':
833          c = 't';
834          break;
835       case '\r':
836          c = 'r';
837          break;
838       default:
839          c = '\0' ;
840       }
841
842       if (c) {
843          *dest = '\\';
844          dest++;
845          *dest = c;
846       } else {
847          *dest = *src;
848       }
849
850       len--;
851       src++;
852       dest++;
853    }
854
855    *dest = '\0';
856    return dest;
857 }
858
859 #ifdef HAVE_BATCH_FILE_INSERT
860 const char *my_pg_batch_lock_path_query = 
861    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
862
863
864 const char *my_pg_batch_lock_filename_query = 
865    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
866
867 const char *my_pg_batch_unlock_tables_query = "COMMIT";
868
869 const char *my_pg_batch_fill_path_query = 
870    "INSERT INTO Path (Path) "
871     "SELECT a.Path FROM "
872      "(SELECT DISTINCT Path FROM batch) AS a "
873       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
874
875
876 const char *my_pg_batch_fill_filename_query = 
877    "INSERT INTO Filename (Name) "
878     "SELECT a.Name FROM "
879      "(SELECT DISTINCT Name FROM batch) as a "
880       "WHERE NOT EXISTS "
881        "(SELECT Name FROM Filename WHERE Name = a.Name)";
882 #endif /* HAVE_BATCH_FILE_INSERT */
883
884 #endif /* HAVE_POSTGRESQL */