2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2015 Kern Sibbald
5 Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
7 The original author of Bacula is Kern Sibbald, with contributions
8 from many others, a complete list can be found in the file AUTHORS.
10 You may use this file and others of this release according to the
11 license defined in the LICENSE file, which includes the Affero General
12 Public License, v3.0 ("AGPLv3") and some additional permissions and
13 terms pursuant to its AGPLv3 Section 7.
15 This notice must be preserved when any source code is
16 conveyed and/or propagated.
18 Bacula(R) is a registered trademark of Kern Sibbald.
21 * Read code for Storage daemon
23 * Kern Sibbald, November MM
30 /* Forward referenced subroutines */
31 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec);
32 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec);
34 /* Responses sent to the File daemon */
35 static char OK_data[] = "3000 OK data\n";
36 static char FD_error[] = "3000 error\n";
37 static char rec_header[] = "rechdr %ld %ld %ld %ld %ld";
40 * Read Data and send to File Daemon
41 * Returns: false on failure
44 bool do_read_data(JCR *jcr)
46 BSOCK *fd = jcr->file_bsock;
48 DCR *dcr = jcr->read_dcr;
51 Dmsg0(100, "Start read data.\n");
53 if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
57 if (jcr->NumReadVolumes == 0) {
58 Jmsg(jcr, M_FATAL, 0, _("No Volume names found for restore.\n"));
63 Dmsg2(200, "Found %d volumes names to restore. First=%s\n", jcr->NumReadVolumes,
64 jcr->VolList->VolumeName);
66 /* Ready device for reading */
67 if (!acquire_device_for_read(dcr)) {
72 /* Tell File daemon we will send data */
73 if (!jcr->is_ok_data_sent) {
74 /* OK_DATA can have been already sent for copy/migrate by run_job() to avoid dead lock*/
76 jcr->is_ok_data_sent = true;
79 jcr->sendJobStatus(JS_Running);
80 jcr->run_time = time(NULL);
83 if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY)) {
84 ok = read_records(dcr, mac_record_cb, mount_next_read_volume);
86 ok = read_records(dcr, read_record_cb, mount_next_read_volume);
90 * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
91 * and the subsequent Jmsg() editing will break
93 int32_t job_elapsed = time(NULL) - jcr->run_time;
95 if (job_elapsed <= 0) {
99 Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
100 job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
101 edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
103 /* Send end of data to FD */
104 fd->signal(BNET_EOD);
106 if (!release_device(jcr->read_dcr)) {
110 Dmsg0(30, "Done reading.\n");
114 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec)
117 BSOCK *fd = jcr->file_bsock;
120 char ec1[50], ec2[50];
121 POOLMEM *wbuf = rec->data; /* send buffer */
122 uint32_t wsize = rec->data_len; /* send size */
124 if (rec->FileIndex < 0) {
128 Dmsg5(400, "Send to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%d\n",
129 rec->VolSessionId, rec->VolSessionTime,
130 FI_to_ascii(ec1, rec->FileIndex),
131 stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
134 /* Send record header to File daemon */
135 if (!fd->fsend(rec_header, rec->VolSessionId, rec->VolSessionTime,
136 rec->FileIndex, rec->Stream, wsize)) {
137 Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
138 Jmsg1(jcr, M_FATAL, 0, _("Error sending header to Client. ERR=%s\n"),
143 * For normal migration jobs, FileIndex values are sequential because
144 * we are dealing with one job. However, for Vbackup (consolidation),
145 * we will be getting records from multiple jobs and writing them back
146 * out, so we need to ensure that the output FileIndex is sequential.
147 * We do so by detecting a FileIndex change and incrementing the
148 * JobFiles, which we then use as the output FileIndex.
150 if (rec->FileIndex >= 0) {
151 /* If something changed, increment FileIndex */
152 if (rec->VolSessionId != rec->last_VolSessionId ||
153 rec->VolSessionTime != rec->last_VolSessionTime ||
154 rec->FileIndex != rec->last_FileIndex) {
156 rec->last_VolSessionId = rec->VolSessionId;
157 rec->last_VolSessionTime = rec->VolSessionTime;
158 rec->last_FileIndex = rec->FileIndex;
162 /* Debug code: check if we must hangup */
163 if (get_hangup() > 0 && (jcr->JobFiles > (uint32_t)get_hangup())) {
164 jcr->setJobStatus(JS_Incomplete);
165 Jmsg1(jcr, M_FATAL, 0, "Debug hangup requested after %d files.\n",
170 if (get_blowup() > 0 && (jcr->JobFiles > (uint32_t)get_blowup())) {
171 Jmsg1(jcr, M_ABORT, 0, "Debug blowup() requested after %d files.\n",
176 save_msg = fd->msg; /* save fd message pointer */
179 /* Send data record to File daemon */
180 jcr->JobBytes += wsize; /* increment bytes this job */
182 Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
183 Jmsg1(jcr, M_FATAL, 0, _("Error sending data to Client. ERR=%s\n"),
192 * New routine after to SD->SD implementation
193 * Called here for each record from read_records()
194 * Returns: true if OK
197 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec)
200 BSOCK *fd = jcr->file_bsock;
201 char buf1[100], buf2[100];
202 bool new_header = false;
204 char ec1[50], ec2[50];
206 POOLMEM *wbuf = rec->data;; /* send buffer */
207 uint32_t wsize = rec->data_len; /* send size */
210 Pmsg5(000, "on entry JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
212 FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
213 stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
216 /* If label and not for us, discard it */
217 if (rec->FileIndex < 0) {
218 Dmsg1(100, "FileIndex=%d\n", rec->FileIndex);
223 * For normal migration jobs, FileIndex values are sequential because
224 * we are dealing with one job. However, for Vbackup (consolidation),
225 * we will be getting records from multiple jobs and writing them back
226 * out, so we need to ensure that the output FileIndex is sequential.
227 * We do so by detecting a FileIndex change and incrementing the
228 * JobFiles, which we then use as the output FileIndex.
230 if (rec->FileIndex >= 0) {
231 /* If something changed, increment FileIndex */
232 if (rec->VolSessionId != rec->last_VolSessionId ||
233 rec->VolSessionTime != rec->last_VolSessionTime ||
234 rec->FileIndex != rec->last_FileIndex ||
235 rec->Stream != rec->last_Stream) {
237 /* Something changed */
238 if (rec->last_VolSessionId != 0) { /* Not first record */
239 Dmsg1(200, "Send EOD jobfiles=%d\n", jcr->JobFiles);
240 if (!fd->signal(BNET_EOD)) { /* End of previous stream */
241 Jmsg(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
247 if (rec->FileIndex != rec->last_FileIndex) {
250 rec->last_VolSessionId = rec->VolSessionId;
251 rec->last_VolSessionTime = rec->VolSessionTime;
252 rec->last_FileIndex = rec->FileIndex;
253 rec->last_Stream = rec->Stream;
255 rec->FileIndex = jcr->JobFiles; /* set sequential output FileIndex */
260 Dmsg5(400, "Send header to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%ld\n",
261 rec->VolSessionId, rec->VolSessionTime,
262 FI_to_ascii(ec1, rec->FileIndex),
263 stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
266 /* Send data header to File daemon */
267 if (!fd->fsend("%ld %ld %ld", rec->FileIndex, rec->Stream, wsize)) {
268 Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
269 Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
275 Dmsg1(400, "FI=%d\n", rec->FileIndex);
276 /* Send data record to File daemon */
277 save_msg = fd->msg; /* save fd message pointer */
278 fd->msg = wbuf; /* pass data directly to the FD */
280 jcr->JobBytes += wsize; /* increment bytes this job */
281 Dmsg1(400, ">filed: send %d bytes data.\n", fd->msglen);
283 Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
284 Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
288 fd->msg = save_msg; /* restore fd message pointer */
290 Dmsg5(500, "wrote_record JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
292 FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
293 stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);