]> git.sur5r.net Git - bacula/bacula/commitdiff
ebl this build with bacula-1.39.30 and 2006-12-19 cvs
authorEric Bollengier <eric@eb.homelinux.org>
Tue, 19 Dec 2006 21:19:38 +0000 (21:19 +0000)
committerEric Bollengier <eric@eb.homelinux.org>
Tue, 19 Dec 2006 21:19:38 +0000 (21:19 +0000)
git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@3816 91ce42f0-d328-0410-95d8-f526ca767f89

bacula/patches/ebl/README.batch-insert [new file with mode: 0644]
bacula/patches/ebl/batch-insert.patch [new file with mode: 0644]

diff --git a/bacula/patches/ebl/README.batch-insert b/bacula/patches/ebl/README.batch-insert
new file mode 100644 (file)
index 0000000..3c9de4c
--- /dev/null
@@ -0,0 +1,26 @@
+This patch allow you to use Batch insert mode with all database.
+It have tested this with postgresql 8.1 and mysql 4.1.
+
+To use it, you MUST change some indexes.
+
+mysql)
+
+ALTER TABLE Filename DROP INDEX Name;
+ALTER TABLE Filename ADD UNIQUE (Name(255));
+
+ALTER TABLE Path DROP INDEX Path;
+ALTER TABLE Path ADD  UNIQUE (Path(255));
+
+postgresql)
+
+DROP INDEX filename_name_idx;
+CREATE UNIQUE INDEX filename_name_idx on filename (name);
+
+DROP INDEX path_name_idx;
+CREATE UNIQUE INDEX path_name_idx on path (path);
+
+
+$Log$
+Revision 1.1  2006/12/19 21:19:38  ricozz
+ebl  this build with bacula-1.39.30 and 2006-12-19 cvs
+
diff --git a/bacula/patches/ebl/batch-insert.patch b/bacula/patches/ebl/batch-insert.patch
new file mode 100644 (file)
index 0000000..19c9fe9
--- /dev/null
@@ -0,0 +1,739 @@
+diff -Naur cvs/src/cats/cats.h my/src/cats/cats.h
+--- cvs/src/cats/cats.h        2006-12-06 15:11:53.000000000 +0100
++++ my/src/cats/cats.h 2006-12-14 21:11:40.000000000 +0100
+@@ -141,6 +141,7 @@
+    POOLMEM *fname;                    /* Filename only */
+    POOLMEM *path;                     /* Path only */
+    POOLMEM *esc_name;                 /* Escaped file/path name */
++   POOLMEM *esc_name2;                /* Escaped file/path name */
+    int fnl;                           /* file name length */
+    int pnl;                           /* path name length */
+ };
+@@ -170,8 +171,14 @@
+ #define sql_fetch_field(x)    my_sqlite_fetch_field(x)
+ #define sql_num_fields(x)     ((x)->ncolumn)
+ #define SQL_ROW               char**
+-
+-
++#define sql_batch_start(x)    db_batch_start(x)
++#define sql_batch_end(x,y)    db_batch_end(x,y)
++#define sql_batch_insert(x,y) db_batch_insert(x,y)
++#define sql_batch_lock_path_query       my_sqlite_batch_lock_query
++#define sql_batch_lock_filename_query   my_sqlite_batch_lock_query
++#define sql_batch_unlock_tables_query   my_sqlite_batch_unlock_query
++#define sql_batch_fill_filename_query   my_sqlite_batch_fill_filename_query
++#define sql_batch_fill_path_query       my_sqlite_batch_fill_path_query    
+ /* In cats/sqlite.c */
+ void       my_sqlite_free_table(B_DB *mdb);
+@@ -179,6 +186,10 @@
+ int        my_sqlite_query(B_DB *mdb, const char *cmd);
+ void       my_sqlite_field_seek(B_DB *mdb, int field);
+ SQL_FIELD *my_sqlite_fetch_field(B_DB *mdb);
++extern char* my_sqlite_batch_lock_query;
++extern char* my_sqlite_batch_unlock_query;
++extern char* my_sqlite_batch_fill_filename_query;
++extern char* my_sqlite_batch_fill_path_query;
+ #else
+@@ -249,6 +260,7 @@
+    POOLMEM *fname;                    /* Filename only */
+    POOLMEM *path;                     /* Path only */
+    POOLMEM *esc_name;                 /* Escaped file/path name */
++   POOLMEM *esc_name2;                /* Escaped file/path name */
+    int fnl;                           /* file name length */
+    int pnl;                           /* path name length */
+ };
+@@ -289,8 +301,14 @@
+ #define sql_fetch_field(x)    my_sqlite_fetch_field(x)
+ #define sql_num_fields(x)     ((x)->ncolumn)
+ #define SQL_ROW               char**
+-
+-
++#define sql_batch_start(x)    db_batch_start(x)
++#define sql_batch_end(x,y)    db_batch_end(x,y)
++#define sql_batch_insert(x,y) db_batch_insert(x,y)
++#define sql_batch_lock_path_query       my_sqlite_batch_lock_query
++#define sql_batch_lock_filename_query   my_sqlite_batch_lock_query
++#define sql_batch_unlock_tables_query   my_sqlite_batch_unlock_query
++#define sql_batch_fill_filename_query   my_sqlite_batch_fill_filename_query
++#define sql_batch_fill_path_query       my_sqlite_batch_fill_path_query
+ /* In cats/sqlite.c */
+ void       my_sqlite_free_table(B_DB *mdb);
+@@ -298,6 +316,10 @@
+ int        my_sqlite_query(B_DB *mdb, const char *cmd);
+ void       my_sqlite_field_seek(B_DB *mdb, int field);
+ SQL_FIELD *my_sqlite_fetch_field(B_DB *mdb);
++extern char* my_sqlite_batch_lock_query;
++extern char* my_sqlite_batch_unlock_query;
++extern char* my_sqlite_batch_fill_filename_query;
++extern char* my_sqlite_batch_fill_path_query;
+ #else
+@@ -341,6 +363,7 @@
+    POOLMEM *fname;                    /* Filename only */
+    POOLMEM *path;                     /* Path only */
+    POOLMEM *esc_name;                 /* Escaped file/path name */
++   POOLMEM *esc_name2;                /* Escaped file/path name */
+    int fnl;                           /* file name length */
+    int pnl;                           /* path name length */
+ };
+@@ -362,9 +385,25 @@
+ #define sql_field_seek(x, y)  mysql_field_seek((x)->result, (y))
+ #define sql_fetch_field(x)    mysql_fetch_field((x)->result)
+ #define sql_num_fields(x)     (int)mysql_num_fields((x)->result)
++#define sql_batch_start(x)    db_batch_start(x)
++#define sql_batch_end(x,y)    db_batch_end(x,y)
++#define sql_batch_insert(x,y) db_batch_insert(x,y)
++#define sql_batch_lock_path_query       my_mysql_batch_lock_path_query
++#define sql_batch_lock_filename_query   my_mysql_batch_lock_filename_query
++#define sql_batch_unlock_tables_query   my_mysql_batch_unlock_tables_query
++#define sql_batch_fill_filename_query   my_mysql_batch_fill_filename_query
++#define sql_batch_fill_path_query       my_mysql_batch_fill_path_query
+ #define SQL_ROW               MYSQL_ROW
+ #define SQL_FIELD             MYSQL_FIELD
++
++int my_mysql_batch_start(B_DB *mdb);
++extern char* my_mysql_batch_lock_path_query;
++extern char* my_mysql_batch_lock_filename_query;
++extern char* my_mysql_batch_unlock_tables_query;
++extern char* my_mysql_batch_fill_filename_query;
++extern char* my_mysql_batch_fill_path_query;
++
+ #else
+ #ifdef HAVE_POSTGRESQL
+@@ -425,6 +464,7 @@
+    POOLMEM *fname;                /* Filename only */
+    POOLMEM *path;                 /* Path only */
+    POOLMEM *esc_name;             /* Escaped file/path name */
++   POOLMEM *esc_name2;            /* Escaped file/path name */
+    int fnl;                       /* file name length */
+    int pnl;                       /* path name length */
+ };     
+@@ -436,7 +476,19 @@
+ int                my_postgresql_currval    (B_DB *mdb, char *table_name);
+ void               my_postgresql_field_seek (B_DB *mdb, int row);
+ POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb);
+-
++int my_postgresql_lock_table(B_DB *mdb, const char *table);
++int my_postgresql_unlock_table(B_DB *mdb);
++int my_postgresql_batch_start(B_DB *mdb);
++int my_postgresql_batch_end(B_DB *mdb, const char *error);
++typedef struct ATTR_DBR ATTR_DBR;
++int my_postgresql_batch_insert(B_DB *mdb, ATTR_DBR *ar);
++char *my_postgresql_copy_escape(char *dest, char *src, size_t len);
++
++extern char* my_pg_batch_lock_path_query;
++extern char* my_pg_batch_lock_filename_query;
++extern char* my_pg_batch_unlock_tables_query;
++extern char* my_pg_batch_fill_filename_query;
++extern char* my_pg_batch_fill_path_query;
+ /* "Generic" names for easier conversion */
+ #define sql_store_result(x)   ((x)->result)
+@@ -452,6 +504,17 @@
+ #define sql_field_seek(x, y)  my_postgresql_field_seek((x), (y))
+ #define sql_fetch_field(x)    my_postgresql_fetch_field(x)
+ #define sql_num_fields(x)     ((x)->num_fields)
++#define sql_batch_start(x)    my_postgresql_batch_start(x)
++#define sql_batch_end(x,y)    my_postgresql_batch_end(x,y)
++#define sql_batch_insert(x,y) my_postgresql_batch_insert(x,y)
++#define sql_lock_table(x,y)   my_postgresql_lock_table(x, y)
++#define sql_unlock_table(x,y) my_postgresql_unlock_table(x)
++#define sql_batch_lock_path_query       my_pg_batch_lock_path_query
++#define sql_batch_lock_filename_query   my_pg_batch_lock_filename_query
++#define sql_batch_unlock_tables_query   my_pg_batch_unlock_tables_query
++#define sql_batch_fill_filename_query   my_pg_batch_fill_filename_query
++#define sql_batch_fill_path_query       my_pg_batch_fill_path_query
++
+ #define SQL_ROW               POSTGRESQL_ROW
+ #define SQL_FIELD             POSTGRESQL_FIELD
+diff -Naur cvs/src/cats/mysql.c my/src/cats/mysql.c
+--- cvs/src/cats/mysql.c       2006-12-09 14:41:50.000000000 +0100
++++ my/src/cats/mysql.c        2006-12-16 19:18:17.000000000 +0100
+@@ -121,6 +121,7 @@
+    mdb->fname = get_pool_memory(PM_FNAME);
+    mdb->path = get_pool_memory(PM_FNAME);
+    mdb->esc_name = get_pool_memory(PM_FNAME);
++   mdb->esc_name2 = get_pool_memory(PM_FNAME);
+    qinsert(&db_list, &mdb->bq);            /* put db in list */
+    V(mutex);
+    return mdb;
+@@ -231,6 +232,7 @@
+       free_pool_memory(mdb->fname);
+       free_pool_memory(mdb->path);
+       free_pool_memory(mdb->esc_name);
++      free_pool_memory(mdb->esc_name2);
+       if (mdb->db_name) {
+          free(mdb->db_name);
+       }
+@@ -372,4 +374,34 @@
+ }
++char *my_mysql_batch_lock_path_query = "LOCK TABLES Path write,     " 
++                                     "            batch write,    " 
++                                     "            Path as p write ";
++
++
++char *my_mysql_batch_lock_filename_query = "LOCK TABLES Filename write,     "
++                                           "            batch write,        "
++                                           "            Filename as f write ";
++
++char *my_mysql_batch_unlock_tables_query = "UNLOCK TABLES";
++
++char *my_mysql_batch_fill_path_query = "INSERT IGNORE INTO Path (Path) "
++                                       " SELECT a.Path FROM            " 
++                                       "  (SELECT DISTINCT Path        "
++                                       "     FROM batch) AS a          " 
++                                       " WHERE NOT EXISTS              "
++                                       "  (SELECT Path                 "
++                                       "     FROM Path AS p            "
++                                       "    WHERE p.Path = a.Path)     ";     
++
++char *my_mysql_batch_fill_filename_query = "INSERT IGNORE INTO Filename (Name)"
++                                           "  SELECT a.Name FROM              " 
++                                           "   (SELECT DISTINCT Name          "
++                                           "      FROM batch) AS a            " 
++                                           "  WHERE NOT EXISTS                "
++                                           "   (SELECT Name                   "
++                                           "      FROM Filename AS f          "
++                                           "      WHERE f.Name = a.Name)      ";
++
+ #endif /* HAVE_MYSQL */
++
+diff -Naur cvs/src/cats/postgresql.c my/src/cats/postgresql.c
+--- cvs/src/cats/postgresql.c  2006-12-06 15:11:53.000000000 +0100
++++ my/src/cats/postgresql.c   2006-12-14 20:28:28.000000000 +0100
+@@ -124,6 +124,7 @@
+    mdb->fname          = get_pool_memory(PM_FNAME);
+    mdb->path           = get_pool_memory(PM_FNAME);
+    mdb->esc_name       = get_pool_memory(PM_FNAME);
++   mdb->esc_name2      = get_pool_memory(PM_FNAME);
+    mdb->allow_transactions = mult_db_connections;
+    qinsert(&db_list, &mdb->bq);            /* put db in list */
+    V(mutex);
+@@ -228,6 +229,7 @@
+       free_pool_memory(mdb->fname);
+       free_pool_memory(mdb->path);
+       free_pool_memory(mdb->esc_name);
++      free_pool_memory(mdb->esc_name2);
+       if (mdb->db_name) {
+          free(mdb->db_name);
+       }
+@@ -538,5 +540,201 @@
+    return id;
+ }
++int my_postgresql_lock_table(B_DB *mdb, const char *table)
++{
++   my_postgresql_query(mdb, "BEGIN");
++   Mmsg(mdb->cmd, "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table);
++   return my_postgresql_query(mdb, mdb->cmd);
++}
++
++int my_postgresql_unlock_table(B_DB *mdb)
++{
++   return my_postgresql_query(mdb, "COMMIT");
++}
++
++int my_postgresql_batch_start(B_DB *mdb)
++{
++   Dmsg0(500, "my_postgresql_batch_start started\n");
++
++   if (my_postgresql_query(mdb,
++                         " CREATE TEMPORARY TABLE batch "
++                         "        (fileindex int,       "
++                         "        jobid int,            "
++                         "        path varchar,         "
++                         "        name varchar,         "
++                         "        lstat varchar,        "
++                         "        md5 varchar)") == 1)
++   {
++      Dmsg0(500, "my_postgresql_batch_start failed\n");
++      return 1;
++   }
++   
++   // We are starting a new query.  reset everything.
++   mdb->num_rows     = -1;
++   mdb->row_number   = -1;
++   mdb->field_number = -1;
++
++   if (mdb->result != NULL) {
++      my_postgresql_free_result(mdb);
++   }
++
++   mdb->result = PQexec(mdb->db, "COPY batch FROM STDIN");
++   mdb->status = PQresultStatus(mdb->result);
++   if (mdb->status == PGRES_COPY_IN) {
++      // how many fields in the set?
++      mdb->num_fields = (int) PQnfields(mdb->result);
++      mdb->num_rows   = 0;
++      mdb->status = 0;
++   } else {
++      Dmsg0(500, "we failed\n");
++      mdb->status = 1;
++   }
++
++   Dmsg0(500, "my_postgresql_batch_start finishing\n");
++
++   return mdb->status;
++}
++
++/* set error to something to abort operation */
++int my_postgresql_batch_end(B_DB *mdb, const char *error)
++{
++   int res;
++   int count=30;
++   Dmsg0(500, "my_postgresql_batch_end started\n");
++
++   if (!mdb) {                        /* no files ? */
++      return 0;
++   }
++
++   do { 
++      res = PQputCopyEnd(mdb->db, error);
++   } while (res == 0 && --count > 0);
++
++   if (res == 1) {
++      Dmsg0(500, "ok\n");
++      mdb->status = 0;
++   }
++   
++   if (res <= 0) {
++      Dmsg0(500, "we failed\n");
++      mdb->status = 1;
++      Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
++   }
++   
++   Dmsg0(500, "my_postgresql_batch_end finishing\n");
++
++   return mdb->status;
++}
++
++int my_postgresql_batch_insert(B_DB *mdb, ATTR_DBR *ar)
++{
++   int res;
++   int count=30;
++   size_t len;
++   char *digest;
++
++   mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
++   my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
++
++   mdb->esc_name2 = check_pool_memory_size(mdb->esc_name2, mdb->pnl*2+1);
++   my_postgresql_copy_escape(mdb->esc_name2, mdb->path, mdb->pnl);
++
++   if (ar->Digest == NULL || ar->Digest[0] == 0) {
++      digest = "0";
++   } else {
++      digest = ar->Digest;
++   }
++
++   len = Mmsg(mdb->cmd, "%u\t%u\t%s\t%s\t%s\t%s\n", 
++            ar->FileIndex, ar->JobId, mdb->path, 
++            mdb->fname, ar->attr, digest);
++
++   do { 
++      res = PQputCopyData(mdb->db,
++                        mdb->cmd,
++                        len);
++   } while (res == 0 && --count > 0);
++
++   if (res == 1) {
++      Dmsg0(500, "ok\n");
++      mdb->changes++;
++      mdb->status = 0;
++   }
++
++   if (res <= 0) {
++      Dmsg0(500, "we failed\n");
++      mdb->status = 1;
++      Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
++   }
++
++   Dmsg0(500, "my_postgresql_batch_insert finishing\n");
++
++   return mdb->status;
++}
++
++/*
++ * Escape strings so that PostgreSQL is happy on COPY
++ *
++ *   NOTE! len is the length of the old string. Your new
++ *         string must be long enough (max 2*old+1) to hold
++ *         the escaped output.
++ */
++char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
++{
++   /* we have to escape \t, \n, \r, \ */
++   char c = '\0' ;
++
++   while (len > 0 && *src) {
++      switch (*src) {
++      case '\n':
++       c = 'n';
++       break;
++      case '\\':
++       c = '\\';
++       break;
++      case '\t':
++       c = 't';
++       break;
++      case '\r':
++       c = 'r';
++       break;
++      default:
++       c = '\0' ;
++      }
++
++      if (c) {
++       *dest = '\\';
++       dest++;
++       *dest = c;
++      } else {
++       *dest = *src;
++      }
++
++      len--;
++      src++;
++      dest++;
++   }
++
++   *dest = '\0';
++   return dest;
++}
++
++char *my_pg_batch_lock_path_query = "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
++
++
++char *my_pg_batch_lock_filename_query = "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
++
++char *my_pg_batch_unlock_tables_query = "COMMIT";
++
++char *my_pg_batch_fill_path_query = "INSERT INTO Path (Path)                                    "
++                                    "  SELECT a.Path FROM                                       "
++                                    "      (SELECT DISTINCT Path FROM batch) AS a               "
++                                    "  WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
++
++char *my_pg_batch_fill_filename_query = "INSERT INTO Filename (Name)        "
++                                        "  SELECT a.Name FROM               "
++                                        "    (SELECT DISTINCT Name FROM batch) as a "
++                                        "    WHERE NOT EXISTS               "
++                                        "      (SELECT Name FROM Filename WHERE Name = a.Name)";
+ #endif /* HAVE_POSTGRESQL */
+diff -Naur cvs/src/cats/protos.h my/src/cats/protos.h
+--- cvs/src/cats/protos.h      2006-12-06 15:11:53.000000000 +0100
++++ my/src/cats/protos.h       2006-12-13 19:03:46.000000000 +0100
+@@ -67,6 +67,10 @@
+ bool db_create_device_record(JCR *jcr, B_DB *mdb, DEVICE_DBR *dr);
+ bool db_create_storage_record(JCR *jcr, B_DB *mdb, STORAGE_DBR *sr);
+ bool db_create_mediatype_record(JCR *jcr, B_DB *mdb, MEDIATYPE_DBR *mr);
++int db_create_batch_file_record(JCR *jcr);
++int db_batch_start(B_DB *mdb);
++int db_batch_end(B_DB *mdb, const char *error);
++int db_batch_insert(B_DB *mdb, ATTR_DBR *ar);
+ /* delete.c */
+ int db_delete_pool_record(JCR *jcr, B_DB *db, POOL_DBR *pool_dbr);
+diff -Naur cvs/src/cats/sql_create.c my/src/cats/sql_create.c
+--- cvs/src/cats/sql_create.c  2006-12-06 15:11:53.000000000 +0100
++++ my/src/cats/sql_create.c   2006-12-14 22:06:41.000000000 +0100
+@@ -664,9 +664,207 @@
+  *  };
+  */
++/*  All db_batch_* functions are used to do bulk batch insert in File/Filename/Path
++ *  tables. This code can be activated by adding "#define HAVE_BATCH_FILE_INSERT 1"
++ *  in baconfig.h
++ *  
++ *  To sum up :
++ *   - bulk load a temp table
++ *   - insert missing filenames into filename with a single query (lock filenames 
++ *   - table before that to avoid possible duplicate inserts with concurrent update)
++ *   - insert missing paths into path with another single query
++ *   - then insert the join between the temp, filename and path tables into file.
++ */
++
++int db_batch_start(B_DB *mdb)
++{
++   return sql_query(mdb,
++             " CREATE TEMPORARY TABLE batch "
++             "        (fileindex integer,   "
++             "        jobid integer,        "
++             "        path blob,            "
++             "        name blob,            "
++             "        lstat tinyblob,       "
++             "        md5 tinyblob)         ");
++}
++
++int db_batch_insert(B_DB *mdb, ATTR_DBR *ar)
++{
++   size_t len;
++   char *digest;
++
++   mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
++   db_escape_string(mdb->esc_name, mdb->fname, mdb->fnl);
++
++   mdb->esc_name2 = check_pool_memory_size(mdb->esc_name2, mdb->pnl*2+1);
++   db_escape_string(mdb->esc_name2, mdb->path, mdb->pnl);
++
++   if (ar->Digest == NULL || ar->Digest[0] == 0) {
++      digest = "0";
++   } else {
++      digest = ar->Digest;
++   }
++
++   len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%u,'%s','%s','%s','%s')",
++              ar->FileIndex, ar->JobId, mdb->path, 
++              mdb->fname, ar->attr, digest);
++
++   sql_query(mdb, mdb->cmd);
++
++   return mdb->status;
++}
++
++/* set error to something to abort operation */
++int db_batch_end(B_DB *mdb, const char *error)
++{
++   
++   Dmsg0(50, "db_batch_end started");
++
++   if (mdb) {
++      mdb->status = 0;
++      return mdb->status;
++   }
++   return 0;
++}
++
++int db_create_batch_file_record(JCR *jcr)
++{
++   Dmsg0(50,"db_create_file_record : no files");
++
++   if (!jcr->db_batch) {         /* no files to backup ? */
++      Dmsg0(50,"db_create_file_record : no files\n");
++      return 0;
++   }
++
++   if (sql_batch_end(jcr->db_batch, NULL)) {
++      Jmsg(jcr, M_FATAL, 0, "Bad batch end %s\n", jcr->db_batch->errmsg);
++      return 1;
++   }
++
++   /* we have to lock tables */
++   if (sql_query(jcr->db_batch, sql_batch_lock_path_query))
++   {
++      Jmsg(jcr, M_FATAL, 0, "Can't lock Path table %s\n", jcr->db_batch->errmsg);
++      return 1;
++   }
++
++   if (sql_query(jcr->db_batch, sql_batch_fill_path_query))
++   {
++      Jmsg(jcr, M_FATAL, 0, "Can't fill Path table %s\n",jcr->db_batch->errmsg);
++      sql_query(jcr->db_batch, sql_batch_unlock_tables_query);
++      return 1;
++   }
++   
++   if (sql_query(jcr->db_batch, sql_batch_unlock_tables_query))
++   {
++      Jmsg(jcr, M_FATAL, 0, "Can't unlock Path table %s\n", jcr->db_batch->errmsg);
++      return 1;      
++   }
++
++   /* we have to lock tables */
++   if (sql_query(jcr->db_batch, sql_batch_lock_filename_query))
++   {
++      Jmsg(jcr, M_FATAL, 0, "Can't lock Filename table %s\n", jcr->db_batch->errmsg);
++      return 1;
++   }
++   
++   if (sql_query(jcr->db_batch, sql_batch_fill_filename_query))
++   {
++      Jmsg(jcr,M_FATAL,0,"Can't fill Filename table %s\n",jcr->db_batch->errmsg);
++      sql_query(jcr->db_batch, sql_batch_unlock_tables_query);
++      return 1;            
++   }
++
++   if (sql_query(jcr->db_batch, sql_batch_unlock_tables_query)) {
++      Jmsg(jcr, M_FATAL, 0, "Can't unlock Filename table %s\n", jcr->db_batch->errmsg);
++      return 1;
++   }
++   
++   if (sql_query(jcr->db_batch, 
++       " INSERT INTO File (FileIndex, JobId, PathId, FilenameId, LStat, MD5)"
++       "  SELECT batch.FileIndex, batch.JobId, Path.PathId,               " 
++       "         Filename.FilenameId,batch.LStat, batch.MD5               "
++       "  FROM batch                                                      "
++       "    JOIN Path ON (batch.Path = Path.Path)                         "
++       "    JOIN Filename ON (batch.Name = Filename.Name)                 "))
++   {
++      Jmsg(jcr, M_FATAL, 0, "Can't fill File table %s\n", jcr->db_batch->errmsg);
++      return 1;
++   }
++
++   sql_query(jcr->db_batch, "DROP TABLE batch");
++
++   return 0;
++}
++
++#ifdef HAVE_BATCH_FILE_INSERT
++/*
++ * Create File record in B_DB
++ *
++ *  In order to reduce database size, we store the File attributes,
++ *  the FileName, and the Path separately.  In principle, there
++ *  is a single FileName record and a single Path record, no matter
++ *  how many times it occurs.  This is this subroutine, we separate
++ *  the file and the path and fill temporary tables with this three records.
++ */
++int db_create_file_attributes_record(JCR *jcr, B_DB *_mdb, ATTR_DBR *ar)
++{
++
++   Dmsg1(dbglevel, "Fname=%s\n", ar->fname);
++   Dmsg0(dbglevel, "put_file_into_catalog\n");
++
++   if (!jcr->db_batch) {
++      jcr->db_batch = db_init_database(jcr, 
++                                      jcr->db->db_name, 
++                                      jcr->db->db_user,
++                                      jcr->db->db_password, 
++                                      jcr->db->db_address,
++                                      jcr->db->db_port,
++                                      jcr->db->db_socket,
++                                      1 /* multi_db = true */);
++
++      if (!jcr->db_batch || !db_open_database(jcr, jcr->db_batch)) {
++         Jmsg(jcr, M_FATAL, 0, _("Could not open database \"%s\".\n"),
++              jcr->db->db_name);
++         if (jcr->db_batch) {
++            Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db_batch));
++         }
++       return 0;
++      }      
++      
++      sql_batch_start(jcr->db_batch);
++   }
++
++   B_DB *mdb = jcr->db_batch;
++
++   /*
++    * Make sure we have an acceptable attributes record.
++    */
++   if (!(ar->Stream == STREAM_UNIX_ATTRIBUTES ||
++         ar->Stream == STREAM_UNIX_ATTRIBUTES_EX)) {
++      Mmsg1(&mdb->errmsg, _("Attempt to put non-attributes into catalog. Stream=%d\n"),
++         ar->Stream);
++      Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
++      return 0;
++   }
++
++   split_path_and_file(jcr, mdb, ar->fname);
+ /*
++   if (jcr->changes > 100000) {
++      sql_batch_end(mdb, NULL);
++      sql_batch_start(mdb);
++      jcr->changes = 0;
++   }
++*/
++
++   return (sql_batch_insert(mdb, ar) == 0);
++}
++
++#else  /* ! HAVE_BATCH_FILE_INSERT */
++
++/*
+  * Create File record in B_DB
+  *
+  *  In order to reduce database size, we store the File attributes,
+@@ -721,6 +919,8 @@
+    return 0;
+ }
++#endif /* ! HAVE_BATCH_FILE_INSERT */
++
+ /*
+  * This is the master File entry containing the attributes.
+  *  The filename and path records have already been created.
+diff -Naur cvs/src/cats/sqlite.c my/src/cats/sqlite.c
+--- cvs/src/cats/sqlite.c      2006-12-06 15:11:53.000000000 +0100
++++ my/src/cats/sqlite.c       2006-12-14 22:30:35.000000000 +0100
+@@ -108,6 +108,7 @@
+    mdb->fname = get_pool_memory(PM_FNAME);
+    mdb->path = get_pool_memory(PM_FNAME);
+    mdb->esc_name = get_pool_memory(PM_FNAME);
++   mdb->esc_name2 = get_pool_memory(PM_FNAME);
+    mdb->allow_transactions = mult_db_connections;
+    qinsert(&db_list, &mdb->bq);            /* put db in list */
+    V(mutex);
+@@ -213,6 +214,7 @@
+       free_pool_memory(mdb->fname);
+       free_pool_memory(mdb->path);
+       free_pool_memory(mdb->esc_name);
++      free_pool_memory(mdb->esc_name2);
+       if (mdb->db_name) {
+          free(mdb->db_name);
+       }
+@@ -433,4 +435,16 @@
+    return mdb->fields[mdb->field++];
+ }
++char *my_sqlite_batch_lock_query = "BEGIN";
++char *my_sqlite_batch_unlock_query = "COMMIT";
++char *my_sqlite_batch_fill_path_query = "INSERT INTO Path (Path)          " 
++                                        " SELECT DISTINCT Path FROM batch "
++                                        " EXCEPT SELECT Path FROM Path    ";
++
++char *my_sqlite_batch_fill_filename_query = "INSERT INTO Filename (Name)       " 
++                                            " SELECT DISTINCT Name FROM batch  "
++                                            " EXCEPT SELECT Name FROM Filename ";
++
++
++
+ #endif /* HAVE_SQLITE */
+diff -Naur cvs/src/dird/backup.c my/src/dird/backup.c
+--- cvs/src/dird/backup.c      2006-12-13 11:57:52.000000000 +0100
++++ my/src/dird/backup.c       2006-12-13 19:03:46.000000000 +0100
+@@ -233,6 +233,9 @@
+    /* Pickup Job termination data */
+    stat = wait_for_job_termination(jcr);
++#ifdef HAVE_BATCH_FILE_INSERT
++   db_create_batch_file_record(jcr);  /* used by bulk batch file insert */
++#endif
+    if (stat == JS_Terminated) {
+       backup_cleanup(jcr, stat);
+       return true;
+diff -Naur cvs/src/dird/jobq.c my/src/dird/jobq.c
+--- cvs/src/dird/jobq.c        2006-11-24 11:29:37.000000000 +0100
++++ my/src/dird/jobq.c 2006-12-13 19:03:46.000000000 +0100
+@@ -563,6 +563,10 @@
+             db_close_database(jcr, jcr->db);
+             jcr->db = NULL;
+          }
++         if (jcr->db_batch) {
++            db_close_database(jcr, jcr->db_batch);
++            jcr->db_batch = NULL;
++         }
+          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
+          jcr->SDJobStatus = 0;
+          V(jq->mutex);                /* release internal lock */
+diff -Naur cvs/src/jcr.h my/src/jcr.h
+--- cvs/src/jcr.h      2006-12-12 21:03:36.000000000 +0100
++++ my/src/jcr.h       2006-12-13 19:03:46.000000000 +0100
+@@ -184,6 +184,7 @@
+    bool cached_attribute;             /* set if attribute is cached */
+    POOLMEM *attr;                     /* Attribute string from SD */
+    B_DB *db;                          /* database pointer */
++   B_DB *db_batch;                    /* database pointer for batch insert */
+    ATTR_DBR *ar;                      /* DB attribute record */
+    /* Daemon specific part of JCR */