]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
dabc4b827bf8c446635956b5f95615f75124c4f4
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula Catalog Database routines specific to PostgreSQL
21  *   These are PostgreSQL specific routines
22  *
23  *    Dan Langille, December 2003
24  *    based upon work done by Kern Sibbald, March 2000
25  *
26  * Note: at one point, this file was changed to class based by a certain 
27  *  programmer, and other than "wrapping" in a class, which is a trivial
28  *  change for a C++ programmer, nothing substantial was done, yet all the
29  *  code was recommitted under this programmer's name.  Consequently, we
30  *  undo those changes here.  Unfortunately, it is too difficult to put
31  *  back the original author's name (Dan Langille) on the parts he wrote.
32  */
33
34 #include "bacula.h"
35
36 #ifdef HAVE_POSTGRESQL
37
38 #include  "cats.h"
39 #include  "libpq-fe.h"
40 #include  "postgres_ext.h"       /* needed for NAMEDATALEN */
41 #include  "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
42 #define __BDB_POSTGRESQL_H_ 1
43 #include  "bdb_postgresql.h"
44
45 #define dbglvl_dbg   DT_SQL|100
46 #define dbglvl_info  DT_SQL|50
47 #define dbglvl_err   DT_SQL|10
48
49 /* -----------------------------------------------------------------------
50  *
51  *   PostgreSQL dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /* List of open databases */
57 static dlist *db_list = NULL; 
58
59 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
60
61 BDB_POSTGRESQL::BDB_POSTGRESQL(): BDB()
62 {
63    BDB_POSTGRESQL *mdb = this;
64
65    if (db_list == NULL) {
66       db_list = New(dlist(mdb, &mdb->m_link));
67    }
68    mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
69    mdb->m_db_type = SQL_TYPE_POSTGRESQL;
70    mdb->m_db_driver = bstrdup("PostgreSQL");
71
72    mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
73    mdb->errmsg[0] = 0;
74    mdb->cmd = get_pool_memory(PM_EMSG);    /* get command buffer */
75    mdb->cached_path = get_pool_memory(PM_FNAME);
76    mdb->cached_path_id = 0;
77    mdb->m_ref_count = 1;
78    mdb->fname = get_pool_memory(PM_FNAME);
79    mdb->path = get_pool_memory(PM_FNAME);
80    mdb->esc_name = get_pool_memory(PM_FNAME);
81    mdb->esc_path = get_pool_memory(PM_FNAME);
82    mdb->esc_obj = get_pool_memory(PM_FNAME);
83    mdb->m_use_fatal_jmsg = true;
84
85    /* Initialize the private members. */
86    mdb->m_db_handle = NULL;
87    mdb->m_result = NULL;
88    mdb->m_buf =  get_pool_memory(PM_FNAME);
89
90    db_list->append(this);
91 }
92
93 BDB_POSTGRESQL::~BDB_POSTGRESQL()
94 {
95 }
96
97 /*
98  * Initialize database data structure. In principal this should
99  * never have errors, or it is really fatal.
100  */
101 BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user, 
102                        const char *db_password, const char *db_address, int db_port, const char *db_socket, 
103                        const char *db_ssl_key, const char *db_ssl_cert, const char *db_ssl_ca,
104                        const char *db_ssl_capath, const char *db_ssl_cipher,
105                        bool mult_db_connections, bool disable_batch_insert) 
106 {
107    BDB_POSTGRESQL *mdb = NULL;
108
109    if (!db_user) {
110       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
111       return NULL;
112    }
113    P(mutex);                          /* lock DB queue */
114    if (db_list && !mult_db_connections) {
115       /*
116        * Look to see if DB already open
117        */
118       foreach_dlist(mdb, db_list) {
119          if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
120             Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
121             mdb->increment_refcount();
122             goto get_out;
123          }
124       }
125    }
126    Dmsg0(dbglvl_info, "db_init_database first time\n");
127    /* Create the global Bacula db context */
128    mdb = New(BDB_POSTGRESQL());
129    if (!mdb) goto get_out;
130
131    /* Initialize the parent class members. */
132    mdb->m_db_name = bstrdup(db_name);
133    mdb->m_db_user = bstrdup(db_user);
134    if (db_password) {
135       mdb->m_db_password = bstrdup(db_password);
136    }
137    if (db_address) {
138       mdb->m_db_address = bstrdup(db_address);
139    }
140    if (db_socket) {
141       mdb->m_db_socket = bstrdup(db_socket);
142    } 
143    mdb->m_db_port = db_port;
144
145    if (disable_batch_insert) { 
146       mdb->m_disabled_batch_insert = true;
147       mdb->m_have_batch_insert = false;
148    } else { 
149       mdb->m_disabled_batch_insert = false;
150 #ifdef USE_BATCH_FILE_INSERT
151 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE) 
152 #ifdef HAVE_PQISTHREADSAFE 
153       mdb->m_have_batch_insert = PQisthreadsafe();
154 #else 
155       mdb->m_have_batch_insert = true;
156 #endif /* HAVE_PQISTHREADSAFE */ 
157 #else 
158       mdb->m_have_batch_insert = true;
159 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */ 
160 #else 
161       mdb->m_have_batch_insert = false;
162 #endif /* USE_BATCH_FILE_INSERT */ 
163    } 
164    mdb->m_allow_transactions = mult_db_connections;
165  
166    /* At this time, when mult_db_connections == true, this is for
167     * specific console command such as bvfs or batch mode, and we don't
168     * want to share a batch mode or bvfs. In the future, we can change
169     * the creation function to add this parameter.
170     */
171    mdb->m_dedicated = mult_db_connections;
172
173 get_out:
174    V(mutex);
175    return mdb;
176
177   
178  
179 /* Check that the database corresponds to the encoding we want  */
180 static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
181 {
182    SQL_ROW row;
183    int ret = false; 
184
185    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) { 
186       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
187       return false; 
188    }
189
190    if ((row = mdb->sql_fetch_row()) == NULL) { 
191       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror()); 
192       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
193    } else { 
194       ret = bstrcmp(row[0], "SQL_ASCII");
195
196       if (ret) {
197          /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
198          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'"); 
199
200       } else { 
201          /* Something is wrong with database encoding */
202          Mmsg(mdb->errmsg,
203               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
204               mdb->get_db_name(), row[0]); 
205          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
206          Dmsg1(dbglvl_err, "%s", mdb->errmsg);
207       }
208    }
209    return ret;
210 }
211
212 /*
213  * Now actually open the database.  This can generate errors,
214  *   which are returned in the errmsg
215  *
216  *  DO NOT close the database or delete mdb here !!!!
217  */
218 bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
219 {
220    bool retval = false; 
221    int errstat;
222    char buf[10], *port;
223    BDB_POSTGRESQL *mdb = this;
224
225    P(mutex);
226    if (mdb->m_connected) {
227       retval = true; 
228       goto get_out;
229    }
230
231    if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
232       berrno be;
233       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
234             be.bstrerror(errstat));
235       goto get_out;
236    }
237
238    if (mdb->m_db_port) {
239       bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
240       port = buf;
241    } else {
242       port = NULL;
243    }
244
245    /* If connection fails, try at 5 sec intervals for 30 seconds. */
246    for (int retry=0; retry < 6; retry++) {
247       /* connect to the database */
248       mdb->m_db_handle = PQsetdbLogin(
249            mdb->m_db_address,         /* default = localhost */
250            port,                      /* default port */
251            NULL,                      /* pg options */
252            NULL,                      /* tty, ignored */
253            mdb->m_db_name,            /* database name */
254            mdb->m_db_user,            /* login name */
255            mdb->m_db_password);       /* password */
256
257       /* If no connect, try once more in case it is a timing problem */
258       if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
259          break;
260       }
261       bmicrosleep(5, 0);
262    }
263
264    Dmsg0(dbglvl_info, "pg_real_connect done\n");
265    Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
266         mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
267
268    if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
269       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
270          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
271          mdb->m_db_name, mdb->m_db_user);
272       goto get_out;
273    }
274
275    mdb->m_connected = true;
276    if (!bdb_check_version(jcr)) {
277       goto get_out;
278    }
279
280    sql_query("SET datestyle TO 'ISO, YMD'"); 
281    sql_query("SET cursor_tuple_fraction=1");
282
283    /* 
284     * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
285     *   WARNING:  nonstandard use of \\ in a string literal
286     */ 
287    sql_query("SET standard_conforming_strings=on"); 
288  
289    /* Check that encoding is SQL_ASCII */
290    pgsql_check_database_encoding(jcr, mdb);
291  
292    retval = true; 
293
294 get_out:
295    V(mutex);
296    return retval; 
297
298
299 void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
300 {
301    BDB_POSTGRESQL *mdb = this;
302
303    if (mdb->m_connected) {
304       bdb_end_transaction(jcr);
305    } 
306    P(mutex);
307    mdb->m_ref_count--;
308    if (mdb->m_ref_count == 0) {
309       if (mdb->m_connected) {
310          sql_free_result(); 
311       } 
312       db_list->remove(mdb);
313       if (mdb->m_connected && mdb->m_db_handle) {
314          PQfinish(mdb->m_db_handle);
315       } 
316       if (is_rwl_valid(&mdb->m_lock)) {
317          rwl_destroy(&mdb->m_lock);
318       } 
319       free_pool_memory(mdb->errmsg);
320       free_pool_memory(mdb->cmd);
321       free_pool_memory(mdb->cached_path);
322       free_pool_memory(mdb->fname);
323       free_pool_memory(mdb->path);
324       free_pool_memory(mdb->esc_name);
325       free_pool_memory(mdb->esc_path);
326       free_pool_memory(mdb->esc_obj);
327       free_pool_memory(mdb->m_buf);
328       if (mdb->m_db_driver) {
329          free(mdb->m_db_driver);
330       } 
331       if (mdb->m_db_name) {
332          free(mdb->m_db_name);
333       } 
334       if (mdb->m_db_user) {
335          free(mdb->m_db_user);
336       } 
337       if (mdb->m_db_password) {
338          free(mdb->m_db_password);
339       } 
340       if (mdb->m_db_address) {
341          free(mdb->m_db_address);
342       } 
343       if (mdb->m_db_socket) {
344          free(mdb->m_db_socket);
345       } 
346       delete mdb;
347       if (db_list->size() == 0) {
348          delete db_list;
349          db_list = NULL;
350       }
351    }
352    V(mutex);
353 }
354
355 void BDB_POSTGRESQL::bdb_thread_cleanup(void)
356 {
357 }
358
359 /*
360  * Escape strings so PostgreSQL is happy
361  *
362  *  len is the length of the old string. Your new
363  *    string must be long enough (max 2*old+1) to hold
364  *    the escaped output.
365  */
366 void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
367 {
368    BDB_POSTGRESQL *mdb = this;
369    int failed; 
370
371    PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
372    if (failed) {
373       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n")); 
374       /* failed on encoding, probably invalid multibyte encoding in the source string
375          see PQescapeStringConn documentation for details. */
376       Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
377    } 
378 }
379
380 /*
381  * Escape binary so that PostgreSQL is happy
382  *
383  */
384 char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
385 {
386    size_t new_len;
387    unsigned char *obj; 
388    BDB_POSTGRESQL *mdb = this;
389
390    mdb->esc_obj[0] = 0;
391    obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
392    if (!obj) { 
393       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
394    } else {
395       mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
396       memcpy(mdb->esc_obj, obj, new_len);
397       mdb->esc_obj[new_len] = 0;
398       PQfreemem(obj);
399    }
400    return (char *)mdb->esc_obj;
401 }
402
403 /*
404  * Unescape binary object so that PostgreSQL is happy
405  *
406  */
407 void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
408                        POOLMEM **dest, int32_t *dest_len)
409 {
410    size_t new_len;
411    unsigned char *obj;
412
413    if (!from) {
414       *dest[0] = 0;
415       *dest_len = 0;
416       return;
417    }
418
419    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
420
421    if (!obj) {
422       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
423    }
424
425    *dest_len = new_len;
426    *dest = check_pool_memory_size(*dest, new_len+1);
427    memcpy(*dest, obj, new_len);
428    (*dest)[new_len]=0;
429
430    PQfreemem(obj);
431
432    Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
433 }
434
435 /*
436  * Start a transaction. This groups inserts and makes things more efficient.
437  *  Usually started when inserting file attributes.
438  */
439 void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
440 {
441    BDB_POSTGRESQL *mdb = this;
442
443    if (!jcr->attr) { 
444       jcr->attr = get_pool_memory(PM_FNAME); 
445    }
446    if (!jcr->ar) { 
447       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR)); 
448       memset(jcr->ar, 0, sizeof(ATTR_DBR));
449    }
450
451    /* 
452     * This is turned off because transactions break if
453     *  multiple simultaneous jobs are run.
454     */ 
455    if (!mdb->m_allow_transactions) {
456       return; 
457    }
458
459    bdb_lock();
460    /* Allow only 25,000 changes per transaction */
461    if (mdb->m_transaction && changes > 25000) {
462       bdb_end_transaction(jcr);
463    } 
464    if (!mdb->m_transaction) {
465       sql_query("BEGIN");             /* begin transaction */
466       Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
467       mdb->m_transaction = true;
468    } 
469    bdb_unlock();
470 }
471
472 void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
473 {
474    BDB_POSTGRESQL *mdb = this;
475
476    if (jcr && jcr->cached_attribute) { 
477       Dmsg0(dbglvl_info, "Flush last cached attribute.\n");
478       if (!bdb_create_attributes_record(jcr, jcr->ar)) {
479          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), jcr->db->bdb_strerror());
480       }
481       jcr->cached_attribute = false; 
482    }
483
484    if (!mdb->m_allow_transactions) {
485       return; 
486    }
487
488    bdb_lock();
489    if (mdb->m_transaction) {
490       sql_query("COMMIT");            /* end transaction */
491       mdb->m_transaction = false;
492       Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
493    }
494    changes = 0; 
495    bdb_unlock();
496 }
497
498
499 /*
500  * Submit a general SQL command, and for each row returned,
501  *  the result_handler is called with the ctx.
502  */
503 bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
504                                        DB_RESULT_HANDLER *result_handler,
505                                        void *ctx)
506 {
507    BDB_POSTGRESQL *mdb = this;
508    SQL_ROW row; 
509    bool retval = false; 
510    bool in_transaction = mdb->m_transaction;
511
512    Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
513
514    mdb->errmsg[0] = 0;
515    /* This code handles only SELECT queries */
516    if (strncasecmp(query, "SELECT", 6) != 0) {
517       return bdb_sql_query(query, result_handler, ctx);
518    }
519
520    if (!result_handler) {       /* no need of big_query without handler */
521       return false; 
522    }
523
524    bdb_lock();
525
526    if (!in_transaction) {       /* CURSOR needs transaction */
527       sql_query("BEGIN");
528    }
529
530    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
531
532    if (!sql_query(mdb->m_buf)) {
533       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
534       Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
535       goto get_out;
536    }
537
538    do {
539       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
540          Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
541          Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
542          goto get_out;
543       }
544       while ((row = sql_fetch_row()) != NULL) {
545          Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
546          if (result_handler(ctx, mdb->m_num_fields, row))
547             break;
548       }
549       PQclear(mdb->m_result);
550       m_result = NULL;
551
552    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
553
554    sql_query("CLOSE _bac_cursor");
555
556    Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
557    sql_free_result();
558    retval = true;
559
560 get_out:
561    if (!in_transaction) {
562       sql_query("COMMIT");  /* end transaction */
563    }
564
565    bdb_unlock();
566    return retval;
567 }
568
569 /* 
570  * Submit a general SQL command, and for each row returned,
571  *  the result_handler is called with the ctx.
572  */ 
573 bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
574 {
575    SQL_ROW row; 
576    bool retval = true; 
577    BDB_POSTGRESQL *mdb = this;
578
579    Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
580
581    bdb_lock();
582    mdb->errmsg[0] = 0;
583    if (!sql_query(query, QF_STORE_RESULT)) { 
584       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
585       Dmsg0(dbglvl_err, "db_sql_query failed\n");
586       retval = false; 
587       goto get_out;
588    } 
589
590    Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
591
592    if (result_handler) {
593       Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
594       while ((row = sql_fetch_row())) {
595          Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
596          if (result_handler(ctx, mdb->m_num_fields, row))
597             break; 
598       } 
599       sql_free_result(); 
600    } 
601
602    Dmsg0(dbglvl_info, "db_sql_query finished\n");
603
604 get_out:
605    bdb_unlock();
606    return retval; 
607 }
608
609 /*
610  * If this routine returns false (failure), Bacula expects
611  *   that no result has been stored.
612  * This is where QueryDB calls to with Postgresql.
613  *
614  *  Returns: true  on success
615  *           false on failure
616  *
617  */
618 bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
619 {
620    int i; 
621    bool retval = false; 
622    BDB_POSTGRESQL *mdb = this;
623
624    Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
625
626    /* We are starting a new query. reset everything. */
627    mdb->m_num_rows     = -1;
628    mdb->m_row_number   = -1;
629    mdb->m_field_number = -1;
630
631    if (mdb->m_result) {
632       PQclear(mdb->m_result);  /* hmm, someone forgot to free?? */
633       mdb->m_result = NULL;
634    } 
635
636    for (i = 0; i < 10; i++) { 
637       mdb->m_result = PQexec(mdb->m_db_handle, query);
638       if (mdb->m_result) {
639          break;
640       }
641       bmicrosleep(5, 0);
642    }
643    if (!mdb->m_result) {
644       Dmsg1(dbglvl_err, "Query failed: %s\n", query);
645       goto get_out;
646    }
647
648    mdb->m_status = PQresultStatus(mdb->m_result);
649    if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
650       Dmsg0(dbglvl_dbg, "we have a result\n");
651
652       /* How many fields in the set? */
653       mdb->m_num_fields = (int)PQnfields(mdb->m_result);
654       Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
655
656       mdb->m_num_rows = PQntuples(mdb->m_result);
657       Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
658
659       mdb->m_row_number = 0;      /* we can start to fetch something */
660       mdb->m_status = 0;          /* succeed */
661       retval = true; 
662    } else {
663       Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
664       goto get_out;
665    }
666
667    Dmsg0(dbglvl_info, "sql_query finishing\n");
668    goto ok_out; 
669
670 get_out:
671    Dmsg0(dbglvl_err, "we failed\n");
672    PQclear(mdb->m_result);
673    mdb->m_result = NULL;
674    mdb->m_status = 1;                   /* failed */
675  
676 ok_out: 
677    return retval; 
678 }  
679  
680 void BDB_POSTGRESQL::sql_free_result(void)
681 {
682    BDB_POSTGRESQL *mdb = this;
683
684    bdb_lock();
685    if (mdb->m_result) {
686       PQclear(mdb->m_result);
687       mdb->m_result = NULL;
688    }
689    if (mdb->m_rows) {
690       free(mdb->m_rows);
691       mdb->m_rows = NULL;
692    }
693    if (mdb->m_fields) {
694       free(mdb->m_fields);
695       mdb->m_fields = NULL;
696    } 
697    mdb->m_num_rows = mdb->m_num_fields = 0;
698    bdb_unlock();
699
700
701 SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
702
703    SQL_ROW row = NULL;            /* by default, return NULL */
704    BDB_POSTGRESQL *mdb = this;
705  
706    Dmsg0(dbglvl_info, "sql_fetch_row start\n");
707  
708    if (mdb->m_num_fields == 0) {     /* No field, no row */
709       Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
710       return NULL;
711    }
712
713    if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
714       if (mdb->m_rows) {
715          Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
716          free(mdb->m_rows);
717       } 
718       Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
719       mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
720       mdb->m_rows_size = mdb->m_num_fields;
721  
722       /* Now reset the row_number now that we have the space allocated */
723       mdb->m_row_number = 0;
724    }
725
726    /* If still within the result set */
727    if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
728       Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
729
730       /* Get each value from this row */
731       for (int j = 0; j < mdb->m_num_fields; j++) {
732          mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
733          Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
734       } 
735       mdb->m_row_number++;  /* Increment the row number for the next call */
736       row = mdb->m_rows;
737    } else { 
738       Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
739    }
740  
741    Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
742  
743    return row; 
744
745
746 const char *BDB_POSTGRESQL::sql_strerror(void)
747 {   
748    BDB_POSTGRESQL *mdb = this;
749    return PQerrorMessage(mdb->m_db_handle);
750
751
752 void BDB_POSTGRESQL::sql_data_seek(int row)
753
754    BDB_POSTGRESQL *mdb = this;
755    /* Set the row number to be returned on the next call to sql_fetch_row */
756    mdb->m_row_number = row;
757
758
759 int BDB_POSTGRESQL::sql_affected_rows(void)
760
761    BDB_POSTGRESQL *mdb = this;
762    return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
763
764  
765 uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
766
767    uint64_t id = 0; 
768    char sequence[NAMEDATALEN-1]; 
769    char getkeyval_query[NAMEDATALEN+50]; 
770    PGresult *p_result;
771    BDB_POSTGRESQL *mdb = this;
772
773    /* First execute the insert query and then retrieve the currval. */
774    if (!sql_query(query)) { 
775       return 0; 
776    } 
777
778    mdb->m_num_rows = sql_affected_rows();
779    if (mdb->m_num_rows != 1) {
780       return 0; 
781    } 
782    mdb->changes++;
783    /* 
784     *  Obtain the current value of the sequence that
785     *  provides the serial value for primary key of the table.
786     * 
787     *  currval is local to our session.  It is not affected by
788     *  other transactions.
789     * 
790     *  Determine the name of the sequence.
791     *  PostgreSQL automatically creates a sequence using
792     *   <table>_<column>_seq.
793     *  At the time of writing, all tables used this format for
794     *   for their primary key: <table>id
795     *  Except for basefiles which has a primary key on baseid.
796     *  Therefore, we need to special case that one table.
797     * 
798     *  everything else can use the PostgreSQL formula.
799     */ 
800    if (strcasecmp(table_name, "basefiles") == 0) {
801       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
802    } else {
803       bstrncpy(sequence, table_name, sizeof(sequence));
804       bstrncat(sequence, "_",        sizeof(sequence));
805       bstrncat(sequence, table_name, sizeof(sequence));
806       bstrncat(sequence, "id",       sizeof(sequence));
807    }
808
809    bstrncat(sequence, "_seq", sizeof(sequence));
810    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence); 
811
812    Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
813    for (int i = 0; i < 10; i++) { 
814       p_result = PQexec(mdb->m_db_handle, getkeyval_query);
815       if (p_result) {
816          break;
817       }
818       bmicrosleep(5, 0);
819    }
820    if (!p_result) {
821       Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
822       goto get_out;
823    }
824
825    Dmsg0(dbglvl_dbg, "exec done");
826
827    if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
828       Dmsg0(dbglvl_dbg, "getting value");
829       id = str_to_uint64(PQgetvalue(p_result, 0, 0));
830       Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
831    } else {
832       Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
833       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
834    }
835
836 get_out:
837    PQclear(p_result);
838    return id;
839
840
841 SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
842
843    int max_len; 
844    int this_len; 
845    BDB_POSTGRESQL *mdb = this;
846  
847    Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
848  
849    if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
850       if (mdb->m_fields) {
851          free(mdb->m_fields);
852          mdb->m_fields = NULL;
853       } 
854       Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
855       mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
856       mdb->m_fields_size = mdb->m_num_fields;
857  
858       for (int i = 0; i < mdb->m_num_fields; i++) {
859          Dmsg1(dbglvl_dbg, "filling field %d\n", i);
860          mdb->m_fields[i].name = PQfname(mdb->m_result, i);
861          mdb->m_fields[i].type = PQftype(mdb->m_result, i);
862          mdb->m_fields[i].flags = 0;
863  
864          /* For a given column, find the max length. */
865          max_len = 0; 
866          for (int j = 0; j < mdb->m_num_rows; j++) {
867             if (PQgetisnull(mdb->m_result, j, i)) {
868                this_len = 4;         /* "NULL" */
869             } else { 
870                this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
871             } 
872
873             if (max_len < this_len) { 
874                max_len = this_len; 
875             } 
876          } 
877          mdb->m_fields[i].max_length = max_len;
878   
879          Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
880                mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
881       } 
882    } 
883  
884    /* Increment field number for the next time around */
885    return &mdb->m_fields[mdb->m_field_number++];
886 }  
887  
888 bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
889 {  
890    if (field_type == 1) {
891       return true; 
892    } 
893    return false; 
894
895  
896 bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
897
898     /*
899      * TEMP: the following is taken from select OID, typname from pg_type;
900      */
901     switch (field_type) {
902     case 20:
903     case 21:
904     case 23:
905     case 700:
906     case 701:
907        return true; 
908     default:
909        return false; 
910     }
911
912  
913 /* 
914  * Escape strings so PostgreSQL is happy on COPY
915  * 
916  * len is the length of the old string. Your new
917  *   string must be long enough (max 2*old+1) to hold
918  *   the escaped output.
919  */ 
920 static char *pgsql_copy_escape(char *dest, char *src, size_t len) 
921
922     /* we have to escape \t, \n, \r, \ */
923     char c = '\0' ;
924  
925     while (len > 0 && *src) {
926        switch (*src) {
927        case '\n':
928           c = 'n';
929           break;
930        case '\\':
931           c = '\\';
932           break;
933        case '\t':
934           c = 't';
935           break;
936        case '\r':
937           c = 'r';
938           break;
939        default:
940           c = '\0' ;
941        }
942
943        if (c) {
944           *dest = '\\';
945           dest++;
946           *dest = c;
947        } else {
948           *dest = *src;
949        }
950
951        len--;
952        src++;
953        dest++;
954     }
955
956     *dest = '\0';
957     return dest;
958
959  
960 bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
961 {
962    BDB_POSTGRESQL *mdb = this;
963    const char *query = "COPY batch FROM STDIN";
964  
965    Dmsg0(dbglvl_info, "sql_batch_start started\n");
966  
967    if  (!sql_query("CREATE TEMPORARY TABLE batch ("
968                           "FileIndex int,"
969                           "JobId int,"
970                           "Path varchar,"
971                           "Name varchar,"
972                           "LStat varchar,"
973                           "Md5 varchar,"
974                           "DeltaSeq smallint)")) {
975       Dmsg0(dbglvl_err, "sql_batch_start failed\n");
976       return false; 
977    }
978
979    /* We are starting a new query.  reset everything. */
980    mdb->m_num_rows     = -1;
981    mdb->m_row_number   = -1;
982    mdb->m_field_number = -1;
983
984    sql_free_result(); 
985
986    for (int i=0; i < 10; i++) {
987       mdb->m_result = PQexec(mdb->m_db_handle, query);
988       if (mdb->m_result) {
989          break;
990       }
991       bmicrosleep(5, 0);
992    }
993    if (!mdb->m_result) {
994       Dmsg1(dbglvl_err, "Query failed: %s\n", query);
995       goto get_out;
996    }
997
998    mdb->m_status = PQresultStatus(mdb->m_result);
999    if (mdb->m_status == PGRES_COPY_IN) {
1000       /* How many fields in the set? */
1001       mdb->m_num_fields = (int) PQnfields(mdb->m_result);
1002       mdb->m_num_rows = 0;
1003       mdb->m_status = 1;
1004    } else {
1005       Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
1006       goto get_out;
1007    }
1008
1009    Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
1010
1011    return true; 
1012
1013 get_out:
1014    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1015    mdb->m_status = 0;
1016    PQclear(mdb->m_result);
1017    mdb->m_result = NULL;
1018    return false; 
1019 }
1020
1021 /* 
1022  * Set error to something to abort the operation
1023  */ 
1024 bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1025 {
1026    int res;
1027    int count=30;
1028    PGresult *p_result;
1029    BDB_POSTGRESQL *mdb = this;
1030
1031    Dmsg0(dbglvl_info, "sql_batch_end started\n");
1032
1033    do {
1034       res = PQputCopyEnd(mdb->m_db_handle, error);
1035    } while (res == 0 && --count > 0);
1036
1037    if (res == 1) {
1038       Dmsg0(dbglvl_dbg, "ok\n");
1039       mdb->m_status = 0;
1040    }
1041
1042    if (res <= 0) {
1043       mdb->m_status = 1;
1044       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1045       Dmsg1(dbglvl_err, "failure %s\n", errmsg);
1046    }
1047
1048    /* Check command status and return to normal libpq state */
1049    p_result = PQgetResult(mdb->m_db_handle);
1050    if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
1051       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1052       mdb->m_status = 1;
1053    }
1054
1055    /* Get some statistics to compute the best plan */
1056    sql_query("ANALYZE batch");
1057
1058    PQclear(p_result);
1059
1060    Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
1061    return true; 
1062 }
1063
1064 bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1065 {
1066    int res;
1067    int count=30;
1068    size_t len;
1069    const char *digest;
1070    char ed1[50];
1071    BDB_POSTGRESQL *mdb = this;
1072
1073    mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
1074    pgsql_copy_escape(mdb->esc_name, fname, fnl);
1075
1076    mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
1077    pgsql_copy_escape(mdb->esc_path, path, pnl);
1078
1079    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1080       digest = "0";
1081    } else {
1082       digest = ar->Digest;
1083    }
1084
1085    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1086               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
1087               mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
1088
1089    do {
1090       res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
1091    } while (res == 0 && --count > 0);
1092
1093    if (res == 1) {
1094       Dmsg0(dbglvl_dbg, "ok\n");
1095       mdb->changes++;
1096       mdb->m_status = 1;
1097    }
1098
1099    if (res <= 0) {
1100       mdb->m_status = 0;
1101       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1102       Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
1103    }
1104
1105    Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
1106
1107    return true; 
1108 }
1109
1110
1111 #endif /* HAVE_POSTGRESQL */