2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2017 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Append code for Storage daemon
21 * Kern Sibbald, May MM
28 /* Responses sent to the File daemon */
29 static char OK_data[] = "3000 OK data\n";
30 static char OK_append[] = "3000 OK append data\n";
32 /* Forward referenced functions */
36 * Check if we can mark this job incomplete
39 void possible_incomplete_job(JCR *jcr, int32_t last_file_index)
42 * Note, here we decide if it is worthwhile to restart
43 * the Job at this point. For the moment, if at least
44 * 10 Files have been seen.
45 * We must be sure that the saved files are safe.
46 * Using this function when their is as comm line problem is probably safe,
47 * it is inappropriate to use it for a any failure that could
48 * involve corrupted data.
50 if (jcr->spool_attributes && last_file_index > 10) {
51 jcr->setJobStatus(JS_Incomplete);
56 * Append Data sent from Client (FD/SD)
59 bool do_append_data(JCR *jcr)
62 int32_t file_index, stream, last_file_index;
64 BSOCK *fd = jcr->file_bsock;
67 char buf1[100], buf2[100];
71 POOLMEM *eblock = NULL;
72 POOL_MEM errmsg(PM_EMSG);
75 pm_strcpy(jcr->errmsg, _("DCR is NULL!!!\n"));
76 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
81 pm_strcpy(jcr->errmsg, _("DEVICE is NULL!!!\n"));
82 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
86 Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
88 memset(&rec, 0, sizeof(rec));
90 if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
91 jcr->setJobStatus(JS_ErrorTerminated);
92 pm_strcpy(jcr->errmsg, _("Unable to set network buffer size.\n"));
93 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
97 if (!acquire_device_for_append(dcr)) {
98 jcr->setJobStatus(JS_ErrorTerminated);
102 dev->start_of_job(dcr);
103 jcr->sendJobStatus(JS_Running);
105 //ASSERT(dev->VolCatInfo.VolCatName[0]);
106 if (dev->VolCatInfo.VolCatName[0] == 0) {
107 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
109 Dmsg1(50, "Begin append device=%s\n", dev->print_name());
111 begin_data_spool(dcr);
112 begin_attribute_spool(jcr);
114 Dmsg0(100, "Just after acquire_device_for_append\n");
115 //ASSERT(dev->VolCatInfo.VolCatName[0]);
116 if (dev->VolCatInfo.VolCatName[0] == 0) {
117 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
120 * Write Begin Session Record
122 if (!write_session_label(dcr, SOS_LABEL)) {
123 Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
125 jcr->setJobStatus(JS_ErrorTerminated);
129 //ASSERT(dev->VolCatInfo.VolCatName[0]);
130 if (dev->VolCatInfo.VolCatName[0] == 0) {
131 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
134 /* Tell File daemon to send data */
135 if (!fd->fsend(OK_data)) {
137 Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
138 be.bstrerror(fd->b_errno));
143 * Get Data from File daemon, write to device. To clarify what is
144 * going on here. We expect:
146 * - Multiple records of data
149 * The Stream header is just used to synchronize things, and
150 * none of the stream header is written to tape.
151 * The Multiple records of data, contain first the Attributes,
152 * then after another stream header, the file data, then
153 * after another stream header, the MD5 data if any.
155 * So we get the (stream header, data, EOD) three time for each
156 * file. 1. for the Attributes, 2. for the file data if any,
157 * and 3. for the MD5 if any.
159 dcr->VolFirstIndex = dcr->VolLastIndex = 0;
160 jcr->run_time = time(NULL); /* start counting time for rates */
164 qfd = New(GetMsg(jcr, fd, NULL, GETMSG_MAX_MSG_SIZE));
165 qfd->start_read_sock();
167 for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
169 /* Read Stream header from the File daemon.
170 * The stream header consists of the following:
171 * file_index (sequential Bacula file index, base 1)
172 * stream (Bacula number to distinguish parts of data)
173 * stream_len (Expected length of this stream. This
174 * will be the size backed up if the file does not
175 * grow during the backup.
177 n = qfd->bget_msg(NULL);
179 if (n == BNET_SIGNAL && qfd->msglen == BNET_EOD) {
180 Dmsg0(200, "Got EOD on reading header.\n");
181 break; /* end of data */
183 Jmsg3(jcr, M_FATAL, 0, _("Error reading data header from FD. n=%d msglen=%d ERR=%s\n"),
184 n, qfd->msglen, fd->bstrerror());
185 // ASX TODO the fd->bstrerror() can be related to the wrong error, I should Queue the error too
186 possible_incomplete_job(jcr, last_file_index);
191 if (sscanf(qfd->msg, "%ld %ld %lld", &file_index, &stream, &stream_len) != 3) {
192 // TODO ASX already done in bufmsg, should reuse the values
194 Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), asciidump(qfd->msg, qfd->msglen, buf, sizeof(buf)));
196 possible_incomplete_job(jcr, last_file_index);
200 Dmsg3(890, "<filed: Header FilInx=%d stream=%d stream_len=%lld\n",
201 file_index, stream, stream_len);
204 * We make sure the file_index is advancing sequentially.
205 * An incomplete job can start the file_index at any number.
206 * otherwise, it must start at 1.
208 if (jcr->rerunning && file_index > 0 && last_file_index == 0) {
211 Dmsg2(400, "file_index=%d last_file_index=%d\n", file_index, last_file_index);
212 if (file_index > 0 && (file_index == last_file_index ||
213 file_index == last_file_index + 1)) {
216 Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or last_FI=%d\n"),
217 file_index, last_file_index);
218 possible_incomplete_job(jcr, last_file_index);
223 if (file_index != last_file_index) {
224 jcr->JobFiles = file_index;
225 last_file_index = file_index;
228 /* Read data stream from the File daemon.
229 * The data stream is just raw bytes
231 while ((n=qfd->bget_msg(NULL)) > 0 && !jcr->is_job_canceled()) {
233 rec.VolSessionId = jcr->VolSessionId;
234 rec.VolSessionTime = jcr->VolSessionTime;
235 rec.FileIndex = file_index;
237 rec.StreamLen = stream_len;
238 rec.maskedStream = stream & STREAMMASK_TYPE; /* strip high bits */
239 rec.data_len = qfd->msglen;
240 rec.data = qfd->msg; /* use message buffer */
242 /* Debug code: check if we must hangup or blowup */
243 if (handle_hangup_blowup(jcr, jcr->JobFiles, jcr->JobBytes)) {
246 Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
247 rec.FileIndex, rec.VolSessionId,
248 stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
250 ok = dcr->write_record(&rec);
252 Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
253 dcr->dev->print_name(), dcr->dev->bstrerror());
256 jcr->JobBytes += rec.data_len; /* increment bytes this job */
257 jcr->JobBytes += qfd->bmsg->jobbytes; // if the block as been downloaded, count it
258 Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
259 FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
260 stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
262 send_attrs_to_dir(jcr, &rec);
263 Dmsg0(650, "Enter bnet_get\n");
265 Dmsg2(650, "End read loop with FD. JobFiles=%d Stat=%d\n", jcr->JobFiles, n);
267 if (fd->is_error()) {
268 if (!jcr->is_job_canceled()) {
269 Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
270 Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
272 possible_incomplete_job(jcr, last_file_index);
279 qfd->wait_read_sock((ok == false) || jcr->is_job_canceled());
282 if (eblock != NULL) {
283 free_pool_memory(eblock);
286 /* Create Job status for end of session label */
287 jcr->setJobStatus(ok?JS_Terminated:JS_ErrorTerminated);
290 /* Terminate connection with Client */
291 fd->fsend(OK_append);
292 do_client_commands(jcr); /* finish dialog with Client */
294 fd->fsend("3999 Failed append\n");
297 Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
300 * Check if we can still write. This may not be the case
301 * if we are at the end of the tape or we got a fatal I/O error.
304 if (ok || dev->can_write()) {
305 if (!dev->flush_before_eos(dcr)) {
306 /* Print only if ok and not cancelled to avoid spurious messages */
307 if (ok && !jcr->is_job_canceled()) {
308 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
309 dev->print_name(), dev->bstrerror());
310 Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
311 possible_incomplete_job(jcr, last_file_index);
313 jcr->setJobStatus(JS_ErrorTerminated);
316 if (!write_session_label(dcr, EOS_LABEL)) {
317 /* Print only if ok and not cancelled to avoid spurious messages */
318 if (ok && !jcr->is_job_canceled()) {
319 Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
321 possible_incomplete_job(jcr, last_file_index);
323 jcr->setJobStatus(JS_ErrorTerminated);
326 /* Flush out final partial block of this session */
327 Dmsg1(200, "=== Flush adata=%d last block.\n", dcr->block->adata);
328 ASSERT(!dcr->block->adata);
329 if (!dcr->write_final_block_to_device()) {
330 /* Print only if ok and not cancelled to avoid spurious messages */
331 if (ok && !jcr->is_job_canceled()) {
332 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
333 dev->print_name(), dev->bstrerror());
334 Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
335 possible_incomplete_job(jcr, last_file_index);
337 jcr->setJobStatus(JS_ErrorTerminated);
341 flush_jobmedia_queue(jcr);
342 if (!ok && !jcr->is_JobStatus(JS_Incomplete)) {
343 discard_data_spool(dcr);
345 /* Note: if commit is OK, the device will remain blocked */
346 commit_data_spool(dcr);
350 * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
351 * and the subsequent Jmsg() editing will break
353 int32_t job_elapsed = time(NULL) - jcr->run_time;
355 if (job_elapsed <= 0) {
359 Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
360 job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
361 edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
364 * Release the device -- and send final Vol info to DIR
369 if ((!ok || jcr->is_job_canceled()) && !jcr->is_JobStatus(JS_Incomplete)) {
370 discard_attribute_spool(jcr);
372 commit_attribute_spool(jcr);
375 jcr->sendJobStatus(); /* update director */
377 Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
382 /* Send attributes and digest to Director for Catalog */
383 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
385 if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES ||
386 rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
387 rec->maskedStream == STREAM_RESTORE_OBJECT ||
388 crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
389 if (!jcr->no_attributes) {
390 BSOCK *dir = jcr->dir_bsock;
391 if (are_attributes_spooled(jcr)) {
394 Dmsg1(850, "Send attributes to dir. FI=%d\n", rec->FileIndex);
395 if (!dir_update_file_attributes(jcr->dcr, rec)) {
396 Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
398 dir->clear_spooling();
401 dir->clear_spooling();