]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
0f95f6463a2e6040cd121419b2a5a430d81074a9
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Append code for Storage daemon
30  *  Kern Sibbald, May MM
31  *
32  */
33
34 #include "bacula.h"
35 #include "stored.h"
36
37
38 /* Responses sent to the File daemon */
39 static char OK_data[]    = "3000 OK data\n";
40 static char OK_append[]  = "3000 OK append data\n";
41
42 /* Forward referenced functions */
43
44
45 /* 
46  */
47 void possible_incomplete_job(JCR *jcr, int32_t last_file_index)
48 {
49 }
50 /*
51  *  Append Data sent from File daemon
52  *
53  */
54 bool do_append_data(JCR *jcr)
55 {
56    int32_t n;
57    int32_t file_index, stream, last_file_index;
58    BSOCK *fd = jcr->file_bsock;
59    bool ok = true;
60    DEV_RECORD rec;
61    char buf1[100], buf2[100];
62    DCR *dcr = jcr->dcr;
63    DEVICE *dev;
64    char ec[50];
65
66
67    if (!dcr) { 
68       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
69       return false;
70    }                                              
71    dev = dcr->dev;
72    if (!dev) { 
73       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
74       return false;
75    }                                              
76
77    Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
78
79    memset(&rec, 0, sizeof(rec));
80
81    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
82       jcr->setJobStatus(JS_ErrorTerminated);
83       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
84       return false;
85    }
86
87    if (!acquire_device_for_append(dcr)) {
88       jcr->setJobStatus(JS_ErrorTerminated);
89       return false;
90    }
91
92    jcr->setJobStatus(JS_Running);
93    dir_send_job_status(jcr);
94
95    if (dev->VolCatInfo.VolCatName[0] == 0) {
96       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
97    }
98    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
99
100    begin_data_spool(dcr);
101    begin_attribute_spool(jcr);
102
103    Dmsg0(100, "Just after acquire_device_for_append\n");
104    if (dev->VolCatInfo.VolCatName[0] == 0) {
105       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
106    }
107    /*
108     * Write Begin Session Record
109     */
110    if (!write_session_label(dcr, SOS_LABEL)) {
111       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
112          dev->bstrerror());
113       jcr->setJobStatus(JS_ErrorTerminated);
114       ok = false;
115    }
116    if (dev->VolCatInfo.VolCatName[0] == 0) {
117       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
118    }
119
120    /* Tell File daemon to send data */
121    if (!fd->fsend(OK_data)) {
122       berrno be;
123       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
124             be.bstrerror(fd->b_errno));
125       ok = false;
126    }
127
128    /*
129     * Get Data from File daemon, write to device.  To clarify what is
130     *   going on here.  We expect:
131     *     - A stream header
132     *     - Multiple records of data
133     *     - EOD record
134     *
135     *    The Stream header is just used to sychronize things, and
136     *    none of the stream header is written to tape.
137     *    The Multiple records of data, contain first the Attributes,
138     *    then after another stream header, the file data, then
139     *    after another stream header, the MD5 data if any.
140     *
141     *   So we get the (stream header, data, EOD) three time for each
142     *   file. 1. for the Attributes, 2. for the file data if any,
143     *   and 3. for the MD5 if any.
144     */
145    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
146    jcr->run_time = time(NULL);              /* start counting time for rates */
147    for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
148
149       /* Read Stream header from the File daemon.
150        *  The stream header consists of the following:
151        *    file_index (sequential Bacula file index, base 1)
152        *    stream     (Bacula number to distinguish parts of data)
153        *    info       (Info for Storage daemon -- compressed, encrypted, ...)
154        *       info is not currently used, so is read, but ignored!
155        */
156      if ((n=bget_msg(fd)) <= 0) {
157          if (n == BNET_SIGNAL && fd->msglen == BNET_EOD) {
158             break;                    /* end of data */
159          }
160          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
161                fd->bstrerror());
162          possible_incomplete_job(jcr, last_file_index);
163          ok = false;
164          break;
165       }
166
167       if (sscanf(fd->msg, "%ld %ld", &file_index, &stream) != 2) {
168          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), fd->msg);
169          ok = false;
170          possible_incomplete_job(jcr, last_file_index);
171          break;
172       }
173
174       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
175
176       /*
177        * We make sure the file_index is advancing sequentially.
178        * An incomplete job can start the file_index at any number.
179        * otherwise, it must start at 1.
180        */
181       if (jcr->rerunning && file_index > 0 && last_file_index == 0) {
182          goto fi_checked;
183       }
184       if (file_index > 0 && (file_index == last_file_index ||
185           file_index == last_file_index + 1)) {
186          goto fi_checked;
187       }
188       Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or sequential=%d\n"),
189             file_index, last_file_index);
190       possible_incomplete_job(jcr, last_file_index);
191       ok = false;
192       break;
193
194 fi_checked:
195       if (file_index != last_file_index) {
196          jcr->JobFiles = file_index;
197          last_file_index = file_index;
198       }
199
200       /* Read data stream from the File daemon.
201        *  The data stream is just raw bytes
202        */
203       while ((n=bget_msg(fd)) > 0 && !jcr->is_job_canceled()) {
204          rec.VolSessionId = jcr->VolSessionId;
205          rec.VolSessionTime = jcr->VolSessionTime;
206          rec.FileIndex = file_index;
207          rec.Stream = stream;
208          rec.maskedStream = stream & STREAMMASK_TYPE;   /* strip high bits */
209          rec.data_len = fd->msglen;
210          rec.data = fd->msg;            /* use message buffer */
211
212          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
213             rec.FileIndex, rec.VolSessionId, 
214             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
215             rec.data_len);
216
217          while (!write_record_to_block(dcr, &rec)) {
218             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
219                        rec.remainder);
220             if (!dcr->write_block_to_device()) {
221                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
222                   dev->print_name(), dev->bstrerror());
223                ok = false;
224                break;
225             }
226          }
227          if (!ok) {
228             Dmsg0(400, "Not OK\n");
229             break;
230          }
231          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
232          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
233             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
234             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
235
236          send_attrs_to_dir(jcr, &rec);
237          Dmsg0(650, "Enter bnet_get\n");
238       }
239       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
240
241       if (fd->is_error()) {
242          if (!jcr->is_job_canceled()) {
243             Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
244             Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
245                   fd->bstrerror());
246             possible_incomplete_job(jcr, last_file_index);
247          }
248          ok = false;
249          break;
250       }
251    }
252
253    /* Create Job status for end of session label */
254    jcr->setJobStatus(ok?JS_Terminated:JS_ErrorTerminated);
255
256    if (ok) {
257       /* Terminate connection with FD */
258       fd->fsend(OK_append);
259       do_fd_commands(jcr);               /* finish dialog with FD */
260    } else {
261       fd->fsend("3999 Failed append\n");
262    }
263
264    /*
265     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
266     *   and the subsequent Jmsg() editing will break
267     */
268    int32_t job_elapsed = time(NULL) - jcr->run_time;
269
270    if (job_elapsed <= 0) {
271       job_elapsed = 1;
272    }
273
274    Jmsg(dcr->jcr, M_INFO, 0, _("Job write elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
275          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
276          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
277
278
279    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
280
281    /*
282     * Check if we can still write. This may not be the case
283     *  if we are at the end of the tape or we got a fatal I/O error.
284     */
285    if (ok || dev->can_write()) {
286       if (!write_session_label(dcr, EOS_LABEL)) {
287          /* Print only if ok and not cancelled to avoid spurious messages */
288          if (ok && !jcr->is_job_canceled()) {
289             Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
290                   dev->bstrerror());
291             possible_incomplete_job(jcr, last_file_index);
292          }
293          jcr->setJobStatus(JS_ErrorTerminated);
294          ok = false;
295       }
296       Dmsg0(90, "back from write_end_session_label()\n");
297       /* Flush out final partial block of this session */
298       if (!dcr->write_block_to_device()) {
299          /* Print only if ok and not cancelled to avoid spurious messages */
300          if (ok && !jcr->is_job_canceled()) {
301             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
302                   dev->print_name(), dev->bstrerror());
303             Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
304             possible_incomplete_job(jcr, last_file_index);
305          }
306          jcr->setJobStatus(JS_ErrorTerminated);
307          ok = false;
308       }
309    }
310
311
312    if (!ok && !jcr->is_JobStatus(JS_Incomplete)) {
313       discard_data_spool(dcr);
314    } else {
315       /* Note: if commit is OK, the device will remain blocked */
316       commit_data_spool(dcr);
317    }
318
319    if (ok) {
320       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
321    }
322    
323    /*
324     * Release the device -- and send final Vol info to DIR
325     *  and unlock it.
326     */
327    release_device(dcr);
328
329    if ((!ok || jcr->is_job_canceled()) && !jcr->is_JobStatus(JS_Incomplete)) {
330       discard_attribute_spool(jcr);
331    } else {
332       commit_attribute_spool(jcr);
333    }
334
335    dir_send_job_status(jcr);          /* update director */
336
337    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
338    return ok;
339 }
340
341
342 /* Send attributes and digest to Director for Catalog */
343 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
344 {
345    if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES    || 
346        rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
347        rec->maskedStream == STREAM_RESTORE_OBJECT     ||
348        crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
349       if (!jcr->no_attributes) {
350          BSOCK *dir = jcr->dir_bsock;
351          if (are_attributes_spooled(jcr)) {
352             dir->set_spooling();
353          }
354          Dmsg0(850, "Send attributes to dir.\n");
355          if (!dir_update_file_attributes(jcr->dcr, rec)) {
356             Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
357                dir->bstrerror());
358             dir->clear_spooling();
359             return false;
360          }
361          dir->clear_spooling();
362       }
363    }
364    return true;
365 }