2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2016 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Append code for Storage daemon
21 * Kern Sibbald, May MM
29 /* Responses sent to the File daemon */
30 static char OK_data[] = "3000 OK data\n";
31 static char OK_append[] = "3000 OK append data\n";
33 /* Forward referenced functions */
37 * Check if we can mark this job incomplete
40 void possible_incomplete_job(JCR *jcr, int32_t last_file_index)
43 * Note, here we decide if it is worthwhile to restart
44 * the Job at this point. For the moment, if at least
45 * 10 Files have been seen.
46 * We must be sure that the saved files are safe.
47 * Using this function when their is as comm line problem is probably safe,
48 * it is inappropriate to use it for a any failure that could
49 * involve corrupted data.
51 if (jcr->spool_attributes && last_file_index > 10) {
52 jcr->setJobStatus(JS_Incomplete);
56 * Append Data sent from Client (FD/SD)
59 bool do_append_data(JCR *jcr)
62 int32_t file_index, stream, last_file_index;
64 BSOCK *fd = jcr->file_bsock;
67 char buf1[100], buf2[100];
71 POOLMEM *eblock = NULL;
72 POOL_MEM errmsg(PM_EMSG);
75 pm_strcpy(jcr->errmsg, _("DCR is NULL!!!\n"));
76 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
81 pm_strcpy(jcr->errmsg, _("DEVICE is NULL!!!\n"));
82 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
86 Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
88 memset(&rec, 0, sizeof(rec));
90 if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
91 jcr->setJobStatus(JS_ErrorTerminated);
92 pm_strcpy(jcr->errmsg, _("Unable to set network buffer size.\n"));
93 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
97 if (!acquire_device_for_append(dcr)) {
98 jcr->setJobStatus(JS_ErrorTerminated);
102 jcr->sendJobStatus(JS_Running);
104 //ASSERT(dev->VolCatInfo.VolCatName[0]);
105 if (dev->VolCatInfo.VolCatName[0] == 0) {
106 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
108 Dmsg1(50, "Begin append device=%s\n", dev->print_name());
110 begin_data_spool(dcr);
111 begin_attribute_spool(jcr);
113 Dmsg0(100, "Just after acquire_device_for_append\n");
114 //ASSERT(dev->VolCatInfo.VolCatName[0]);
115 if (dev->VolCatInfo.VolCatName[0] == 0) {
116 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
119 * Write Begin Session Record
121 if (!write_session_label(dcr, SOS_LABEL)) {
122 Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
124 jcr->setJobStatus(JS_ErrorTerminated);
128 //ASSERT(dev->VolCatInfo.VolCatName[0]);
129 if (dev->VolCatInfo.VolCatName[0] == 0) {
130 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
133 /* Tell File daemon to send data */
134 if (!fd->fsend(OK_data)) {
136 Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
137 be.bstrerror(fd->b_errno));
142 * Get Data from File daemon, write to device. To clarify what is
143 * going on here. We expect:
145 * - Multiple records of data
148 * The Stream header is just used to synchronize things, and
149 * none of the stream header is written to tape.
150 * The Multiple records of data, contain first the Attributes,
151 * then after another stream header, the file data, then
152 * after another stream header, the MD5 data if any.
154 * So we get the (stream header, data, EOD) three time for each
155 * file. 1. for the Attributes, 2. for the file data if any,
156 * and 3. for the MD5 if any.
158 dcr->VolFirstIndex = dcr->VolLastIndex = 0;
159 jcr->run_time = time(NULL); /* start counting time for rates */
163 qfd = New(GetMsg(jcr, fd, NULL, GETMSG_MAX_MSG_SIZE));
164 qfd->start_read_sock();
166 for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
168 /* Read Stream header from the File daemon.
169 * The stream header consists of the following:
170 * file_index (sequential Bacula file index, base 1)
171 * stream (Bacula number to distinguish parts of data)
172 * stream_len (Expected length of this stream. This
173 * will be the size backed up if the file does not
174 * grow during the backup.
176 n = qfd->bget_msg(NULL);
178 if (n == BNET_SIGNAL && qfd->msglen == BNET_EOD) {
179 Dmsg0(200, "Got EOD on reading header.\n");
180 break; /* end of data */
182 Jmsg3(jcr, M_FATAL, 0, _("Error reading data header from FD. n=%d msglen=%d ERR=%s\n"),
183 n, qfd->msglen, fd->bstrerror());
184 // ASX TODO the fd->bstrerror() can be related to the wrong error, I should Queue the error too
185 possible_incomplete_job(jcr, last_file_index);
190 if (sscanf(qfd->msg, "%ld %ld %lld", &file_index, &stream, &stream_len) != 3) {
191 // TODO ASX already done in bufmsg, should reuse the values
193 Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), asciidump(qfd->msg, qfd->msglen, buf, sizeof(buf)));
195 possible_incomplete_job(jcr, last_file_index);
199 Dmsg3(890, "<filed: Header FilInx=%d stream=%d stream_len=%lld\n",
200 file_index, stream, stream_len);
203 * We make sure the file_index is advancing sequentially.
204 * An incomplete job can start the file_index at any number.
205 * otherwise, it must start at 1.
207 if (jcr->rerunning && file_index > 0 && last_file_index == 0) {
210 Dmsg2(400, "file_index=%d last_file_index=%d\n", file_index, last_file_index);
211 if (file_index > 0 && (file_index == last_file_index ||
212 file_index == last_file_index + 1)) {
215 Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or last_FI=%d\n"),
216 file_index, last_file_index);
217 possible_incomplete_job(jcr, last_file_index);
222 if (file_index != last_file_index) {
223 jcr->JobFiles = file_index;
224 last_file_index = file_index;
227 /* Read data stream from the File daemon.
228 * The data stream is just raw bytes
230 while ((n=qfd->bget_msg(NULL)) > 0 && !jcr->is_job_canceled()) {
232 rec.VolSessionId = jcr->VolSessionId;
233 rec.VolSessionTime = jcr->VolSessionTime;
234 rec.FileIndex = file_index;
236 rec.StreamLen = stream_len;
237 rec.maskedStream = stream & STREAMMASK_TYPE; /* strip high bits */
238 rec.data_len = qfd->msglen;
239 rec.data = qfd->msg; /* use message buffer */
241 Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
242 rec.FileIndex, rec.VolSessionId,
243 stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
245 ok = dcr->write_record(&rec);
247 Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
248 dcr->dev->print_name(), dcr->dev->bstrerror());
251 jcr->JobBytes += rec.data_len; /* increment bytes this job */
252 jcr->JobBytes += qfd->bmsg->jobbytes; // if the block as been downloaded, count it
253 Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
254 FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
255 stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
257 send_attrs_to_dir(jcr, &rec);
258 Dmsg0(650, "Enter bnet_get\n");
260 Dmsg2(650, "End read loop with FD. JobFiles=%d Stat=%d\n", jcr->JobFiles, n);
262 if (fd->is_error()) {
263 if (!jcr->is_job_canceled()) {
264 Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
265 Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
267 possible_incomplete_job(jcr, last_file_index);
274 qfd->wait_read_sock();
277 if (eblock != NULL) {
278 free_pool_memory(eblock);
281 /* Create Job status for end of session label */
282 jcr->setJobStatus(ok?JS_Terminated:JS_ErrorTerminated);
285 /* Terminate connection with Client */
286 fd->fsend(OK_append);
287 do_client_commands(jcr); /* finish dialog with Client */
289 fd->fsend("3999 Failed append\n");
292 Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
295 * Check if we can still write. This may not be the case
296 * if we are at the end of the tape or we got a fatal I/O error.
298 if (ok || dev->can_write()) {
299 if (!write_session_label(dcr, EOS_LABEL)) {
300 /* Print only if ok and not cancelled to avoid spurious messages */
301 if (ok && !jcr->is_job_canceled()) {
302 Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
304 possible_incomplete_job(jcr, last_file_index);
306 jcr->setJobStatus(JS_ErrorTerminated);
309 /* Flush out final partial block of this session */
310 if (!dcr->write_final_block_to_device()) {
311 /* Print only if ok and not cancelled to avoid spurious messages */
312 if (ok && !jcr->is_job_canceled()) {
313 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
314 dev->print_name(), dev->bstrerror());
315 Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
316 possible_incomplete_job(jcr, last_file_index);
318 jcr->setJobStatus(JS_ErrorTerminated);
322 flush_jobmedia_queue(jcr);
323 if (!ok && !jcr->is_JobStatus(JS_Incomplete)) {
324 discard_data_spool(dcr);
326 /* Note: if commit is OK, the device will remain blocked */
327 commit_data_spool(dcr);
331 * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
332 * and the subsequent Jmsg() editing will break
334 int32_t job_elapsed = time(NULL) - jcr->run_time;
336 if (job_elapsed <= 0) {
340 Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
341 job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
342 edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
345 * Release the device -- and send final Vol info to DIR
350 if ((!ok || jcr->is_job_canceled()) && !jcr->is_JobStatus(JS_Incomplete)) {
351 discard_attribute_spool(jcr);
353 commit_attribute_spool(jcr);
356 jcr->sendJobStatus(); /* update director */
358 Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
363 /* Send attributes and digest to Director for Catalog */
364 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
366 if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES ||
367 rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
368 rec->maskedStream == STREAM_RESTORE_OBJECT ||
369 crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
370 if (!jcr->no_attributes) {
371 BSOCK *dir = jcr->dir_bsock;
372 if (are_attributes_spooled(jcr)) {
375 Dmsg1(850, "Send attributes to dir. FI=%d\n", rec->FileIndex);
376 if (!dir_update_file_attributes(jcr->dcr, rec)) {
377 Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
379 dir->clear_spooling();
382 dir->clear_spooling();