]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
f9224b1092650f70ecb17e4a327f40b63fc15bca
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Append code for Storage daemon
30  *  Kern Sibbald, May MM
31  *
32  */
33
34 #include "bacula.h"
35 #include "stored.h"
36
37
38 /* Responses sent to the File daemon */
39 static char OK_data[]    = "3000 OK data\n";
40 static char OK_append[]  = "3000 OK append data\n";
41
42 /* Forward referenced functions */
43
44
45 /* 
46  * Check if we can mark this job incomplete
47  *
48  */
49 void possible_incomplete_job(JCR *jcr, int32_t last_file_index)
50 {
51    /*
52     * Note, here we decide if it is worthwhile to restart
53     *  the Job at this point. For the moment, if at least
54     *  10 Files have been seen, which is good for testing, but
55     *  for a production system, we probably want something like
56     *  100-1000 files, and some number of bytes of data.
57     *
58     *  ****FIXME**** update this
59     */
60    if (last_file_index > 10) {
61       jcr->setJobStatus(JS_Incomplete);
62    }
63 }
64 /*
65  *  Append Data sent from File daemon
66  *
67  */
68 bool do_append_data(JCR *jcr)
69 {
70    int32_t n;
71    int32_t file_index, stream, last_file_index;
72    BSOCK *fd = jcr->file_bsock;
73    bool ok = true;
74    DEV_RECORD rec;
75    char buf1[100], buf2[100];
76    DCR *dcr = jcr->dcr;
77    DEVICE *dev;
78    char ec[50];
79
80
81    if (!dcr) { 
82       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
83       return false;
84    }                                              
85    dev = dcr->dev;
86    if (!dev) { 
87       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
88       return false;
89    }                                              
90
91    Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
92
93    memset(&rec, 0, sizeof(rec));
94
95    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
96       set_jcr_job_status(jcr, JS_ErrorTerminated);
97       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
98       return false;
99    }
100
101    if (!acquire_device_for_append(dcr)) {
102       set_jcr_job_status(jcr, JS_ErrorTerminated);
103       return false;
104    }
105
106    set_jcr_job_status(jcr, JS_Running);
107    dir_send_job_status(jcr);
108
109    if (dev->VolCatInfo.VolCatName[0] == 0) {
110       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
111    }
112    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
113
114    begin_data_spool(dcr);
115    begin_attribute_spool(jcr);
116
117    Dmsg0(100, "Just after acquire_device_for_append\n");
118    if (dev->VolCatInfo.VolCatName[0] == 0) {
119       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
120    }
121    /*
122     * Write Begin Session Record
123     */
124    if (!write_session_label(dcr, SOS_LABEL)) {
125       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
126          dev->bstrerror());
127       set_jcr_job_status(jcr, JS_ErrorTerminated);
128       ok = false;
129    }
130    if (dev->VolCatInfo.VolCatName[0] == 0) {
131       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
132    }
133
134    /* Tell File daemon to send data */
135    if (!fd->fsend(OK_data)) {
136       berrno be;
137       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
138             be.bstrerror(fd->b_errno));
139       ok = false;
140    }
141
142    /*
143     * Get Data from File daemon, write to device.  To clarify what is
144     *   going on here.  We expect:
145     *     - A stream header
146     *     - Multiple records of data
147     *     - EOD record
148     *
149     *    The Stream header is just used to sychronize things, and
150     *    none of the stream header is written to tape.
151     *    The Multiple records of data, contain first the Attributes,
152     *    then after another stream header, the file data, then
153     *    after another stream header, the MD5 data if any.
154     *
155     *   So we get the (stream header, data, EOD) three time for each
156     *   file. 1. for the Attributes, 2. for the file data if any,
157     *   and 3. for the MD5 if any.
158     */
159    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
160    jcr->run_time = time(NULL);              /* start counting time for rates */
161    for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
162
163       /* Read Stream header from the File daemon.
164        *  The stream header consists of the following:
165        *    file_index (sequential Bacula file index, base 1)
166        *    stream     (Bacula number to distinguish parts of data)
167        *    info       (Info for Storage daemon -- compressed, encrypted, ...)
168        *       info is not currently used, so is read, but ignored!
169        */
170      if ((n=bget_msg(fd)) <= 0) {
171          if (n == BNET_SIGNAL && fd->msglen == BNET_EOD) {
172             break;                    /* end of data */
173          }
174          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
175                fd->bstrerror());
176          possible_incomplete_job(jcr, last_file_index);
177          ok = false;
178          break;
179       }
180
181       if (sscanf(fd->msg, "%ld %ld", &file_index, &stream) != 2) {
182          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), fd->msg);
183          ok = false;
184          break;
185       }
186
187       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
188
189       /*
190        * We make sure the file_index is advancing sequentially.
191        * An incomplete job can start the file_index at any number.
192        * otherwise, it must start at 1.
193        */
194       if (jcr->incomplete && file_index > 0 && last_file_index == 0) {
195          goto fi_checked;
196       }
197       if (file_index > 0 && (file_index == last_file_index ||
198           file_index == last_file_index + 1)) {
199          goto fi_checked;
200       }
201       Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or sequential=%d\n"),
202             file_index, last_file_index);
203       possible_incomplete_job(jcr, last_file_index);
204       ok = false;
205       break;
206
207 fi_checked:
208       if (file_index != last_file_index) {
209          jcr->JobFiles = file_index;
210          last_file_index = file_index;
211       }
212
213       /* Read data stream from the File daemon.
214        *  The data stream is just raw bytes
215        */
216       while ((n=bget_msg(fd)) > 0 && !jcr->is_job_canceled()) {
217          rec.VolSessionId = jcr->VolSessionId;
218          rec.VolSessionTime = jcr->VolSessionTime;
219          rec.FileIndex = file_index;
220          rec.Stream = stream;
221          rec.maskedStream = stream & STREAMMASK_TYPE;   /* strip high bits */
222          rec.data_len = fd->msglen;
223          rec.data = fd->msg;            /* use message buffer */
224
225          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
226             rec.FileIndex, rec.VolSessionId, 
227             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
228             rec.data_len);
229
230          while (!write_record_to_block(dcr->block, &rec)) {
231             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
232                        rec.remainder);
233             if (!write_block_to_device(dcr)) {
234                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
235                   dev->print_name(), dev->bstrerror());
236                ok = false;
237                break;
238             }
239          }
240          if (!ok) {
241             Dmsg0(400, "Not OK\n");
242             break;
243          }
244          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
245          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
246             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
247             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
248
249          send_attrs_to_dir(jcr, &rec);
250          Dmsg0(650, "Enter bnet_get\n");
251       }
252       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
253
254       if (fd->is_error()) {
255          if (!jcr->is_job_canceled()) {
256             Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
257             Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
258                   fd->bstrerror());
259             possible_incomplete_job(jcr, last_file_index);
260          }
261          ok = false;
262          break;
263       }
264    }
265
266    /* Create Job status for end of session label */
267    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
268
269    if (ok) {
270       /* Terminate connection with FD */
271       fd->fsend(OK_append);
272       do_fd_commands(jcr);               /* finish dialog with FD */
273    } else {
274       fd->fsend("3999 Failed append\n");
275    }
276
277    /*
278     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
279     *   and the subsequent Jmsg() editing will break
280     */
281    int32_t job_elapsed = time(NULL) - jcr->run_time;
282
283    if (job_elapsed <= 0) {
284       job_elapsed = 1;
285    }
286
287    Jmsg(dcr->jcr, M_INFO, 0, _("Job write elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
288          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
289          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
290
291
292    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
293
294    /*
295     * Check if we can still write. This may not be the case
296     *  if we are at the end of the tape or we got a fatal I/O error.
297     */
298    if (ok || dev->can_write()) {
299       if (!write_session_label(dcr, EOS_LABEL)) {
300          /* Print only if ok and not cancelled to avoid spurious messages */
301          if (ok && !jcr->is_job_canceled()) {
302             Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
303                   dev->bstrerror());
304          }
305          set_jcr_job_status(jcr, JS_ErrorTerminated);
306          ok = false;
307       }
308       if (dev->VolCatInfo.VolCatName[0] == 0) {
309          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
310          Dmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
311       }
312       Dmsg0(90, "back from write_end_session_label()\n");
313       /* Flush out final partial block of this session */
314       if (!write_block_to_device(dcr)) {
315          /* Print only if ok and not cancelled to avoid spurious messages */
316          if (ok && !jcr->is_job_canceled()) {
317             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
318                   dev->print_name(), dev->bstrerror());
319             Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
320          }
321          set_jcr_job_status(jcr, JS_ErrorTerminated);
322          ok = false;
323       }
324       if (dev->VolCatInfo.VolCatName[0] == 0) {
325          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
326          Dmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
327       }
328    }
329
330
331    if (!ok && (jcr->getJobStatus() != JS_Incomplete)) {
332       discard_data_spool(dcr);
333    } else {
334       /* Note: if commit is OK, the device will remain blocked */
335       commit_data_spool(dcr);
336    }
337
338    if (ok) {
339       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
340    }
341    
342    /*
343     * Release the device -- and send final Vol info to DIR
344     *  and unlock it.
345     */
346    release_device(dcr);
347
348    if ((!ok || jcr->is_job_canceled()) && (jcr->getJobStatus() != JS_Incomplete)) {
349       discard_attribute_spool(jcr);
350    } else {
351       commit_attribute_spool(jcr);
352    }
353
354    dir_send_job_status(jcr);          /* update director */
355
356    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
357    return ok;
358 }
359
360
361 /* Send attributes and digest to Director for Catalog */
362 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
363 {
364    if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES    || 
365        rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
366        rec->maskedStream == STREAM_RESTORE_OBJECT     ||
367        crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
368       if (!jcr->no_attributes) {
369          BSOCK *dir = jcr->dir_bsock;
370          if (are_attributes_spooled(jcr)) {
371             dir->set_spooling();
372          }
373          Dmsg0(850, "Send attributes to dir.\n");
374          if (!dir_update_file_attributes(jcr->dcr, rec)) {
375             Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
376                dir->bstrerror());
377             dir->clear_spooling();
378             return false;
379          }
380          dir->clear_spooling();
381       }
382    }
383    return true;
384 }