2 * Append code for Storage daemon
8 Copyright (C) 2000-2005 Kern Sibbald
10 This program is free software; you can redistribute it and/or
11 modify it under the terms of the GNU General Public License
12 version 2 as amended with additional clauses defined in the
13 file LICENSE in the main source directory.
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 the file LICENSE for additional details.
26 /* Responses sent to the File daemon */
27 static char OK_data[] = "3000 OK data\n";
29 /* Forward referenced functions */
32 * Append Data sent from File daemon
35 bool do_append_data(JCR *jcr)
38 int32_t file_index, stream, last_file_index;
40 BSOCK *fd_sock = jcr->file_bsock;
44 DEVICE *dev = dcr->dev;
45 char buf1[100], buf2[100];
48 Dmsg0(100, "Start append data.\n");
50 memset(&rec, 0, sizeof(rec));
54 if (!bnet_set_buffer_size(ds, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
55 set_jcr_job_status(jcr, JS_ErrorTerminated);
56 Jmsg(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
60 if (!acquire_device_for_append(dcr)) {
61 set_jcr_job_status(jcr, JS_ErrorTerminated);
65 set_jcr_job_status(jcr, JS_Running);
66 dir_send_job_status(jcr);
68 if (dev->VolCatInfo.VolCatName[0] == 0) {
69 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
71 Dmsg1(20, "Begin append device=%s\n", dev->print_name());
73 begin_data_spool(dcr);
74 begin_attribute_spool(jcr);
76 Dmsg0(100, "Just after acquire_device_for_append\n");
77 if (dev->VolCatInfo.VolCatName[0] == 0) {
78 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
81 * Write Begin Session Record
83 if (!write_session_label(dcr, SOS_LABEL)) {
84 Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
86 set_jcr_job_status(jcr, JS_ErrorTerminated);
89 if (dev->VolCatInfo.VolCatName[0] == 0) {
90 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
93 /* Tell File daemon to send data */
94 if (!bnet_fsend(fd_sock, OK_data)) {
96 Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
97 be.strerror(fd_sock->b_errno));
102 * Get Data from File daemon, write to device. To clarify what is
103 * going on here. We expect:
105 * - Multiple records of data
108 * The Stream header is just used to sychronize things, and
109 * none of the stream header is written to tape.
110 * The Multiple records of data, contain first the Attributes,
111 * then after another stream header, the file data, then
112 * after another stream header, the MD5 data if any.
114 * So we get the (stream header, data, EOD) three time for each
115 * file. 1. for the Attributes, 2. for the file data if any,
116 * and 3. for the MD5 if any.
118 dcr->VolFirstIndex = dcr->VolLastIndex = 0;
119 jcr->run_time = time(NULL); /* start counting time for rates */
120 for (last_file_index = 0; ok && !job_canceled(jcr); ) {
122 /* Read Stream header from the File daemon.
123 * The stream header consists of the following:
124 * file_index (sequential Bacula file index, base 1)
125 * stream (Bacula number to distinguish parts of data)
126 * info (Info for Storage daemon -- compressed, encryped, ...)
127 * info is not currently used, so is read, but ignored!
129 if ((n=bget_msg(ds)) <= 0) {
130 if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
131 break; /* end of data */
133 Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
140 * This hand scanning is a bit more complicated than a simple
141 * sscanf, but it allows us to handle any size integer up to
142 * int64_t without worrying about whether %d, %ld, %lld, or %q
143 * is the correct format for each different architecture.
144 * It is a real pity that sscanf() is not portable.
147 while (B_ISSPACE(*p)) {
150 file_index = (int32_t)str_to_int64(p);
151 while (B_ISDIGIT(*p)) {
154 if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
155 Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
159 stream = (int32_t)str_to_int64(p);
161 Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
163 if (!(file_index > 0 && (file_index == last_file_index ||
164 file_index == last_file_index + 1))) {
165 Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
169 if (file_index != last_file_index) {
170 jcr->JobFiles = file_index;
171 last_file_index = file_index;
174 /* Read data stream from the File daemon.
175 * The data stream is just raw bytes
177 while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
178 rec.VolSessionId = jcr->VolSessionId;
179 rec.VolSessionTime = jcr->VolSessionTime;
180 rec.FileIndex = file_index;
182 rec.data_len = ds->msglen;
183 rec.data = ds->msg; /* use message buffer */
185 Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
186 rec.FileIndex, rec.VolSessionId,
187 stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
190 while (!write_record_to_block(dcr->block, &rec)) {
191 Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
193 if (!write_block_to_device(dcr)) {
194 Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
195 dev->print_name(), strerror_dev(dev));
196 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
197 dev->print_name(), strerror_dev(dev));
203 Dmsg0(400, "Not OK\n");
206 jcr->JobBytes += rec.data_len; /* increment bytes this job */
207 Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
208 FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
209 stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
211 /* Send attributes and digest to Director for Catalog */
212 if (stream == STREAM_UNIX_ATTRIBUTES || stream == STREAM_UNIX_ATTRIBUTES_EX ||
213 crypto_digest_stream_type(stream) != CRYPTO_DIGEST_NONE) {
214 if (!jcr->no_attributes) {
215 if (are_attributes_spooled(jcr)) {
216 jcr->dir_bsock->spool = true;
218 Dmsg0(850, "Send attributes to dir.\n");
219 if (!dir_update_file_attributes(dcr, &rec)) {
220 jcr->dir_bsock->spool = false;
221 Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
222 bnet_strerror(jcr->dir_bsock));
226 jcr->dir_bsock->spool = false;
229 Dmsg0(650, "Enter bnet_get\n");
231 Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
232 if (is_bnet_error(ds)) {
233 Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
234 Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
241 /* Create Job status for end of session label */
242 set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
244 Dmsg1(200, "Write session label JobStatus=%d\n", jcr->JobStatus);
245 if ((!ok || job_canceled(jcr)) && dev->VolCatInfo.VolCatName[0] == 0) {
246 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
250 * If !OK, check if we can still write. This may not be the case
251 * if we are at the end of the tape or we got a fatal I/O error.
253 if (ok || dev->can_write()) {
254 if (!write_session_label(dcr, EOS_LABEL)) {
255 Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
257 set_jcr_job_status(jcr, JS_ErrorTerminated);
260 if (dev->VolCatInfo.VolCatName[0] == 0) {
261 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
263 Dmsg0(90, "back from write_end_session_label()\n");
264 /* Flush out final partial block of this session */
265 if (!write_block_to_device(dcr)) {
266 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
267 dev->print_name(), strerror_dev(dev));
268 Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
272 if (dev->VolCatInfo.VolCatName[0] == 0) {
273 Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
277 discard_data_spool(dcr);
279 commit_data_spool(dcr);
283 ok = dvd_close_job(dcr); /* do DVD cleanup if any */
286 /* Release the device -- and send final Vol info to DIR */
289 if (!ok || job_canceled(jcr)) {
290 discard_attribute_spool(jcr);
292 commit_attribute_spool(jcr);
295 dir_send_job_status(jcr); /* update director */
297 Dmsg1(100, "return from do_append_data() ok=%d\n", ok);