From: Eric Bollengier Date: Tue, 19 Dec 2006 21:19:38 +0000 (+0000) Subject: ebl this build with bacula-1.39.30 and 2006-12-19 cvs X-Git-Tag: Release-7.0.0~7217 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=d0744b5e6ee13be90deb7c3ee0ec5f402f7501bd;p=bacula%2Fbacula ebl this build with bacula-1.39.30 and 2006-12-19 cvs git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@3816 91ce42f0-d328-0410-95d8-f526ca767f89 --- diff --git a/bacula/patches/ebl/README.batch-insert b/bacula/patches/ebl/README.batch-insert new file mode 100644 index 0000000000..3c9de4ca70 --- /dev/null +++ b/bacula/patches/ebl/README.batch-insert @@ -0,0 +1,26 @@ +This patch allow you to use Batch insert mode with all database. +It have tested this with postgresql 8.1 and mysql 4.1. + +To use it, you MUST change some indexes. + +mysql) + +ALTER TABLE Filename DROP INDEX Name; +ALTER TABLE Filename ADD UNIQUE (Name(255)); + +ALTER TABLE Path DROP INDEX Path; +ALTER TABLE Path ADD UNIQUE (Path(255)); + +postgresql) + +DROP INDEX filename_name_idx; +CREATE UNIQUE INDEX filename_name_idx on filename (name); + +DROP INDEX path_name_idx; +CREATE UNIQUE INDEX path_name_idx on path (path); + + +$Log$ +Revision 1.1 2006/12/19 21:19:38 ricozz +ebl this build with bacula-1.39.30 and 2006-12-19 cvs + diff --git a/bacula/patches/ebl/batch-insert.patch b/bacula/patches/ebl/batch-insert.patch new file mode 100644 index 0000000000..19c9fe911a --- /dev/null +++ b/bacula/patches/ebl/batch-insert.patch @@ -0,0 +1,739 @@ +diff -Naur cvs/src/cats/cats.h my/src/cats/cats.h +--- cvs/src/cats/cats.h 2006-12-06 15:11:53.000000000 +0100 ++++ my/src/cats/cats.h 2006-12-14 21:11:40.000000000 +0100 +@@ -141,6 +141,7 @@ + POOLMEM *fname; /* Filename only */ + POOLMEM *path; /* Path only */ + POOLMEM *esc_name; /* Escaped file/path name */ ++ POOLMEM *esc_name2; /* Escaped file/path name */ + int fnl; /* file name length */ + int pnl; /* path name length */ + }; +@@ -170,8 +171,14 @@ + #define sql_fetch_field(x) my_sqlite_fetch_field(x) + #define sql_num_fields(x) ((x)->ncolumn) + #define SQL_ROW char** +- +- ++#define sql_batch_start(x) db_batch_start(x) ++#define sql_batch_end(x,y) db_batch_end(x,y) ++#define sql_batch_insert(x,y) db_batch_insert(x,y) ++#define sql_batch_lock_path_query my_sqlite_batch_lock_query ++#define sql_batch_lock_filename_query my_sqlite_batch_lock_query ++#define sql_batch_unlock_tables_query my_sqlite_batch_unlock_query ++#define sql_batch_fill_filename_query my_sqlite_batch_fill_filename_query ++#define sql_batch_fill_path_query my_sqlite_batch_fill_path_query + + /* In cats/sqlite.c */ + void my_sqlite_free_table(B_DB *mdb); +@@ -179,6 +186,10 @@ + int my_sqlite_query(B_DB *mdb, const char *cmd); + void my_sqlite_field_seek(B_DB *mdb, int field); + SQL_FIELD *my_sqlite_fetch_field(B_DB *mdb); ++extern char* my_sqlite_batch_lock_query; ++extern char* my_sqlite_batch_unlock_query; ++extern char* my_sqlite_batch_fill_filename_query; ++extern char* my_sqlite_batch_fill_path_query; + + + #else +@@ -249,6 +260,7 @@ + POOLMEM *fname; /* Filename only */ + POOLMEM *path; /* Path only */ + POOLMEM *esc_name; /* Escaped file/path name */ ++ POOLMEM *esc_name2; /* Escaped file/path name */ + int fnl; /* file name length */ + int pnl; /* path name length */ + }; +@@ -289,8 +301,14 @@ + #define sql_fetch_field(x) my_sqlite_fetch_field(x) + #define sql_num_fields(x) ((x)->ncolumn) + #define SQL_ROW char** +- +- ++#define sql_batch_start(x) db_batch_start(x) ++#define sql_batch_end(x,y) db_batch_end(x,y) ++#define sql_batch_insert(x,y) db_batch_insert(x,y) ++#define sql_batch_lock_path_query my_sqlite_batch_lock_query ++#define sql_batch_lock_filename_query my_sqlite_batch_lock_query ++#define sql_batch_unlock_tables_query my_sqlite_batch_unlock_query ++#define sql_batch_fill_filename_query my_sqlite_batch_fill_filename_query ++#define sql_batch_fill_path_query my_sqlite_batch_fill_path_query + + /* In cats/sqlite.c */ + void my_sqlite_free_table(B_DB *mdb); +@@ -298,6 +316,10 @@ + int my_sqlite_query(B_DB *mdb, const char *cmd); + void my_sqlite_field_seek(B_DB *mdb, int field); + SQL_FIELD *my_sqlite_fetch_field(B_DB *mdb); ++extern char* my_sqlite_batch_lock_query; ++extern char* my_sqlite_batch_unlock_query; ++extern char* my_sqlite_batch_fill_filename_query; ++extern char* my_sqlite_batch_fill_path_query; + + + #else +@@ -341,6 +363,7 @@ + POOLMEM *fname; /* Filename only */ + POOLMEM *path; /* Path only */ + POOLMEM *esc_name; /* Escaped file/path name */ ++ POOLMEM *esc_name2; /* Escaped file/path name */ + int fnl; /* file name length */ + int pnl; /* path name length */ + }; +@@ -362,9 +385,25 @@ + #define sql_field_seek(x, y) mysql_field_seek((x)->result, (y)) + #define sql_fetch_field(x) mysql_fetch_field((x)->result) + #define sql_num_fields(x) (int)mysql_num_fields((x)->result) ++#define sql_batch_start(x) db_batch_start(x) ++#define sql_batch_end(x,y) db_batch_end(x,y) ++#define sql_batch_insert(x,y) db_batch_insert(x,y) ++#define sql_batch_lock_path_query my_mysql_batch_lock_path_query ++#define sql_batch_lock_filename_query my_mysql_batch_lock_filename_query ++#define sql_batch_unlock_tables_query my_mysql_batch_unlock_tables_query ++#define sql_batch_fill_filename_query my_mysql_batch_fill_filename_query ++#define sql_batch_fill_path_query my_mysql_batch_fill_path_query + #define SQL_ROW MYSQL_ROW + #define SQL_FIELD MYSQL_FIELD + ++ ++int my_mysql_batch_start(B_DB *mdb); ++extern char* my_mysql_batch_lock_path_query; ++extern char* my_mysql_batch_lock_filename_query; ++extern char* my_mysql_batch_unlock_tables_query; ++extern char* my_mysql_batch_fill_filename_query; ++extern char* my_mysql_batch_fill_path_query; ++ + #else + + #ifdef HAVE_POSTGRESQL +@@ -425,6 +464,7 @@ + POOLMEM *fname; /* Filename only */ + POOLMEM *path; /* Path only */ + POOLMEM *esc_name; /* Escaped file/path name */ ++ POOLMEM *esc_name2; /* Escaped file/path name */ + int fnl; /* file name length */ + int pnl; /* path name length */ + }; +@@ -436,7 +476,19 @@ + int my_postgresql_currval (B_DB *mdb, char *table_name); + void my_postgresql_field_seek (B_DB *mdb, int row); + POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb); +- ++int my_postgresql_lock_table(B_DB *mdb, const char *table); ++int my_postgresql_unlock_table(B_DB *mdb); ++int my_postgresql_batch_start(B_DB *mdb); ++int my_postgresql_batch_end(B_DB *mdb, const char *error); ++typedef struct ATTR_DBR ATTR_DBR; ++int my_postgresql_batch_insert(B_DB *mdb, ATTR_DBR *ar); ++char *my_postgresql_copy_escape(char *dest, char *src, size_t len); ++ ++extern char* my_pg_batch_lock_path_query; ++extern char* my_pg_batch_lock_filename_query; ++extern char* my_pg_batch_unlock_tables_query; ++extern char* my_pg_batch_fill_filename_query; ++extern char* my_pg_batch_fill_path_query; + + /* "Generic" names for easier conversion */ + #define sql_store_result(x) ((x)->result) +@@ -452,6 +504,17 @@ + #define sql_field_seek(x, y) my_postgresql_field_seek((x), (y)) + #define sql_fetch_field(x) my_postgresql_fetch_field(x) + #define sql_num_fields(x) ((x)->num_fields) ++#define sql_batch_start(x) my_postgresql_batch_start(x) ++#define sql_batch_end(x,y) my_postgresql_batch_end(x,y) ++#define sql_batch_insert(x,y) my_postgresql_batch_insert(x,y) ++#define sql_lock_table(x,y) my_postgresql_lock_table(x, y) ++#define sql_unlock_table(x,y) my_postgresql_unlock_table(x) ++#define sql_batch_lock_path_query my_pg_batch_lock_path_query ++#define sql_batch_lock_filename_query my_pg_batch_lock_filename_query ++#define sql_batch_unlock_tables_query my_pg_batch_unlock_tables_query ++#define sql_batch_fill_filename_query my_pg_batch_fill_filename_query ++#define sql_batch_fill_path_query my_pg_batch_fill_path_query ++ + #define SQL_ROW POSTGRESQL_ROW + #define SQL_FIELD POSTGRESQL_FIELD + +diff -Naur cvs/src/cats/mysql.c my/src/cats/mysql.c +--- cvs/src/cats/mysql.c 2006-12-09 14:41:50.000000000 +0100 ++++ my/src/cats/mysql.c 2006-12-16 19:18:17.000000000 +0100 +@@ -121,6 +121,7 @@ + mdb->fname = get_pool_memory(PM_FNAME); + mdb->path = get_pool_memory(PM_FNAME); + mdb->esc_name = get_pool_memory(PM_FNAME); ++ mdb->esc_name2 = get_pool_memory(PM_FNAME); + qinsert(&db_list, &mdb->bq); /* put db in list */ + V(mutex); + return mdb; +@@ -231,6 +232,7 @@ + free_pool_memory(mdb->fname); + free_pool_memory(mdb->path); + free_pool_memory(mdb->esc_name); ++ free_pool_memory(mdb->esc_name2); + if (mdb->db_name) { + free(mdb->db_name); + } +@@ -372,4 +374,34 @@ + + } + ++char *my_mysql_batch_lock_path_query = "LOCK TABLES Path write, " ++ " batch write, " ++ " Path as p write "; ++ ++ ++char *my_mysql_batch_lock_filename_query = "LOCK TABLES Filename write, " ++ " batch write, " ++ " Filename as f write "; ++ ++char *my_mysql_batch_unlock_tables_query = "UNLOCK TABLES"; ++ ++char *my_mysql_batch_fill_path_query = "INSERT IGNORE INTO Path (Path) " ++ " SELECT a.Path FROM " ++ " (SELECT DISTINCT Path " ++ " FROM batch) AS a " ++ " WHERE NOT EXISTS " ++ " (SELECT Path " ++ " FROM Path AS p " ++ " WHERE p.Path = a.Path) "; ++ ++char *my_mysql_batch_fill_filename_query = "INSERT IGNORE INTO Filename (Name)" ++ " SELECT a.Name FROM " ++ " (SELECT DISTINCT Name " ++ " FROM batch) AS a " ++ " WHERE NOT EXISTS " ++ " (SELECT Name " ++ " FROM Filename AS f " ++ " WHERE f.Name = a.Name) "; ++ + #endif /* HAVE_MYSQL */ ++ +diff -Naur cvs/src/cats/postgresql.c my/src/cats/postgresql.c +--- cvs/src/cats/postgresql.c 2006-12-06 15:11:53.000000000 +0100 ++++ my/src/cats/postgresql.c 2006-12-14 20:28:28.000000000 +0100 +@@ -124,6 +124,7 @@ + mdb->fname = get_pool_memory(PM_FNAME); + mdb->path = get_pool_memory(PM_FNAME); + mdb->esc_name = get_pool_memory(PM_FNAME); ++ mdb->esc_name2 = get_pool_memory(PM_FNAME); + mdb->allow_transactions = mult_db_connections; + qinsert(&db_list, &mdb->bq); /* put db in list */ + V(mutex); +@@ -228,6 +229,7 @@ + free_pool_memory(mdb->fname); + free_pool_memory(mdb->path); + free_pool_memory(mdb->esc_name); ++ free_pool_memory(mdb->esc_name2); + if (mdb->db_name) { + free(mdb->db_name); + } +@@ -538,5 +540,201 @@ + return id; + } + ++int my_postgresql_lock_table(B_DB *mdb, const char *table) ++{ ++ my_postgresql_query(mdb, "BEGIN"); ++ Mmsg(mdb->cmd, "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table); ++ return my_postgresql_query(mdb, mdb->cmd); ++} ++ ++int my_postgresql_unlock_table(B_DB *mdb) ++{ ++ return my_postgresql_query(mdb, "COMMIT"); ++} ++ ++int my_postgresql_batch_start(B_DB *mdb) ++{ ++ Dmsg0(500, "my_postgresql_batch_start started\n"); ++ ++ if (my_postgresql_query(mdb, ++ " CREATE TEMPORARY TABLE batch " ++ " (fileindex int, " ++ " jobid int, " ++ " path varchar, " ++ " name varchar, " ++ " lstat varchar, " ++ " md5 varchar)") == 1) ++ { ++ Dmsg0(500, "my_postgresql_batch_start failed\n"); ++ return 1; ++ } ++ ++ // We are starting a new query. reset everything. ++ mdb->num_rows = -1; ++ mdb->row_number = -1; ++ mdb->field_number = -1; ++ ++ if (mdb->result != NULL) { ++ my_postgresql_free_result(mdb); ++ } ++ ++ mdb->result = PQexec(mdb->db, "COPY batch FROM STDIN"); ++ mdb->status = PQresultStatus(mdb->result); ++ if (mdb->status == PGRES_COPY_IN) { ++ // how many fields in the set? ++ mdb->num_fields = (int) PQnfields(mdb->result); ++ mdb->num_rows = 0; ++ mdb->status = 0; ++ } else { ++ Dmsg0(500, "we failed\n"); ++ mdb->status = 1; ++ } ++ ++ Dmsg0(500, "my_postgresql_batch_start finishing\n"); ++ ++ return mdb->status; ++} ++ ++/* set error to something to abort operation */ ++int my_postgresql_batch_end(B_DB *mdb, const char *error) ++{ ++ int res; ++ int count=30; ++ Dmsg0(500, "my_postgresql_batch_end started\n"); ++ ++ if (!mdb) { /* no files ? */ ++ return 0; ++ } ++ ++ do { ++ res = PQputCopyEnd(mdb->db, error); ++ } while (res == 0 && --count > 0); ++ ++ if (res == 1) { ++ Dmsg0(500, "ok\n"); ++ mdb->status = 0; ++ } ++ ++ if (res <= 0) { ++ Dmsg0(500, "we failed\n"); ++ mdb->status = 1; ++ Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db)); ++ } ++ ++ Dmsg0(500, "my_postgresql_batch_end finishing\n"); ++ ++ return mdb->status; ++} ++ ++int my_postgresql_batch_insert(B_DB *mdb, ATTR_DBR *ar) ++{ ++ int res; ++ int count=30; ++ size_t len; ++ char *digest; ++ ++ mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1); ++ my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl); ++ ++ mdb->esc_name2 = check_pool_memory_size(mdb->esc_name2, mdb->pnl*2+1); ++ my_postgresql_copy_escape(mdb->esc_name2, mdb->path, mdb->pnl); ++ ++ if (ar->Digest == NULL || ar->Digest[0] == 0) { ++ digest = "0"; ++ } else { ++ digest = ar->Digest; ++ } ++ ++ len = Mmsg(mdb->cmd, "%u\t%u\t%s\t%s\t%s\t%s\n", ++ ar->FileIndex, ar->JobId, mdb->path, ++ mdb->fname, ar->attr, digest); ++ ++ do { ++ res = PQputCopyData(mdb->db, ++ mdb->cmd, ++ len); ++ } while (res == 0 && --count > 0); ++ ++ if (res == 1) { ++ Dmsg0(500, "ok\n"); ++ mdb->changes++; ++ mdb->status = 0; ++ } ++ ++ if (res <= 0) { ++ Dmsg0(500, "we failed\n"); ++ mdb->status = 1; ++ Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db)); ++ } ++ ++ Dmsg0(500, "my_postgresql_batch_insert finishing\n"); ++ ++ return mdb->status; ++} ++ ++/* ++ * Escape strings so that PostgreSQL is happy on COPY ++ * ++ * NOTE! len is the length of the old string. Your new ++ * string must be long enough (max 2*old+1) to hold ++ * the escaped output. ++ */ ++char *my_postgresql_copy_escape(char *dest, char *src, size_t len) ++{ ++ /* we have to escape \t, \n, \r, \ */ ++ char c = '\0' ; ++ ++ while (len > 0 && *src) { ++ switch (*src) { ++ case '\n': ++ c = 'n'; ++ break; ++ case '\\': ++ c = '\\'; ++ break; ++ case '\t': ++ c = 't'; ++ break; ++ case '\r': ++ c = 'r'; ++ break; ++ default: ++ c = '\0' ; ++ } ++ ++ if (c) { ++ *dest = '\\'; ++ dest++; ++ *dest = c; ++ } else { ++ *dest = *src; ++ } ++ ++ len--; ++ src++; ++ dest++; ++ } ++ ++ *dest = '\0'; ++ return dest; ++} ++ ++char *my_pg_batch_lock_path_query = "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE"; ++ ++ ++char *my_pg_batch_lock_filename_query = "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE"; ++ ++char *my_pg_batch_unlock_tables_query = "COMMIT"; ++ ++char *my_pg_batch_fill_path_query = "INSERT INTO Path (Path) " ++ " SELECT a.Path FROM " ++ " (SELECT DISTINCT Path FROM batch) AS a " ++ " WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "; ++ + ++char *my_pg_batch_fill_filename_query = "INSERT INTO Filename (Name) " ++ " SELECT a.Name FROM " ++ " (SELECT DISTINCT Name FROM batch) as a " ++ " WHERE NOT EXISTS " ++ " (SELECT Name FROM Filename WHERE Name = a.Name)"; + #endif /* HAVE_POSTGRESQL */ +diff -Naur cvs/src/cats/protos.h my/src/cats/protos.h +--- cvs/src/cats/protos.h 2006-12-06 15:11:53.000000000 +0100 ++++ my/src/cats/protos.h 2006-12-13 19:03:46.000000000 +0100 +@@ -67,6 +67,10 @@ + bool db_create_device_record(JCR *jcr, B_DB *mdb, DEVICE_DBR *dr); + bool db_create_storage_record(JCR *jcr, B_DB *mdb, STORAGE_DBR *sr); + bool db_create_mediatype_record(JCR *jcr, B_DB *mdb, MEDIATYPE_DBR *mr); ++int db_create_batch_file_record(JCR *jcr); ++int db_batch_start(B_DB *mdb); ++int db_batch_end(B_DB *mdb, const char *error); ++int db_batch_insert(B_DB *mdb, ATTR_DBR *ar); + + /* delete.c */ + int db_delete_pool_record(JCR *jcr, B_DB *db, POOL_DBR *pool_dbr); +diff -Naur cvs/src/cats/sql_create.c my/src/cats/sql_create.c +--- cvs/src/cats/sql_create.c 2006-12-06 15:11:53.000000000 +0100 ++++ my/src/cats/sql_create.c 2006-12-14 22:06:41.000000000 +0100 +@@ -664,9 +664,207 @@ + * }; + */ + ++/* All db_batch_* functions are used to do bulk batch insert in File/Filename/Path ++ * tables. This code can be activated by adding "#define HAVE_BATCH_FILE_INSERT 1" ++ * in baconfig.h ++ * ++ * To sum up : ++ * - bulk load a temp table ++ * - insert missing filenames into filename with a single query (lock filenames ++ * - table before that to avoid possible duplicate inserts with concurrent update) ++ * - insert missing paths into path with another single query ++ * - then insert the join between the temp, filename and path tables into file. ++ */ ++ ++int db_batch_start(B_DB *mdb) ++{ ++ return sql_query(mdb, ++ " CREATE TEMPORARY TABLE batch " ++ " (fileindex integer, " ++ " jobid integer, " ++ " path blob, " ++ " name blob, " ++ " lstat tinyblob, " ++ " md5 tinyblob) "); ++} ++ ++int db_batch_insert(B_DB *mdb, ATTR_DBR *ar) ++{ ++ size_t len; ++ char *digest; ++ ++ mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1); ++ db_escape_string(mdb->esc_name, mdb->fname, mdb->fnl); ++ ++ mdb->esc_name2 = check_pool_memory_size(mdb->esc_name2, mdb->pnl*2+1); ++ db_escape_string(mdb->esc_name2, mdb->path, mdb->pnl); ++ ++ if (ar->Digest == NULL || ar->Digest[0] == 0) { ++ digest = "0"; ++ } else { ++ digest = ar->Digest; ++ } ++ ++ len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%u,'%s','%s','%s','%s')", ++ ar->FileIndex, ar->JobId, mdb->path, ++ mdb->fname, ar->attr, digest); ++ ++ sql_query(mdb, mdb->cmd); ++ ++ return mdb->status; ++} ++ ++/* set error to something to abort operation */ ++int db_batch_end(B_DB *mdb, const char *error) ++{ ++ ++ Dmsg0(50, "db_batch_end started"); ++ ++ if (mdb) { ++ mdb->status = 0; ++ return mdb->status; ++ } ++ return 0; ++} ++ ++int db_create_batch_file_record(JCR *jcr) ++{ ++ Dmsg0(50,"db_create_file_record : no files"); ++ ++ if (!jcr->db_batch) { /* no files to backup ? */ ++ Dmsg0(50,"db_create_file_record : no files\n"); ++ return 0; ++ } ++ ++ if (sql_batch_end(jcr->db_batch, NULL)) { ++ Jmsg(jcr, M_FATAL, 0, "Bad batch end %s\n", jcr->db_batch->errmsg); ++ return 1; ++ } ++ ++ /* we have to lock tables */ ++ if (sql_query(jcr->db_batch, sql_batch_lock_path_query)) ++ { ++ Jmsg(jcr, M_FATAL, 0, "Can't lock Path table %s\n", jcr->db_batch->errmsg); ++ return 1; ++ } ++ ++ if (sql_query(jcr->db_batch, sql_batch_fill_path_query)) ++ { ++ Jmsg(jcr, M_FATAL, 0, "Can't fill Path table %s\n",jcr->db_batch->errmsg); ++ sql_query(jcr->db_batch, sql_batch_unlock_tables_query); ++ return 1; ++ } ++ ++ if (sql_query(jcr->db_batch, sql_batch_unlock_tables_query)) ++ { ++ Jmsg(jcr, M_FATAL, 0, "Can't unlock Path table %s\n", jcr->db_batch->errmsg); ++ return 1; ++ } ++ ++ /* we have to lock tables */ ++ if (sql_query(jcr->db_batch, sql_batch_lock_filename_query)) ++ { ++ Jmsg(jcr, M_FATAL, 0, "Can't lock Filename table %s\n", jcr->db_batch->errmsg); ++ return 1; ++ } ++ ++ if (sql_query(jcr->db_batch, sql_batch_fill_filename_query)) ++ { ++ Jmsg(jcr,M_FATAL,0,"Can't fill Filename table %s\n",jcr->db_batch->errmsg); ++ sql_query(jcr->db_batch, sql_batch_unlock_tables_query); ++ return 1; ++ } ++ ++ if (sql_query(jcr->db_batch, sql_batch_unlock_tables_query)) { ++ Jmsg(jcr, M_FATAL, 0, "Can't unlock Filename table %s\n", jcr->db_batch->errmsg); ++ return 1; ++ } ++ ++ if (sql_query(jcr->db_batch, ++ " INSERT INTO File (FileIndex, JobId, PathId, FilenameId, LStat, MD5)" ++ " SELECT batch.FileIndex, batch.JobId, Path.PathId, " ++ " Filename.FilenameId,batch.LStat, batch.MD5 " ++ " FROM batch " ++ " JOIN Path ON (batch.Path = Path.Path) " ++ " JOIN Filename ON (batch.Name = Filename.Name) ")) ++ { ++ Jmsg(jcr, M_FATAL, 0, "Can't fill File table %s\n", jcr->db_batch->errmsg); ++ return 1; ++ } ++ ++ sql_query(jcr->db_batch, "DROP TABLE batch"); ++ ++ return 0; ++} ++ ++#ifdef HAVE_BATCH_FILE_INSERT ++/* ++ * Create File record in B_DB ++ * ++ * In order to reduce database size, we store the File attributes, ++ * the FileName, and the Path separately. In principle, there ++ * is a single FileName record and a single Path record, no matter ++ * how many times it occurs. This is this subroutine, we separate ++ * the file and the path and fill temporary tables with this three records. ++ */ ++int db_create_file_attributes_record(JCR *jcr, B_DB *_mdb, ATTR_DBR *ar) ++{ ++ ++ Dmsg1(dbglevel, "Fname=%s\n", ar->fname); ++ Dmsg0(dbglevel, "put_file_into_catalog\n"); ++ ++ if (!jcr->db_batch) { ++ jcr->db_batch = db_init_database(jcr, ++ jcr->db->db_name, ++ jcr->db->db_user, ++ jcr->db->db_password, ++ jcr->db->db_address, ++ jcr->db->db_port, ++ jcr->db->db_socket, ++ 1 /* multi_db = true */); ++ ++ if (!jcr->db_batch || !db_open_database(jcr, jcr->db_batch)) { ++ Jmsg(jcr, M_FATAL, 0, _("Could not open database \"%s\".\n"), ++ jcr->db->db_name); ++ if (jcr->db_batch) { ++ Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db_batch)); ++ } ++ return 0; ++ } ++ ++ sql_batch_start(jcr->db_batch); ++ } ++ ++ B_DB *mdb = jcr->db_batch; ++ ++ /* ++ * Make sure we have an acceptable attributes record. ++ */ ++ if (!(ar->Stream == STREAM_UNIX_ATTRIBUTES || ++ ar->Stream == STREAM_UNIX_ATTRIBUTES_EX)) { ++ Mmsg1(&mdb->errmsg, _("Attempt to put non-attributes into catalog. Stream=%d\n"), ++ ar->Stream); ++ Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg); ++ return 0; ++ } ++ ++ split_path_and_file(jcr, mdb, ar->fname); + + + /* ++ if (jcr->changes > 100000) { ++ sql_batch_end(mdb, NULL); ++ sql_batch_start(mdb); ++ jcr->changes = 0; ++ } ++*/ ++ ++ return (sql_batch_insert(mdb, ar) == 0); ++} ++ ++#else /* ! HAVE_BATCH_FILE_INSERT */ ++ ++/* + * Create File record in B_DB + * + * In order to reduce database size, we store the File attributes, +@@ -721,6 +919,8 @@ + return 0; + } + ++#endif /* ! HAVE_BATCH_FILE_INSERT */ ++ + /* + * This is the master File entry containing the attributes. + * The filename and path records have already been created. +diff -Naur cvs/src/cats/sqlite.c my/src/cats/sqlite.c +--- cvs/src/cats/sqlite.c 2006-12-06 15:11:53.000000000 +0100 ++++ my/src/cats/sqlite.c 2006-12-14 22:30:35.000000000 +0100 +@@ -108,6 +108,7 @@ + mdb->fname = get_pool_memory(PM_FNAME); + mdb->path = get_pool_memory(PM_FNAME); + mdb->esc_name = get_pool_memory(PM_FNAME); ++ mdb->esc_name2 = get_pool_memory(PM_FNAME); + mdb->allow_transactions = mult_db_connections; + qinsert(&db_list, &mdb->bq); /* put db in list */ + V(mutex); +@@ -213,6 +214,7 @@ + free_pool_memory(mdb->fname); + free_pool_memory(mdb->path); + free_pool_memory(mdb->esc_name); ++ free_pool_memory(mdb->esc_name2); + if (mdb->db_name) { + free(mdb->db_name); + } +@@ -433,4 +435,16 @@ + return mdb->fields[mdb->field++]; + } + ++char *my_sqlite_batch_lock_query = "BEGIN"; ++char *my_sqlite_batch_unlock_query = "COMMIT"; ++char *my_sqlite_batch_fill_path_query = "INSERT INTO Path (Path) " ++ " SELECT DISTINCT Path FROM batch " ++ " EXCEPT SELECT Path FROM Path "; ++ ++char *my_sqlite_batch_fill_filename_query = "INSERT INTO Filename (Name) " ++ " SELECT DISTINCT Name FROM batch " ++ " EXCEPT SELECT Name FROM Filename "; ++ ++ ++ + #endif /* HAVE_SQLITE */ +diff -Naur cvs/src/dird/backup.c my/src/dird/backup.c +--- cvs/src/dird/backup.c 2006-12-13 11:57:52.000000000 +0100 ++++ my/src/dird/backup.c 2006-12-13 19:03:46.000000000 +0100 +@@ -233,6 +233,9 @@ + + /* Pickup Job termination data */ + stat = wait_for_job_termination(jcr); ++#ifdef HAVE_BATCH_FILE_INSERT ++ db_create_batch_file_record(jcr); /* used by bulk batch file insert */ ++#endif + if (stat == JS_Terminated) { + backup_cleanup(jcr, stat); + return true; +diff -Naur cvs/src/dird/jobq.c my/src/dird/jobq.c +--- cvs/src/dird/jobq.c 2006-11-24 11:29:37.000000000 +0100 ++++ my/src/dird/jobq.c 2006-12-13 19:03:46.000000000 +0100 +@@ -563,6 +563,10 @@ + db_close_database(jcr, jcr->db); + jcr->db = NULL; + } ++ if (jcr->db_batch) { ++ db_close_database(jcr, jcr->db_batch); ++ jcr->db_batch = NULL; ++ } + Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count()); + jcr->SDJobStatus = 0; + V(jq->mutex); /* release internal lock */ +diff -Naur cvs/src/jcr.h my/src/jcr.h +--- cvs/src/jcr.h 2006-12-12 21:03:36.000000000 +0100 ++++ my/src/jcr.h 2006-12-13 19:03:46.000000000 +0100 +@@ -184,6 +184,7 @@ + bool cached_attribute; /* set if attribute is cached */ + POOLMEM *attr; /* Attribute string from SD */ + B_DB *db; /* database pointer */ ++ B_DB *db_batch; /* database pointer for batch insert */ + ATTR_DBR *ar; /* DB attribute record */ + + /* Daemon specific part of JCR */