]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
This commit was manufactured by cvs2svn to create tag
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2  * Bacula Catalog Database routines specific to PostgreSQL
3  *   These are PostgreSQL specific routines
4  *
5  *    Dan Langille, December 2003
6  *    based upon work done by Kern Sibbald, March 2000
7  *
8  *    Version $Id$
9  */
10 /*
11    Copyright (C) 2003-2006 Kern Sibbald
12
13    This program is free software; you can redistribute it and/or
14    modify it under the terms of the GNU General Public License
15    version 2 as amended with additional clauses defined in the
16    file LICENSE in the main source directory.
17
18    This program is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
21    the file LICENSE for additional details.
22
23  */
24
25
26 /* The following is necessary so that we do not include
27  * the dummy external definition of DB.
28  */
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats.h"
33
34 #ifdef HAVE_POSTGRESQL
35
36 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
37
38 /* -----------------------------------------------------------------------
39  *
40  *   PostgreSQL dependent defines and subroutines
41  *
42  * -----------------------------------------------------------------------
43  */
44
45 /* List of open databases */
46 static BQUEUE db_list = {&db_list, &db_list};
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49
50 /*
51  * Initialize database data structure. In principal this should
52  * never have errors, or it is really fatal.
53  */
54 B_DB *
55 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
56                  const char *db_address, int db_port, const char *db_socket,
57                  int mult_db_connections)
58 {
59    B_DB *mdb;
60
61    if (!db_user) {
62       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
63       return NULL;
64    }
65    P(mutex);                          /* lock DB queue */
66    if (!mult_db_connections) {
67       /* Look to see if DB already open */
68       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
69          if (strcmp(mdb->db_name, db_name) == 0) {
70             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
71             mdb->ref_count++;
72             V(mutex);
73             return mdb;                  /* already open */
74          }
75       }
76    }
77    Dmsg0(100, "db_open first time\n");
78    mdb = (B_DB *)malloc(sizeof(B_DB));
79    memset(mdb, 0, sizeof(B_DB));
80    mdb->db_name = bstrdup(db_name);
81    mdb->db_user = bstrdup(db_user);
82    if (db_password) {
83       mdb->db_password = bstrdup(db_password);
84    }
85    if (db_address) {
86       mdb->db_address  = bstrdup(db_address);
87    }
88    if (db_socket) {
89       mdb->db_socket   = bstrdup(db_socket);
90    }
91    mdb->db_port        = db_port;
92    mdb->have_insert_id = TRUE;
93    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
94    *mdb->errmsg        = 0;
95    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
96    mdb->cached_path    = get_pool_memory(PM_FNAME);
97    mdb->cached_path_id = 0;
98    mdb->ref_count      = 1;
99    mdb->fname          = get_pool_memory(PM_FNAME);
100    mdb->path           = get_pool_memory(PM_FNAME);
101    mdb->esc_name       = get_pool_memory(PM_FNAME);
102    mdb->allow_transactions = mult_db_connections;
103    qinsert(&db_list, &mdb->bq);            /* put db in list */
104    V(mutex);
105    return mdb;
106 }
107
108 /*
109  * Now actually open the database.  This can generate errors,
110  *   which are returned in the errmsg
111  *
112  * DO NOT close the database or free(mdb) here !!!!
113  */
114 int
115 db_open_database(JCR *jcr, B_DB *mdb)
116 {
117    int errstat;
118    char buf[10], *port;
119
120    P(mutex);
121    if (mdb->connected) {
122       V(mutex);
123       return 1;
124    }
125    mdb->connected = false;
126
127    if ((errstat=rwl_init(&mdb->lock)) != 0) {
128       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
129             strerror(errstat));
130       V(mutex);
131       return 0;
132    }
133
134    if (mdb->db_port) {
135       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
136       port = buf;
137    } else {
138       port = NULL;
139    }
140
141    /* If connection fails, try at 5 sec intervals for 30 seconds. */
142    for (int retry=0; retry < 6; retry++) {
143       /* connect to the database */
144       mdb->db = PQsetdbLogin(
145            mdb->db_address,           /* default = localhost */
146            port,                      /* default port */
147            NULL,                      /* pg options */
148            NULL,                      /* tty, ignored */
149            mdb->db_name,              /* database name */
150            mdb->db_user,              /* login name */
151            mdb->db_password);         /* password */
152
153       /* If no connect, try once more in case it is a timing problem */
154       if (PQstatus(mdb->db) == CONNECTION_OK) {
155          break;
156       }
157       bmicrosleep(5, 0);
158    }
159
160    Dmsg0(50, "pg_real_connect done\n");
161    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
162             mdb->db_password==NULL?"(NULL)":mdb->db_password);
163
164    if (PQstatus(mdb->db) != CONNECTION_OK) {
165       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
166             "Database=%s User=%s\n"
167             "It is probably not running or your password is incorrect.\n"),
168              mdb->db_name, mdb->db_user);
169       V(mutex);
170       return 0;
171    }
172
173    if (!check_tables_version(jcr, mdb)) {
174       V(mutex);
175       return 0;
176    }
177
178    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
179
180    mdb->connected = true;
181    V(mutex);
182    return 1;
183 }
184
185 void
186 db_close_database(JCR *jcr, B_DB *mdb)
187 {
188    if (!mdb) {
189       return;
190    }
191    db_end_transaction(jcr, mdb);
192    P(mutex);
193    mdb->ref_count--;
194    if (mdb->ref_count == 0) {
195       qdchain(&mdb->bq);
196       if (mdb->connected && mdb->db) {
197          sql_close(mdb);
198       }
199       rwl_destroy(&mdb->lock);
200       free_pool_memory(mdb->errmsg);
201       free_pool_memory(mdb->cmd);
202       free_pool_memory(mdb->cached_path);
203       free_pool_memory(mdb->fname);
204       free_pool_memory(mdb->path);
205       free_pool_memory(mdb->esc_name);
206       if (mdb->db_name) {
207          free(mdb->db_name);
208       }
209       if (mdb->db_user) {
210          free(mdb->db_user);
211       }
212       if (mdb->db_password) {
213          free(mdb->db_password);
214       }
215       if (mdb->db_address) {
216          free(mdb->db_address);
217       }
218       if (mdb->db_socket) {
219          free(mdb->db_socket);
220       }
221       my_postgresql_free_result(mdb);
222       free(mdb);
223    }
224    V(mutex);
225 }
226
227 /*
228  * Return the next unique index (auto-increment) for
229  * the given table.  Return NULL on error.
230  *
231  * For PostgreSQL, NULL causes the auto-increment value
232  *  to be updated.
233  */
234 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
235 {
236    strcpy(index, "NULL");
237    return 1;
238 }
239
240
241 /*
242  * Escape strings so that PostgreSQL is happy
243  *
244  *   NOTE! len is the length of the old string. Your new
245  *         string must be long enough (max 2*old+1) to hold
246  *         the escaped output.
247  */
248 void
249 db_escape_string(char *snew, char *old, int len)
250 {
251    PQescapeString(snew, old, len);
252 }
253
254 /*
255  * Submit a general SQL command (cmd), and for each row returned,
256  *  the sqlite_handler is called with the ctx.
257  */
258 int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
259 {
260    SQL_ROW row;
261
262    Dmsg0(500, "db_sql_query started\n");
263
264    db_lock(mdb);
265    if (sql_query(mdb, query) != 0) {
266       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
267       db_unlock(mdb);
268       Dmsg0(500, "db_sql_query failed\n");
269       return 0;
270    }
271    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
272
273    if (result_handler != NULL) {
274       Dmsg0(500, "db_sql_query invoking handler\n");
275       if ((mdb->result = sql_store_result(mdb)) != NULL) {
276          int num_fields = sql_num_fields(mdb);
277
278          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
279          while ((row = sql_fetch_row(mdb)) != NULL) {
280
281             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
282             if (result_handler(ctx, num_fields, row))
283                break;
284          }
285
286         sql_free_result(mdb);
287       }
288    }
289    db_unlock(mdb);
290
291    Dmsg0(500, "db_sql_query finished\n");
292
293    return 1;
294 }
295
296
297
298 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
299 {
300    int j;
301    POSTGRESQL_ROW row = NULL; // by default, return NULL
302
303    Dmsg0(500, "my_postgresql_fetch_row start\n");
304
305    if (mdb->row_number == -1 || mdb->row == NULL) {
306       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
307
308       if (mdb->row != NULL) {
309          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
310          free(mdb->row);
311          mdb->row = NULL;
312       }
313
314       mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields);
315
316       // now reset the row_number now that we have the space allocated
317       mdb->row_number = 0;
318    }
319
320    // if still within the result set
321    if (mdb->row_number < mdb->num_rows) {
322       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
323       // get each value from this row
324       for (j = 0; j < mdb->num_fields; j++) {
325          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
326          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
327       }
328       // increment the row number for the next call
329       mdb->row_number++;
330
331       row = mdb->row;
332    } else {
333       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
334    }
335
336    Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
337
338    return row;
339 }
340
341 int my_postgresql_max_length(B_DB *mdb, int field_num) {
342    //
343    // for a given column, find the max length
344    //
345    int max_length;
346    int i;
347    int this_length;
348
349    max_length = 0;
350    for (i = 0; i < mdb->num_rows; i++) {
351       if (PQgetisnull(mdb->result, i, field_num)) {
352           this_length = 4;        // "NULL"
353       } else {
354           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
355       }
356
357       if (max_length < this_length) {
358           max_length = this_length;
359       }
360    }
361
362    return max_length;
363 }
364
365 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
366 {
367    int     i;
368
369    Dmsg0(500, "my_postgresql_fetch_field starts\n");
370    if (mdb->fields == NULL) {
371       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
372       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
373
374       for (i = 0; i < mdb->num_fields; i++) {
375          Dmsg1(500, "filling field %d\n", i);
376          mdb->fields[i].name           = PQfname(mdb->result, i);
377          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
378          mdb->fields[i].type       = PQftype(mdb->result, i);
379          mdb->fields[i].flags      = 0;
380
381          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
382             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
383             mdb->fields[i].flags);
384       } // end for
385    } // end if
386
387    // increment field number for the next time around
388
389    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
390    return &mdb->fields[mdb->field_number++];
391 }
392
393 void my_postgresql_data_seek(B_DB *mdb, int row)
394 {
395    // set the row number to be returned on the next call
396    // to my_postgresql_fetch_row
397    mdb->row_number = row;
398 }
399
400 void my_postgresql_field_seek(B_DB *mdb, int field)
401 {
402    mdb->field_number = field;
403 }
404
405 /*
406  * Note, if this routine returns 1 (failure), Bacula expects
407  *  that no result has been stored.
408  */
409 int my_postgresql_query(B_DB *mdb, const char *query) {
410    Dmsg0(500, "my_postgresql_query started\n");
411    // We are starting a new query.  reset everything.
412    mdb->num_rows     = -1;
413    mdb->row_number   = -1;
414    mdb->field_number = -1;
415
416    if (mdb->result != NULL) {
417       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
418    }
419
420    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
421    mdb->result = PQexec(mdb->db, query);
422    mdb->status = PQresultStatus(mdb->result);
423    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
424       Dmsg1(500, "we have a result\n", query);
425
426       // how many fields in the set?
427       mdb->num_fields = (int) PQnfields(mdb->result);
428       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
429
430       mdb->num_rows   = PQntuples(mdb->result);
431       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
432
433       mdb->status = 0;
434    } else {
435       Dmsg1(500, "we failed\n", query);
436       mdb->status = 1;
437    }
438
439    Dmsg0(500, "my_postgresql_query finishing\n");
440
441    return mdb->status;
442 }
443
444 void my_postgresql_free_result (B_DB *mdb)
445 {
446    if (mdb->result) {
447       PQclear(mdb->result);
448       mdb->result = NULL;
449    }
450
451    if (mdb->row) {
452       free(mdb->row);
453       mdb->row = NULL;
454    }
455
456    if (mdb->fields) {
457       free(mdb->fields);
458       mdb->fields = NULL;
459    }
460 }
461
462 int my_postgresql_currval(B_DB *mdb, char *table_name)
463 {
464    // Obtain the current value of the sequence that
465    // provides the serial value for primary key of the table.
466
467    // currval is local to our session.  It is not affected by
468    // other transactions.
469
470    // Determine the name of the sequence.
471    // PostgreSQL automatically creates a sequence using
472    // <table>_<column>_seq.
473    // At the time of writing, all tables used this format for
474    // for their primary key: <table>id
475    // Except for basefiles which has a primary key on baseid.
476    // Therefore, we need to special case that one table.
477
478    // everything else can use the PostgreSQL formula.
479
480    char      sequence[NAMEDATALEN-1];
481    char      query   [NAMEDATALEN+50];
482    PGresult *result;
483    int       id = 0;
484
485    if (strcasecmp(table_name, "basefiles") == 0) {
486       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
487    } else {
488       bstrncpy(sequence, table_name, sizeof(sequence));
489       bstrncat(sequence, "_",        sizeof(sequence));
490       bstrncat(sequence, table_name, sizeof(sequence));
491       bstrncat(sequence, "id",       sizeof(sequence));
492    }
493
494    bstrncat(sequence, "_seq", sizeof(sequence));
495    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
496
497 // Mmsg(query, "SELECT currval('%s')", sequence);
498    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
499    result = PQexec(mdb->db, query);
500
501    Dmsg0(500, "exec done");
502
503    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
504       Dmsg0(500, "getting value");
505       id = atoi(PQgetvalue(result, 0, 0));
506       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
507    } else {
508       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
509    }
510
511    PQclear(result);
512
513    return id;
514 }
515
516
517 #endif /* HAVE_POSTGRESQL */