]> git.sur5r.net Git - bacula/bacula/blob - bacula/patches/testing/batch-insert.patch
ebl add #define in readme
[bacula/bacula] / bacula / patches / testing / batch-insert.patch
1 diff -Naur cvs/src/cats/cats.h my/src/cats/cats.h
2 --- cvs/src/cats/cats.h 2006-12-06 15:11:53.000000000 +0100
3 +++ my/src/cats/cats.h  2006-12-14 21:11:40.000000000 +0100
4 @@ -141,6 +141,7 @@
5     POOLMEM *fname;                    /* Filename only */
6     POOLMEM *path;                     /* Path only */
7     POOLMEM *esc_name;                 /* Escaped file/path name */
8 +   POOLMEM *esc_name2;                /* Escaped file/path name */
9     int fnl;                           /* file name length */
10     int pnl;                           /* path name length */
11  };
12 @@ -170,8 +171,14 @@
13  #define sql_fetch_field(x)    my_sqlite_fetch_field(x)
14  #define sql_num_fields(x)     ((x)->ncolumn)
15  #define SQL_ROW               char**
16 -
17 -
18 +#define sql_batch_start(x)    db_batch_start(x)
19 +#define sql_batch_end(x,y)    db_batch_end(x,y)
20 +#define sql_batch_insert(x,y) db_batch_insert(x,y)
21 +#define sql_batch_lock_path_query       my_sqlite_batch_lock_query
22 +#define sql_batch_lock_filename_query   my_sqlite_batch_lock_query
23 +#define sql_batch_unlock_tables_query   my_sqlite_batch_unlock_query
24 +#define sql_batch_fill_filename_query   my_sqlite_batch_fill_filename_query
25 +#define sql_batch_fill_path_query       my_sqlite_batch_fill_path_query    
26  
27  /* In cats/sqlite.c */
28  void       my_sqlite_free_table(B_DB *mdb);
29 @@ -179,6 +186,10 @@
30  int        my_sqlite_query(B_DB *mdb, const char *cmd);
31  void       my_sqlite_field_seek(B_DB *mdb, int field);
32  SQL_FIELD *my_sqlite_fetch_field(B_DB *mdb);
33 +extern char* my_sqlite_batch_lock_query;
34 +extern char* my_sqlite_batch_unlock_query;
35 +extern char* my_sqlite_batch_fill_filename_query;
36 +extern char* my_sqlite_batch_fill_path_query;
37  
38  
39  #else
40 @@ -249,6 +260,7 @@
41     POOLMEM *fname;                    /* Filename only */
42     POOLMEM *path;                     /* Path only */
43     POOLMEM *esc_name;                 /* Escaped file/path name */
44 +   POOLMEM *esc_name2;                /* Escaped file/path name */
45     int fnl;                           /* file name length */
46     int pnl;                           /* path name length */
47  };
48 @@ -289,8 +301,14 @@
49  #define sql_fetch_field(x)    my_sqlite_fetch_field(x)
50  #define sql_num_fields(x)     ((x)->ncolumn)
51  #define SQL_ROW               char**
52 -
53 -
54 +#define sql_batch_start(x)    db_batch_start(x)
55 +#define sql_batch_end(x,y)    db_batch_end(x,y)
56 +#define sql_batch_insert(x,y) db_batch_insert(x,y)
57 +#define sql_batch_lock_path_query       my_sqlite_batch_lock_query
58 +#define sql_batch_lock_filename_query   my_sqlite_batch_lock_query
59 +#define sql_batch_unlock_tables_query   my_sqlite_batch_unlock_query
60 +#define sql_batch_fill_filename_query   my_sqlite_batch_fill_filename_query
61 +#define sql_batch_fill_path_query       my_sqlite_batch_fill_path_query
62  
63  /* In cats/sqlite.c */
64  void       my_sqlite_free_table(B_DB *mdb);
65 @@ -298,6 +316,10 @@
66  int        my_sqlite_query(B_DB *mdb, const char *cmd);
67  void       my_sqlite_field_seek(B_DB *mdb, int field);
68  SQL_FIELD *my_sqlite_fetch_field(B_DB *mdb);
69 +extern char* my_sqlite_batch_lock_query;
70 +extern char* my_sqlite_batch_unlock_query;
71 +extern char* my_sqlite_batch_fill_filename_query;
72 +extern char* my_sqlite_batch_fill_path_query;
73  
74  
75  #else
76 @@ -341,6 +363,7 @@
77     POOLMEM *fname;                    /* Filename only */
78     POOLMEM *path;                     /* Path only */
79     POOLMEM *esc_name;                 /* Escaped file/path name */
80 +   POOLMEM *esc_name2;                /* Escaped file/path name */
81     int fnl;                           /* file name length */
82     int pnl;                           /* path name length */
83  };
84 @@ -362,9 +385,25 @@
85  #define sql_field_seek(x, y)  mysql_field_seek((x)->result, (y))
86  #define sql_fetch_field(x)    mysql_fetch_field((x)->result)
87  #define sql_num_fields(x)     (int)mysql_num_fields((x)->result)
88 +#define sql_batch_start(x)    db_batch_start(x)
89 +#define sql_batch_end(x,y)    db_batch_end(x,y)
90 +#define sql_batch_insert(x,y) db_batch_insert(x,y)
91 +#define sql_batch_lock_path_query       my_mysql_batch_lock_path_query
92 +#define sql_batch_lock_filename_query   my_mysql_batch_lock_filename_query
93 +#define sql_batch_unlock_tables_query   my_mysql_batch_unlock_tables_query
94 +#define sql_batch_fill_filename_query   my_mysql_batch_fill_filename_query
95 +#define sql_batch_fill_path_query       my_mysql_batch_fill_path_query
96  #define SQL_ROW               MYSQL_ROW
97  #define SQL_FIELD             MYSQL_FIELD
98  
99 +
100 +int my_mysql_batch_start(B_DB *mdb);
101 +extern char* my_mysql_batch_lock_path_query;
102 +extern char* my_mysql_batch_lock_filename_query;
103 +extern char* my_mysql_batch_unlock_tables_query;
104 +extern char* my_mysql_batch_fill_filename_query;
105 +extern char* my_mysql_batch_fill_path_query;
106 +
107  #else
108  
109  #ifdef HAVE_POSTGRESQL
110 @@ -425,6 +464,7 @@
111     POOLMEM *fname;                /* Filename only */
112     POOLMEM *path;                 /* Path only */
113     POOLMEM *esc_name;             /* Escaped file/path name */
114 +   POOLMEM *esc_name2;            /* Escaped file/path name */
115     int fnl;                       /* file name length */
116     int pnl;                       /* path name length */
117  };     
118 @@ -436,7 +476,19 @@
119  int                my_postgresql_currval    (B_DB *mdb, char *table_name);
120  void               my_postgresql_field_seek (B_DB *mdb, int row);
121  POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb);
122 -
123 +int my_postgresql_lock_table(B_DB *mdb, const char *table);
124 +int my_postgresql_unlock_table(B_DB *mdb);
125 +int my_postgresql_batch_start(B_DB *mdb);
126 +int my_postgresql_batch_end(B_DB *mdb, const char *error);
127 +typedef struct ATTR_DBR ATTR_DBR;
128 +int my_postgresql_batch_insert(B_DB *mdb, ATTR_DBR *ar);
129 +char *my_postgresql_copy_escape(char *dest, char *src, size_t len);
130 +
131 +extern char* my_pg_batch_lock_path_query;
132 +extern char* my_pg_batch_lock_filename_query;
133 +extern char* my_pg_batch_unlock_tables_query;
134 +extern char* my_pg_batch_fill_filename_query;
135 +extern char* my_pg_batch_fill_path_query;
136  
137  /* "Generic" names for easier conversion */
138  #define sql_store_result(x)   ((x)->result)
139 @@ -452,6 +504,17 @@
140  #define sql_field_seek(x, y)  my_postgresql_field_seek((x), (y))
141  #define sql_fetch_field(x)    my_postgresql_fetch_field(x)
142  #define sql_num_fields(x)     ((x)->num_fields)
143 +#define sql_batch_start(x)    my_postgresql_batch_start(x)
144 +#define sql_batch_end(x,y)    my_postgresql_batch_end(x,y)
145 +#define sql_batch_insert(x,y) my_postgresql_batch_insert(x,y)
146 +#define sql_lock_table(x,y)   my_postgresql_lock_table(x, y)
147 +#define sql_unlock_table(x,y) my_postgresql_unlock_table(x)
148 +#define sql_batch_lock_path_query       my_pg_batch_lock_path_query
149 +#define sql_batch_lock_filename_query   my_pg_batch_lock_filename_query
150 +#define sql_batch_unlock_tables_query   my_pg_batch_unlock_tables_query
151 +#define sql_batch_fill_filename_query   my_pg_batch_fill_filename_query
152 +#define sql_batch_fill_path_query       my_pg_batch_fill_path_query
153 +
154  #define SQL_ROW               POSTGRESQL_ROW
155  #define SQL_FIELD             POSTGRESQL_FIELD
156  
157 diff -Naur cvs/src/cats/mysql.c my/src/cats/mysql.c
158 --- cvs/src/cats/mysql.c        2006-12-09 14:41:50.000000000 +0100
159 +++ my/src/cats/mysql.c 2006-12-16 19:18:17.000000000 +0100
160 @@ -121,6 +121,7 @@
161     mdb->fname = get_pool_memory(PM_FNAME);
162     mdb->path = get_pool_memory(PM_FNAME);
163     mdb->esc_name = get_pool_memory(PM_FNAME);
164 +   mdb->esc_name2 = get_pool_memory(PM_FNAME);
165     qinsert(&db_list, &mdb->bq);            /* put db in list */
166     V(mutex);
167     return mdb;
168 @@ -231,6 +232,7 @@
169        free_pool_memory(mdb->fname);
170        free_pool_memory(mdb->path);
171        free_pool_memory(mdb->esc_name);
172 +      free_pool_memory(mdb->esc_name2);
173        if (mdb->db_name) {
174           free(mdb->db_name);
175        }
176 @@ -372,4 +374,34 @@
177  
178  }
179  
180 +char *my_mysql_batch_lock_path_query = "LOCK TABLES Path write,     " 
181 +                                      "            batch write,    " 
182 +                                      "            Path as p write ";
183 +
184 +
185 +char *my_mysql_batch_lock_filename_query = "LOCK TABLES Filename write,     "
186 +                                           "            batch write,        "
187 +                                           "            Filename as f write ";
188 +
189 +char *my_mysql_batch_unlock_tables_query = "UNLOCK TABLES";
190 +
191 +char *my_mysql_batch_fill_path_query = "INSERT IGNORE INTO Path (Path) "
192 +                                       " SELECT a.Path FROM            " 
193 +                                       "  (SELECT DISTINCT Path        "
194 +                                       "     FROM batch) AS a          " 
195 +                                       " WHERE NOT EXISTS              "
196 +                                       "  (SELECT Path                 "
197 +                                       "     FROM Path AS p            "
198 +                                       "    WHERE p.Path = a.Path)     ";     
199 +
200 +char *my_mysql_batch_fill_filename_query = "INSERT IGNORE INTO Filename (Name)"
201 +                                           "  SELECT a.Name FROM              " 
202 +                                           "   (SELECT DISTINCT Name          "
203 +                                           "      FROM batch) AS a            " 
204 +                                           "  WHERE NOT EXISTS                "
205 +                                           "   (SELECT Name                   "
206 +                                           "      FROM Filename AS f          "
207 +                                           "      WHERE f.Name = a.Name)      ";
208 +
209  #endif /* HAVE_MYSQL */
210 +
211 diff -Naur cvs/src/cats/postgresql.c my/src/cats/postgresql.c
212 --- cvs/src/cats/postgresql.c   2006-12-06 15:11:53.000000000 +0100
213 +++ my/src/cats/postgresql.c    2006-12-14 20:28:28.000000000 +0100
214 @@ -124,6 +124,7 @@
215     mdb->fname          = get_pool_memory(PM_FNAME);
216     mdb->path           = get_pool_memory(PM_FNAME);
217     mdb->esc_name       = get_pool_memory(PM_FNAME);
218 +   mdb->esc_name2      = get_pool_memory(PM_FNAME);
219     mdb->allow_transactions = mult_db_connections;
220     qinsert(&db_list, &mdb->bq);            /* put db in list */
221     V(mutex);
222 @@ -228,6 +229,7 @@
223        free_pool_memory(mdb->fname);
224        free_pool_memory(mdb->path);
225        free_pool_memory(mdb->esc_name);
226 +      free_pool_memory(mdb->esc_name2);
227        if (mdb->db_name) {
228           free(mdb->db_name);
229        }
230 @@ -538,5 +540,201 @@
231     return id;
232  }
233  
234 +int my_postgresql_lock_table(B_DB *mdb, const char *table)
235 +{
236 +   my_postgresql_query(mdb, "BEGIN");
237 +   Mmsg(mdb->cmd, "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table);
238 +   return my_postgresql_query(mdb, mdb->cmd);
239 +}
240 +
241 +int my_postgresql_unlock_table(B_DB *mdb)
242 +{
243 +   return my_postgresql_query(mdb, "COMMIT");
244 +}
245 +
246 +int my_postgresql_batch_start(B_DB *mdb)
247 +{
248 +   Dmsg0(500, "my_postgresql_batch_start started\n");
249 +
250 +   if (my_postgresql_query(mdb,
251 +                          " CREATE TEMPORARY TABLE batch "
252 +                          "        (fileindex int,       "
253 +                          "        jobid int,            "
254 +                          "        path varchar,         "
255 +                          "        name varchar,         "
256 +                          "        lstat varchar,        "
257 +                          "        md5 varchar)") == 1)
258 +   {
259 +      Dmsg0(500, "my_postgresql_batch_start failed\n");
260 +      return 1;
261 +   }
262 +   
263 +   // We are starting a new query.  reset everything.
264 +   mdb->num_rows     = -1;
265 +   mdb->row_number   = -1;
266 +   mdb->field_number = -1;
267 +
268 +   if (mdb->result != NULL) {
269 +      my_postgresql_free_result(mdb);
270 +   }
271 +
272 +   mdb->result = PQexec(mdb->db, "COPY batch FROM STDIN");
273 +   mdb->status = PQresultStatus(mdb->result);
274 +   if (mdb->status == PGRES_COPY_IN) {
275 +      // how many fields in the set?
276 +      mdb->num_fields = (int) PQnfields(mdb->result);
277 +      mdb->num_rows   = 0;
278 +      mdb->status = 0;
279 +   } else {
280 +      Dmsg0(500, "we failed\n");
281 +      mdb->status = 1;
282 +   }
283 +
284 +   Dmsg0(500, "my_postgresql_batch_start finishing\n");
285 +
286 +   return mdb->status;
287 +}
288 +
289 +/* set error to something to abort operation */
290 +int my_postgresql_batch_end(B_DB *mdb, const char *error)
291 +{
292 +   int res;
293 +   int count=30;
294 +   Dmsg0(500, "my_postgresql_batch_end started\n");
295 +
296 +   if (!mdb) {                 /* no files ? */
297 +      return 0;
298 +   }
299 +
300 +   do { 
301 +      res = PQputCopyEnd(mdb->db, error);
302 +   } while (res == 0 && --count > 0);
303 +
304 +   if (res == 1) {
305 +      Dmsg0(500, "ok\n");
306 +      mdb->status = 0;
307 +   }
308 +   
309 +   if (res <= 0) {
310 +      Dmsg0(500, "we failed\n");
311 +      mdb->status = 1;
312 +      Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
313 +   }
314 +   
315 +   Dmsg0(500, "my_postgresql_batch_end finishing\n");
316 +
317 +   return mdb->status;
318 +}
319 +
320 +int my_postgresql_batch_insert(B_DB *mdb, ATTR_DBR *ar)
321 +{
322 +   int res;
323 +   int count=30;
324 +   size_t len;
325 +   char *digest;
326 +
327 +   mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
328 +   my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
329 +
330 +   mdb->esc_name2 = check_pool_memory_size(mdb->esc_name2, mdb->pnl*2+1);
331 +   my_postgresql_copy_escape(mdb->esc_name2, mdb->path, mdb->pnl);
332 +
333 +   if (ar->Digest == NULL || ar->Digest[0] == 0) {
334 +      digest = "0";
335 +   } else {
336 +      digest = ar->Digest;
337 +   }
338 +
339 +   len = Mmsg(mdb->cmd, "%u\t%u\t%s\t%s\t%s\t%s\n", 
340 +             ar->FileIndex, ar->JobId, mdb->path, 
341 +             mdb->fname, ar->attr, digest);
342 +
343 +   do { 
344 +      res = PQputCopyData(mdb->db,
345 +                         mdb->cmd,
346 +                         len);
347 +   } while (res == 0 && --count > 0);
348 +
349 +   if (res == 1) {
350 +      Dmsg0(500, "ok\n");
351 +      mdb->changes++;
352 +      mdb->status = 0;
353 +   }
354 +
355 +   if (res <= 0) {
356 +      Dmsg0(500, "we failed\n");
357 +      mdb->status = 1;
358 +      Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
359 +   }
360 +
361 +   Dmsg0(500, "my_postgresql_batch_insert finishing\n");
362 +
363 +   return mdb->status;
364 +}
365 +
366 +/*
367 + * Escape strings so that PostgreSQL is happy on COPY
368 + *
369 + *   NOTE! len is the length of the old string. Your new
370 + *         string must be long enough (max 2*old+1) to hold
371 + *         the escaped output.
372 + */
373 +char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
374 +{
375 +   /* we have to escape \t, \n, \r, \ */
376 +   char c = '\0' ;
377 +
378 +   while (len > 0 && *src) {
379 +      switch (*src) {
380 +      case '\n':
381 +        c = 'n';
382 +        break;
383 +      case '\\':
384 +        c = '\\';
385 +        break;
386 +      case '\t':
387 +        c = 't';
388 +        break;
389 +      case '\r':
390 +        c = 'r';
391 +        break;
392 +      default:
393 +        c = '\0' ;
394 +      }
395 +
396 +      if (c) {
397 +        *dest = '\\';
398 +        dest++;
399 +        *dest = c;
400 +      } else {
401 +        *dest = *src;
402 +      }
403 +
404 +      len--;
405 +      src++;
406 +      dest++;
407 +   }
408 +
409 +   *dest = '\0';
410 +   return dest;
411 +}
412 +
413 +char *my_pg_batch_lock_path_query = "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
414 +
415 +
416 +char *my_pg_batch_lock_filename_query = "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
417 +
418 +char *my_pg_batch_unlock_tables_query = "COMMIT";
419 +
420 +char *my_pg_batch_fill_path_query = "INSERT INTO Path (Path)                                    "
421 +                                    "  SELECT a.Path FROM                                       "
422 +                                    "      (SELECT DISTINCT Path FROM batch) AS a               "
423 +                                    "  WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
424 +
425  
426 +char *my_pg_batch_fill_filename_query = "INSERT INTO Filename (Name)        "
427 +                                        "  SELECT a.Name FROM               "
428 +                                        "    (SELECT DISTINCT Name FROM batch) as a "
429 +                                        "    WHERE NOT EXISTS               "
430 +                                        "      (SELECT Name FROM Filename WHERE Name = a.Name)";
431  #endif /* HAVE_POSTGRESQL */
432 diff -Naur cvs/src/cats/protos.h my/src/cats/protos.h
433 --- cvs/src/cats/protos.h       2006-12-06 15:11:53.000000000 +0100
434 +++ my/src/cats/protos.h        2006-12-13 19:03:46.000000000 +0100
435 @@ -67,6 +67,10 @@
436  bool db_create_device_record(JCR *jcr, B_DB *mdb, DEVICE_DBR *dr);
437  bool db_create_storage_record(JCR *jcr, B_DB *mdb, STORAGE_DBR *sr);
438  bool db_create_mediatype_record(JCR *jcr, B_DB *mdb, MEDIATYPE_DBR *mr);
439 +int db_create_batch_file_record(JCR *jcr);
440 +int db_batch_start(B_DB *mdb);
441 +int db_batch_end(B_DB *mdb, const char *error);
442 +int db_batch_insert(B_DB *mdb, ATTR_DBR *ar);
443  
444  /* delete.c */
445  int db_delete_pool_record(JCR *jcr, B_DB *db, POOL_DBR *pool_dbr);
446 diff -Naur cvs/src/cats/sql_create.c my/src/cats/sql_create.c
447 --- cvs/src/cats/sql_create.c   2006-12-06 15:11:53.000000000 +0100
448 +++ my/src/cats/sql_create.c    2006-12-14 22:06:41.000000000 +0100
449 @@ -664,9 +664,207 @@
450   *  };
451   */
452  
453 +/*  All db_batch_* functions are used to do bulk batch insert in File/Filename/Path
454 + *  tables. This code can be activated by adding "#define HAVE_BATCH_FILE_INSERT 1"
455 + *  in baconfig.h
456 + *  
457 + *  To sum up :
458 + *   - bulk load a temp table
459 + *   - insert missing filenames into filename with a single query (lock filenames 
460 + *   - table before that to avoid possible duplicate inserts with concurrent update)
461 + *   - insert missing paths into path with another single query
462 + *   - then insert the join between the temp, filename and path tables into file.
463 + */
464 +
465 +int db_batch_start(B_DB *mdb)
466 +{
467 +   return sql_query(mdb,
468 +             " CREATE TEMPORARY TABLE batch "
469 +             "        (fileindex integer,   "
470 +             "        jobid integer,        "
471 +             "        path blob,            "
472 +             "        name blob,            "
473 +             "        lstat tinyblob,       "
474 +             "        md5 tinyblob)         ");
475 +}
476 +
477 +int db_batch_insert(B_DB *mdb, ATTR_DBR *ar)
478 +{
479 +   size_t len;
480 +   char *digest;
481 +
482 +   mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
483 +   db_escape_string(mdb->esc_name, mdb->fname, mdb->fnl);
484 +
485 +   mdb->esc_name2 = check_pool_memory_size(mdb->esc_name2, mdb->pnl*2+1);
486 +   db_escape_string(mdb->esc_name2, mdb->path, mdb->pnl);
487 +
488 +   if (ar->Digest == NULL || ar->Digest[0] == 0) {
489 +      digest = "0";
490 +   } else {
491 +      digest = ar->Digest;
492 +   }
493 +
494 +   len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%u,'%s','%s','%s','%s')",
495 +              ar->FileIndex, ar->JobId, mdb->path, 
496 +              mdb->fname, ar->attr, digest);
497 +
498 +   sql_query(mdb, mdb->cmd);
499 +
500 +   return mdb->status;
501 +}
502 +
503 +/* set error to something to abort operation */
504 +int db_batch_end(B_DB *mdb, const char *error)
505 +{
506 +   
507 +   Dmsg0(50, "db_batch_end started");
508 +
509 +   if (mdb) {
510 +      mdb->status = 0;
511 +      return mdb->status;
512 +   }
513 +   return 0;
514 +}
515 +
516 +int db_create_batch_file_record(JCR *jcr)
517 +{
518 +   Dmsg0(50,"db_create_file_record : no files");
519 +
520 +   if (!jcr->db_batch) {         /* no files to backup ? */
521 +      Dmsg0(50,"db_create_file_record : no files\n");
522 +      return 0;
523 +   }
524 +
525 +   if (sql_batch_end(jcr->db_batch, NULL)) {
526 +      Jmsg(jcr, M_FATAL, 0, "Bad batch end %s\n", jcr->db_batch->errmsg);
527 +      return 1;
528 +   }
529 +
530 +   /* we have to lock tables */
531 +   if (sql_query(jcr->db_batch, sql_batch_lock_path_query))
532 +   {
533 +      Jmsg(jcr, M_FATAL, 0, "Can't lock Path table %s\n", jcr->db_batch->errmsg);
534 +      return 1;
535 +   }
536 +
537 +   if (sql_query(jcr->db_batch, sql_batch_fill_path_query))
538 +   {
539 +      Jmsg(jcr, M_FATAL, 0, "Can't fill Path table %s\n",jcr->db_batch->errmsg);
540 +      sql_query(jcr->db_batch, sql_batch_unlock_tables_query);
541 +      return 1;
542 +   }
543 +   
544 +   if (sql_query(jcr->db_batch, sql_batch_unlock_tables_query))
545 +   {
546 +      Jmsg(jcr, M_FATAL, 0, "Can't unlock Path table %s\n", jcr->db_batch->errmsg);
547 +      return 1;      
548 +   }
549 +
550 +   /* we have to lock tables */
551 +   if (sql_query(jcr->db_batch, sql_batch_lock_filename_query))
552 +   {
553 +      Jmsg(jcr, M_FATAL, 0, "Can't lock Filename table %s\n", jcr->db_batch->errmsg);
554 +      return 1;
555 +   }
556 +   
557 +   if (sql_query(jcr->db_batch, sql_batch_fill_filename_query))
558 +   {
559 +      Jmsg(jcr,M_FATAL,0,"Can't fill Filename table %s\n",jcr->db_batch->errmsg);
560 +      sql_query(jcr->db_batch, sql_batch_unlock_tables_query);
561 +      return 1;            
562 +   }
563 +
564 +   if (sql_query(jcr->db_batch, sql_batch_unlock_tables_query)) {
565 +      Jmsg(jcr, M_FATAL, 0, "Can't unlock Filename table %s\n", jcr->db_batch->errmsg);
566 +      return 1;
567 +   }
568 +   
569 +   if (sql_query(jcr->db_batch, 
570 +       " INSERT INTO File (FileIndex, JobId, PathId, FilenameId, LStat, MD5)"
571 +       "  SELECT batch.FileIndex, batch.JobId, Path.PathId,               " 
572 +       "         Filename.FilenameId,batch.LStat, batch.MD5               "
573 +       "  FROM batch                                                      "
574 +       "    JOIN Path ON (batch.Path = Path.Path)                         "
575 +       "    JOIN Filename ON (batch.Name = Filename.Name)                 "))
576 +   {
577 +      Jmsg(jcr, M_FATAL, 0, "Can't fill File table %s\n", jcr->db_batch->errmsg);
578 +      return 1;
579 +   }
580 +
581 +   sql_query(jcr->db_batch, "DROP TABLE batch");
582 +
583 +   return 0;
584 +}
585 +
586 +#ifdef HAVE_BATCH_FILE_INSERT
587 +/*
588 + * Create File record in B_DB
589 + *
590 + *  In order to reduce database size, we store the File attributes,
591 + *  the FileName, and the Path separately.  In principle, there
592 + *  is a single FileName record and a single Path record, no matter
593 + *  how many times it occurs.  This is this subroutine, we separate
594 + *  the file and the path and fill temporary tables with this three records.
595 + */
596 +int db_create_file_attributes_record(JCR *jcr, B_DB *_mdb, ATTR_DBR *ar)
597 +{
598 +
599 +   Dmsg1(dbglevel, "Fname=%s\n", ar->fname);
600 +   Dmsg0(dbglevel, "put_file_into_catalog\n");
601 +
602 +   if (!jcr->db_batch) {
603 +      jcr->db_batch = db_init_database(jcr, 
604 +                                      jcr->db->db_name, 
605 +                                      jcr->db->db_user,
606 +                                      jcr->db->db_password, 
607 +                                      jcr->db->db_address,
608 +                                      jcr->db->db_port,
609 +                                      jcr->db->db_socket,
610 +                                      1 /* multi_db = true */);
611 +
612 +      if (!jcr->db_batch || !db_open_database(jcr, jcr->db_batch)) {
613 +         Jmsg(jcr, M_FATAL, 0, _("Could not open database \"%s\".\n"),
614 +              jcr->db->db_name);
615 +         if (jcr->db_batch) {
616 +            Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db_batch));
617 +         }
618 +        return 0;
619 +      }      
620 +      
621 +      sql_batch_start(jcr->db_batch);
622 +   }
623 +
624 +   B_DB *mdb = jcr->db_batch;
625 +
626 +   /*
627 +    * Make sure we have an acceptable attributes record.
628 +    */
629 +   if (!(ar->Stream == STREAM_UNIX_ATTRIBUTES ||
630 +         ar->Stream == STREAM_UNIX_ATTRIBUTES_EX)) {
631 +      Mmsg1(&mdb->errmsg, _("Attempt to put non-attributes into catalog. Stream=%d\n"),
632 +         ar->Stream);
633 +      Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
634 +      return 0;
635 +   }
636 +
637 +   split_path_and_file(jcr, mdb, ar->fname);
638  
639  
640  /*
641 +   if (jcr->changes > 100000) {
642 +      sql_batch_end(mdb, NULL);
643 +      sql_batch_start(mdb);
644 +      jcr->changes = 0;
645 +   }
646 +*/
647 +
648 +   return (sql_batch_insert(mdb, ar) == 0);
649 +}
650 +
651 +#else  /* ! HAVE_BATCH_FILE_INSERT */
652 +
653 +/*
654   * Create File record in B_DB
655   *
656   *  In order to reduce database size, we store the File attributes,
657 @@ -721,6 +919,8 @@
658     return 0;
659  }
660  
661 +#endif /* ! HAVE_BATCH_FILE_INSERT */
662 +
663  /*
664   * This is the master File entry containing the attributes.
665   *  The filename and path records have already been created.
666 diff -Naur cvs/src/cats/sqlite.c my/src/cats/sqlite.c
667 --- cvs/src/cats/sqlite.c       2006-12-06 15:11:53.000000000 +0100
668 +++ my/src/cats/sqlite.c        2006-12-14 22:30:35.000000000 +0100
669 @@ -108,6 +108,7 @@
670     mdb->fname = get_pool_memory(PM_FNAME);
671     mdb->path = get_pool_memory(PM_FNAME);
672     mdb->esc_name = get_pool_memory(PM_FNAME);
673 +   mdb->esc_name2 = get_pool_memory(PM_FNAME);
674     mdb->allow_transactions = mult_db_connections;
675     qinsert(&db_list, &mdb->bq);            /* put db in list */
676     V(mutex);
677 @@ -213,6 +214,7 @@
678        free_pool_memory(mdb->fname);
679        free_pool_memory(mdb->path);
680        free_pool_memory(mdb->esc_name);
681 +      free_pool_memory(mdb->esc_name2);
682        if (mdb->db_name) {
683           free(mdb->db_name);
684        }
685 @@ -433,4 +435,16 @@
686     return mdb->fields[mdb->field++];
687  }
688  
689 +char *my_sqlite_batch_lock_query = "BEGIN";
690 +char *my_sqlite_batch_unlock_query = "COMMIT";
691 +char *my_sqlite_batch_fill_path_query = "INSERT INTO Path (Path)          " 
692 +                                        " SELECT DISTINCT Path FROM batch "
693 +                                        " EXCEPT SELECT Path FROM Path    ";
694 +
695 +char *my_sqlite_batch_fill_filename_query = "INSERT INTO Filename (Name)       " 
696 +                                            " SELECT DISTINCT Name FROM batch  "
697 +                                            " EXCEPT SELECT Name FROM Filename ";
698 +
699 +
700 +
701  #endif /* HAVE_SQLITE */
702 diff -Naur cvs/src/dird/backup.c my/src/dird/backup.c
703 --- cvs/src/dird/backup.c       2006-12-13 11:57:52.000000000 +0100
704 +++ my/src/dird/backup.c        2006-12-13 19:03:46.000000000 +0100
705 @@ -233,6 +233,9 @@
706  
707     /* Pickup Job termination data */
708     stat = wait_for_job_termination(jcr);
709 +#ifdef HAVE_BATCH_FILE_INSERT
710 +   db_create_batch_file_record(jcr);   /* used by bulk batch file insert */
711 +#endif
712     if (stat == JS_Terminated) {
713        backup_cleanup(jcr, stat);
714        return true;
715 diff -Naur cvs/src/dird/jobq.c my/src/dird/jobq.c
716 --- cvs/src/dird/jobq.c 2006-11-24 11:29:37.000000000 +0100
717 +++ my/src/dird/jobq.c  2006-12-13 19:03:46.000000000 +0100
718 @@ -563,6 +563,10 @@
719              db_close_database(jcr, jcr->db);
720              jcr->db = NULL;
721           }
722 +         if (jcr->db_batch) {
723 +            db_close_database(jcr, jcr->db_batch);
724 +            jcr->db_batch = NULL;
725 +         }
726           Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
727           jcr->SDJobStatus = 0;
728           V(jq->mutex);                /* release internal lock */
729 diff -Naur cvs/src/jcr.h my/src/jcr.h
730 --- cvs/src/jcr.h       2006-12-12 21:03:36.000000000 +0100
731 +++ my/src/jcr.h        2006-12-13 19:03:46.000000000 +0100
732 @@ -184,6 +184,7 @@
733     bool cached_attribute;             /* set if attribute is cached */
734     POOLMEM *attr;                     /* Attribute string from SD */
735     B_DB *db;                          /* database pointer */
736 +   B_DB *db_batch;                    /* database pointer for batch insert */
737     ATTR_DBR *ar;                      /* DB attribute record */
738  
739     /* Daemon specific part of JCR */