]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/cats/sql_create.c
stop copy/migration using basejobs
[bacula/bacula] / bacula / src / cats / sql_create.c
index d8cca379fe4fac0b1a8172ffbe4b3a2dbbec692f..bc960664a9d1615d521c78e51da139e1bc52ff0a 100644 (file)
@@ -1,3 +1,30 @@
+/*
+   Bacula® - The Network Backup Solution
+
+   Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
+
+   The main author of Bacula is Kern Sibbald, with contributions from
+   many others, a complete list can be found in the file AUTHORS.
+   This program is Free Software; you can redistribute it and/or
+   modify it under the terms of version two of the GNU General Public
+   License as published by the Free Software Foundation and included
+   in the file LICENSE.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA.
+
+   Bacula® is a registered trademark of Kern Sibbald.
+   The licensor of Bacula is the Free Software Foundation Europe
+   (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
+   Switzerland, email:ftf@fsfeurope.org.
+*/
 /*
  * Bacula Catalog Database Create record interface routines
  *
@@ -5,20 +32,6 @@
  *
  *    Version $Id$
  */
-/*
-   Copyright (C) 2000-2005 Kern Sibbald
-
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License
-   version 2 as amended with additional clauses defined in the
-   file LICENSE in the main source directory.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
-   the file LICENSE for additional details.
-
- */
 
 /* The following is necessary so that we do not include
  * the dummy external definition of DB.
@@ -28,7 +41,9 @@
 #include "bacula.h"
 #include "cats.h"
 
-#if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL
+static const int dbglevel = 10;
+
+#if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_DBI
 
 /* -----------------------------------------------------------------------
  *
  */
 
 /* Forward referenced subroutines */
+#ifndef HAVE_BATCH_FILE_INSERT
 static int db_create_file_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar);
 static int db_create_filename_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar);
 static int db_create_path_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar);
-
-
-/* Imported subroutines */
-extern void print_dashes(B_DB *mdb);
-extern void print_result(B_DB *mdb);
-extern int QueryDB(const char *file, int line, JCR *jcr, B_DB *db, char *select_cmd);
-extern int InsertDB(const char *file, int line, JCR *jcr, B_DB *db, char *select_cmd);
-extern int UpdateDB(const char *file, int line, JCR *jcr, B_DB *db, char *update_cmd);
-extern void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname);
+#endif /* HAVE_BATCH_FILE_INSERT */
 
 
 /* Create a new record for the Job
@@ -64,23 +72,24 @@ db_create_job_record(JCR *jcr, B_DB *mdb, JOB_DBR *jr)
    struct tm tm;
    bool ok;
    utime_t JobTDate;
-   char ed1[30];
+   char ed1[30],ed2[30];
 
    db_lock(mdb);
 
    stime = jr->SchedTime;
    ASSERT(stime != 0);
 
-   localtime_r(&stime, &tm);
-   strftime(dt, sizeof(dt), "%Y-%m-%d %T", &tm);
+   (void)localtime_r(&stime, &tm);
+   strftime(dt, sizeof(dt), "%Y-%m-%d %H:%M:%S", &tm);
    JobTDate = (utime_t)stime;
 
    /* Must create it */
    Mmsg(mdb->cmd,
-"INSERT INTO Job (Job,Name,Type,Level,JobStatus,SchedTime,JobTDate) VALUES "
-"('%s','%s','%c','%c','%c','%s',%s)",
+"INSERT INTO Job (Job,Name,Type,Level,JobStatus,SchedTime,JobTDate,ClientId) "
+"VALUES ('%s','%s','%c','%c','%c','%s',%s,%s)",
            jr->Job, jr->Name, (char)(jr->JobType), (char)(jr->JobLevel),
-           (char)(jr->JobStatus), dt, edit_uint64(JobTDate, ed1));
+           (char)(jr->JobStatus), dt, edit_uint64(JobTDate, ed1),
+           edit_int64(jr->ClientId, ed2));
 
    if (!INSERT_DB(jcr, mdb, mdb->cmd)) {
       Mmsg2(&mdb->errmsg, _("Create DB Job record %s failed. ERR=%s\n"),
@@ -95,55 +104,6 @@ db_create_job_record(JCR *jcr, B_DB *mdb, JOB_DBR *jr)
    return ok;
 }
 
-/* Create a new migration, archive, copy
- * Returns: false on failure
- *          true  on success
- */
-bool
-db_create_mac_record(JCR *jcr, B_DB *mdb, MAC_DBR *mr)
-{
-   char schedt[MAX_TIME_LENGTH], sdt[MAX_TIME_LENGTH], edt[MAX_TIME_LENGTH];
-   time_t stime;
-   struct tm tm;
-   bool ok;
-   utime_t JobTDate;
-   char ed1[30], ed2[30];
-
-   db_lock(mdb);
-
-   stime = mr->SchedTime;
-   ASSERT(stime != 0);
-
-   localtime_r(&stime, &tm);
-   strftime(schedt, sizeof(schedt), "%Y-%m-%d %T", &tm);
-   JobTDate = (utime_t)stime;
-   localtime_r(&mr->StartTime, &tm);
-   strftime(sdt, sizeof(sdt), "%Y-%m-%d %T", &tm);
-   localtime_r(&mr->EndTime, &tm);
-   strftime(edt, sizeof(edt), "%Y-%m-%d %T", &tm);
-
-   /* Must create it */
-   Mmsg(mdb->cmd,
-"INSERT INTO MAC (OriginaJobId,JobType,JobLevel,SchedTime,"
-"StartTime,EndTime,JobTDate) VALUES "
-"('%s','%c','%c','%s','%s','%s',%s)",
-           edit_int64(mr->OriginalJobId, ed1),
-           (char)(mr->JobType), (char)(mr->JobLevel),
-           schedt, sdt, edt, edit_uint64(JobTDate, ed2));
-
-   if (!INSERT_DB(jcr, mdb, mdb->cmd)) {
-      Mmsg2(&mdb->errmsg, _("Create DB MAC record %s failed. ERR=%s\n"),
-            mdb->cmd, sql_strerror(mdb));
-      mr->JobId = 0;
-      ok = false;
-   } else {
-      mr->JobId = sql_insert_id(mdb, NT_("Job"));
-      ok = true;
-   }
-   db_unlock(mdb);
-   return ok;
-}
-
 
 /* Create a JobMedia record for medium used this job
  * Returns: false on failure
@@ -152,7 +112,7 @@ db_create_mac_record(JCR *jcr, B_DB *mdb, MAC_DBR *mr)
 bool
 db_create_jobmedia_record(JCR *jcr, B_DB *mdb, JOBMEDIA_DBR *jm)
 {
-   bool ok = true;;
+   bool ok = true;
    int count;
    char ed1[50], ed2[50];
 
@@ -167,15 +127,19 @@ db_create_jobmedia_record(JCR *jcr, B_DB *mdb, JOBMEDIA_DBR *jm)
    }
    count++;
 
+   /* Note, jm->Strip is not used and is not likely to be used
+    * in the near future, so I have removed it from the insert
+    * to save space in the DB. KES June 2006.
+    */
    Mmsg(mdb->cmd,
         "INSERT INTO JobMedia (JobId,MediaId,FirstIndex,LastIndex,"
-        "StartFile,EndFile,StartBlock,EndBlock,VolIndex,Copy,Stripe) "
-        "VALUES (%s,%s,%u,%u,%u,%u,%u,%u,%u,%u,%u)",
+        "StartFile,EndFile,StartBlock,EndBlock,VolIndex,Copy) "
+        "VALUES (%s,%s,%u,%u,%u,%u,%u,%u,%u,%u)",
         edit_int64(jm->JobId, ed1),
         edit_int64(jm->MediaId, ed2),
         jm->FirstIndex, jm->LastIndex,
         jm->StartFile, jm->EndFile, jm->StartBlock, jm->EndBlock,count,
-        jm->Copy, jm->Stripe);
+        jm->Copy);
 
    Dmsg0(300, mdb->cmd);
    if (!INSERT_DB(jcr, mdb, mdb->cmd)) {
@@ -198,8 +162,6 @@ db_create_jobmedia_record(JCR *jcr, B_DB *mdb, JOBMEDIA_DBR *jm)
    return ok;
 }
 
-
-
 /* Create Unique Pool record
  * Returns: false on failure
  *          true  on success
@@ -208,7 +170,7 @@ bool
 db_create_pool_record(JCR *jcr, B_DB *mdb, POOL_DBR *pr)
 {
    bool stat;        
-   char ed1[30], ed2[30], ed3[50];
+   char ed1[30], ed2[30], ed3[50], ed4[50], ed5[50];
 
    Dmsg0(200, "In create pool\n");
    db_lock(mdb);
@@ -230,8 +192,9 @@ db_create_pool_record(JCR *jcr, B_DB *mdb, POOL_DBR *pr)
    Mmsg(mdb->cmd,
 "INSERT INTO Pool (Name,NumVols,MaxVols,UseOnce,UseCatalog,"
 "AcceptAnyVolume,AutoPrune,Recycle,VolRetention,VolUseDuration,"
-"MaxVolJobs,MaxVolFiles,MaxVolBytes,PoolType,LabelType,LabelFormat) "
-"VALUES ('%s',%u,%u,%d,%d,%d,%d,%d,%s,%s,%u,%u,%s,'%s',%d,'%s')",
+"MaxVolJobs,MaxVolFiles,MaxVolBytes,PoolType,LabelType,LabelFormat,"
+"RecyclePoolId,ScratchPoolId) "
+"VALUES ('%s',%u,%u,%d,%d,%d,%d,%d,%s,%s,%u,%u,%s,'%s',%d,'%s',%s,%s)",
                   pr->Name,
                   pr->NumVols, pr->MaxVols,
                   pr->UseOnce, pr->UseCatalog,
@@ -241,7 +204,9 @@ db_create_pool_record(JCR *jcr, B_DB *mdb, POOL_DBR *pr)
                   edit_uint64(pr->VolUseDuration, ed2),
                   pr->MaxVolJobs, pr->MaxVolFiles,
                   edit_uint64(pr->MaxVolBytes, ed3),
-                  pr->PoolType, pr->LabelType, pr->LabelFormat);
+                  pr->PoolType, pr->LabelType, pr->LabelFormat,
+                  edit_int64(pr->RecyclePoolId,ed4),
+                  edit_int64(pr->ScratchPoolId,ed5));
    Dmsg1(200, "Create Pool: %s\n", mdb->cmd);
    if (!INSERT_DB(jcr, mdb, mdb->cmd)) {
       Mmsg2(&mdb->errmsg, _("Create db Pool record %s failed: ERR=%s\n"),
@@ -421,6 +386,7 @@ db_create_media_record(JCR *jcr, B_DB *mdb, MEDIA_DBR *mr)
 {
    int stat;
    char ed1[50], ed2[50], ed3[50], ed4[50], ed5[50], ed6[50], ed7[50], ed8[50];
+   char ed9[50], ed10[50], ed11[50], ed12[50];
    struct tm tm;
 
    db_lock(mdb);
@@ -444,8 +410,10 @@ db_create_media_record(JCR *jcr, B_DB *mdb, MEDIA_DBR *mr)
 "INSERT INTO Media (VolumeName,MediaType,MediaTypeId,PoolId,MaxVolBytes,"
 "VolCapacityBytes,Recycle,VolRetention,VolUseDuration,MaxVolJobs,MaxVolFiles,"
 "VolStatus,Slot,VolBytes,InChanger,VolReadTime,VolWriteTime,VolParts,"
-"EndFile,EndBlock,LabelType,StorageId,DeviceId,LocationId) "
-"VALUES ('%s','%s',0,%u,%s,%s,%d,%s,%s,%u,%u,'%s',%d,%s,%d,%s,%s,%d,0,0,%d,%s,0,0)",
+"EndFile,EndBlock,LabelType,StorageId,DeviceId,LocationId,"
+"ScratchPoolId,RecyclePoolId,Enabled)"
+"VALUES ('%s','%s',0,%u,%s,%s,%d,%s,%s,%u,%u,'%s',%d,%s,%d,%s,%s,%d,0,0,%d,%s,"
+"%s,%s,%s,%s,%d)",
           mr->VolumeName,
           mr->MediaType, mr->PoolId,
           edit_uint64(mr->MaxVolBytes,ed1),
@@ -459,11 +427,16 @@ db_create_media_record(JCR *jcr, B_DB *mdb, MEDIA_DBR *mr)
           mr->Slot,
           edit_uint64(mr->VolBytes, ed5),
           mr->InChanger,
-          edit_uint64(mr->VolReadTime, ed6),
-          edit_uint64(mr->VolWriteTime, ed7),
+          edit_int64(mr->VolReadTime, ed6),
+          edit_int64(mr->VolWriteTime, ed7),
           mr->VolParts,
           mr->LabelType,
-          edit_int64(mr->StorageId, ed8) 
+          edit_int64(mr->StorageId, ed8), 
+          edit_int64(mr->DeviceId, ed9), 
+          edit_int64(mr->LocationId, ed10), 
+          edit_int64(mr->ScratchPoolId, ed11), 
+          edit_int64(mr->RecyclePoolId, ed12), 
+          mr->Enabled
           );
 
 
@@ -480,20 +453,19 @@ db_create_media_record(JCR *jcr, B_DB *mdb, MEDIA_DBR *mr)
          if (mr->LabelDate == 0) {
             mr->LabelDate = time(NULL);
          }
-         localtime_r(&mr->LabelDate, &tm);
-         strftime(dt, sizeof(dt), "%Y-%m-%d %T", &tm);
+         (void)localtime_r(&mr->LabelDate, &tm);
+         strftime(dt, sizeof(dt), "%Y-%m-%d %H:%M:%S", &tm);
          Mmsg(mdb->cmd, "UPDATE Media SET LabelDate='%s' "
               "WHERE MediaId=%d", dt, mr->MediaId);
          stat = UPDATE_DB(jcr, mdb, mdb->cmd);
       }
+      /*
+       * Make sure that if InChanger is non-zero any other identical slot
+       *   has InChanger zero.
+       */
+      db_make_inchanger_unique(jcr, mdb, mr);
    }
 
-   /*
-    * Make sure that if InChanger is non-zero any other identical slot
-    *   has InChanger zero.
-    */
-   db_make_inchanger_unique(jcr, mdb, mr);
-
    db_unlock(mdb);
    return stat;
 }
@@ -652,8 +624,8 @@ bool db_create_fileset_record(JCR *jcr, B_DB *mdb, FILESET_DBR *fsr)
    if (fsr->CreateTime == 0 && fsr->cCreateTime[0] == 0) {
       fsr->CreateTime = time(NULL);
    }
-   localtime_r(&fsr->CreateTime, &tm);
-   strftime(fsr->cCreateTime, sizeof(fsr->cCreateTime), "%Y-%m-%d %T", &tm);
+   (void)localtime_r(&fsr->CreateTime, &tm);
+   strftime(fsr->cCreateTime, sizeof(fsr->cCreateTime), "%Y-%m-%d %H:%M:%S", &tm);
 
    /* Must create it */
       Mmsg(mdb->cmd, "INSERT INTO FileSet (FileSet,MD5,CreateTime) "
@@ -695,7 +667,170 @@ bool db_create_fileset_record(JCR *jcr, B_DB *mdb, FILESET_DBR *fsr)
  *  };
  */
 
+#ifdef HAVE_BATCH_FILE_INSERT
+
+/*  All sql_batch_* functions are used to do bulk batch insert in File/Filename/Path
+ *  tables. This code can be activated by adding "#define HAVE_BATCH_FILE_INSERT 1"
+ *  in baconfig.h
+ *  
+ *  To sum up :
+ *   - bulk load a temp table
+ *   - insert missing filenames into filename with a single query (lock filenames 
+ *   - table before that to avoid possible duplicate inserts with concurrent update)
+ *   - insert missing paths into path with another single query
+ *   - then insert the join between the temp, filename and path tables into file.
+ */
+
+/* 
+ * Returns 1 if OK
+ *         0 if failed
+ */
+bool my_batch_start(JCR *jcr, B_DB *mdb)
+{
+   bool ok;
+
+   db_lock(mdb);
+   ok =  db_sql_query(mdb,
+             "CREATE TEMPORARY TABLE batch ("
+                "FileIndex integer,"
+                "JobId integer,"
+                "Path blob,"
+                "Name blob,"
+                "LStat tinyblob,"
+                "MD5 tinyblob)",NULL, NULL);
+   db_unlock(mdb);
+   return ok;
+}
 
+/* 
+ * Returns 1 if OK
+ *         0 if failed
+ */
+bool my_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
+{
+   size_t len;
+   const char *digest;
+   char ed1[50];
+
+   mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
+   db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
+
+   mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
+   db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
+
+   if (ar->Digest == NULL || ar->Digest[0] == 0) {
+      digest = "0";
+   } else {
+      digest = ar->Digest;
+   }
+
+   len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
+              ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path, 
+              mdb->esc_name, ar->attr, digest);
+
+   return INSERT_DB(jcr, mdb, mdb->cmd);
+}
+
+/* set error to something to abort operation */
+/* 
+ * Returns 1 if OK
+ *         0 if failed
+ */
+bool my_batch_end(JCR *jcr, B_DB *mdb, const char *error)
+{
+   
+   Dmsg0(50, "sql_batch_end started\n");
+
+   if (mdb) {
+#ifdef HAVE_DBI 
+      mdb->status = (dbi_error_flag)0;
+#else
+      mdb->status = 0;
+#endif
+   }
+   return true;
+}
+
+/* 
+ * Returns 1 if OK
+ *         0 if failed
+ */
+bool db_write_batch_file_records(JCR *jcr)
+{
+   int JobStatus = jcr->JobStatus;
+
+   if (!jcr->batch_started) {         /* no files to backup ? */
+      Dmsg0(50,"db_create_file_record : no files\n");
+      return true;
+   }
+   if (job_canceled(jcr)) {
+      return false;
+   }
+
+   Dmsg1(50,"db_create_file_record changes=%u\n",jcr->db_batch->changes);
+
+   jcr->JobStatus = JS_AttrInserting;
+   if (!sql_batch_end(jcr, jcr->db_batch, NULL)) {
+      Jmsg1(jcr, M_FATAL, 0, "Batch end %s\n", jcr->db_batch->errmsg);
+      return false;
+   }
+   if (job_canceled(jcr)) {
+      return false;
+   }
+
+
+   /* we have to lock tables */
+   if (!db_sql_query(jcr->db_batch, sql_batch_lock_path_query, NULL, NULL)) {
+      Jmsg1(jcr, M_FATAL, 0, "Lock Path table %s\n", jcr->db_batch->errmsg);
+      return false;
+   }
+
+   if (!db_sql_query(jcr->db_batch, sql_batch_fill_path_query, NULL, NULL)) {
+      Jmsg1(jcr, M_FATAL, 0, "Fill Path table %s\n",jcr->db_batch->errmsg);
+      db_sql_query(jcr->db_batch, sql_batch_unlock_tables_query, NULL, NULL);
+      return false;
+   }
+   
+   if (!db_sql_query(jcr->db_batch, sql_batch_unlock_tables_query,NULL,NULL)) {
+      Jmsg1(jcr, M_FATAL, 0, "Unlock Path table %s\n", jcr->db_batch->errmsg);
+      return false;      
+   }
+
+   /* we have to lock tables */
+   if (!db_sql_query(jcr->db_batch,sql_batch_lock_filename_query,NULL, NULL)) {
+      Jmsg1(jcr, M_FATAL, 0, "Lock Filename table %s\n", jcr->db_batch->errmsg);
+      return false;
+   }
+   
+   if (!db_sql_query(jcr->db_batch,sql_batch_fill_filename_query, NULL,NULL)) {
+      Jmsg1(jcr,M_FATAL,0,"Fill Filename table %s\n",jcr->db_batch->errmsg);
+      db_sql_query(jcr->db_batch, sql_batch_unlock_tables_query, NULL, NULL);
+      return false;            
+   }
+
+   if (!db_sql_query(jcr->db_batch, sql_batch_unlock_tables_query,NULL,NULL)) {
+      Jmsg1(jcr, M_FATAL, 0, "Unlock Filename table %s\n", jcr->db_batch->errmsg);
+      return false;
+   }
+   
+   if (!db_sql_query(jcr->db_batch, 
+       "INSERT INTO File (FileIndex, JobId, PathId, FilenameId, LStat, MD5)"
+         "SELECT batch.FileIndex, batch.JobId, Path.PathId, "
+                "Filename.FilenameId,batch.LStat, batch.MD5 "
+           "FROM batch "
+           "JOIN Path ON (batch.Path = Path.Path) "
+           "JOIN Filename ON (batch.Name = Filename.Name)",
+                     NULL,NULL))
+   {
+      Jmsg1(jcr, M_FATAL, 0, "Fill File table %s\n", jcr->db_batch->errmsg);
+      return false;
+   }
+
+   db_sql_query(jcr->db_batch, "DROP TABLE batch", NULL,NULL);
+
+   jcr->JobStatus = JobStatus;         /* reset entry status */
+   return true;
+}
 
 /*
  * Create File record in B_DB
@@ -704,14 +839,72 @@ bool db_create_fileset_record(JCR *jcr, B_DB *mdb, FILESET_DBR *fsr)
  *  the FileName, and the Path separately.  In principle, there
  *  is a single FileName record and a single Path record, no matter
  *  how many times it occurs.  This is this subroutine, we separate
- *  the file and the path and create three database records.
+ *  the file and the path and fill temporary tables with this three records.
  */
-int db_create_file_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
+bool db_create_file_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
 {
+   ASSERT(ar->FileType != FT_BASE);
 
+   Dmsg1(dbglevel, "Fname=%s\n", ar->fname);
+   Dmsg0(dbglevel, "put_file_into_catalog\n");
+
+   /* Open the dedicated connexion */
+   if (!jcr->batch_started) {
+
+      if (!db_open_batch_connexion(jcr, mdb)) {
+         return false;
+      }
+      if (!sql_batch_start(jcr, jcr->db_batch)) {
+         Mmsg1(&mdb->errmsg, 
+              "Can't start batch mode: ERR=%s", db_strerror(jcr->db_batch));
+         Jmsg1(jcr, M_FATAL, 0, "%s", mdb->errmsg);
+         return false;
+      }
+      jcr->batch_started = true;
+   }
+   B_DB *bdb = jcr->db_batch;
+
+   /*
+    * Make sure we have an acceptable attributes record.
+    */
+   if (!(ar->Stream == STREAM_UNIX_ATTRIBUTES ||
+         ar->Stream == STREAM_UNIX_ATTRIBUTES_EX)) {
+      Mmsg1(&mdb->errmsg, _("Attempt to put non-attributes into catalog. Stream=%d\n"),
+         ar->Stream);
+      Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
+      return false;
+   }
+
+   split_path_and_file(jcr, bdb, ar->fname);
+
+
+/*
+ * if (bdb->changes > 100000) {
+ *    db_write_batch_file_records(jcr);
+ *    bdb->changes = 0;
+ *     sql_batch_start(jcr, bdb);
+ * }
+ */
+
+   return sql_batch_insert(jcr, bdb, ar);
+}
+
+#else  /* ! HAVE_BATCH_FILE_INSERT */
+
+/*
+ * Create File record in B_DB
+ *
+ *  In order to reduce database size, we store the File attributes,
+ *  the FileName, and the Path separately.  In principle, there
+ *  is a single FileName record and a single Path record, no matter
+ *  how many times it occurs.  This is this subroutine, we separate
+ *  the file and the path and create three database records.
+ */
+bool db_create_file_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
+{
    db_lock(mdb);
-   Dmsg1(300, "Fname=%s\n", ar->fname);
-   Dmsg0(500, "put_file_into_catalog\n");
+   Dmsg1(dbglevel, "Fname=%s\n", ar->fname);
+   Dmsg0(dbglevel, "put_file_into_catalog\n");
    /*
     * Make sure we have an acceptable attributes record.
     */
@@ -729,29 +922,30 @@ int db_create_file_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
    if (!db_create_filename_record(jcr, mdb, ar)) {
       goto bail_out;
    }
-   Dmsg1(500, "db_create_filename_record: %s\n", mdb->esc_name);
+   Dmsg1(dbglevel, "db_create_filename_record: %s\n", mdb->esc_name);
 
 
    if (!db_create_path_record(jcr, mdb, ar)) {
       goto bail_out;
    }
-   Dmsg1(500, "db_create_path_record: %s\n", mdb->esc_name);
+   Dmsg1(dbglevel, "db_create_path_record: %s\n", mdb->esc_name);
 
    /* Now create master File record */
    if (!db_create_file_record(jcr, mdb, ar)) {
       goto bail_out;
    }
-   Dmsg0(500, "db_create_file_record OK\n");
+   Dmsg0(dbglevel, "db_create_file_record OK\n");
 
-   Dmsg3(300, "CreateAttributes Path=%s File=%s FilenameId=%d\n", mdb->path, mdb->fname, ar->FilenameId);
+   Dmsg3(dbglevel, "CreateAttributes Path=%s File=%s FilenameId=%d\n", mdb->path, mdb->fname, ar->FilenameId);
    db_unlock(mdb);
-   return 1;
+   return true;
 
 bail_out:
    db_unlock(mdb);
-   return 0;
+   return false;
 }
 
+
 /*
  * This is the master File entry containing the attributes.
  *  The filename and path records have already been created.
@@ -759,8 +953,8 @@ bail_out:
 static int db_create_file_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
 {
    int stat;
-   static char *no_digest = "0";
-   char *digest;
+   static const char *no_digest = "0";
+   const char *digest;
 
    ASSERT(ar->JobId);
    ASSERT(ar->PathId);
@@ -799,7 +993,7 @@ static int db_create_path_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
    int stat;
 
    mdb->esc_name = check_pool_memory_size(mdb->esc_name, 2*mdb->pnl+2);
-   db_escape_string(mdb->esc_name, mdb->path, mdb->pnl);
+   db_escape_string(jcr, mdb, mdb->esc_name, mdb->path, mdb->pnl);
 
    if (mdb->cached_path_id != 0 && mdb->cached_path_len == mdb->pnl &&
        strcmp(mdb->cached_path, mdb->path) == 0) {
@@ -869,8 +1063,8 @@ static int db_create_filename_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
    SQL_ROW row;
 
    mdb->esc_name = check_pool_memory_size(mdb->esc_name, 2*mdb->fnl+2);
-   db_escape_string(mdb->esc_name, mdb->fname, mdb->fnl);
-
+   db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
+   
    Mmsg(mdb->cmd, "SELECT FilenameId FROM Filename WHERE Name='%s'", mdb->esc_name);
 
    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
@@ -909,4 +1103,173 @@ static int db_create_filename_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
    return ar->FilenameId > 0;
 }
 
-#endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL */
+bool db_write_batch_file_records(JCR *jcr)
+{
+   return true;
+}
+
+#endif /* ! HAVE_BATCH_FILE_INSERT */
+
+
+/* List of SQL commands to create temp table and indicies  */
+const char *create_temp_basefile[4] = {
+   /* MySQL */
+   "CREATE TEMPORARY TABLE basefile%lld ("
+   "Path BLOB NOT NULL,"
+   "Name BLOB NOT NULL)",
+
+   /* Postgresql */
+   "CREATE TEMPORARY TABLE basefile%lld (" 
+//   "CREATE TABLE basefile%lld (" 
+   "Path TEXT,"
+   "Name TEXT)",
+
+   /* SQLite */
+   "CREATE TEMPORARY TABLE basefile%lld (" 
+   "Path TEXT,"
+   "Name TEXT)",
+
+   /* SQLite3 */
+   "CREATE TEMPORARY TABLE basefile%lld (" 
+   "Path TEXT,"
+   "Name TEXT)"
+};
+
+/* 
+ * Create file attributes record, or base file attributes record
+ */
+bool db_create_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
+{
+   bool ret;
+   if (ar->FileType != FT_BASE) {
+      ret = db_create_file_attributes_record(jcr, mdb, ar);
+
+   } else if (jcr->HasBase) {
+      ret = db_create_base_file_attributes_record(jcr, jcr->db_batch, ar);
+
+   } else {
+      Jmsg0(jcr, M_FATAL, 0, _("Can't Copy/Migrate job using BaseJob"));
+      ret = true;               /* in copy/migration what do we do ? */
+   }
+
+   return ret;
+}
+
+/*
+ * Create Base File record in B_DB
+ *
+ */
+bool db_create_base_file_attributes_record(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
+{
+   bool ret;
+   Dmsg1(dbglevel, "create_base_file Fname=%s\n", ar->fname);
+   Dmsg0(dbglevel, "put_base_file_into_catalog\n");
+
+   /*
+    * Make sure we have an acceptable attributes record.
+    */
+   if (!(ar->Stream == STREAM_UNIX_ATTRIBUTES ||
+         ar->Stream == STREAM_UNIX_ATTRIBUTES_EX)) {
+      Mmsg1(&mdb->errmsg, _("Attempt to put non-attributes into catalog. Stream=%d\n"),
+         ar->Stream);
+      Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
+      return false;
+   }
+
+   db_lock(mdb); 
+   split_path_and_file(jcr, mdb, ar->fname);
+   
+   mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
+   db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
+   
+   mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
+   db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
+   
+   Mmsg(mdb->cmd, "INSERT INTO basefile%lld (Path, Name) VALUES ('%s','%s')",
+        (uint64_t)jcr->JobId, mdb->esc_path, mdb->esc_name);
+
+   ret = INSERT_DB(jcr, mdb, mdb->cmd);
+   db_unlock(mdb);
+
+   return ret;
+}
+
+/* 
+ * Cleanup the base file temporary tables
+ */
+static void db_cleanup_base_file(JCR *jcr, B_DB *mdb)
+{
+   POOL_MEM buf(PM_MESSAGE);
+   Mmsg(buf, "DROP TABLE new_basefile%lld", (uint64_t) jcr->JobId);
+   db_sql_query(mdb, buf.c_str(), NULL, NULL);
+
+   Mmsg(buf, "DROP TABLE basefile%lld", (uint64_t) jcr->JobId);
+   db_sql_query(mdb, buf.c_str(), NULL, NULL);
+}
+
+/*
+ * Put all base file seen in the backup to the BaseFile table
+ * and cleanup temporary tables
+ */
+bool db_commit_base_file_attributes_record(JCR *jcr, B_DB *mdb)
+{
+   bool ret;
+   char ed1[50];
+   POOL_MEM buf(PM_MESSAGE);
+
+   Mmsg(buf, 
+  "INSERT INTO BaseFiles (BaseJobId, JobId, FileId, FileIndex) ( "
+   "SELECT B.JobId AS BaseJobId, %s AS JobId, "
+          "B.FileId, B.FileIndex "
+     "FROM basefile%s AS A, new_basefile%s AS B "
+    "WHERE A.Path = B.Path "
+      "AND A.Name = B.Name "
+    "ORDER BY B.FileId)", 
+        edit_uint64(jcr->JobId, ed1), ed1, ed1);
+   ret = db_sql_query(mdb, buf.c_str(), NULL, NULL);
+   db_cleanup_base_file(jcr, mdb);
+   return ret;
+}
+
+/*
+ * Find the last "accurate" backup state with Base jobs
+ * 1) Get all files with jobid in list (F subquery) 
+ * 2) Take only the last version of each file (Temp subquery) => accurate list is ok
+ * 3) Put the result in a temporary table for the end of job
+ *
+ */
+bool db_create_base_file_list(JCR *jcr, B_DB *mdb, char *jobids)
+{
+   POOL_MEM buf(PM_MESSAGE);
+
+   if (!*jobids) {
+      db_lock(mdb);
+      Mmsg(mdb->errmsg, _("ERR=JobIds are empty\n"));
+      db_unlock(mdb);
+      return false;
+   }
+
+   Mmsg(buf, create_temp_basefile[db_type], (uint64_t) jcr->JobId);
+   if (!db_sql_query(mdb, buf.c_str(), NULL, NULL)) {
+      return false;
+   }
+     
+   Mmsg(buf,
+"CREATE TEMPORARY TABLE new_basefile%lld AS ( "
+//"CREATE TABLE new_basefile%lld AS ( "
+  "SELECT Path.Path AS Path, Filename.Name AS Name, File.FileIndex AS FileIndex,"
+         "File.JobId AS JobId, File.LStat AS LStat, File.FileId AS FileId "
+  "FROM ( "
+   "SELECT max(FileId) as FileId, PathId, FilenameId "
+     "FROM (SELECT FileId, PathId, FilenameId FROM File WHERE JobId IN (%s)) AS F "
+    "GROUP BY PathId, FilenameId "
+   ") AS Temp "
+  "JOIN Filename ON (Filename.FilenameId = Temp.FilenameId) "
+  "JOIN Path ON (Path.PathId = Temp.PathId) "
+  "JOIN File ON (File.FileId = Temp.FileId) "
+ "WHERE File.FileIndex > 0)",
+        (uint64_t)jcr->JobId, jobids);
+   return db_sql_query(mdb, buf.c_str(), NULL, NULL);
+}
+
+#endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_DBI */