2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2017 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Read code for Storage daemon
22 * Kern Sibbald, November MM
29 /* Forward referenced subroutines */
30 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec);
31 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec);
33 /* Responses sent to the File daemon */
34 static char OK_data[] = "3000 OK data\n";
35 static char FD_error[] = "3000 error\n";
36 static char rec_header[] = "rechdr %ld %ld %ld %ld %ld";
39 * Read Data and send to File Daemon
40 * Returns: false on failure
43 bool do_read_data(JCR *jcr)
45 BSOCK *fd = jcr->file_bsock;
47 DCR *dcr = jcr->read_dcr;
50 Dmsg0(100, "Start read data.\n");
52 if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
56 if (jcr->NumReadVolumes == 0) {
57 Jmsg(jcr, M_FATAL, 0, _("No Volume names found for restore.\n"));
62 Dmsg2(200, "Found %d volumes names to restore. First=%s\n", jcr->NumReadVolumes,
63 jcr->VolList->VolumeName);
65 /* Ready device for reading */
66 if (!acquire_device_for_read(dcr)) {
70 dcr->dev->start_of_job(dcr);
72 /* Tell File daemon we will send data */
73 if (!jcr->is_ok_data_sent) {
75 jcr->is_ok_data_sent = true;
78 jcr->sendJobStatus(JS_Running);
79 jcr->run_time = time(NULL);
82 if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY)) {
83 ok = read_records(dcr, mac_record_cb, mount_next_read_volume);
85 ok = read_records(dcr, read_record_cb, mount_next_read_volume);
89 * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
90 * and the subsequent Jmsg() editing will break
92 int32_t job_elapsed = time(NULL) - jcr->run_time;
94 if (job_elapsed <= 0) {
98 Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
99 job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
100 edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
102 /* Send end of data to FD */
103 fd->signal(BNET_EOD);
105 if (!release_device(jcr->read_dcr)) {
109 Dmsg0(30, "Done reading.\n");
113 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec)
116 BSOCK *fd = jcr->file_bsock;
119 char ec1[50], ec2[50];
120 POOLMEM *wbuf = rec->data; /* send buffer */
121 uint32_t wsize = rec->data_len; /* send size */
123 if (rec->FileIndex < 0) {
127 Dmsg5(400, "Send to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%d\n",
128 rec->VolSessionId, rec->VolSessionTime,
129 FI_to_ascii(ec1, rec->FileIndex),
130 stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
133 Dmsg2(640, ">filed: send header stream=0x%lx len=%ld\n", rec->Stream, wsize);
134 /* Send record header to File daemon */
135 if (!fd->fsend(rec_header, rec->VolSessionId, rec->VolSessionTime,
136 rec->FileIndex, rec->Stream, wsize)) {
137 Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
138 Jmsg1(jcr, M_FATAL, 0, _("Error sending header to Client. ERR=%s\n"),
143 * For normal migration jobs, FileIndex values are sequential because
144 * we are dealing with one job. However, for Vbackup (consolidation),
145 * we will be getting records from multiple jobs and writing them back
146 * out, so we need to ensure that the output FileIndex is sequential.
147 * We do so by detecting a FileIndex change and incrementing the
148 * JobFiles, which we then use as the output FileIndex.
150 if (rec->FileIndex >= 0) {
151 /* If something changed, increment FileIndex */
152 if (rec->VolSessionId != rec->last_VolSessionId ||
153 rec->VolSessionTime != rec->last_VolSessionTime ||
154 rec->FileIndex != rec->last_FileIndex) {
156 rec->last_VolSessionId = rec->VolSessionId;
157 rec->last_VolSessionTime = rec->VolSessionTime;
158 rec->last_FileIndex = rec->FileIndex;
162 /* Debug code: check if we must hangup or blowup */
163 if (handle_hangup_blowup(jcr, jcr->JobFiles, jcr->JobBytes)) {
167 save_msg = fd->msg; /* save fd message pointer */
170 /* Send data record to File daemon */
171 jcr->JobBytes += wsize; /* increment bytes this job */
172 Dmsg1(640, ">filed: send %d bytes data.\n", fd->msglen);
174 Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
175 Jmsg1(jcr, M_FATAL, 0, _("Error sending data to Client. ERR=%s\n"),
184 * New routine after to SD->SD implementation
185 * Called here for each record from read_records()
186 * Returns: true if OK
189 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec)
192 BSOCK *fd = jcr->file_bsock;
193 char buf1[100], buf2[100];
194 bool new_header = false;
196 char ec1[50], ec2[50];
198 POOLMEM *wbuf = rec->data;; /* send buffer */
199 uint32_t wsize = rec->data_len; /* send size */
202 Pmsg5(000, "on entry JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
204 FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
205 stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
208 /* If label and not for us, discard it */
209 if (rec->FileIndex < 0) {
210 Dmsg1(100, "FileIndex=%d\n", rec->FileIndex);
215 * For normal migration jobs, FileIndex values are sequential because
216 * we are dealing with one job. However, for Vbackup (consolidation),
217 * we will be getting records from multiple jobs and writing them back
218 * out, so we need to ensure that the output FileIndex is sequential.
219 * We do so by detecting a FileIndex change and incrementing the
220 * JobFiles, which we then use as the output FileIndex.
222 if (rec->FileIndex >= 0) {
223 /* If something changed, increment FileIndex */
224 if (rec->VolSessionId != rec->last_VolSessionId ||
225 rec->VolSessionTime != rec->last_VolSessionTime ||
226 rec->FileIndex != rec->last_FileIndex ||
227 rec->Stream != rec->last_Stream) {
229 /* Something changed */
230 if (rec->last_VolSessionId != 0) { /* Not first record */
231 Dmsg1(200, "Send EOD jobfiles=%d\n", jcr->JobFiles);
232 if (!fd->signal(BNET_EOD)) { /* End of previous stream */
233 Jmsg(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
239 if (rec->FileIndex != rec->last_FileIndex) {
242 rec->last_VolSessionId = rec->VolSessionId;
243 rec->last_VolSessionTime = rec->VolSessionTime;
244 rec->last_FileIndex = rec->FileIndex;
245 rec->last_Stream = rec->Stream;
247 rec->FileIndex = jcr->JobFiles; /* set sequential output FileIndex */
252 Dmsg5(400, "Send header to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%ld\n",
253 rec->VolSessionId, rec->VolSessionTime,
254 FI_to_ascii(ec1, rec->FileIndex),
255 stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
258 /* Send data header to File daemon */
259 if (!fd->fsend("%ld %ld %ld", rec->FileIndex, rec->Stream, wsize)) {
260 Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
261 Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
267 Dmsg1(400, "FI=%d\n", rec->FileIndex);
268 /* Send data record to File daemon */
269 save_msg = fd->msg; /* save fd message pointer */
270 fd->msg = wbuf; /* pass data directly to the FD */
272 jcr->JobBytes += wsize; /* increment bytes this job */
273 Dmsg1(400, ">filed: send %d bytes data.\n", fd->msglen);
275 Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
276 Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
280 fd->msg = save_msg; /* restore fd message pointer */
282 Dmsg5(500, "wrote_record JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
284 FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
285 stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);