]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
- Apply patch from Christopher Hull
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2  * Bacula Catalog Database routines specific to PostgreSQL
3  *   These are PostgreSQL specific routines
4  *
5  *    Dan Langille, December 2003
6  *    based upon work done by Kern Sibbald, March 2000
7  *
8  *    Version $Id$
9  */
10 /*
11    Copyright (C) 2003-2006 Kern Sibbald
12
13    This program is free software; you can redistribute it and/or
14    modify it under the terms of the GNU General Public License
15    version 2 as amended with additional clauses defined in the
16    file LICENSE in the main source directory.
17
18    This program is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
21    the file LICENSE for additional details.
22
23  */
24
25
26 /* The following is necessary so that we do not include
27  * the dummy external definition of DB.
28  */
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats.h"
33
34 #ifdef HAVE_POSTGRESQL
35
36 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
37
38 /* -----------------------------------------------------------------------
39  *
40  *   PostgreSQL dependent defines and subroutines
41  *
42  * -----------------------------------------------------------------------
43  */
44
45 /* List of open databases */
46 static BQUEUE db_list = {&db_list, &db_list};
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49
50 /*
51  * Initialize database data structure. In principal this should
52  * never have errors, or it is really fatal.
53  */
54 B_DB *
55 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
56                  const char *db_address, int db_port, const char *db_socket,
57                  int mult_db_connections)
58 {
59    B_DB *mdb;
60
61    if (!db_user) {
62       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
63       return NULL;
64    }
65    P(mutex);                          /* lock DB queue */
66    if (!mult_db_connections) {
67       /* Look to see if DB already open */
68       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
69          if (bstrcmp(mdb->db_name, db_name) &&
70              bstrcmp(mdb->db_address, db_address) &&
71              mdb->db_port == db_port) {
72             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
73             mdb->ref_count++;
74             V(mutex);
75             return mdb;                  /* already open */
76          }
77       }
78    }
79    Dmsg0(100, "db_open first time\n");
80    mdb = (B_DB *)malloc(sizeof(B_DB));
81    memset(mdb, 0, sizeof(B_DB));
82    mdb->db_name = bstrdup(db_name);
83    mdb->db_user = bstrdup(db_user);
84    if (db_password) {
85       mdb->db_password = bstrdup(db_password);
86    }
87    if (db_address) {
88       mdb->db_address  = bstrdup(db_address);
89    }
90    if (db_socket) {
91       mdb->db_socket   = bstrdup(db_socket);
92    }
93    mdb->db_port        = db_port;
94    mdb->have_insert_id = TRUE;
95    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
96    *mdb->errmsg        = 0;
97    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
98    mdb->cached_path    = get_pool_memory(PM_FNAME);
99    mdb->cached_path_id = 0;
100    mdb->ref_count      = 1;
101    mdb->fname          = get_pool_memory(PM_FNAME);
102    mdb->path           = get_pool_memory(PM_FNAME);
103    mdb->esc_name       = get_pool_memory(PM_FNAME);
104    mdb->allow_transactions = mult_db_connections;
105    qinsert(&db_list, &mdb->bq);            /* put db in list */
106    V(mutex);
107    return mdb;
108 }
109
110 /*
111  * Now actually open the database.  This can generate errors,
112  *   which are returned in the errmsg
113  *
114  * DO NOT close the database or free(mdb) here !!!!
115  */
116 int
117 db_open_database(JCR *jcr, B_DB *mdb)
118 {
119    int errstat;
120    char buf[10], *port;
121
122    P(mutex);
123    if (mdb->connected) {
124       V(mutex);
125       return 1;
126    }
127    mdb->connected = false;
128
129    if ((errstat=rwl_init(&mdb->lock)) != 0) {
130       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
131             strerror(errstat));
132       V(mutex);
133       return 0;
134    }
135
136    if (mdb->db_port) {
137       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
138       port = buf;
139    } else {
140       port = NULL;
141    }
142
143    /* If connection fails, try at 5 sec intervals for 30 seconds. */
144    for (int retry=0; retry < 6; retry++) {
145       /* connect to the database */
146       mdb->db = PQsetdbLogin(
147            mdb->db_address,           /* default = localhost */
148            port,                      /* default port */
149            NULL,                      /* pg options */
150            NULL,                      /* tty, ignored */
151            mdb->db_name,              /* database name */
152            mdb->db_user,              /* login name */
153            mdb->db_password);         /* password */
154
155       /* If no connect, try once more in case it is a timing problem */
156       if (PQstatus(mdb->db) == CONNECTION_OK) {
157          break;
158       }
159       bmicrosleep(5, 0);
160    }
161
162    Dmsg0(50, "pg_real_connect done\n");
163    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
164             mdb->db_password==NULL?"(NULL)":mdb->db_password);
165
166    if (PQstatus(mdb->db) != CONNECTION_OK) {
167       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
168             "Database=%s User=%s\n"
169             "It is probably not running or your password is incorrect.\n"),
170              mdb->db_name, mdb->db_user);
171       V(mutex);
172       return 0;
173    }
174
175    if (!check_tables_version(jcr, mdb)) {
176       V(mutex);
177       return 0;
178    }
179
180    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
181
182    mdb->connected = true;
183    V(mutex);
184    return 1;
185 }
186
187 void
188 db_close_database(JCR *jcr, B_DB *mdb)
189 {
190    if (!mdb) {
191       return;
192    }
193    db_end_transaction(jcr, mdb);
194    P(mutex);
195    mdb->ref_count--;
196    if (mdb->ref_count == 0) {
197       qdchain(&mdb->bq);
198       if (mdb->connected && mdb->db) {
199          sql_close(mdb);
200       }
201       rwl_destroy(&mdb->lock);
202       free_pool_memory(mdb->errmsg);
203       free_pool_memory(mdb->cmd);
204       free_pool_memory(mdb->cached_path);
205       free_pool_memory(mdb->fname);
206       free_pool_memory(mdb->path);
207       free_pool_memory(mdb->esc_name);
208       if (mdb->db_name) {
209          free(mdb->db_name);
210       }
211       if (mdb->db_user) {
212          free(mdb->db_user);
213       }
214       if (mdb->db_password) {
215          free(mdb->db_password);
216       }
217       if (mdb->db_address) {
218          free(mdb->db_address);
219       }
220       if (mdb->db_socket) {
221          free(mdb->db_socket);
222       }
223       my_postgresql_free_result(mdb);
224       free(mdb);
225    }
226    V(mutex);
227 }
228
229 /*
230  * Return the next unique index (auto-increment) for
231  * the given table.  Return NULL on error.
232  *
233  * For PostgreSQL, NULL causes the auto-increment value
234  *  to be updated.
235  */
236 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
237 {
238    strcpy(index, "NULL");
239    return 1;
240 }
241
242
243 /*
244  * Escape strings so that PostgreSQL is happy
245  *
246  *   NOTE! len is the length of the old string. Your new
247  *         string must be long enough (max 2*old+1) to hold
248  *         the escaped output.
249  */
250 void
251 db_escape_string(char *snew, char *old, int len)
252 {
253    PQescapeString(snew, old, len);
254 }
255
256 /*
257  * Submit a general SQL command (cmd), and for each row returned,
258  *  the sqlite_handler is called with the ctx.
259  */
260 int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
261 {
262    SQL_ROW row;
263
264    Dmsg0(500, "db_sql_query started\n");
265
266    db_lock(mdb);
267    if (sql_query(mdb, query) != 0) {
268       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
269       db_unlock(mdb);
270       Dmsg0(500, "db_sql_query failed\n");
271       return 0;
272    }
273    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
274
275    if (result_handler != NULL) {
276       Dmsg0(500, "db_sql_query invoking handler\n");
277       if ((mdb->result = sql_store_result(mdb)) != NULL) {
278          int num_fields = sql_num_fields(mdb);
279
280          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
281          while ((row = sql_fetch_row(mdb)) != NULL) {
282
283             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
284             if (result_handler(ctx, num_fields, row))
285                break;
286          }
287
288         sql_free_result(mdb);
289       }
290    }
291    db_unlock(mdb);
292
293    Dmsg0(500, "db_sql_query finished\n");
294
295    return 1;
296 }
297
298
299
300 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
301 {
302    int j;
303    POSTGRESQL_ROW row = NULL; // by default, return NULL
304
305    Dmsg0(500, "my_postgresql_fetch_row start\n");
306
307    if (mdb->row_number == -1 || mdb->row == NULL) {
308       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
309
310       if (mdb->row != NULL) {
311          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
312          free(mdb->row);
313          mdb->row = NULL;
314       }
315
316       mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields);
317
318       // now reset the row_number now that we have the space allocated
319       mdb->row_number = 0;
320    }
321
322    // if still within the result set
323    if (mdb->row_number < mdb->num_rows) {
324       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
325       // get each value from this row
326       for (j = 0; j < mdb->num_fields; j++) {
327          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
328          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
329       }
330       // increment the row number for the next call
331       mdb->row_number++;
332
333       row = mdb->row;
334    } else {
335       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
336    }
337
338    Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
339
340    return row;
341 }
342
343 int my_postgresql_max_length(B_DB *mdb, int field_num) {
344    //
345    // for a given column, find the max length
346    //
347    int max_length;
348    int i;
349    int this_length;
350
351    max_length = 0;
352    for (i = 0; i < mdb->num_rows; i++) {
353       if (PQgetisnull(mdb->result, i, field_num)) {
354           this_length = 4;        // "NULL"
355       } else {
356           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
357       }
358
359       if (max_length < this_length) {
360           max_length = this_length;
361       }
362    }
363
364    return max_length;
365 }
366
367 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
368 {
369    int     i;
370
371    Dmsg0(500, "my_postgresql_fetch_field starts\n");
372    if (mdb->fields == NULL) {
373       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
374       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
375
376       for (i = 0; i < mdb->num_fields; i++) {
377          Dmsg1(500, "filling field %d\n", i);
378          mdb->fields[i].name           = PQfname(mdb->result, i);
379          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
380          mdb->fields[i].type       = PQftype(mdb->result, i);
381          mdb->fields[i].flags      = 0;
382
383          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
384             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
385             mdb->fields[i].flags);
386       } // end for
387    } // end if
388
389    // increment field number for the next time around
390
391    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
392    return &mdb->fields[mdb->field_number++];
393 }
394
395 void my_postgresql_data_seek(B_DB *mdb, int row)
396 {
397    // set the row number to be returned on the next call
398    // to my_postgresql_fetch_row
399    mdb->row_number = row;
400 }
401
402 void my_postgresql_field_seek(B_DB *mdb, int field)
403 {
404    mdb->field_number = field;
405 }
406
407 /*
408  * Note, if this routine returns 1 (failure), Bacula expects
409  *  that no result has been stored.
410  */
411 int my_postgresql_query(B_DB *mdb, const char *query) {
412    Dmsg0(500, "my_postgresql_query started\n");
413    // We are starting a new query.  reset everything.
414    mdb->num_rows     = -1;
415    mdb->row_number   = -1;
416    mdb->field_number = -1;
417
418    if (mdb->result != NULL) {
419       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
420    }
421
422    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
423    mdb->result = PQexec(mdb->db, query);
424    mdb->status = PQresultStatus(mdb->result);
425    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
426       Dmsg1(500, "we have a result\n", query);
427
428       // how many fields in the set?
429       mdb->num_fields = (int) PQnfields(mdb->result);
430       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
431
432       mdb->num_rows   = PQntuples(mdb->result);
433       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
434
435       mdb->status = 0;
436    } else {
437       Dmsg1(500, "we failed\n", query);
438       mdb->status = 1;
439    }
440
441    Dmsg0(500, "my_postgresql_query finishing\n");
442
443    return mdb->status;
444 }
445
446 void my_postgresql_free_result (B_DB *mdb)
447 {
448    if (mdb->result) {
449       PQclear(mdb->result);
450       mdb->result = NULL;
451    }
452
453    if (mdb->row) {
454       free(mdb->row);
455       mdb->row = NULL;
456    }
457
458    if (mdb->fields) {
459       free(mdb->fields);
460       mdb->fields = NULL;
461    }
462 }
463
464 int my_postgresql_currval(B_DB *mdb, char *table_name)
465 {
466    // Obtain the current value of the sequence that
467    // provides the serial value for primary key of the table.
468
469    // currval is local to our session.  It is not affected by
470    // other transactions.
471
472    // Determine the name of the sequence.
473    // PostgreSQL automatically creates a sequence using
474    // <table>_<column>_seq.
475    // At the time of writing, all tables used this format for
476    // for their primary key: <table>id
477    // Except for basefiles which has a primary key on baseid.
478    // Therefore, we need to special case that one table.
479
480    // everything else can use the PostgreSQL formula.
481
482    char      sequence[NAMEDATALEN-1];
483    char      query   [NAMEDATALEN+50];
484    PGresult *result;
485    int       id = 0;
486
487    if (strcasecmp(table_name, "basefiles") == 0) {
488       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
489    } else {
490       bstrncpy(sequence, table_name, sizeof(sequence));
491       bstrncat(sequence, "_",        sizeof(sequence));
492       bstrncat(sequence, table_name, sizeof(sequence));
493       bstrncat(sequence, "id",       sizeof(sequence));
494    }
495
496    bstrncat(sequence, "_seq", sizeof(sequence));
497    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
498
499 // Mmsg(query, "SELECT currval('%s')", sequence);
500    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
501    result = PQexec(mdb->db, query);
502
503    Dmsg0(500, "exec done");
504
505    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
506       Dmsg0(500, "getting value");
507       id = atoi(PQgetvalue(result, 0, 0));
508       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
509    } else {
510       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
511    }
512
513    PQclear(result);
514
515    return id;
516 }
517
518
519 #endif /* HAVE_POSTGRESQL */