/*
Bacula® - The Network Backup Solution
- Copyright (C) 2000-2011 Free Software Foundation Europe e.V.
+ Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
- The main author of Bacula is Kern Sibbald, with contributions from
- many others, a complete list can be found in the file AUTHORS.
- This program is Free Software; you can redistribute it and/or
- modify it under the terms of version three of the GNU Affero General Public
- License as published by the Free Software Foundation and included
- in the file LICENSE.
+ The main author of Bacula is Kern Sibbald, with contributions from many
+ others, a complete list can be found in the file AUTHORS.
- This program is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- 02110-1301, USA.
+ You may use this file and others of this release according to the
+ license defined in the LICENSE file, which includes the Affero General
+ Public License, v3.0 ("AGPLv3") and some additional permissions and
+ terms pursuant to its AGPLv3 Section 7.
Bacula® is a registered trademark of Kern Sibbald.
- The licensor of Bacula is the Free Software Foundation Europe
- (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
- Switzerland, email:ftf@fsfeurope.org.
*/
/*
* Read code for Storage daemon
#include "stored.h"
/* Forward referenced subroutines */
-static bool record_cb(DCR *dcr, DEV_RECORD *rec);
+static bool read_record_cb(DCR *dcr, DEV_RECORD *rec);
+static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec);
/* Responses sent to the File daemon */
BSOCK *fd = jcr->file_bsock;
bool ok = true;
DCR *dcr = jcr->read_dcr;
+ char ec[50];
- Dmsg0(20, "Start read data.\n");
+ Dmsg0(100, "Start read data.\n");
- if (!bnet_set_buffer_size(fd, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
+ if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
return false;
}
/* Tell File daemon we will send data */
fd->fsend(OK_data);
+
jcr->sendJobStatus(JS_Running);
- ok = read_records(dcr, record_cb, mount_next_read_volume);
+ jcr->run_time = time(NULL);
+ jcr->JobFiles = 0;
+
+ if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY)) {
+ ok = read_records(dcr, mac_record_cb, mount_next_read_volume);
+ } else {
+ ok = read_records(dcr, read_record_cb, mount_next_read_volume);
+ }
+
+ /*
+ * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
+ * and the subsequent Jmsg() editing will break
+ */
+ int32_t job_elapsed = time(NULL) - jcr->run_time;
+
+ if (job_elapsed <= 0) {
+ job_elapsed = 1;
+ }
+
+ Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
+ job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
+ edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
/* Send end of data to FD */
fd->signal(BNET_EOD);
}
/*
- * Called here for each record from read_records()
+ * Called here for reading (restore) for each record from read_records()
* Returns: true if OK
* false if error
*/
-static bool record_cb(DCR *dcr, DEV_RECORD *rec)
+static bool read_record_cb(DCR *dcr, DEV_RECORD *rec)
{
JCR *jcr = dcr->jcr;
BSOCK *fd = jcr->file_bsock;
return true;
}
Dmsg5(400, "Send to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%d\n",
- rec->VolSessionId, rec->VolSessionTime,
+ rec->VolSessionId, rec->VolSessionTime,
FI_to_ascii(ec1, rec->FileIndex),
stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
rec->data_len);
Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
fd->bstrerror());
return false;
- } else {
- Dmsg1(400, ">filed: Hdr=%s\n", fd->msg);
}
+ /*
+ * For normal migration jobs, FileIndex values are sequential because
+ * we are dealing with one job. However, for Vbackup (consolidation),
+ * we will be getting records from multiple jobs and writing them back
+ * out, so we need to ensure that the output FileIndex is sequential.
+ * We do so by detecting a FileIndex change and incrementing the
+ * JobFiles, which we then use as the output FileIndex.
+ */
+ if (rec->FileIndex >= 0) {
+ /* If something changed, increment FileIndex */
+ if (rec->VolSessionId != rec->last_VolSessionId ||
+ rec->VolSessionTime != rec->last_VolSessionTime ||
+ rec->FileIndex != rec->last_FileIndex) {
+ jcr->JobFiles++;
+ rec->last_VolSessionId = rec->VolSessionId;
+ rec->last_VolSessionTime = rec->VolSessionTime;
+ rec->last_FileIndex = rec->FileIndex;
+ }
+ }
/* Send data record to File daemon */
save_msg = fd->msg; /* save fd message pointer */
fd->msg = rec->data; /* pass data directly to the FD */
fd->msglen = rec->data_len;
+ jcr->JobBytes += rec->data_len; /* increment bytes this job */
Dmsg1(400, ">filed: send %d bytes data.\n", fd->msglen);
if (!fd->send()) {
Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
fd->msg = save_msg; /* restore fd message pointer */
return ok;
}
+
+/*
+ * New routine after to SD->SD implementation
+ * Called here for each record from read_records()
+ * Returns: true if OK
+ * false if error
+ */
+static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec)
+{
+ JCR *jcr = dcr->jcr;
+ BSOCK *fd = jcr->file_bsock;
+ char buf1[100], buf2[100];
+ bool new_header = false;
+ POOLMEM *save_msg;
+ char ec1[50], ec2[50];
+ bool ok = true;
+
+#ifdef xxx
+ Pmsg5(000, "on entry JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
+ jcr->JobId,
+ FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
+ stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
+#endif
+
+ /* If label and not for us, discard it */
+ if (rec->FileIndex < 0) {
+ Dmsg1(100, "FileIndex=%d\n", rec->FileIndex);
+ return true;
+ }
+
+ /*
+ * For normal migration jobs, FileIndex values are sequential because
+ * we are dealing with one job. However, for Vbackup (consolidation),
+ * we will be getting records from multiple jobs and writing them back
+ * out, so we need to ensure that the output FileIndex is sequential.
+ * We do so by detecting a FileIndex change and incrementing the
+ * JobFiles, which we then use as the output FileIndex.
+ */
+ if (rec->FileIndex >= 0) {
+ /* If something changed, increment FileIndex */
+ if (rec->VolSessionId != rec->last_VolSessionId ||
+ rec->VolSessionTime != rec->last_VolSessionTime ||
+ rec->FileIndex != rec->last_FileIndex ||
+ rec->Stream != rec->last_Stream) {
+
+ /* Something changed */
+ if (rec->last_VolSessionId != 0) { /* Not first record */
+ Dmsg1(200, "Send EOD jobfiles=%d\n", jcr->JobFiles);
+ if (!fd->signal(BNET_EOD)) { /* End of previous stream */
+ Jmsg(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
+ fd->bstrerror());
+ return false;
+ }
+ }
+ new_header = true;
+ if (rec->FileIndex != rec->last_FileIndex) {
+ jcr->JobFiles++;
+ }
+ rec->last_VolSessionId = rec->VolSessionId;
+ rec->last_VolSessionTime = rec->VolSessionTime;
+ rec->last_FileIndex = rec->FileIndex;
+ rec->last_Stream = rec->Stream;
+ }
+ rec->FileIndex = jcr->JobFiles; /* set sequential output FileIndex */
+ }
+
+ if (new_header) {
+ new_header = false;
+ Dmsg5(400, "Send header to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%d\n",
+ rec->VolSessionId, rec->VolSessionTime,
+ FI_to_ascii(ec1, rec->FileIndex),
+ stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
+ rec->data_len);
+
+ /* Send data header to File daemon */
+ if (!fd->fsend("%ld %ld %lld", rec->FileIndex, rec->Stream, rec->data_len)) {
+ Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
+ Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
+ fd->bstrerror());
+ return false;
+ }
+ }
+
+ Dmsg1(400, "FI=%d\n", rec->FileIndex);
+ /* Send data record to File daemon */
+ save_msg = fd->msg; /* save fd message pointer */
+ fd->msg = rec->data; /* pass data directly to the FD */
+ fd->msglen = rec->data_len;
+ jcr->JobBytes += rec->data_len; /* increment bytes this job */
+ Dmsg1(400, ">filed: send %d bytes data.\n", fd->msglen);
+ if (!fd->send()) {
+ Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
+ Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
+ fd->bstrerror());
+ ok = false;
+ }
+ fd->msg = save_msg; /* restore fd message pointer */
+
+ Dmsg5(500, "wrote_record JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
+ jcr->JobId,
+ FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
+ stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
+
+ return ok;
+}