]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
Prepare for adding high bits to Stream
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Append code for Storage daemon
30  *  Kern Sibbald, May MM
31  *
32  */
33
34 #include "bacula.h"
35 #include "stored.h"
36
37
38 /* Responses sent to the File daemon */
39 static char OK_data[]    = "3000 OK data\n";
40 static char OK_append[]  = "3000 OK append data\n";
41
42 /* Forward referenced functions */
43
44 /*
45  *  Append Data sent from File daemon
46  *
47  */
48 bool do_append_data(JCR *jcr)
49 {
50    int32_t n;
51    int32_t file_index, stream, last_file_index;
52    BSOCK *fd = jcr->file_bsock;
53    bool ok = true;
54    DEV_RECORD rec;
55    char buf1[100], buf2[100];
56    DCR *dcr = jcr->dcr;
57    DEVICE *dev;
58    char ec[50];
59
60
61    if (!dcr) { 
62       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
63       return false;
64    }                                              
65    dev = dcr->dev;
66    if (!dev) { 
67       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
68       return false;
69    }                                              
70
71    Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
72
73    memset(&rec, 0, sizeof(rec));
74
75    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
76       set_jcr_job_status(jcr, JS_ErrorTerminated);
77       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
78       return false;
79    }
80
81    if (!acquire_device_for_append(dcr)) {
82       set_jcr_job_status(jcr, JS_ErrorTerminated);
83       return false;
84    }
85
86    set_jcr_job_status(jcr, JS_Running);
87    dir_send_job_status(jcr);
88
89    if (dev->VolCatInfo.VolCatName[0] == 0) {
90       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
91    }
92    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
93
94    begin_data_spool(dcr);
95    begin_attribute_spool(jcr);
96
97    Dmsg0(100, "Just after acquire_device_for_append\n");
98    if (dev->VolCatInfo.VolCatName[0] == 0) {
99       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
100    }
101    /*
102     * Write Begin Session Record
103     */
104    if (!write_session_label(dcr, SOS_LABEL)) {
105       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
106          dev->bstrerror());
107       set_jcr_job_status(jcr, JS_ErrorTerminated);
108       ok = false;
109    }
110    if (dev->VolCatInfo.VolCatName[0] == 0) {
111       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
112    }
113
114    /* Tell File daemon to send data */
115    if (!fd->fsend(OK_data)) {
116       berrno be;
117       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
118             be.bstrerror(fd->b_errno));
119       ok = false;
120    }
121
122    /*
123     * Get Data from File daemon, write to device.  To clarify what is
124     *   going on here.  We expect:
125     *     - A stream header
126     *     - Multiple records of data
127     *     - EOD record
128     *
129     *    The Stream header is just used to sychronize things, and
130     *    none of the stream header is written to tape.
131     *    The Multiple records of data, contain first the Attributes,
132     *    then after another stream header, the file data, then
133     *    after another stream header, the MD5 data if any.
134     *
135     *   So we get the (stream header, data, EOD) three time for each
136     *   file. 1. for the Attributes, 2. for the file data if any,
137     *   and 3. for the MD5 if any.
138     */
139    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
140    jcr->run_time = time(NULL);              /* start counting time for rates */
141    for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
142
143       /* Read Stream header from the File daemon.
144        *  The stream header consists of the following:
145        *    file_index (sequential Bacula file index, base 1)
146        *    stream     (Bacula number to distinguish parts of data)
147        *    info       (Info for Storage daemon -- compressed, encrypted, ...)
148        *       info is not currently used, so is read, but ignored!
149        */
150      if ((n=bget_msg(fd)) <= 0) {
151          if (n == BNET_SIGNAL && fd->msglen == BNET_EOD) {
152             break;                    /* end of data */
153          }
154          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
155                fd->bstrerror());
156          ok = false;
157          break;
158       }
159
160       if (sscanf(fd->msg, "%ld %ld", &file_index, &stream) != 2) {
161          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), fd->msg);
162          ok = false;
163          break;
164       }
165
166       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
167
168       if (!(file_index > 0 && (file_index == last_file_index ||
169           file_index == last_file_index + 1))) {
170          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
171          ok = false;
172          break;
173       }
174       if (file_index != last_file_index) {
175          jcr->JobFiles = file_index;
176          last_file_index = file_index;
177       }
178
179       /* Read data stream from the File daemon.
180        *  The data stream is just raw bytes
181        */
182       while ((n=bget_msg(fd)) > 0 && !jcr->is_job_canceled()) {
183          rec.VolSessionId = jcr->VolSessionId;
184          rec.VolSessionTime = jcr->VolSessionTime;
185          rec.FileIndex = file_index;
186          rec.Stream = stream;
187          rec.maskedStream = stream & STREAMMASK_TYPE;   /* strip high bits */
188          rec.data_len = fd->msglen;
189          rec.data = fd->msg;            /* use message buffer */
190
191          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
192             rec.FileIndex, rec.VolSessionId, 
193             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
194             rec.data_len);
195
196          while (!write_record_to_block(dcr->block, &rec)) {
197             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
198                        rec.remainder);
199             if (!write_block_to_device(dcr)) {
200                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
201                   dev->print_name(), dev->bstrerror());
202                ok = false;
203                break;
204             }
205          }
206          if (!ok) {
207             Dmsg0(400, "Not OK\n");
208             break;
209          }
210          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
211          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
212             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
213             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
214
215          send_attrs_to_dir(jcr, &rec);
216          Dmsg0(650, "Enter bnet_get\n");
217       }
218       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
219
220       if (fd->is_error()) {
221          if (!jcr->is_job_canceled()) {
222             Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
223             Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
224                   fd->bstrerror());
225          }
226          ok = false;
227          break;
228       }
229    }
230
231    /* Create Job status for end of session label */
232    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
233
234    if (ok) {
235       /* Terminate connection with FD */
236       fd->fsend(OK_append);
237       do_fd_commands(jcr);               /* finish dialog with FD */
238    } else {
239       fd->fsend("3999 Failed append\n");
240    }
241
242    /*
243     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
244     *   and the subsequent Jmsg() editing will break
245     */
246    int32_t job_elapsed = time(NULL) - jcr->run_time;
247
248    if (job_elapsed <= 0) {
249       job_elapsed = 1;
250    }
251
252    Jmsg(dcr->jcr, M_INFO, 0, _("Job write elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
253          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
254          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
255
256
257    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
258
259    /*
260     * Check if we can still write. This may not be the case
261     *  if we are at the end of the tape or we got a fatal I/O error.
262     */
263    if (ok || dev->can_write()) {
264       if (!write_session_label(dcr, EOS_LABEL)) {
265          /* Print only if ok and not cancelled to avoid spurious messages */
266          if (ok && !jcr->is_job_canceled()) {
267             Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
268                   dev->bstrerror());
269          }
270          set_jcr_job_status(jcr, JS_ErrorTerminated);
271          ok = false;
272       }
273       if (dev->VolCatInfo.VolCatName[0] == 0) {
274          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
275          Dmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
276       }
277       Dmsg0(90, "back from write_end_session_label()\n");
278       /* Flush out final partial block of this session */
279       if (!write_block_to_device(dcr)) {
280          /* Print only if ok and not cancelled to avoid spurious messages */
281          if (ok && !jcr->is_job_canceled()) {
282             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
283                   dev->print_name(), dev->bstrerror());
284             Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
285          }
286          set_jcr_job_status(jcr, JS_ErrorTerminated);
287          ok = false;
288       }
289       if (dev->VolCatInfo.VolCatName[0] == 0) {
290          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
291          Dmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
292       }
293    }
294
295
296    if (!ok) {
297       discard_data_spool(dcr);
298    } else {
299       /* Note: if commit is OK, the device will remain blocked */
300       commit_data_spool(dcr);
301    }
302
303    if (ok) {
304       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
305    }
306    
307    /*
308     * Release the device -- and send final Vol info to DIR
309     *  and unlock it.
310     */
311    release_device(dcr);
312
313    if (!ok || jcr->is_job_canceled()) {
314       discard_attribute_spool(jcr);
315    } else {
316       commit_attribute_spool(jcr);
317    }
318
319    dir_send_job_status(jcr);          /* update director */
320
321    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
322    return ok;
323 }
324
325
326 /* Send attributes and digest to Director for Catalog */
327 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
328 {
329    if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES    || 
330        rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
331        rec->maskedStream == STREAM_RESTORE_OBJECT     ||
332        crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
333       if (!jcr->no_attributes) {
334          BSOCK *dir = jcr->dir_bsock;
335          if (are_attributes_spooled(jcr)) {
336             dir->set_spooling();
337          }
338          Dmsg0(850, "Send attributes to dir.\n");
339          if (!dir_update_file_attributes(jcr->dcr, rec)) {
340             Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
341                dir->bstrerror());
342             dir->clear_spooling();
343             return false;
344          }
345          dir->clear_spooling();
346       }
347    }
348    return true;
349 }