2 * Append code for Storage daemon
8 Copyright (C) 2000-2006 Kern Sibbald
10 This program is free software; you can redistribute it and/or
11 modify it under the terms of the GNU General Public License
12 version 2 as amended with additional clauses defined in the
13 file LICENSE in the main source directory.
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 the file LICENSE for additional details.
26 /* Responses sent to the File daemon */
27 static char OK_data[] = "3000 OK data\n";
29 /* Forward referenced functions */
32 * Append Data sent from File daemon
35 bool do_append_data(JCR *jcr)
38 int32_t file_index, stream, last_file_index;
40 BSOCK *fd_sock = jcr->file_bsock;
43 char buf1[100], buf2[100];
50 Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
55 Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
59 Dmsg1(100, "Start append data. res=%d\n", dev->reserved_device);
61 memset(&rec, 0, sizeof(rec));
65 if (!bnet_set_buffer_size(ds, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
66 set_jcr_job_status(jcr, JS_ErrorTerminated);
67 Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
71 if (!acquire_device_for_append(dcr)) {
72 set_jcr_job_status(jcr, JS_ErrorTerminated);
77 set_jcr_job_status(jcr, JS_Running);
78 dir_send_job_status(jcr);
80 if (dev->VolCatInfo.VolCatName[0] == 0) {
81 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
83 Dmsg1(20, "Begin append device=%s\n", dev->print_name());
85 begin_data_spool(dcr);
86 begin_attribute_spool(jcr);
88 Dmsg0(100, "Just after acquire_device_for_append\n");
89 if (dev->VolCatInfo.VolCatName[0] == 0) {
90 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
93 * Write Begin Session Record
95 if (!write_session_label(dcr, SOS_LABEL)) {
96 Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
98 set_jcr_job_status(jcr, JS_ErrorTerminated);
101 if (dev->VolCatInfo.VolCatName[0] == 0) {
102 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
105 /* Tell File daemon to send data */
106 if (!bnet_fsend(fd_sock, OK_data)) {
108 Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
109 be.strerror(fd_sock->b_errno));
114 * Get Data from File daemon, write to device. To clarify what is
115 * going on here. We expect:
117 * - Multiple records of data
120 * The Stream header is just used to sychronize things, and
121 * none of the stream header is written to tape.
122 * The Multiple records of data, contain first the Attributes,
123 * then after another stream header, the file data, then
124 * after another stream header, the MD5 data if any.
126 * So we get the (stream header, data, EOD) three time for each
127 * file. 1. for the Attributes, 2. for the file data if any,
128 * and 3. for the MD5 if any.
130 dcr->VolFirstIndex = dcr->VolLastIndex = 0;
131 jcr->run_time = time(NULL); /* start counting time for rates */
132 for (last_file_index = 0; ok && !job_canceled(jcr); ) {
134 /* Read Stream header from the File daemon.
135 * The stream header consists of the following:
136 * file_index (sequential Bacula file index, base 1)
137 * stream (Bacula number to distinguish parts of data)
138 * info (Info for Storage daemon -- compressed, encryped, ...)
139 * info is not currently used, so is read, but ignored!
141 if ((n=bget_msg(ds)) <= 0) {
142 if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
143 break; /* end of data */
145 Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
152 * This hand scanning is a bit more complicated than a simple
153 * sscanf, but it allows us to handle any size integer up to
154 * int64_t without worrying about whether %d, %ld, %lld, or %q
155 * is the correct format for each different architecture.
156 * It is a real pity that sscanf() is not portable.
159 while (B_ISSPACE(*p)) {
162 file_index = (int32_t)str_to_int64(p);
163 while (B_ISDIGIT(*p)) {
166 if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
167 Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
171 stream = (int32_t)str_to_int64(p);
173 Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
175 if (!(file_index > 0 && (file_index == last_file_index ||
176 file_index == last_file_index + 1))) {
177 Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
181 if (file_index != last_file_index) {
182 jcr->JobFiles = file_index;
183 last_file_index = file_index;
186 /* Read data stream from the File daemon.
187 * The data stream is just raw bytes
189 while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
190 rec.VolSessionId = jcr->VolSessionId;
191 rec.VolSessionTime = jcr->VolSessionTime;
192 rec.FileIndex = file_index;
194 rec.data_len = ds->msglen;
195 rec.data = ds->msg; /* use message buffer */
197 Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
198 rec.FileIndex, rec.VolSessionId,
199 stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
202 while (!write_record_to_block(dcr->block, &rec)) {
203 Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
205 if (!write_block_to_device(dcr)) {
206 Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
207 dev->print_name(), dev->bstrerror());
208 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
209 dev->print_name(), dev->bstrerror());
215 Dmsg0(400, "Not OK\n");
218 jcr->JobBytes += rec.data_len; /* increment bytes this job */
219 Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
220 FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
221 stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
223 /* Send attributes and digest to Director for Catalog */
224 if (stream == STREAM_UNIX_ATTRIBUTES || stream == STREAM_UNIX_ATTRIBUTES_EX ||
225 crypto_digest_stream_type(stream) != CRYPTO_DIGEST_NONE) {
226 if (!jcr->no_attributes) {
227 if (are_attributes_spooled(jcr)) {
228 jcr->dir_bsock->spool = true;
230 Dmsg0(850, "Send attributes to dir.\n");
231 if (!dir_update_file_attributes(dcr, &rec)) {
232 jcr->dir_bsock->spool = false;
233 Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
234 bnet_strerror(jcr->dir_bsock));
238 jcr->dir_bsock->spool = false;
241 Dmsg0(650, "Enter bnet_get\n");
243 Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
245 if (is_bnet_error(ds)) {
246 Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
247 Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
254 time_t job_elapsed = time(NULL) - jcr->run_time;
256 if (job_elapsed == 0)
259 Jmsg(dcr->jcr, M_INFO, 0, _("Job write elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
260 job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
261 edit_uint64_with_commas(jcr->dcr->job_spool_size / job_elapsed, ec));
263 /* Create Job status for end of session label */
264 set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
266 Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
269 * If !OK, check if we can still write. This may not be the case
270 * if we are at the end of the tape or we got a fatal I/O error.
272 if (ok || dev->can_write()) {
273 if (!write_session_label(dcr, EOS_LABEL)) {
274 Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
276 set_jcr_job_status(jcr, JS_ErrorTerminated);
279 if (dev->VolCatInfo.VolCatName[0] == 0) {
280 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
282 Dmsg0(90, "back from write_end_session_label()\n");
283 /* Flush out final partial block of this session */
284 if (!write_block_to_device(dcr)) {
285 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
286 dev->print_name(), dev->bstrerror());
287 Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
291 if (dev->VolCatInfo.VolCatName[0] == 0) {
292 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
296 discard_data_spool(dcr);
298 commit_data_spool(dcr);
302 ok = dvd_close_job(dcr); /* do DVD cleanup if any */
305 /* Release the device -- and send final Vol info to DIR */
308 if (!ok || job_canceled(jcr)) {
309 discard_attribute_spool(jcr);
311 commit_attribute_spool(jcr);
314 dir_send_job_status(jcr); /* update director */
316 Dmsg1(100, "return from do_append_data() ok=%d\n", ok);