]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Patch to add MySQL ssl access
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5    Copyright (C) 2003-2014 Free Software Foundation Europe e.V.
6
7    The original author of Bacula is Kern Sibbald, with contributions
8    from many others, a complete list can be found in the file AUTHORS.
9
10    You may use this file and others of this release according to the
11    license defined in the LICENSE file, which includes the Affero General
12    Public License, v3.0 ("AGPLv3") and some additional permissions and
13    terms pursuant to its AGPLv3 Section 7.
14
15    This notice must be preserved when any source code is 
16    conveyed and/or propagated.
17
18    Bacula(R) is a registered trademark of Kern Sibbald.
19 */
20 /*
21  * Bacula Catalog Database routines specific to PostgreSQL
22  *   These are PostgreSQL specific routines
23  *
24  *    Dan Langille, December 2003
25  *    based upon work done by Kern Sibbald, March 2000
26  *
27  * Note: at one point, this file was changed to class based by a certain 
28  *  programmer, and other than "wrapping" in a class, which is a trivial
29  *  change for a C++ programmer, nothing substantial was done, yet all the
30  *  code was recommitted under this programmer's name.  Consequently, we
31  *  undo those changes here.  Unfortunately, it is too difficult to put
32  *  back the original author's name (Dan Langille) on the parts he wrote.
33  */
34
35 #include "bacula.h"
36
37 #ifdef HAVE_POSTGRESQL
38
39 #include  "cats.h"
40 #include  "libpq-fe.h"
41 #include  "postgres_ext.h"       /* needed for NAMEDATALEN */
42 #include  "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
43 #define __BDB_POSTGRESQL_H_ 1
44 #include "bdb_postgresql.h"
45
46 #define dbglvl_dbg   DT_SQL|100
47 #define dbglvl_info  DT_SQL|50
48 #define dbglvl_err   DT_SQL|10
49
50 /* -----------------------------------------------------------------------
51  *
52  *   PostgreSQL dependent defines and subroutines
53  *
54  * -----------------------------------------------------------------------
55  */
56
57 /* List of open databases */
58 static dlist *db_list = NULL; 
59
60 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
61
62 BDB_POSTGRESQL::BDB_POSTGRESQL()
63 {
64    BDB_POSTGRESQL *mdb = this;
65
66    if (db_list == NULL) {
67       db_list = New(dlist(mdb, &mdb->m_link));
68    }
69    mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
70    mdb->m_db_type = SQL_TYPE_POSTGRESQL;
71    mdb->m_db_driver = bstrdup("PostgreSQL");
72
73    mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
74    mdb->errmsg[0] = 0;
75    mdb->cmd = get_pool_memory(PM_EMSG);    /* get command buffer */
76    mdb->cached_path = get_pool_memory(PM_FNAME);
77    mdb->cached_path_id = 0;
78    mdb->m_ref_count = 1;
79    mdb->fname = get_pool_memory(PM_FNAME);
80    mdb->path = get_pool_memory(PM_FNAME);
81    mdb->esc_name = get_pool_memory(PM_FNAME);
82    mdb->esc_path = get_pool_memory(PM_FNAME);
83    mdb->esc_obj = get_pool_memory(PM_FNAME);
84    mdb->m_use_fatal_jmsg = true;
85
86    /* Initialize the private members. */
87    mdb->m_db_handle = NULL;
88    mdb->m_result = NULL;
89    mdb->m_buf =  get_pool_memory(PM_FNAME);
90
91    db_list->append(this);
92 }
93
94 BDB_POSTGRESQL::~BDB_POSTGRESQL()
95 {
96 }
97
98 /*
99  * Initialize database data structure. In principal this should
100  * never have errors, or it is really fatal.
101  */
102 BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user, 
103                        const char *db_password, const char *db_address, int db_port, const char *db_socket, 
104                        const char *db_ssl_key, const char *db_ssl_cert, const char *db_ssl_ca,
105                        const char *db_ssl_capath, const char *db_ssl_cipher,
106                        bool mult_db_connections, bool disable_batch_insert) 
107 {
108    BDB_POSTGRESQL *mdb = NULL;
109
110    if (!db_user) {
111       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
112       return NULL;
113    }
114    P(mutex);                          /* lock DB queue */
115    if (db_list && !mult_db_connections) {
116       /*
117        * Look to see if DB already open
118        */
119       foreach_dlist(mdb, db_list) {
120          if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
121             Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
122             mdb->increment_refcount();
123             goto get_out;
124          }
125       }
126    }
127    Dmsg0(dbglvl_info, "db_init_database first time\n");
128    /* Create the global Bacula db context */
129    mdb = New(BDB_POSTGRESQL());
130    if (!mdb) goto get_out;
131
132    /*
133     * Initialize the parent class members.
134     */
135    mdb->m_db_name = bstrdup(db_name);
136    mdb->m_db_user = bstrdup(db_user);
137    if (db_password) {
138       mdb->m_db_password = bstrdup(db_password);
139    }
140    if (db_address) {
141       mdb->m_db_address = bstrdup(db_address);
142    }
143    if (db_socket) {
144       mdb->m_db_socket = bstrdup(db_socket);
145    } 
146    mdb->m_db_port = db_port;
147
148    if (disable_batch_insert) { 
149       mdb->m_disabled_batch_insert = true;
150       mdb->m_have_batch_insert = false;
151    } else { 
152       mdb->m_disabled_batch_insert = false;
153 #ifdef USE_BATCH_FILE_INSERT
154 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE) 
155 #ifdef HAVE_PQISTHREADSAFE 
156       mdb->m_have_batch_insert = PQisthreadsafe();
157 #else 
158       mdb->m_have_batch_insert = true;
159 #endif /* HAVE_PQISTHREADSAFE */ 
160 #else 
161       mdb->m_have_batch_insert = true;
162 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */ 
163 #else 
164       mdb->m_have_batch_insert = false;
165 #endif /* USE_BATCH_FILE_INSERT */ 
166    } 
167    mdb->m_allow_transactions = mult_db_connections;
168
169    /* At this time, when mult_db_connections == true, this is for
170     * specific console command such as bvfs or batch mode, and we don't
171     * want to share a batch mode or bvfs. In the future, we can change
172     * the creation function to add this parameter.
173     */
174    mdb->m_dedicated = mult_db_connections;
175
176 get_out:
177    V(mutex);
178    return mdb;
179
180   
181  
182 /* Check that the database corresponds to the encoding we want  */
183 static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
184 {
185    SQL_ROW row;
186    int ret = false; 
187
188    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) { 
189       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
190       return false; 
191    }
192
193    if ((row = mdb->sql_fetch_row()) == NULL) { 
194       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror()); 
195       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
196    } else { 
197       ret = bstrcmp(row[0], "SQL_ASCII");
198
199       if (ret) {
200          /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
201          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'"); 
202
203       } else { 
204          /* Something is wrong with database encoding */
205          Mmsg(mdb->errmsg,
206               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
207               mdb->get_db_name(), row[0]); 
208          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
209          Dmsg1(dbglvl_err, "%s", mdb->errmsg);
210       }
211    }
212    return ret;
213 }
214
215 /*
216  * Now actually open the database.  This can generate errors,
217  *   which are returned in the errmsg
218  *
219  *  DO NOT close the database or delete mdb here !!!!
220  */
221 bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
222 {
223    bool retval = false; 
224    int errstat;
225    char buf[10], *port;
226    BDB_POSTGRESQL *mdb = this;
227
228    P(mutex);
229    if (mdb->m_connected) {
230       retval = true; 
231       goto get_out;
232    }
233
234    if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
235       berrno be;
236       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
237             be.bstrerror(errstat));
238       goto get_out;
239    }
240
241    if (mdb->m_db_port) {
242       bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
243       port = buf;
244    } else {
245       port = NULL;
246    }
247
248    /* If connection fails, try at 5 sec intervals for 30 seconds. */
249    for (int retry=0; retry < 6; retry++) {
250       /* connect to the database */
251       mdb->m_db_handle = PQsetdbLogin(
252            mdb->m_db_address,         /* default = localhost */
253            port,                      /* default port */
254            NULL,                      /* pg options */
255            NULL,                      /* tty, ignored */
256            mdb->m_db_name,            /* database name */
257            mdb->m_db_user,            /* login name */
258            mdb->m_db_password);       /* password */
259
260       /* If no connect, try once more in case it is a timing problem */
261       if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
262          break;
263       }
264       bmicrosleep(5, 0);
265    }
266
267    Dmsg0(dbglvl_info, "pg_real_connect done\n");
268    Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
269         mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
270
271    if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
272       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
273          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
274          mdb->m_db_name, mdb->m_db_user);
275       goto get_out;
276    }
277
278    mdb->m_connected = true;
279    if (!bdb_check_version(jcr)) {
280       goto get_out;
281    }
282
283    sql_query("SET datestyle TO 'ISO, YMD'"); 
284    sql_query("SET cursor_tuple_fraction=1");
285
286    /* 
287     * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
288     *   WARNING:  nonstandard use of \\ in a string literal
289     */ 
290    sql_query("SET standard_conforming_strings=on"); 
291  
292    /* Check that encoding is SQL_ASCII */
293    pgsql_check_database_encoding(jcr, mdb);
294  
295    retval = true; 
296
297 get_out:
298    V(mutex);
299    return retval; 
300
301
302 void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
303 {
304    BDB_POSTGRESQL *mdb = this;
305
306    if (mdb->m_connected) {
307       bdb_end_transaction(jcr);
308    } 
309    P(mutex);
310    mdb->m_ref_count--;
311    if (mdb->m_ref_count == 0) {
312       if (mdb->m_connected) {
313          sql_free_result(); 
314       } 
315       db_list->remove(mdb);
316       if (mdb->m_connected && mdb->m_db_handle) {
317          PQfinish(mdb->m_db_handle);
318       }
319       if (is_rwl_valid(&mdb->m_lock)) {
320          rwl_destroy(&mdb->m_lock);
321       } 
322       free_pool_memory(mdb->errmsg);
323       free_pool_memory(mdb->cmd);
324       free_pool_memory(mdb->cached_path);
325       free_pool_memory(mdb->fname);
326       free_pool_memory(mdb->path);
327       free_pool_memory(mdb->esc_name);
328       free_pool_memory(mdb->esc_path);
329       free_pool_memory(mdb->esc_obj);
330       free_pool_memory(mdb->m_buf);
331       if (mdb->m_db_driver) {
332          free(mdb->m_db_driver);
333       } 
334       if (mdb->m_db_name) {
335          free(mdb->m_db_name);
336       } 
337       if (mdb->m_db_user) {
338          free(mdb->m_db_user);
339       } 
340       if (mdb->m_db_password) {
341          free(mdb->m_db_password);
342       } 
343       if (mdb->m_db_address) {
344          free(mdb->m_db_address);
345       } 
346       if (mdb->m_db_socket) {
347          free(mdb->m_db_socket);
348       } 
349       delete mdb;
350       if (db_list->size() == 0) {
351          delete db_list;
352          db_list = NULL;
353       }
354    }
355    V(mutex);
356 }
357
358 void BDB_POSTGRESQL::bdb_thread_cleanup(void)
359 {
360 }
361
362 /*
363  * Escape strings so PostgreSQL is happy
364  *
365  *  len is the length of the old string. Your new
366  *    string must be long enough (max 2*old+1) to hold
367  *    the escaped output.
368  */
369 void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
370 {
371    BDB_POSTGRESQL *mdb = this;
372    int failed; 
373
374    PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
375    if (failed) {
376       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n")); 
377       /* failed on encoding, probably invalid multibyte encoding in the source string
378          see PQescapeStringConn documentation for details. */
379       Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
380    } 
381 }
382
383 /*
384  * Escape binary so that PostgreSQL is happy
385  *
386  */
387 char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
388 {
389    size_t new_len;
390    unsigned char *obj; 
391    BDB_POSTGRESQL *mdb = this;
392
393    mdb->esc_obj[0] = 0;
394    obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
395    if (!obj) { 
396       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
397    } else {
398       mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
399       memcpy(mdb->esc_obj, obj, new_len);
400       mdb->esc_obj[new_len] = 0;
401       PQfreemem(obj);
402    }
403    return (char *)mdb->esc_obj;
404 }
405
406 /*
407  * Unescape binary object so that PostgreSQL is happy
408  *
409  */
410 void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
411                        POOLMEM **dest, int32_t *dest_len)
412 {
413    size_t new_len;
414    unsigned char *obj;
415
416    if (!from) {
417       *dest[0] = 0;
418       *dest_len = 0;
419       return;
420    }
421
422    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
423
424    if (!obj) {
425       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
426    }
427
428    *dest_len = new_len;
429    *dest = check_pool_memory_size(*dest, new_len+1);
430    memcpy(*dest, obj, new_len);
431    (*dest)[new_len]=0;
432
433    PQfreemem(obj);
434
435    Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
436 }
437
438 /*
439  * Start a transaction. This groups inserts and makes things more efficient.
440  *  Usually started when inserting file attributes.
441  */
442 void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
443 {
444    BDB_POSTGRESQL *mdb = this;
445
446    if (!jcr->attr) { 
447       jcr->attr = get_pool_memory(PM_FNAME); 
448    }
449    if (!jcr->ar) { 
450       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR)); 
451    }
452
453    /* 
454     * This is turned off because transactions break if
455     *  multiple simultaneous jobs are run.
456     */ 
457    if (!mdb->m_allow_transactions) {
458       return; 
459    }
460
461    bdb_lock();
462    /* Allow only 25,000 changes per transaction */
463    if (mdb->m_transaction && changes > 25000) {
464       bdb_end_transaction(jcr);
465    } 
466    if (!mdb->m_transaction) {
467       sql_query("BEGIN");             /* begin transaction */
468       Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
469       mdb->m_transaction = true;
470    } 
471    bdb_unlock();
472 }
473
474 void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
475 {
476    BDB_POSTGRESQL *mdb = this;
477
478    if (jcr && jcr->cached_attribute) { 
479       Dmsg0(dbglvl_info, "Flush last cached attribute.\n");
480       if (!bdb_create_attributes_record(jcr, jcr->ar)) {
481          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), jcr->db->bdb_strerror());
482       }
483       jcr->cached_attribute = false; 
484    }
485
486    if (!mdb->m_allow_transactions) {
487       return; 
488    }
489
490    bdb_lock();
491    if (mdb->m_transaction) {
492       sql_query("COMMIT");            /* end transaction */
493       mdb->m_transaction = false;
494       Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
495    }
496    changes = 0; 
497    bdb_unlock();
498 }
499
500
501 /*
502  * Submit a general SQL command, and for each row returned,
503  *  the result_handler is called with the ctx.
504  */
505 bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
506                                        DB_RESULT_HANDLER *result_handler,
507                                        void *ctx)
508 {
509    BDB_POSTGRESQL *mdb = this;
510    SQL_ROW row; 
511    bool retval = false; 
512    bool in_transaction = mdb->m_transaction;
513
514    Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
515
516    mdb->errmsg[0] = 0;
517    /* This code handles only SELECT queries */
518    if (strncasecmp(query, "SELECT", 6) != 0) {
519       return bdb_sql_query(query, result_handler, ctx);
520    }
521
522    if (!result_handler) {       /* no need of big_query without handler */
523       return false; 
524    }
525
526    bdb_lock();
527
528    if (!in_transaction) {       /* CURSOR needs transaction */
529       sql_query("BEGIN");
530    }
531
532    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
533
534    if (!sql_query(mdb->m_buf)) {
535       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
536       Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
537       goto get_out;
538    }
539
540    do {
541       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
542          Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
543          Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
544          goto get_out;
545       }
546       while ((row = sql_fetch_row()) != NULL) {
547          Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
548          if (result_handler(ctx, mdb->m_num_fields, row))
549             break;
550       }
551       PQclear(mdb->m_result);
552       m_result = NULL;
553
554    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
555
556    sql_query("CLOSE _bac_cursor");
557
558    Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
559    sql_free_result();
560    retval = true;
561
562 get_out:
563    if (!in_transaction) {
564       sql_query("COMMIT");  /* end transaction */
565    }
566
567    bdb_unlock();
568    return retval;
569 }
570
571 /* 
572  * Submit a general SQL command, and for each row returned,
573  *  the result_handler is called with the ctx.
574  */ 
575 bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
576 {
577    SQL_ROW row; 
578    bool retval = true; 
579    BDB_POSTGRESQL *mdb = this;
580
581    Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
582
583    bdb_lock();
584    mdb->errmsg[0] = 0;
585    if (!sql_query(query, QF_STORE_RESULT)) { 
586       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
587       Dmsg0(dbglvl_err, "db_sql_query failed\n");
588       retval = false; 
589       goto get_out;
590    } 
591
592    Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
593
594    if (result_handler) {
595       Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
596       while ((row = sql_fetch_row())) {
597          Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
598          if (result_handler(ctx, mdb->m_num_fields, row))
599             break; 
600       } 
601       sql_free_result(); 
602    } 
603
604    Dmsg0(dbglvl_info, "db_sql_query finished\n");
605
606 get_out:
607    bdb_unlock();
608    return retval; 
609 }
610
611 /*
612  * If this routine returns false (failure), Bacula expects
613  *   that no result has been stored.
614  * This is where QueryDB calls to with Postgresql.
615  *
616  *  Returns: true  on success
617  *           false on failure
618  *
619  */
620 bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
621 {
622    int i; 
623    bool retval = false; 
624    BDB_POSTGRESQL *mdb = this;
625
626    Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
627
628    /* We are starting a new query. reset everything. */
629    mdb->m_num_rows     = -1;
630    mdb->m_row_number   = -1;
631    mdb->m_field_number = -1;
632
633    if (mdb->m_result) {
634       PQclear(mdb->m_result);  /* hmm, someone forgot to free?? */
635       mdb->m_result = NULL;
636    } 
637
638    for (i = 0; i < 10; i++) { 
639       mdb->m_result = PQexec(mdb->m_db_handle, query);
640       if (mdb->m_result) {
641          break;
642       }
643       bmicrosleep(5, 0);
644    }
645    if (!mdb->m_result) {
646       Dmsg1(dbglvl_err, "Query failed: %s\n", query);
647       goto get_out;
648    }
649
650    mdb->m_status = PQresultStatus(mdb->m_result);
651    if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
652       Dmsg0(dbglvl_dbg, "we have a result\n");
653
654       /* How many fields in the set? */
655       mdb->m_num_fields = (int)PQnfields(mdb->m_result);
656       Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
657
658       mdb->m_num_rows = PQntuples(mdb->m_result);
659       Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
660
661       mdb->m_row_number = 0;      /* we can start to fetch something */
662       mdb->m_status = 0;          /* succeed */
663       retval = true; 
664    } else {
665       Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
666       goto get_out;
667    }
668
669    Dmsg0(dbglvl_info, "sql_query finishing\n");
670    goto ok_out; 
671
672 get_out:
673    Dmsg0(dbglvl_err, "we failed\n");
674    PQclear(mdb->m_result);
675    mdb->m_result = NULL;
676    mdb->m_status = 1;                   /* failed */
677  
678 ok_out: 
679    return retval; 
680 }  
681
682 void BDB_POSTGRESQL::sql_free_result(void)
683 {
684    BDB_POSTGRESQL *mdb = this;
685
686    bdb_lock();
687    if (mdb->m_result) {
688       PQclear(mdb->m_result);
689       mdb->m_result = NULL;
690    }
691    if (mdb->m_rows) {
692       free(mdb->m_rows);
693       mdb->m_rows = NULL;
694    }
695    if (mdb->m_fields) {
696       free(mdb->m_fields);
697       mdb->m_fields = NULL;
698    } 
699    mdb->m_num_rows = mdb->m_num_fields = 0;
700    bdb_unlock();
701
702
703 SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
704
705    SQL_ROW row = NULL;            /* by default, return NULL */
706    BDB_POSTGRESQL *mdb = this;
707  
708    Dmsg0(dbglvl_info, "sql_fetch_row start\n");
709  
710    if (mdb->m_num_fields == 0) {     /* No field, no row */
711       Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
712       return NULL;
713    }
714
715    if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
716       if (mdb->m_rows) {
717          Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
718          free(mdb->m_rows);
719       } 
720       Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
721       mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
722       mdb->m_rows_size = mdb->m_num_fields;
723  
724       /* Now reset the row_number now that we have the space allocated */
725       mdb->m_row_number = 0;
726    }
727
728    /* If still within the result set */
729    if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
730       Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
731
732       /* Get each value from this row */
733       for (int j = 0; j < mdb->m_num_fields; j++) {
734          mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
735          Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
736       } 
737       mdb->m_row_number++;  /* Increment the row number for the next call */
738       row = mdb->m_rows;
739    } else { 
740       Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
741    }
742  
743    Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
744  
745    return row; 
746 }
747
748 const char *BDB_POSTGRESQL::sql_strerror(void)
749 {
750    BDB_POSTGRESQL *mdb = this;
751    return PQerrorMessage(mdb->m_db_handle);
752
753
754 void BDB_POSTGRESQL::sql_data_seek(int row)
755
756    BDB_POSTGRESQL *mdb = this;
757    /* Set the row number to be returned on the next call to sql_fetch_row */
758    mdb->m_row_number = row;
759
760
761 int BDB_POSTGRESQL::sql_affected_rows(void)
762
763    BDB_POSTGRESQL *mdb = this;
764    return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
765
766  
767 uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
768
769    uint64_t id = 0; 
770    char sequence[NAMEDATALEN-1]; 
771    char getkeyval_query[NAMEDATALEN+50]; 
772    PGresult *p_result;
773    BDB_POSTGRESQL *mdb = this;
774
775    /* First execute the insert query and then retrieve the currval. */
776    if (!sql_query(query)) { 
777       return 0; 
778    } 
779
780    mdb->m_num_rows = sql_affected_rows();
781    if (mdb->m_num_rows != 1) {
782       return 0; 
783    } 
784    mdb->changes++;
785    /* 
786     *  Obtain the current value of the sequence that
787     *  provides the serial value for primary key of the table.
788     * 
789     *  currval is local to our session.  It is not affected by
790     *  other transactions.
791     * 
792     *  Determine the name of the sequence.
793     *  PostgreSQL automatically creates a sequence using
794     *   <table>_<column>_seq.
795     *  At the time of writing, all tables used this format for
796     *   for their primary key: <table>id
797     *  Except for basefiles which has a primary key on baseid.
798     *  Therefore, we need to special case that one table.
799     * 
800     *  everything else can use the PostgreSQL formula.
801     */ 
802    if (strcasecmp(table_name, "basefiles") == 0) {
803       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
804    } else {
805       bstrncpy(sequence, table_name, sizeof(sequence));
806       bstrncat(sequence, "_",        sizeof(sequence));
807       bstrncat(sequence, table_name, sizeof(sequence));
808       bstrncat(sequence, "id",       sizeof(sequence));
809    }
810
811    bstrncat(sequence, "_seq", sizeof(sequence));
812    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence); 
813
814    Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
815    for (int i = 0; i < 10; i++) { 
816       p_result = PQexec(mdb->m_db_handle, getkeyval_query);
817       if (p_result) {
818          break;
819       }
820       bmicrosleep(5, 0);
821    }
822    if (!p_result) {
823       Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
824       goto get_out;
825    }
826
827    Dmsg0(dbglvl_dbg, "exec done");
828
829    if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
830       Dmsg0(dbglvl_dbg, "getting value");
831       id = str_to_uint64(PQgetvalue(p_result, 0, 0));
832       Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
833    } else {
834       Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
835       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
836    }
837
838 get_out:
839    PQclear(p_result);
840    return id;
841
842
843 SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
844
845    int max_len; 
846    int this_len; 
847    BDB_POSTGRESQL *mdb = this;
848  
849    Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
850  
851    if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
852       if (mdb->m_fields) {
853          free(mdb->m_fields);
854          mdb->m_fields = NULL;
855       } 
856       Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
857       mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
858       mdb->m_fields_size = mdb->m_num_fields;
859  
860       for (int i = 0; i < mdb->m_num_fields; i++) {
861          Dmsg1(dbglvl_dbg, "filling field %d\n", i);
862          mdb->m_fields[i].name = PQfname(mdb->m_result, i);
863          mdb->m_fields[i].type = PQftype(mdb->m_result, i);
864          mdb->m_fields[i].flags = 0;
865  
866          /* For a given column, find the max length. */
867          max_len = 0; 
868          for (int j = 0; j < mdb->m_num_rows; j++) {
869             if (PQgetisnull(mdb->m_result, j, i)) {
870                this_len = 4;         /* "NULL" */
871             } else { 
872                this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
873             } 
874
875             if (max_len < this_len) { 
876                max_len = this_len; 
877             } 
878          } 
879          mdb->m_fields[i].max_length = max_len;
880   
881          Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
882                mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
883       } 
884    } 
885  
886    /* Increment field number for the next time around */
887    return &mdb->m_fields[mdb->m_field_number++];
888 }  
889  
890 bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
891 {  
892    if (field_type == 1) {
893       return true; 
894    } 
895    return false; 
896
897  
898 bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
899
900     /*
901      * TEMP: the following is taken from select OID, typname from pg_type;
902      */
903     switch (field_type) {
904     case 20:
905     case 21:
906     case 23:
907     case 700:
908     case 701:
909        return true; 
910     default:
911        return false; 
912     }
913
914  
915 /* 
916  * Escape strings so PostgreSQL is happy on COPY
917  * 
918  * len is the length of the old string. Your new
919  *   string must be long enough (max 2*old+1) to hold
920  *   the escaped output.
921  */ 
922 static char *pgsql_copy_escape(char *dest, char *src, size_t len) 
923
924     /* we have to escape \t, \n, \r, \ */
925     char c = '\0' ;
926  
927     while (len > 0 && *src) {
928        switch (*src) {
929        case '\n':
930           c = 'n';
931           break;
932        case '\\':
933           c = '\\';
934           break;
935        case '\t':
936           c = 't';
937           break;
938        case '\r':
939           c = 'r';
940           break;
941        default:
942           c = '\0' ;
943        }
944
945        if (c) {
946           *dest = '\\';
947           dest++;
948           *dest = c;
949        } else {
950           *dest = *src;
951        }
952
953        len--;
954        src++;
955        dest++;
956     }
957
958     *dest = '\0';
959     return dest;
960
961  
962 bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
963 {
964    BDB_POSTGRESQL *mdb = this;
965    const char *query = "COPY batch FROM STDIN";
966  
967    Dmsg0(dbglvl_info, "sql_batch_start started\n");
968  
969    if  (!sql_query("CREATE TEMPORARY TABLE batch ("
970                           "FileIndex int,"
971                           "JobId int,"
972                           "Path varchar,"
973                           "Name varchar,"
974                           "LStat varchar,"
975                           "Md5 varchar,"
976                           "DeltaSeq smallint)")) {
977       Dmsg0(dbglvl_err, "sql_batch_start failed\n");
978       return false; 
979    }
980
981    /* We are starting a new query.  reset everything. */
982    mdb->m_num_rows     = -1;
983    mdb->m_row_number   = -1;
984    mdb->m_field_number = -1;
985
986    sql_free_result(); 
987
988    for (int i=0; i < 10; i++) {
989       mdb->m_result = PQexec(mdb->m_db_handle, query);
990       if (mdb->m_result) {
991          break;
992       }
993       bmicrosleep(5, 0);
994    }
995    if (!mdb->m_result) {
996       Dmsg1(dbglvl_err, "Query failed: %s\n", query);
997       goto get_out;
998    }
999
1000    mdb->m_status = PQresultStatus(mdb->m_result);
1001    if (mdb->m_status == PGRES_COPY_IN) {
1002       /* How many fields in the set? */
1003       mdb->m_num_fields = (int) PQnfields(mdb->m_result);
1004       mdb->m_num_rows = 0;
1005       mdb->m_status = 1;
1006    } else {
1007       Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
1008       goto get_out;
1009    }
1010
1011    Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
1012
1013    return true; 
1014
1015 get_out:
1016    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1017    mdb->m_status = 0;
1018    PQclear(mdb->m_result);
1019    mdb->m_result = NULL;
1020    return false; 
1021 }
1022
1023 /* 
1024  * Set error to something to abort the operation
1025  */ 
1026 bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1027 {
1028    int res;
1029    int count=30;
1030    PGresult *p_result;
1031    BDB_POSTGRESQL *mdb = this;
1032
1033    Dmsg0(dbglvl_info, "sql_batch_end started\n");
1034
1035    do {
1036       res = PQputCopyEnd(mdb->m_db_handle, error);
1037    } while (res == 0 && --count > 0);
1038
1039    if (res == 1) {
1040       Dmsg0(dbglvl_dbg, "ok\n");
1041       mdb->m_status = 0;
1042    }
1043
1044    if (res <= 0) {
1045       mdb->m_status = 1;
1046       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1047       Dmsg1(dbglvl_err, "failure %s\n", errmsg);
1048    }
1049
1050    /* Check command status and return to normal libpq state */
1051    p_result = PQgetResult(mdb->m_db_handle);
1052    if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
1053       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1054       mdb->m_status = 1;
1055    }
1056
1057    /* Get some statistics to compute the best plan */
1058    sql_query("ANALYZE batch");
1059
1060    PQclear(p_result);
1061
1062    Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
1063    return true; 
1064 }
1065
1066 bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1067 {
1068    int res;
1069    int count=30;
1070    size_t len;
1071    const char *digest;
1072    char ed1[50];
1073    BDB_POSTGRESQL *mdb = this;
1074
1075    mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
1076    pgsql_copy_escape(mdb->esc_name, fname, fnl);
1077
1078    mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
1079    pgsql_copy_escape(mdb->esc_path, path, pnl);
1080
1081    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1082       digest = "0";
1083    } else {
1084       digest = ar->Digest;
1085    }
1086
1087    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1088               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
1089               mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
1090
1091    do {
1092       res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
1093    } while (res == 0 && --count > 0);
1094
1095    if (res == 1) {
1096       Dmsg0(dbglvl_dbg, "ok\n");
1097       mdb->changes++;
1098       mdb->m_status = 1;
1099    }
1100
1101    if (res <= 0) {
1102       mdb->m_status = 0;
1103       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1104       Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
1105    }
1106
1107    Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
1108
1109    return true; 
1110 }
1111
1112
1113 #endif /* HAVE_POSTGRESQL */