]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Change copyright as per agreement with FSFE
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2016 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula Catalog Database routines specific to PostgreSQL
21  *   These are PostgreSQL specific routines
22  *
23  *    Dan Langille, December 2003
24  *    based upon work done by Kern Sibbald, March 2000
25  *
26  * Note: at one point, this file was changed to class based by a certain 
27  *  programmer, and other than "wrapping" in a class, which is a trivial
28  *  change for a C++ programmer, nothing substantial was done, yet all the
29  *  code was recommitted under this programmer's name.  Consequently, we
30  *  undo those changes here.  Unfortunately, it is too difficult to put
31  *  back the original author's name (Dan Langille) on the parts he wrote.
32  */
33
34 #include "bacula.h"
35
36 #ifdef HAVE_POSTGRESQL
37
38 #include  "cats.h"
39 #include  "libpq-fe.h"
40 #include  "postgres_ext.h"       /* needed for NAMEDATALEN */
41 #include  "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
42 #define __BDB_POSTGRESQL_H_ 1
43 #include  "bdb_postgresql.h"
44
45 #define dbglvl_dbg   DT_SQL|100
46 #define dbglvl_info  DT_SQL|50
47 #define dbglvl_err   DT_SQL|10
48
49 /* -----------------------------------------------------------------------
50  *
51  *   PostgreSQL dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /* List of open databases */
57 static dlist *db_list = NULL; 
58
59 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
60
61 BDB_POSTGRESQL::BDB_POSTGRESQL()
62 {
63    BDB_POSTGRESQL *mdb = this;
64
65    if (db_list == NULL) {
66       db_list = New(dlist(mdb, &mdb->m_link));
67    }
68    mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
69    mdb->m_db_type = SQL_TYPE_POSTGRESQL;
70    mdb->m_db_driver = bstrdup("PostgreSQL");
71
72    mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
73    mdb->errmsg[0] = 0;
74    mdb->cmd = get_pool_memory(PM_EMSG);    /* get command buffer */
75    mdb->cached_path = get_pool_memory(PM_FNAME);
76    mdb->cached_path_id = 0;
77    mdb->m_ref_count = 1;
78    mdb->fname = get_pool_memory(PM_FNAME);
79    mdb->path = get_pool_memory(PM_FNAME);
80    mdb->esc_name = get_pool_memory(PM_FNAME);
81    mdb->esc_path = get_pool_memory(PM_FNAME);
82    mdb->esc_obj = get_pool_memory(PM_FNAME);
83    mdb->m_use_fatal_jmsg = true;
84
85    /* Initialize the private members. */
86    mdb->m_db_handle = NULL;
87    mdb->m_result = NULL;
88    mdb->m_buf =  get_pool_memory(PM_FNAME);
89
90    db_list->append(this);
91 }
92
93 BDB_POSTGRESQL::~BDB_POSTGRESQL()
94 {
95 }
96
97 /*
98  * Initialize database data structure. In principal this should
99  * never have errors, or it is really fatal.
100  */
101 BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user, 
102                        const char *db_password, const char *db_address, int db_port, const char *db_socket, 
103                        const char *db_ssl_key, const char *db_ssl_cert, const char *db_ssl_ca,
104                        const char *db_ssl_capath, const char *db_ssl_cipher,
105                        bool mult_db_connections, bool disable_batch_insert) 
106 {
107    BDB_POSTGRESQL *mdb = NULL;
108
109    if (!db_user) {
110       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
111       return NULL;
112    }
113    P(mutex);                          /* lock DB queue */
114    if (db_list && !mult_db_connections) {
115       /*
116        * Look to see if DB already open
117        */
118       foreach_dlist(mdb, db_list) {
119          if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
120             Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
121             mdb->increment_refcount();
122             goto get_out;
123          }
124       }
125    }
126    Dmsg0(dbglvl_info, "db_init_database first time\n");
127    /* Create the global Bacula db context */
128    mdb = New(BDB_POSTGRESQL());
129    if (!mdb) goto get_out;
130
131    /* Initialize the parent class members. */
132    mdb->m_db_name = bstrdup(db_name);
133    mdb->m_db_user = bstrdup(db_user);
134    if (db_password) {
135       mdb->m_db_password = bstrdup(db_password);
136    }
137    if (db_address) {
138       mdb->m_db_address = bstrdup(db_address);
139    }
140    if (db_socket) {
141       mdb->m_db_socket = bstrdup(db_socket);
142    } 
143    mdb->m_db_port = db_port;
144
145    if (disable_batch_insert) { 
146       mdb->m_disabled_batch_insert = true;
147       mdb->m_have_batch_insert = false;
148    } else { 
149       mdb->m_disabled_batch_insert = false;
150 #ifdef USE_BATCH_FILE_INSERT
151 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE) 
152 #ifdef HAVE_PQISTHREADSAFE 
153       mdb->m_have_batch_insert = PQisthreadsafe();
154 #else 
155       mdb->m_have_batch_insert = true;
156 #endif /* HAVE_PQISTHREADSAFE */ 
157 #else 
158       mdb->m_have_batch_insert = true;
159 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */ 
160 #else 
161       mdb->m_have_batch_insert = false;
162 #endif /* USE_BATCH_FILE_INSERT */ 
163    } 
164    mdb->m_allow_transactions = mult_db_connections;
165  
166    /* At this time, when mult_db_connections == true, this is for
167     * specific console command such as bvfs or batch mode, and we don't
168     * want to share a batch mode or bvfs. In the future, we can change
169     * the creation function to add this parameter.
170     */
171    mdb->m_dedicated = mult_db_connections;
172
173 get_out:
174    V(mutex);
175    return mdb;
176
177   
178  
179 /* Check that the database corresponds to the encoding we want  */
180 static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
181 {
182    SQL_ROW row;
183    int ret = false; 
184
185    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) { 
186       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
187       return false; 
188    }
189
190    if ((row = mdb->sql_fetch_row()) == NULL) { 
191       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror()); 
192       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
193    } else { 
194       ret = bstrcmp(row[0], "SQL_ASCII");
195
196       if (ret) {
197          /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
198          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'"); 
199
200       } else { 
201          /* Something is wrong with database encoding */
202          Mmsg(mdb->errmsg,
203               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
204               mdb->get_db_name(), row[0]); 
205          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
206          Dmsg1(dbglvl_err, "%s", mdb->errmsg);
207       }
208    }
209    return ret;
210 }
211
212 /*
213  * Now actually open the database.  This can generate errors,
214  *   which are returned in the errmsg
215  *
216  *  DO NOT close the database or delete mdb here !!!!
217  */
218 bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
219 {
220    bool retval = false; 
221    int errstat;
222    char buf[10], *port;
223    BDB_POSTGRESQL *mdb = this;
224
225    P(mutex);
226    if (mdb->m_connected) {
227       retval = true; 
228       goto get_out;
229    }
230
231    if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
232       berrno be;
233       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
234             be.bstrerror(errstat));
235       goto get_out;
236    }
237
238    if (mdb->m_db_port) {
239       bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
240       port = buf;
241    } else {
242       port = NULL;
243    }
244
245    /* If connection fails, try at 5 sec intervals for 30 seconds. */
246    for (int retry=0; retry < 6; retry++) {
247       /* connect to the database */
248       mdb->m_db_handle = PQsetdbLogin(
249            mdb->m_db_address,         /* default = localhost */
250            port,                      /* default port */
251            NULL,                      /* pg options */
252            NULL,                      /* tty, ignored */
253            mdb->m_db_name,            /* database name */
254            mdb->m_db_user,            /* login name */
255            mdb->m_db_password);       /* password */
256
257       /* If no connect, try once more in case it is a timing problem */
258       if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
259          break;
260       }
261       bmicrosleep(5, 0);
262    }
263
264    Dmsg0(dbglvl_info, "pg_real_connect done\n");
265    Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
266         mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
267
268    if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
269       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
270          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
271          mdb->m_db_name, mdb->m_db_user);
272       goto get_out;
273    }
274
275    mdb->m_connected = true;
276    if (!bdb_check_version(jcr)) {
277       goto get_out;
278    }
279
280    sql_query("SET datestyle TO 'ISO, YMD'"); 
281    sql_query("SET cursor_tuple_fraction=1");
282
283    /* 
284     * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
285     *   WARNING:  nonstandard use of \\ in a string literal
286     */ 
287    sql_query("SET standard_conforming_strings=on"); 
288  
289    /* Check that encoding is SQL_ASCII */
290    pgsql_check_database_encoding(jcr, mdb);
291  
292    retval = true; 
293
294 get_out:
295    V(mutex);
296    return retval; 
297
298
299 void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
300 {
301    BDB_POSTGRESQL *mdb = this;
302
303    if (mdb->m_connected) {
304       bdb_end_transaction(jcr);
305    } 
306    P(mutex);
307    mdb->m_ref_count--;
308    if (mdb->m_ref_count == 0) {
309       if (mdb->m_connected) {
310          sql_free_result(); 
311       } 
312       db_list->remove(mdb);
313       if (mdb->m_connected && mdb->m_db_handle) {
314          PQfinish(mdb->m_db_handle);
315       } 
316       if (is_rwl_valid(&mdb->m_lock)) {
317          rwl_destroy(&mdb->m_lock);
318       } 
319       free_pool_memory(mdb->errmsg);
320       free_pool_memory(mdb->cmd);
321       free_pool_memory(mdb->cached_path);
322       free_pool_memory(mdb->fname);
323       free_pool_memory(mdb->path);
324       free_pool_memory(mdb->esc_name);
325       free_pool_memory(mdb->esc_path);
326       free_pool_memory(mdb->esc_obj);
327       free_pool_memory(mdb->m_buf);
328       if (mdb->m_db_driver) {
329          free(mdb->m_db_driver);
330       } 
331       if (mdb->m_db_name) {
332          free(mdb->m_db_name);
333       } 
334       if (mdb->m_db_user) {
335          free(mdb->m_db_user);
336       } 
337       if (mdb->m_db_password) {
338          free(mdb->m_db_password);
339       } 
340       if (mdb->m_db_address) {
341          free(mdb->m_db_address);
342       } 
343       if (mdb->m_db_socket) {
344          free(mdb->m_db_socket);
345       } 
346       delete mdb;
347       if (db_list->size() == 0) {
348          delete db_list;
349          db_list = NULL;
350       }
351    }
352    V(mutex);
353 }
354
355 void BDB_POSTGRESQL::bdb_thread_cleanup(void)
356 {
357 }
358
359 /*
360  * Escape strings so PostgreSQL is happy
361  *
362  *  len is the length of the old string. Your new
363  *    string must be long enough (max 2*old+1) to hold
364  *    the escaped output.
365  */
366 void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
367 {
368    BDB_POSTGRESQL *mdb = this;
369    int failed; 
370
371    PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
372    if (failed) {
373       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n")); 
374       /* failed on encoding, probably invalid multibyte encoding in the source string
375          see PQescapeStringConn documentation for details. */
376       Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
377    } 
378 }
379
380 /*
381  * Escape binary so that PostgreSQL is happy
382  *
383  */
384 char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
385 {
386    size_t new_len;
387    unsigned char *obj; 
388    BDB_POSTGRESQL *mdb = this;
389
390    mdb->esc_obj[0] = 0;
391    obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
392    if (!obj) { 
393       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
394    } else {
395       mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
396       memcpy(mdb->esc_obj, obj, new_len);
397       mdb->esc_obj[new_len] = 0;
398       PQfreemem(obj);
399    }
400    return (char *)mdb->esc_obj;
401 }
402
403 /*
404  * Unescape binary object so that PostgreSQL is happy
405  *
406  */
407 void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
408                        POOLMEM **dest, int32_t *dest_len)
409 {
410    size_t new_len;
411    unsigned char *obj;
412
413    if (!from) {
414       *dest[0] = 0;
415       *dest_len = 0;
416       return;
417    }
418
419    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
420
421    if (!obj) {
422       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
423    }
424
425    *dest_len = new_len;
426    *dest = check_pool_memory_size(*dest, new_len+1);
427    memcpy(*dest, obj, new_len);
428    (*dest)[new_len]=0;
429
430    PQfreemem(obj);
431
432    Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
433 }
434
435 /*
436  * Start a transaction. This groups inserts and makes things more efficient.
437  *  Usually started when inserting file attributes.
438  */
439 void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
440 {
441    BDB_POSTGRESQL *mdb = this;
442
443    if (!jcr->attr) { 
444       jcr->attr = get_pool_memory(PM_FNAME); 
445    }
446    if (!jcr->ar) { 
447       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR)); 
448    }
449
450    /* 
451     * This is turned off because transactions break if
452     *  multiple simultaneous jobs are run.
453     */ 
454    if (!mdb->m_allow_transactions) {
455       return; 
456    }
457
458    bdb_lock();
459    /* Allow only 25,000 changes per transaction */
460    if (mdb->m_transaction && changes > 25000) {
461       bdb_end_transaction(jcr);
462    } 
463    if (!mdb->m_transaction) {
464       sql_query("BEGIN");             /* begin transaction */
465       Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
466       mdb->m_transaction = true;
467    } 
468    bdb_unlock();
469 }
470
471 void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
472 {
473    BDB_POSTGRESQL *mdb = this;
474
475    if (jcr && jcr->cached_attribute) { 
476       Dmsg0(dbglvl_info, "Flush last cached attribute.\n");
477       if (!bdb_create_attributes_record(jcr, jcr->ar)) {
478          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), jcr->db->bdb_strerror());
479       }
480       jcr->cached_attribute = false; 
481    }
482
483    if (!mdb->m_allow_transactions) {
484       return; 
485    }
486
487    bdb_lock();
488    if (mdb->m_transaction) {
489       sql_query("COMMIT");            /* end transaction */
490       mdb->m_transaction = false;
491       Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
492    }
493    changes = 0; 
494    bdb_unlock();
495 }
496
497
498 /*
499  * Submit a general SQL command, and for each row returned,
500  *  the result_handler is called with the ctx.
501  */
502 bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
503                                        DB_RESULT_HANDLER *result_handler,
504                                        void *ctx)
505 {
506    BDB_POSTGRESQL *mdb = this;
507    SQL_ROW row; 
508    bool retval = false; 
509    bool in_transaction = mdb->m_transaction;
510
511    Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
512
513    mdb->errmsg[0] = 0;
514    /* This code handles only SELECT queries */
515    if (strncasecmp(query, "SELECT", 6) != 0) {
516       return bdb_sql_query(query, result_handler, ctx);
517    }
518
519    if (!result_handler) {       /* no need of big_query without handler */
520       return false; 
521    }
522
523    bdb_lock();
524
525    if (!in_transaction) {       /* CURSOR needs transaction */
526       sql_query("BEGIN");
527    }
528
529    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
530
531    if (!sql_query(mdb->m_buf)) {
532       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
533       Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
534       goto get_out;
535    }
536
537    do {
538       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
539          Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
540          Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
541          goto get_out;
542       }
543       while ((row = sql_fetch_row()) != NULL) {
544          Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
545          if (result_handler(ctx, mdb->m_num_fields, row))
546             break;
547       }
548       PQclear(mdb->m_result);
549       m_result = NULL;
550
551    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
552
553    sql_query("CLOSE _bac_cursor");
554
555    Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
556    sql_free_result();
557    retval = true;
558
559 get_out:
560    if (!in_transaction) {
561       sql_query("COMMIT");  /* end transaction */
562    }
563
564    bdb_unlock();
565    return retval;
566 }
567
568 /* 
569  * Submit a general SQL command, and for each row returned,
570  *  the result_handler is called with the ctx.
571  */ 
572 bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
573 {
574    SQL_ROW row; 
575    bool retval = true; 
576    BDB_POSTGRESQL *mdb = this;
577
578    Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
579
580    bdb_lock();
581    mdb->errmsg[0] = 0;
582    if (!sql_query(query, QF_STORE_RESULT)) { 
583       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
584       Dmsg0(dbglvl_err, "db_sql_query failed\n");
585       retval = false; 
586       goto get_out;
587    } 
588
589    Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
590
591    if (result_handler) {
592       Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
593       while ((row = sql_fetch_row())) {
594          Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
595          if (result_handler(ctx, mdb->m_num_fields, row))
596             break; 
597       } 
598       sql_free_result(); 
599    } 
600
601    Dmsg0(dbglvl_info, "db_sql_query finished\n");
602
603 get_out:
604    bdb_unlock();
605    return retval; 
606 }
607
608 /*
609  * If this routine returns false (failure), Bacula expects
610  *   that no result has been stored.
611  * This is where QueryDB calls to with Postgresql.
612  *
613  *  Returns: true  on success
614  *           false on failure
615  *
616  */
617 bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
618 {
619    int i; 
620    bool retval = false; 
621    BDB_POSTGRESQL *mdb = this;
622
623    Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
624
625    /* We are starting a new query. reset everything. */
626    mdb->m_num_rows     = -1;
627    mdb->m_row_number   = -1;
628    mdb->m_field_number = -1;
629
630    if (mdb->m_result) {
631       PQclear(mdb->m_result);  /* hmm, someone forgot to free?? */
632       mdb->m_result = NULL;
633    } 
634
635    for (i = 0; i < 10; i++) { 
636       mdb->m_result = PQexec(mdb->m_db_handle, query);
637       if (mdb->m_result) {
638          break;
639       }
640       bmicrosleep(5, 0);
641    }
642    if (!mdb->m_result) {
643       Dmsg1(dbglvl_err, "Query failed: %s\n", query);
644       goto get_out;
645    }
646
647    mdb->m_status = PQresultStatus(mdb->m_result);
648    if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
649       Dmsg0(dbglvl_dbg, "we have a result\n");
650
651       /* How many fields in the set? */
652       mdb->m_num_fields = (int)PQnfields(mdb->m_result);
653       Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
654
655       mdb->m_num_rows = PQntuples(mdb->m_result);
656       Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
657
658       mdb->m_row_number = 0;      /* we can start to fetch something */
659       mdb->m_status = 0;          /* succeed */
660       retval = true; 
661    } else {
662       Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
663       goto get_out;
664    }
665
666    Dmsg0(dbglvl_info, "sql_query finishing\n");
667    goto ok_out; 
668
669 get_out:
670    Dmsg0(dbglvl_err, "we failed\n");
671    PQclear(mdb->m_result);
672    mdb->m_result = NULL;
673    mdb->m_status = 1;                   /* failed */
674  
675 ok_out: 
676    return retval; 
677 }  
678  
679 void BDB_POSTGRESQL::sql_free_result(void)
680 {
681    BDB_POSTGRESQL *mdb = this;
682
683    bdb_lock();
684    if (mdb->m_result) {
685       PQclear(mdb->m_result);
686       mdb->m_result = NULL;
687    }
688    if (mdb->m_rows) {
689       free(mdb->m_rows);
690       mdb->m_rows = NULL;
691    }
692    if (mdb->m_fields) {
693       free(mdb->m_fields);
694       mdb->m_fields = NULL;
695    } 
696    mdb->m_num_rows = mdb->m_num_fields = 0;
697    bdb_unlock();
698
699
700 SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
701
702    SQL_ROW row = NULL;            /* by default, return NULL */
703    BDB_POSTGRESQL *mdb = this;
704  
705    Dmsg0(dbglvl_info, "sql_fetch_row start\n");
706  
707    if (mdb->m_num_fields == 0) {     /* No field, no row */
708       Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
709       return NULL;
710    }
711
712    if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
713       if (mdb->m_rows) {
714          Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
715          free(mdb->m_rows);
716       } 
717       Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
718       mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
719       mdb->m_rows_size = mdb->m_num_fields;
720  
721       /* Now reset the row_number now that we have the space allocated */
722       mdb->m_row_number = 0;
723    }
724
725    /* If still within the result set */
726    if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
727       Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
728
729       /* Get each value from this row */
730       for (int j = 0; j < mdb->m_num_fields; j++) {
731          mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
732          Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
733       } 
734       mdb->m_row_number++;  /* Increment the row number for the next call */
735       row = mdb->m_rows;
736    } else { 
737       Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
738    }
739  
740    Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
741  
742    return row; 
743
744
745 const char *BDB_POSTGRESQL::sql_strerror(void)
746 {   
747    BDB_POSTGRESQL *mdb = this;
748    return PQerrorMessage(mdb->m_db_handle);
749
750
751 void BDB_POSTGRESQL::sql_data_seek(int row)
752
753    BDB_POSTGRESQL *mdb = this;
754    /* Set the row number to be returned on the next call to sql_fetch_row */
755    mdb->m_row_number = row;
756
757
758 int BDB_POSTGRESQL::sql_affected_rows(void)
759
760    BDB_POSTGRESQL *mdb = this;
761    return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
762
763  
764 uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
765
766    uint64_t id = 0; 
767    char sequence[NAMEDATALEN-1]; 
768    char getkeyval_query[NAMEDATALEN+50]; 
769    PGresult *p_result;
770    BDB_POSTGRESQL *mdb = this;
771
772    /* First execute the insert query and then retrieve the currval. */
773    if (!sql_query(query)) { 
774       return 0; 
775    } 
776
777    mdb->m_num_rows = sql_affected_rows();
778    if (mdb->m_num_rows != 1) {
779       return 0; 
780    } 
781    mdb->changes++;
782    /* 
783     *  Obtain the current value of the sequence that
784     *  provides the serial value for primary key of the table.
785     * 
786     *  currval is local to our session.  It is not affected by
787     *  other transactions.
788     * 
789     *  Determine the name of the sequence.
790     *  PostgreSQL automatically creates a sequence using
791     *   <table>_<column>_seq.
792     *  At the time of writing, all tables used this format for
793     *   for their primary key: <table>id
794     *  Except for basefiles which has a primary key on baseid.
795     *  Therefore, we need to special case that one table.
796     * 
797     *  everything else can use the PostgreSQL formula.
798     */ 
799    if (strcasecmp(table_name, "basefiles") == 0) {
800       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
801    } else {
802       bstrncpy(sequence, table_name, sizeof(sequence));
803       bstrncat(sequence, "_",        sizeof(sequence));
804       bstrncat(sequence, table_name, sizeof(sequence));
805       bstrncat(sequence, "id",       sizeof(sequence));
806    }
807
808    bstrncat(sequence, "_seq", sizeof(sequence));
809    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence); 
810
811    Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
812    for (int i = 0; i < 10; i++) { 
813       p_result = PQexec(mdb->m_db_handle, getkeyval_query);
814       if (p_result) {
815          break;
816       }
817       bmicrosleep(5, 0);
818    }
819    if (!p_result) {
820       Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
821       goto get_out;
822    }
823
824    Dmsg0(dbglvl_dbg, "exec done");
825
826    if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
827       Dmsg0(dbglvl_dbg, "getting value");
828       id = str_to_uint64(PQgetvalue(p_result, 0, 0));
829       Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
830    } else {
831       Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
832       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
833    }
834
835 get_out:
836    PQclear(p_result);
837    return id;
838
839
840 SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
841
842    int max_len; 
843    int this_len; 
844    BDB_POSTGRESQL *mdb = this;
845  
846    Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
847  
848    if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
849       if (mdb->m_fields) {
850          free(mdb->m_fields);
851          mdb->m_fields = NULL;
852       } 
853       Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
854       mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
855       mdb->m_fields_size = mdb->m_num_fields;
856  
857       for (int i = 0; i < mdb->m_num_fields; i++) {
858          Dmsg1(dbglvl_dbg, "filling field %d\n", i);
859          mdb->m_fields[i].name = PQfname(mdb->m_result, i);
860          mdb->m_fields[i].type = PQftype(mdb->m_result, i);
861          mdb->m_fields[i].flags = 0;
862  
863          /* For a given column, find the max length. */
864          max_len = 0; 
865          for (int j = 0; j < mdb->m_num_rows; j++) {
866             if (PQgetisnull(mdb->m_result, j, i)) {
867                this_len = 4;         /* "NULL" */
868             } else { 
869                this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
870             } 
871
872             if (max_len < this_len) { 
873                max_len = this_len; 
874             } 
875          } 
876          mdb->m_fields[i].max_length = max_len;
877   
878          Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
879                mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
880       } 
881    } 
882  
883    /* Increment field number for the next time around */
884    return &mdb->m_fields[mdb->m_field_number++];
885 }  
886  
887 bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
888 {  
889    if (field_type == 1) {
890       return true; 
891    } 
892    return false; 
893
894  
895 bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
896
897     /*
898      * TEMP: the following is taken from select OID, typname from pg_type;
899      */
900     switch (field_type) {
901     case 20:
902     case 21:
903     case 23:
904     case 700:
905     case 701:
906        return true; 
907     default:
908        return false; 
909     }
910
911  
912 /* 
913  * Escape strings so PostgreSQL is happy on COPY
914  * 
915  * len is the length of the old string. Your new
916  *   string must be long enough (max 2*old+1) to hold
917  *   the escaped output.
918  */ 
919 static char *pgsql_copy_escape(char *dest, char *src, size_t len) 
920
921     /* we have to escape \t, \n, \r, \ */
922     char c = '\0' ;
923  
924     while (len > 0 && *src) {
925        switch (*src) {
926        case '\n':
927           c = 'n';
928           break;
929        case '\\':
930           c = '\\';
931           break;
932        case '\t':
933           c = 't';
934           break;
935        case '\r':
936           c = 'r';
937           break;
938        default:
939           c = '\0' ;
940        }
941
942        if (c) {
943           *dest = '\\';
944           dest++;
945           *dest = c;
946        } else {
947           *dest = *src;
948        }
949
950        len--;
951        src++;
952        dest++;
953     }
954
955     *dest = '\0';
956     return dest;
957
958  
959 bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
960 {
961    BDB_POSTGRESQL *mdb = this;
962    const char *query = "COPY batch FROM STDIN";
963  
964    Dmsg0(dbglvl_info, "sql_batch_start started\n");
965  
966    if  (!sql_query("CREATE TEMPORARY TABLE batch ("
967                           "FileIndex int,"
968                           "JobId int,"
969                           "Path varchar,"
970                           "Name varchar,"
971                           "LStat varchar,"
972                           "Md5 varchar,"
973                           "DeltaSeq smallint)")) {
974       Dmsg0(dbglvl_err, "sql_batch_start failed\n");
975       return false; 
976    }
977
978    /* We are starting a new query.  reset everything. */
979    mdb->m_num_rows     = -1;
980    mdb->m_row_number   = -1;
981    mdb->m_field_number = -1;
982
983    sql_free_result(); 
984
985    for (int i=0; i < 10; i++) {
986       mdb->m_result = PQexec(mdb->m_db_handle, query);
987       if (mdb->m_result) {
988          break;
989       }
990       bmicrosleep(5, 0);
991    }
992    if (!mdb->m_result) {
993       Dmsg1(dbglvl_err, "Query failed: %s\n", query);
994       goto get_out;
995    }
996
997    mdb->m_status = PQresultStatus(mdb->m_result);
998    if (mdb->m_status == PGRES_COPY_IN) {
999       /* How many fields in the set? */
1000       mdb->m_num_fields = (int) PQnfields(mdb->m_result);
1001       mdb->m_num_rows = 0;
1002       mdb->m_status = 1;
1003    } else {
1004       Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
1005       goto get_out;
1006    }
1007
1008    Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
1009
1010    return true; 
1011
1012 get_out:
1013    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1014    mdb->m_status = 0;
1015    PQclear(mdb->m_result);
1016    mdb->m_result = NULL;
1017    return false; 
1018 }
1019
1020 /* 
1021  * Set error to something to abort the operation
1022  */ 
1023 bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1024 {
1025    int res;
1026    int count=30;
1027    PGresult *p_result;
1028    BDB_POSTGRESQL *mdb = this;
1029
1030    Dmsg0(dbglvl_info, "sql_batch_end started\n");
1031
1032    do {
1033       res = PQputCopyEnd(mdb->m_db_handle, error);
1034    } while (res == 0 && --count > 0);
1035
1036    if (res == 1) {
1037       Dmsg0(dbglvl_dbg, "ok\n");
1038       mdb->m_status = 0;
1039    }
1040
1041    if (res <= 0) {
1042       mdb->m_status = 1;
1043       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1044       Dmsg1(dbglvl_err, "failure %s\n", errmsg);
1045    }
1046
1047    /* Check command status and return to normal libpq state */
1048    p_result = PQgetResult(mdb->m_db_handle);
1049    if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
1050       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1051       mdb->m_status = 1;
1052    }
1053
1054    /* Get some statistics to compute the best plan */
1055    sql_query("ANALYZE batch");
1056
1057    PQclear(p_result);
1058
1059    Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
1060    return true; 
1061 }
1062
1063 bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1064 {
1065    int res;
1066    int count=30;
1067    size_t len;
1068    const char *digest;
1069    char ed1[50];
1070    BDB_POSTGRESQL *mdb = this;
1071
1072    mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
1073    pgsql_copy_escape(mdb->esc_name, fname, fnl);
1074
1075    mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
1076    pgsql_copy_escape(mdb->esc_path, path, pnl);
1077
1078    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1079       digest = "0";
1080    } else {
1081       digest = ar->Digest;
1082    }
1083
1084    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1085               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
1086               mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
1087
1088    do {
1089       res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
1090    } while (res == 0 && --count > 0);
1091
1092    if (res == 1) {
1093       Dmsg0(dbglvl_dbg, "ok\n");
1094       mdb->changes++;
1095       mdb->m_status = 1;
1096    }
1097
1098    if (res <= 0) {
1099       mdb->m_status = 0;
1100       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1101       Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
1102    }
1103
1104    Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
1105
1106    return true; 
1107 }
1108
1109
1110 #endif /* HAVE_POSTGRESQL */