]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
Backport more from master
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2012 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Append code for Storage daemon
30  *  Kern Sibbald, May MM
31  *
32  */
33
34 #include "bacula.h"
35 #include "stored.h"
36
37
38 /* Responses sent to the File daemon */
39 static char OK_data[]    = "3000 OK data\n";
40 static char OK_append[]  = "3000 OK append data\n";
41
42 /* Forward referenced functions */
43
44
45 /* 
46  */
47 void possible_incomplete_job(JCR *jcr, int32_t last_file_index)
48 {
49 }
50 /*
51  *  Append Data sent from File daemon
52  *
53  */
54 bool do_append_data(JCR *jcr)
55 {
56    int32_t n;
57    int32_t file_index, stream, last_file_index;
58    BSOCK *fd = jcr->file_bsock;
59    bool ok = true;
60    DEV_RECORD rec;
61    char buf1[100], buf2[100];
62    DCR *dcr = jcr->dcr;
63    DEVICE *dev;
64    char ec[50];
65
66
67    if (!dcr) { 
68       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
69       return false;
70    }                                              
71    dev = dcr->dev;
72    if (!dev) { 
73       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
74       return false;
75    }                                              
76
77    Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
78
79    memset(&rec, 0, sizeof(rec));
80
81    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
82       jcr->setJobStatus(JS_ErrorTerminated);
83       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
84       return false;
85    }
86
87    if (!acquire_device_for_append(dcr)) {
88       jcr->setJobStatus(JS_ErrorTerminated);
89       return false;
90    }
91
92    jcr->sendJobStatus(JS_Running);
93
94    if (dev->VolCatInfo.VolCatName[0] == 0) {
95       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
96    }
97    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
98
99    begin_data_spool(dcr);
100    begin_attribute_spool(jcr);
101
102    Dmsg0(100, "Just after acquire_device_for_append\n");
103    if (dev->VolCatInfo.VolCatName[0] == 0) {
104       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
105    }
106    /*
107     * Write Begin Session Record
108     */
109    if (!write_session_label(dcr, SOS_LABEL)) {
110       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
111          dev->bstrerror());
112       jcr->setJobStatus(JS_ErrorTerminated);
113       ok = false;
114    }
115    if (dev->VolCatInfo.VolCatName[0] == 0) {
116       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
117    }
118
119    /* Tell File daemon to send data */
120    if (!fd->fsend(OK_data)) {
121       berrno be;
122       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
123             be.bstrerror(fd->b_errno));
124       ok = false;
125    }
126
127    /*
128     * Get Data from File daemon, write to device.  To clarify what is
129     *   going on here.  We expect:
130     *     - A stream header
131     *     - Multiple records of data
132     *     - EOD record
133     *
134     *    The Stream header is just used to sychronize things, and
135     *    none of the stream header is written to tape.
136     *    The Multiple records of data, contain first the Attributes,
137     *    then after another stream header, the file data, then
138     *    after another stream header, the MD5 data if any.
139     *
140     *   So we get the (stream header, data, EOD) three time for each
141     *   file. 1. for the Attributes, 2. for the file data if any,
142     *   and 3. for the MD5 if any.
143     */
144    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
145    jcr->run_time = time(NULL);              /* start counting time for rates */
146    for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
147
148       /* Read Stream header from the File daemon.
149        *  The stream header consists of the following:
150        *    file_index (sequential Bacula file index, base 1)
151        *    stream     (Bacula number to distinguish parts of data)
152        *    info       (Info for Storage daemon -- compressed, encrypted, ...)
153        *       info is not currently used, so is read, but ignored!
154        */
155      if ((n=bget_msg(fd)) <= 0) {
156          if (n == BNET_SIGNAL && fd->msglen == BNET_EOD) {
157             break;                    /* end of data */
158          }
159          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
160                fd->bstrerror());
161          possible_incomplete_job(jcr, last_file_index);
162          ok = false;
163          break;
164       }
165
166       if (sscanf(fd->msg, "%ld %ld", &file_index, &stream) != 2) {
167          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), fd->msg);
168          ok = false;
169          possible_incomplete_job(jcr, last_file_index);
170          break;
171       }
172
173       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
174
175       /*
176        * We make sure the file_index is advancing sequentially.
177        * An incomplete job can start the file_index at any number.
178        * otherwise, it must start at 1.
179        */
180       if (jcr->rerunning && file_index > 0 && last_file_index == 0) {
181          goto fi_checked;
182       }
183       if (file_index > 0 && (file_index == last_file_index ||
184           file_index == last_file_index + 1)) {
185          goto fi_checked;
186       }
187       Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or sequential=%d\n"),
188             file_index, last_file_index);
189       possible_incomplete_job(jcr, last_file_index);
190       ok = false;
191       break;
192
193 fi_checked:
194       if (file_index != last_file_index) {
195          jcr->JobFiles = file_index;
196          last_file_index = file_index;
197       }
198
199       /* Read data stream from the File daemon.
200        *  The data stream is just raw bytes
201        */
202       while ((n=bget_msg(fd)) > 0 && !jcr->is_job_canceled()) {
203          rec.VolSessionId = jcr->VolSessionId;
204          rec.VolSessionTime = jcr->VolSessionTime;
205          rec.FileIndex = file_index;
206          rec.Stream = stream;
207          rec.maskedStream = stream & STREAMMASK_TYPE;   /* strip high bits */
208          rec.data_len = fd->msglen;
209          rec.data = fd->msg;            /* use message buffer */
210
211          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
212             rec.FileIndex, rec.VolSessionId, 
213             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
214             rec.data_len);
215
216          ok = dcr->write_record(&rec);
217          if (!ok) {
218             Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
219                   dcr->dev->print_name(), dcr->dev->bstrerror());
220             break;
221          }
222          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
223          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
224             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
225             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
226
227          send_attrs_to_dir(jcr, &rec);
228          Dmsg0(650, "Enter bnet_get\n");
229       }
230       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
231
232       if (fd->is_error()) {
233          if (!jcr->is_job_canceled()) {
234             Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
235             Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
236                   fd->bstrerror());
237             possible_incomplete_job(jcr, last_file_index);
238          }
239          ok = false;
240          break;
241       }
242    }
243
244    /* Create Job status for end of session label */
245    jcr->setJobStatus(ok?JS_Terminated:JS_ErrorTerminated);
246
247    if (ok) {
248       /* Terminate connection with FD */
249       fd->fsend(OK_append);
250       do_fd_commands(jcr);               /* finish dialog with FD */
251    } else {
252       fd->fsend("3999 Failed append\n");
253    }
254
255    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
256
257    /*
258     * Check if we can still write. This may not be the case
259     *  if we are at the end of the tape or we got a fatal I/O error.
260     */
261    if (ok || dev->can_write()) {
262       if (!write_session_label(dcr, EOS_LABEL)) {
263          /* Print only if ok and not cancelled to avoid spurious messages */
264          if (ok && !jcr->is_job_canceled()) {
265             Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
266                   dev->bstrerror());
267             possible_incomplete_job(jcr, last_file_index);
268          }
269          jcr->setJobStatus(JS_ErrorTerminated);
270          ok = false;
271       }
272       Dmsg0(90, "back from write_end_session_label()\n");
273       /* Flush out final partial block of this session */
274       if (!dcr->write_block_to_device()) {
275          /* Print only if ok and not cancelled to avoid spurious messages */
276          if (ok && !jcr->is_job_canceled()) {
277             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
278                   dev->print_name(), dev->bstrerror());
279             Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
280             possible_incomplete_job(jcr, last_file_index);
281          }
282          jcr->setJobStatus(JS_ErrorTerminated);
283          ok = false;
284       }
285    }
286
287    if (!ok && !jcr->is_JobStatus(JS_Incomplete)) {
288       discard_data_spool(dcr);
289    } else {
290       /* Note: if commit is OK, the device will remain blocked */
291       commit_data_spool(dcr);
292    }
293
294    if (ok) {
295       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
296    }
297
298    /*
299     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
300     *   and the subsequent Jmsg() editing will break
301     */
302    int32_t job_elapsed = time(NULL) - jcr->run_time;
303
304    if (job_elapsed <= 0) {
305       job_elapsed = 1;
306    }
307
308    Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
309          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
310          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
311    
312    /*
313     * Release the device -- and send final Vol info to DIR
314     *  and unlock it.
315     */
316    release_device(dcr);
317
318    if ((!ok || jcr->is_job_canceled()) && !jcr->is_JobStatus(JS_Incomplete)) {
319       discard_attribute_spool(jcr);
320    } else {
321       commit_attribute_spool(jcr);
322    }
323
324    jcr->sendJobStatus();          /* update director */
325
326    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
327    return ok;
328 }
329
330
331 /* Send attributes and digest to Director for Catalog */
332 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
333 {
334    if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES    || 
335        rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
336        rec->maskedStream == STREAM_RESTORE_OBJECT     ||
337        crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
338       if (!jcr->no_attributes) {
339          BSOCK *dir = jcr->dir_bsock;
340          if (are_attributes_spooled(jcr)) {
341             dir->set_spooling();
342          }
343          Dmsg0(850, "Send attributes to dir.\n");
344          if (!dir_update_file_attributes(jcr->dcr, rec)) {
345             Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
346                dir->bstrerror());
347             dir->clear_spooling();
348             return false;
349          }
350          dir->clear_spooling();
351       }
352    }
353    return true;
354 }