]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Tweak fix typo in comment
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula Catalog Database routines specific to PostgreSQL
21  *   These are PostgreSQL specific routines
22  *
23  *    Dan Langille, December 2003
24  *    based upon work done by Kern Sibbald, March 2000
25  *
26  * Note: at one point, this file was changed to class based by a certain 
27  *  programmer, and other than "wrapping" in a class, which is a trivial
28  *  change for a C++ programmer, nothing substantial was done, yet all the
29  *  code was recommitted under this programmer's name.  Consequently, we
30  *  undo those changes here.  Unfortunately, it is too difficult to put
31  *  back the original author's name (Dan Langille) on the parts he wrote.
32  */
33
34 #include "bacula.h"
35
36 #ifdef HAVE_POSTGRESQL
37
38 #include  "cats.h"
39 #include  "libpq-fe.h"
40 #include  "postgres_ext.h"       /* needed for NAMEDATALEN */
41 #include  "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
42 #define __BDB_POSTGRESQL_H_ 1
43 #include  "bdb_postgresql.h"
44
45 #define dbglvl_dbg   DT_SQL|100
46 #define dbglvl_info  DT_SQL|50
47 #define dbglvl_err   DT_SQL|10
48
49 /* -----------------------------------------------------------------------
50  *
51  *   PostgreSQL dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /* List of open databases */
57 static dlist *db_list = NULL; 
58
59 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
60
61 BDB_POSTGRESQL::BDB_POSTGRESQL(): BDB()
62 {
63    BDB_POSTGRESQL *mdb = this;
64
65    if (db_list == NULL) {
66       db_list = New(dlist(mdb, &mdb->m_link));
67    }
68    mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
69    mdb->m_db_type = SQL_TYPE_POSTGRESQL;
70    mdb->m_db_driver = bstrdup("PostgreSQL");
71
72    mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
73    mdb->errmsg[0] = 0;
74    mdb->cmd = get_pool_memory(PM_EMSG);    /* get command buffer */
75    mdb->cached_path = get_pool_memory(PM_FNAME);
76    mdb->cached_path_id = 0;
77    mdb->m_ref_count = 1;
78    mdb->fname = get_pool_memory(PM_FNAME);
79    mdb->path = get_pool_memory(PM_FNAME);
80    mdb->esc_name = get_pool_memory(PM_FNAME);
81    mdb->esc_path = get_pool_memory(PM_FNAME);
82    mdb->esc_obj = get_pool_memory(PM_FNAME);
83    mdb->m_use_fatal_jmsg = true;
84
85    /* Initialize the private members. */
86    mdb->m_db_handle = NULL;
87    mdb->m_result = NULL;
88    mdb->m_buf =  get_pool_memory(PM_FNAME);
89
90    db_list->append(this);
91 }
92
93 BDB_POSTGRESQL::~BDB_POSTGRESQL()
94 {
95 }
96
97 /*
98  * Initialize database data structure. In principal this should
99  * never have errors, or it is really fatal.
100  */
101 BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user, 
102                        const char *db_password, const char *db_address, int db_port, const char *db_socket, 
103                        const char *db_ssl_mode, const char *db_ssl_key, const char *db_ssl_cert,
104                        const char *db_ssl_ca, const char *db_ssl_capath, const char *db_ssl_cipher,
105                        bool mult_db_connections, bool disable_batch_insert) 
106 {
107    BDB_POSTGRESQL *mdb = NULL;
108
109    if (!db_user) {
110       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
111       return NULL;
112    }
113    P(mutex);                          /* lock DB queue */
114    if (db_list && !mult_db_connections) {
115       /*
116        * Look to see if DB already open
117        */
118       foreach_dlist(mdb, db_list) {
119          if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
120             Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
121             mdb->increment_refcount();
122             goto get_out;
123          }
124       }
125    }
126    Dmsg0(dbglvl_info, "db_init_database first time\n");
127    /* Create the global Bacula db context */
128    mdb = New(BDB_POSTGRESQL());
129    if (!mdb) goto get_out;
130
131    /* Initialize the parent class members. */
132    mdb->m_db_name = bstrdup(db_name);
133    mdb->m_db_user = bstrdup(db_user);
134    if (db_password) {
135       mdb->m_db_password = bstrdup(db_password);
136    }
137    if (db_address) {
138       mdb->m_db_address = bstrdup(db_address);
139    }
140    if (db_socket) {
141       mdb->m_db_socket = bstrdup(db_socket);
142    }
143    if (db_ssl_mode) {
144       mdb->m_db_ssl_mode = bstrdup(db_ssl_mode);
145    } else {
146       mdb->m_db_ssl_mode = bstrdup("prefer");
147    }
148    if (db_ssl_key) {
149       mdb->m_db_ssl_key = bstrdup(db_ssl_key);
150    }
151    if (db_ssl_cert) {
152       mdb->m_db_ssl_cert = bstrdup(db_ssl_cert);
153    }
154    if (db_ssl_ca) {
155       mdb->m_db_ssl_ca = bstrdup(db_ssl_ca);
156    }
157    mdb->m_db_port = db_port;
158
159    if (disable_batch_insert) { 
160       mdb->m_disabled_batch_insert = true;
161       mdb->m_have_batch_insert = false;
162    } else { 
163       mdb->m_disabled_batch_insert = false;
164 #ifdef USE_BATCH_FILE_INSERT
165 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE) 
166 #ifdef HAVE_PQISTHREADSAFE 
167       mdb->m_have_batch_insert = PQisthreadsafe();
168 #else 
169       mdb->m_have_batch_insert = true;
170 #endif /* HAVE_PQISTHREADSAFE */ 
171 #else 
172       mdb->m_have_batch_insert = true;
173 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */ 
174 #else 
175       mdb->m_have_batch_insert = false;
176 #endif /* USE_BATCH_FILE_INSERT */ 
177    } 
178    mdb->m_allow_transactions = mult_db_connections;
179  
180    /* At this time, when mult_db_connections == true, this is for
181     * specific console command such as bvfs or batch mode, and we don't
182     * want to share a batch mode or bvfs. In the future, we can change
183     * the creation function to add this parameter.
184     */
185    mdb->m_dedicated = mult_db_connections;
186
187 get_out:
188    V(mutex);
189    return mdb;
190
191   
192  
193 /* Check that the database corresponds to the encoding we want  */
194 static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
195 {
196    SQL_ROW row;
197    int ret = false; 
198
199    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) { 
200       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
201       return false; 
202    }
203
204    if ((row = mdb->sql_fetch_row()) == NULL) { 
205       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror()); 
206       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
207    } else { 
208       ret = bstrcmp(row[0], "SQL_ASCII");
209
210       if (ret) {
211          /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
212          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'"); 
213
214       } else { 
215          /* Something is wrong with database encoding */
216          Mmsg(mdb->errmsg,
217               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
218               mdb->get_db_name(), row[0]); 
219          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
220          Dmsg1(dbglvl_err, "%s", mdb->errmsg);
221       }
222    }
223    return ret;
224 }
225
226 /*
227  * Now actually open the database.  This can generate errors,
228  *   which are returned in the errmsg
229  *
230  *  DO NOT close the database or delete mdb here !!!!
231  */
232 bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
233 {
234    bool retval = false; 
235    int errstat;
236    char buf[10], *port;
237    BDB_POSTGRESQL *mdb = this;
238
239    P(mutex);
240    if (mdb->m_connected) {
241       retval = true; 
242       goto get_out;
243    }
244
245    if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
246       berrno be;
247       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
248             be.bstrerror(errstat));
249       goto get_out;
250    }
251
252    if (mdb->m_db_port) {
253       bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
254       port = buf;
255    } else {
256       port = NULL;
257    }
258
259    /* Tells libpq that the SSL library has already been initialized */
260    PQinitSSL(0);
261
262    /* If connection fails, try at 5 sec intervals for 30 seconds. */
263    for (int retry=0; retry < 6; retry++) {
264       /* connect to the database */
265       const char *keywords[10] = {"host", "port",
266                                   "dbname", "user",
267                                   "password", "sslmode",
268                                   "sslkey", "sslcert",
269                                   "sslrootcert", NULL };
270       const char *values[10] = {mdb->m_db_address, /* default localhost */
271                                 port, /* default port */
272                                 mdb->m_db_name,
273                                 mdb->m_db_user,
274                                 mdb->m_db_password,
275                                 mdb->m_db_ssl_mode,
276                                 mdb->m_db_ssl_key,
277                                 mdb->m_db_ssl_cert,
278                                 mdb->m_db_ssl_ca,
279                                 NULL };
280       mdb->m_db_handle = PQconnectdbParams(keywords,
281                                            values, 0);
282
283       /* If no connect, try once more in case it is a timing problem */
284       if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
285          break;
286       }
287       bmicrosleep(5, 0);
288    }
289
290    Dmsg0(dbglvl_info, "pg_real_connect done\n");
291    Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
292         mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
293
294 #ifdef HAVE_OPENSSL
295    #define USE_OPENSSL 1
296    SSL *ssl;
297    if (PQgetssl(mdb->m_db_handle) != NULL) {
298       Dmsg0(dbglvl_info, "SSL in use\n");
299       ssl = (SSL *)PQgetssl(mdb->m_db_handle);
300       Dmsg2(dbglvl_info, "Version:%s Cipher:%s\n", SSL_get_version(ssl), SSL_get_cipher(ssl)); 
301    } else {
302       Dmsg0(dbglvl_info, "SSL not in use\n");
303    }
304 #endif
305
306    if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
307       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
308          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
309          mdb->m_db_name, mdb->m_db_user);
310       goto get_out;
311    }
312
313    mdb->m_connected = true;
314    if (!bdb_check_version(jcr)) {
315       goto get_out;
316    }
317
318    sql_query("SET datestyle TO 'ISO, YMD'"); 
319    sql_query("SET cursor_tuple_fraction=1");
320
321    /* 
322     * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
323     *   WARNING:  nonstandard use of \\ in a string literal
324     */ 
325    sql_query("SET standard_conforming_strings=on"); 
326  
327    /* Check that encoding is SQL_ASCII */
328    pgsql_check_database_encoding(jcr, mdb);
329  
330    retval = true; 
331
332 get_out:
333    V(mutex);
334    return retval; 
335
336
337 void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
338 {
339    BDB_POSTGRESQL *mdb = this;
340
341    if (mdb->m_connected) {
342       bdb_end_transaction(jcr);
343    } 
344    P(mutex);
345    mdb->m_ref_count--;
346    if (mdb->m_ref_count == 0) {
347       if (mdb->m_connected) {
348          sql_free_result(); 
349       } 
350       db_list->remove(mdb);
351       if (mdb->m_connected && mdb->m_db_handle) {
352          PQfinish(mdb->m_db_handle);
353       } 
354       if (is_rwl_valid(&mdb->m_lock)) {
355          rwl_destroy(&mdb->m_lock);
356       } 
357       free_pool_memory(mdb->errmsg);
358       free_pool_memory(mdb->cmd);
359       free_pool_memory(mdb->cached_path);
360       free_pool_memory(mdb->fname);
361       free_pool_memory(mdb->path);
362       free_pool_memory(mdb->esc_name);
363       free_pool_memory(mdb->esc_path);
364       free_pool_memory(mdb->esc_obj);
365       free_pool_memory(mdb->m_buf);
366       if (mdb->m_db_driver) {
367          free(mdb->m_db_driver);
368       } 
369       if (mdb->m_db_name) {
370          free(mdb->m_db_name);
371       } 
372       if (mdb->m_db_user) {
373          free(mdb->m_db_user);
374       } 
375       if (mdb->m_db_password) {
376          free(mdb->m_db_password);
377       } 
378       if (mdb->m_db_address) {
379          free(mdb->m_db_address);
380       } 
381       if (mdb->m_db_socket) {
382          free(mdb->m_db_socket);
383       }
384       if (mdb->m_db_ssl_mode) {
385          free(mdb->m_db_ssl_mode);
386       }
387       if (mdb->m_db_ssl_key) {
388          free(mdb->m_db_ssl_key);
389       }
390       if (mdb->m_db_ssl_cert) {
391          free(mdb->m_db_ssl_cert);
392       }
393       if (mdb->m_db_ssl_ca) {
394          free(mdb->m_db_ssl_ca);
395       }
396       delete mdb;
397       if (db_list->size() == 0) {
398          delete db_list;
399          db_list = NULL;
400       }
401    }
402    V(mutex);
403 }
404
405 void BDB_POSTGRESQL::bdb_thread_cleanup(void)
406 {
407 }
408
409 /*
410  * Escape strings so PostgreSQL is happy
411  *
412  *  len is the length of the old string. Your new
413  *    string must be long enough (max 2*old+1) to hold
414  *    the escaped output.
415  */
416 void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
417 {
418    BDB_POSTGRESQL *mdb = this;
419    int failed; 
420
421    PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
422    if (failed) {
423       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n")); 
424       /* failed on encoding, probably invalid multibyte encoding in the source string
425          see PQescapeStringConn documentation for details. */
426       Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
427    } 
428 }
429
430 /*
431  * Escape binary so that PostgreSQL is happy
432  *
433  */
434 char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
435 {
436    size_t new_len;
437    unsigned char *obj; 
438    BDB_POSTGRESQL *mdb = this;
439
440    mdb->esc_obj[0] = 0;
441    obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
442    if (!obj) { 
443       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
444    } else {
445       mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
446       memcpy(mdb->esc_obj, obj, new_len);
447       mdb->esc_obj[new_len] = 0;
448       PQfreemem(obj);
449    }
450    return (char *)mdb->esc_obj;
451 }
452
453 /*
454  * Unescape binary object so that PostgreSQL is happy
455  *
456  */
457 void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
458                        POOLMEM **dest, int32_t *dest_len)
459 {
460    size_t new_len;
461    unsigned char *obj;
462
463    if (!from) {
464       *dest[0] = 0;
465       *dest_len = 0;
466       return;
467    }
468
469    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
470
471    if (!obj) {
472       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
473    }
474
475    *dest_len = new_len;
476    *dest = check_pool_memory_size(*dest, new_len+1);
477    memcpy(*dest, obj, new_len);
478    (*dest)[new_len]=0;
479
480    PQfreemem(obj);
481
482    Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
483 }
484
485 /*
486  * Start a transaction. This groups inserts and makes things more efficient.
487  *  Usually started when inserting file attributes.
488  */
489 void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
490 {
491    BDB_POSTGRESQL *mdb = this;
492
493    if (!jcr->attr) { 
494       jcr->attr = get_pool_memory(PM_FNAME); 
495    }
496    if (!jcr->ar) { 
497       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR)); 
498       memset(jcr->ar, 0, sizeof(ATTR_DBR));
499    }
500
501    /* 
502     * This is turned off because transactions break if
503     *  multiple simultaneous jobs are run.
504     */ 
505    if (!mdb->m_allow_transactions) {
506       return; 
507    }
508
509    bdb_lock();
510    /* Allow only 25,000 changes per transaction */
511    if (mdb->m_transaction && changes > 25000) {
512       bdb_end_transaction(jcr);
513    } 
514    if (!mdb->m_transaction) {
515       sql_query("BEGIN");             /* begin transaction */
516       Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
517       mdb->m_transaction = true;
518    } 
519    bdb_unlock();
520 }
521
522 void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
523 {
524    BDB_POSTGRESQL *mdb = this;
525
526    if (jcr && jcr->cached_attribute) { 
527       Dmsg0(dbglvl_info, "Flush last cached attribute.\n");
528       if (!bdb_create_attributes_record(jcr, jcr->ar)) {
529          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), jcr->db->bdb_strerror());
530       }
531       jcr->cached_attribute = false; 
532    }
533
534    if (!mdb->m_allow_transactions) {
535       return; 
536    }
537
538    bdb_lock();
539    if (mdb->m_transaction) {
540       sql_query("COMMIT");            /* end transaction */
541       mdb->m_transaction = false;
542       Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
543    }
544    changes = 0; 
545    bdb_unlock();
546 }
547
548
549 /*
550  * Submit a general SQL command, and for each row returned,
551  *  the result_handler is called with the ctx.
552  */
553 bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
554                                        DB_RESULT_HANDLER *result_handler,
555                                        void *ctx)
556 {
557    BDB_POSTGRESQL *mdb = this;
558    SQL_ROW row; 
559    bool retval = false; 
560    bool in_transaction = mdb->m_transaction;
561
562    Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
563
564    mdb->errmsg[0] = 0;
565    /* This code handles only SELECT queries */
566    if (strncasecmp(query, "SELECT", 6) != 0) {
567       return bdb_sql_query(query, result_handler, ctx);
568    }
569
570    if (!result_handler) {       /* no need of big_query without handler */
571       return false; 
572    }
573
574    bdb_lock();
575
576    if (!in_transaction) {       /* CURSOR needs transaction */
577       sql_query("BEGIN");
578    }
579
580    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
581
582    if (!sql_query(mdb->m_buf)) {
583       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
584       Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
585       goto get_out;
586    }
587
588    do {
589       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
590          Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
591          Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
592          goto get_out;
593       }
594       while ((row = sql_fetch_row()) != NULL) {
595          Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
596          if (result_handler(ctx, mdb->m_num_fields, row))
597             break;
598       }
599       PQclear(mdb->m_result);
600       m_result = NULL;
601
602    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
603
604    sql_query("CLOSE _bac_cursor");
605
606    Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
607    sql_free_result();
608    retval = true;
609
610 get_out:
611    if (!in_transaction) {
612       sql_query("COMMIT");  /* end transaction */
613    }
614
615    bdb_unlock();
616    return retval;
617 }
618
619 /* 
620  * Submit a general SQL command, and for each row returned,
621  *  the result_handler is called with the ctx.
622  */ 
623 bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
624 {
625    SQL_ROW row; 
626    bool retval = true; 
627    BDB_POSTGRESQL *mdb = this;
628
629    Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
630
631    bdb_lock();
632    mdb->errmsg[0] = 0;
633    if (!sql_query(query, QF_STORE_RESULT)) { 
634       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
635       Dmsg0(dbglvl_err, "db_sql_query failed\n");
636       retval = false; 
637       goto get_out;
638    } 
639
640    Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
641
642    if (result_handler) {
643       Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
644       while ((row = sql_fetch_row())) {
645          Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
646          if (result_handler(ctx, mdb->m_num_fields, row))
647             break; 
648       } 
649       sql_free_result(); 
650    } 
651
652    Dmsg0(dbglvl_info, "db_sql_query finished\n");
653
654 get_out:
655    bdb_unlock();
656    return retval; 
657 }
658
659 /*
660  * If this routine returns false (failure), Bacula expects
661  *   that no result has been stored.
662  * This is where QueryDB calls to with Postgresql.
663  *
664  *  Returns: true  on success
665  *           false on failure
666  *
667  */
668 bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
669 {
670    int i; 
671    bool retval = false; 
672    BDB_POSTGRESQL *mdb = this;
673
674    Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
675
676    /* We are starting a new query. reset everything. */
677    mdb->m_num_rows     = -1;
678    mdb->m_row_number   = -1;
679    mdb->m_field_number = -1;
680
681    if (mdb->m_result) {
682       PQclear(mdb->m_result);  /* hmm, someone forgot to free?? */
683       mdb->m_result = NULL;
684    } 
685
686    for (i = 0; i < 10; i++) { 
687       mdb->m_result = PQexec(mdb->m_db_handle, query);
688       if (mdb->m_result) {
689          break;
690       }
691       bmicrosleep(5, 0);
692    }
693    if (!mdb->m_result) {
694       Dmsg1(dbglvl_err, "Query failed: %s\n", query);
695       goto get_out;
696    }
697
698    mdb->m_status = PQresultStatus(mdb->m_result);
699    if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
700       Dmsg0(dbglvl_dbg, "we have a result\n");
701
702       /* How many fields in the set? */
703       mdb->m_num_fields = (int)PQnfields(mdb->m_result);
704       Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
705
706       mdb->m_num_rows = PQntuples(mdb->m_result);
707       Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
708
709       mdb->m_row_number = 0;      /* we can start to fetch something */
710       mdb->m_status = 0;          /* succeed */
711       retval = true; 
712    } else {
713       Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
714       goto get_out;
715    }
716
717    Dmsg0(dbglvl_info, "sql_query finishing\n");
718    goto ok_out; 
719
720 get_out:
721    Dmsg0(dbglvl_err, "we failed\n");
722    PQclear(mdb->m_result);
723    mdb->m_result = NULL;
724    mdb->m_status = 1;                   /* failed */
725  
726 ok_out: 
727    return retval; 
728 }  
729  
730 void BDB_POSTGRESQL::sql_free_result(void)
731 {
732    BDB_POSTGRESQL *mdb = this;
733
734    bdb_lock();
735    if (mdb->m_result) {
736       PQclear(mdb->m_result);
737       mdb->m_result = NULL;
738    }
739    if (mdb->m_rows) {
740       free(mdb->m_rows);
741       mdb->m_rows = NULL;
742    }
743    if (mdb->m_fields) {
744       free(mdb->m_fields);
745       mdb->m_fields = NULL;
746    } 
747    mdb->m_num_rows = mdb->m_num_fields = 0;
748    bdb_unlock();
749
750
751 SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
752
753    SQL_ROW row = NULL;            /* by default, return NULL */
754    BDB_POSTGRESQL *mdb = this;
755  
756    Dmsg0(dbglvl_info, "sql_fetch_row start\n");
757  
758    if (mdb->m_num_fields == 0) {     /* No field, no row */
759       Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
760       return NULL;
761    }
762
763    if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
764       if (mdb->m_rows) {
765          Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
766          free(mdb->m_rows);
767       } 
768       Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
769       mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
770       mdb->m_rows_size = mdb->m_num_fields;
771  
772       /* Now reset the row_number now that we have the space allocated */
773       mdb->m_row_number = 0;
774    }
775
776    /* If still within the result set */
777    if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
778       Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
779
780       /* Get each value from this row */
781       for (int j = 0; j < mdb->m_num_fields; j++) {
782          mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
783          Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
784       } 
785       mdb->m_row_number++;  /* Increment the row number for the next call */
786       row = mdb->m_rows;
787    } else { 
788       Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
789    }
790  
791    Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
792  
793    return row; 
794
795
796 const char *BDB_POSTGRESQL::sql_strerror(void)
797 {   
798    BDB_POSTGRESQL *mdb = this;
799    return PQerrorMessage(mdb->m_db_handle);
800
801
802 void BDB_POSTGRESQL::sql_data_seek(int row)
803
804    BDB_POSTGRESQL *mdb = this;
805    /* Set the row number to be returned on the next call to sql_fetch_row */
806    mdb->m_row_number = row;
807
808
809 int BDB_POSTGRESQL::sql_affected_rows(void)
810
811    BDB_POSTGRESQL *mdb = this;
812    return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
813
814  
815 uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
816
817    uint64_t id = 0; 
818    char sequence[NAMEDATALEN-1]; 
819    char getkeyval_query[NAMEDATALEN+50]; 
820    PGresult *p_result;
821    BDB_POSTGRESQL *mdb = this;
822
823    /* First execute the insert query and then retrieve the currval. */
824    if (!sql_query(query)) { 
825       return 0; 
826    } 
827
828    mdb->m_num_rows = sql_affected_rows();
829    if (mdb->m_num_rows != 1) {
830       return 0; 
831    } 
832    mdb->changes++;
833    /* 
834     *  Obtain the current value of the sequence that
835     *  provides the serial value for primary key of the table.
836     * 
837     *  currval is local to our session.  It is not affected by
838     *  other transactions.
839     * 
840     *  Determine the name of the sequence.
841     *  PostgreSQL automatically creates a sequence using
842     *   <table>_<column>_seq.
843     *  At the time of writing, all tables used this format for
844     *   for their primary key: <table>id
845     *  Except for basefiles which has a primary key on baseid.
846     *  Therefore, we need to special case that one table.
847     * 
848     *  everything else can use the PostgreSQL formula.
849     */ 
850    if (strcasecmp(table_name, "basefiles") == 0) {
851       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
852    } else {
853       bstrncpy(sequence, table_name, sizeof(sequence));
854       bstrncat(sequence, "_",        sizeof(sequence));
855       bstrncat(sequence, table_name, sizeof(sequence));
856       bstrncat(sequence, "id",       sizeof(sequence));
857    }
858
859    bstrncat(sequence, "_seq", sizeof(sequence));
860    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence); 
861
862    Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
863    for (int i = 0; i < 10; i++) { 
864       p_result = PQexec(mdb->m_db_handle, getkeyval_query);
865       if (p_result) {
866          break;
867       }
868       bmicrosleep(5, 0);
869    }
870    if (!p_result) {
871       Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
872       goto get_out;
873    }
874
875    Dmsg0(dbglvl_dbg, "exec done");
876
877    if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
878       Dmsg0(dbglvl_dbg, "getting value");
879       id = str_to_uint64(PQgetvalue(p_result, 0, 0));
880       Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
881    } else {
882       Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
883       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
884    }
885
886 get_out:
887    PQclear(p_result);
888    return id;
889
890
891 SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
892
893    int max_len; 
894    int this_len; 
895    BDB_POSTGRESQL *mdb = this;
896  
897    Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
898  
899    if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
900       if (mdb->m_fields) {
901          free(mdb->m_fields);
902          mdb->m_fields = NULL;
903       } 
904       Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
905       mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
906       mdb->m_fields_size = mdb->m_num_fields;
907  
908       for (int i = 0; i < mdb->m_num_fields; i++) {
909          Dmsg1(dbglvl_dbg, "filling field %d\n", i);
910          mdb->m_fields[i].name = PQfname(mdb->m_result, i);
911          mdb->m_fields[i].type = PQftype(mdb->m_result, i);
912          mdb->m_fields[i].flags = 0;
913  
914          /* For a given column, find the max length. */
915          max_len = 0; 
916          for (int j = 0; j < mdb->m_num_rows; j++) {
917             if (PQgetisnull(mdb->m_result, j, i)) {
918                this_len = 4;         /* "NULL" */
919             } else { 
920                this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
921             } 
922
923             if (max_len < this_len) { 
924                max_len = this_len; 
925             } 
926          } 
927          mdb->m_fields[i].max_length = max_len;
928   
929          Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
930                mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
931       } 
932    } 
933  
934    /* Increment field number for the next time around */
935    return &mdb->m_fields[mdb->m_field_number++];
936 }  
937  
938 bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
939 {  
940    if (field_type == 1) {
941       return true; 
942    } 
943    return false; 
944
945  
946 bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
947
948     /*
949      * TEMP: the following is taken from select OID, typname from pg_type;
950      */
951     switch (field_type) {
952     case 20:
953     case 21:
954     case 23:
955     case 700:
956     case 701:
957        return true; 
958     default:
959        return false; 
960     }
961
962  
963 /* 
964  * Escape strings so PostgreSQL is happy on COPY
965  * 
966  * len is the length of the old string. Your new
967  *   string must be long enough (max 2*old+1) to hold
968  *   the escaped output.
969  */ 
970 static char *pgsql_copy_escape(char *dest, char *src, size_t len) 
971
972     /* we have to escape \t, \n, \r, \ */
973     char c = '\0' ;
974  
975     while (len > 0 && *src) {
976        switch (*src) {
977        case '\n':
978           c = 'n';
979           break;
980        case '\\':
981           c = '\\';
982           break;
983        case '\t':
984           c = 't';
985           break;
986        case '\r':
987           c = 'r';
988           break;
989        default:
990           c = '\0' ;
991        }
992
993        if (c) {
994           *dest = '\\';
995           dest++;
996           *dest = c;
997        } else {
998           *dest = *src;
999        }
1000
1001        len--;
1002        src++;
1003        dest++;
1004     }
1005
1006     *dest = '\0';
1007     return dest;
1008
1009  
1010 bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
1011 {
1012    BDB_POSTGRESQL *mdb = this;
1013    const char *query = "COPY batch FROM STDIN";
1014  
1015    Dmsg0(dbglvl_info, "sql_batch_start started\n");
1016  
1017    if  (!sql_query("CREATE TEMPORARY TABLE batch ("
1018                           "FileIndex int,"
1019                           "JobId int,"
1020                           "Path varchar,"
1021                           "Name varchar,"
1022                           "LStat varchar,"
1023                           "Md5 varchar,"
1024                           "DeltaSeq smallint)")) {
1025       Dmsg0(dbglvl_err, "sql_batch_start failed\n");
1026       return false; 
1027    }
1028
1029    /* We are starting a new query.  reset everything. */
1030    mdb->m_num_rows     = -1;
1031    mdb->m_row_number   = -1;
1032    mdb->m_field_number = -1;
1033
1034    sql_free_result(); 
1035
1036    for (int i=0; i < 10; i++) {
1037       mdb->m_result = PQexec(mdb->m_db_handle, query);
1038       if (mdb->m_result) {
1039          break;
1040       }
1041       bmicrosleep(5, 0);
1042    }
1043    if (!mdb->m_result) {
1044       Dmsg1(dbglvl_err, "Query failed: %s\n", query);
1045       goto get_out;
1046    }
1047
1048    mdb->m_status = PQresultStatus(mdb->m_result);
1049    if (mdb->m_status == PGRES_COPY_IN) {
1050       /* How many fields in the set? */
1051       mdb->m_num_fields = (int) PQnfields(mdb->m_result);
1052       mdb->m_num_rows = 0;
1053       mdb->m_status = 1;
1054    } else {
1055       Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
1056       goto get_out;
1057    }
1058
1059    Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
1060
1061    return true; 
1062
1063 get_out:
1064    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1065    mdb->m_status = 0;
1066    PQclear(mdb->m_result);
1067    mdb->m_result = NULL;
1068    return false; 
1069 }
1070
1071 /* 
1072  * Set error to something to abort the operation
1073  */ 
1074 bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1075 {
1076    int res;
1077    int count=30;
1078    PGresult *p_result;
1079    BDB_POSTGRESQL *mdb = this;
1080
1081    Dmsg0(dbglvl_info, "sql_batch_end started\n");
1082
1083    do {
1084       res = PQputCopyEnd(mdb->m_db_handle, error);
1085    } while (res == 0 && --count > 0);
1086
1087    if (res == 1) {
1088       Dmsg0(dbglvl_dbg, "ok\n");
1089       mdb->m_status = 0;
1090    }
1091
1092    if (res <= 0) {
1093       mdb->m_status = 1;
1094       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1095       Dmsg1(dbglvl_err, "failure %s\n", errmsg);
1096    }
1097
1098    /* Check command status and return to normal libpq state */
1099    p_result = PQgetResult(mdb->m_db_handle);
1100    if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
1101       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1102       mdb->m_status = 1;
1103    }
1104
1105    /* Get some statistics to compute the best plan */
1106    sql_query("ANALYZE batch");
1107
1108    PQclear(p_result);
1109
1110    Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
1111    return true; 
1112 }
1113
1114 bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1115 {
1116    int res;
1117    int count=30;
1118    size_t len;
1119    const char *digest;
1120    char ed1[50];
1121    BDB_POSTGRESQL *mdb = this;
1122
1123    mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
1124    pgsql_copy_escape(mdb->esc_name, fname, fnl);
1125
1126    mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
1127    pgsql_copy_escape(mdb->esc_path, path, pnl);
1128
1129    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1130       digest = "0";
1131    } else {
1132       digest = ar->Digest;
1133    }
1134
1135    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1136               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
1137               mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
1138
1139    do {
1140       res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
1141    } while (res == 0 && --count > 0);
1142
1143    if (res == 1) {
1144       Dmsg0(dbglvl_dbg, "ok\n");
1145       mdb->changes++;
1146       mdb->m_status = 1;
1147    }
1148
1149    if (res <= 0) {
1150       mdb->m_status = 0;
1151       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1152       Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
1153    }
1154
1155    Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
1156
1157    return true; 
1158 }
1159
1160
1161 #endif /* HAVE_POSTGRESQL */