]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
6a57c14b47cf0ddf5a8dc8d1533f629e0ef20af6
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2  * Append code for Storage daemon
3  *  Kern Sibbald, May MM
4  *
5  *  Version $Id$
6  */
7 /*
8    Bacula® - The Network Backup Solution
9
10    Copyright (C) 2000-2006 Free Software Foundation Europe e.V.
11
12    The main author of Bacula is Kern Sibbald, with contributions from
13    many others, a complete list can be found in the file AUTHORS.
14    This program is Free Software; you can redistribute it and/or
15    modify it under the terms of version two of the GNU General Public
16    License as published by the Free Software Foundation plus additions
17    that are listed in the file LICENSE.
18
19    This program is distributed in the hope that it will be useful, but
20    WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22    General Public License for more details.
23
24    You should have received a copy of the GNU General Public License
25    along with this program; if not, write to the Free Software
26    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27    02110-1301, USA.
28
29    Bacula® is a registered trademark of John Walker.
30    The licensor of Bacula is the Free Software Foundation Europe
31    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
32    Switzerland, email:ftf@fsfeurope.org.
33 */
34
35 #include "bacula.h"
36 #include "stored.h"
37
38
39 /* Responses sent to the File daemon */
40 static char OK_data[]    = "3000 OK data\n";
41
42 /* Forward referenced functions */
43
44 /*
45  *  Append Data sent from File daemon
46  *
47  */
48 bool do_append_data(JCR *jcr)
49 {
50    int32_t n;
51    int32_t file_index, stream, last_file_index;
52    BSOCK *ds;
53    BSOCK *fd_sock = jcr->file_bsock;
54    bool ok = true;
55    DEV_RECORD rec;
56    char buf1[100], buf2[100];
57    DCR *dcr = jcr->dcr;
58    DEVICE *dev;
59    char ec[50];
60
61
62    if (!dcr) { 
63       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
64       return false;
65    }                                              
66    dev = dcr->dev;
67    if (!dev) { 
68       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
69       return false;
70    }                                              
71
72    Dmsg1(100, "Start append data. res=%d\n", dev->reserved_device);
73
74    memset(&rec, 0, sizeof(rec));
75
76    ds = fd_sock;
77
78    if (!bnet_set_buffer_size(ds, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
79       set_jcr_job_status(jcr, JS_ErrorTerminated);
80       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
81       return false;
82    }
83
84    if (!acquire_device_for_append(dcr)) {
85       set_jcr_job_status(jcr, JS_ErrorTerminated);
86       return false;
87    }
88
89    set_jcr_job_status(jcr, JS_Running);
90    dir_send_job_status(jcr);
91
92    if (dev->VolCatInfo.VolCatName[0] == 0) {
93       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
94    }
95    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
96
97    begin_data_spool(dcr);
98    begin_attribute_spool(jcr);
99
100    Dmsg0(100, "Just after acquire_device_for_append\n");
101    if (dev->VolCatInfo.VolCatName[0] == 0) {
102       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
103    }
104    /*
105     * Write Begin Session Record
106     */
107    if (!write_session_label(dcr, SOS_LABEL)) {
108       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
109          dev->bstrerror());
110       set_jcr_job_status(jcr, JS_ErrorTerminated);
111       ok = false;
112    }
113    if (dev->VolCatInfo.VolCatName[0] == 0) {
114       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
115    }
116
117    /* Tell File daemon to send data */
118    if (!bnet_fsend(fd_sock, OK_data)) {
119       berrno be;
120       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
121             be.strerror(fd_sock->b_errno));
122       ok = false;
123    }
124
125    /*
126     * Get Data from File daemon, write to device.  To clarify what is
127     *   going on here.  We expect:
128     *     - A stream header
129     *     - Multiple records of data
130     *     - EOD record
131     *
132     *    The Stream header is just used to sychronize things, and
133     *    none of the stream header is written to tape.
134     *    The Multiple records of data, contain first the Attributes,
135     *    then after another stream header, the file data, then
136     *    after another stream header, the MD5 data if any.
137     *
138     *   So we get the (stream header, data, EOD) three time for each
139     *   file. 1. for the Attributes, 2. for the file data if any,
140     *   and 3. for the MD5 if any.
141     */
142    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
143    jcr->run_time = time(NULL);              /* start counting time for rates */
144    for (last_file_index = 0; ok && !job_canceled(jcr); ) {
145
146       /* Read Stream header from the File daemon.
147        *  The stream header consists of the following:
148        *    file_index (sequential Bacula file index, base 1)
149        *    stream     (Bacula number to distinguish parts of data)
150        *    info       (Info for Storage daemon -- compressed, encryped, ...)
151        *       info is not currently used, so is read, but ignored!
152        */
153      if ((n=bget_msg(ds)) <= 0) {
154          if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
155             break;                    /* end of data */
156          }
157          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
158                bnet_strerror(ds));
159          ok = false;
160          break;
161       }
162
163       /*
164        * This hand scanning is a bit more complicated than a simple
165        *   sscanf, but it allows us to handle any size integer up to
166        *   int64_t without worrying about whether %d, %ld, %lld, or %q
167        *   is the correct format for each different architecture.
168        * It is a real pity that sscanf() is not portable.
169        */
170       char *p = ds->msg;
171       while (B_ISSPACE(*p)) {
172          p++;
173       }
174       file_index = (int32_t)str_to_int64(p);
175       while (B_ISDIGIT(*p)) {
176          p++;
177       }
178       if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
179          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
180          ok = false;
181          break;
182       }
183       stream = (int32_t)str_to_int64(p);
184
185       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
186
187       if (!(file_index > 0 && (file_index == last_file_index ||
188           file_index == last_file_index + 1))) {
189          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
190          ok = false;
191          break;
192       }
193       if (file_index != last_file_index) {
194          jcr->JobFiles = file_index;
195          last_file_index = file_index;
196       }
197
198       /* Read data stream from the File daemon.
199        *  The data stream is just raw bytes
200        */
201       while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
202          rec.VolSessionId = jcr->VolSessionId;
203          rec.VolSessionTime = jcr->VolSessionTime;
204          rec.FileIndex = file_index;
205          rec.Stream = stream;
206          rec.data_len = ds->msglen;
207          rec.data = ds->msg;            /* use message buffer */
208
209          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
210             rec.FileIndex, rec.VolSessionId, 
211             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
212             rec.data_len);
213
214          while (!write_record_to_block(dcr->block, &rec)) {
215             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
216                        rec.remainder);
217             if (!write_block_to_device(dcr)) {
218                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
219                   dev->print_name(), dev->bstrerror());
220                ok = false;
221                break;
222             }
223          }
224          if (!ok) {
225             Dmsg0(400, "Not OK\n");
226             break;
227          }
228          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
229          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
230             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
231             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
232
233          /* Send attributes and digest to Director for Catalog */
234          if (stream == STREAM_UNIX_ATTRIBUTES || stream == STREAM_UNIX_ATTRIBUTES_EX ||
235              crypto_digest_stream_type(stream) != CRYPTO_DIGEST_NONE) {
236             if (!jcr->no_attributes) {
237                if (are_attributes_spooled(jcr)) {
238                   jcr->dir_bsock->spool = true;
239                }
240                Dmsg0(850, "Send attributes to dir.\n");
241                if (!dir_update_file_attributes(dcr, &rec)) {
242                   jcr->dir_bsock->spool = false;
243                   Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
244                      bnet_strerror(jcr->dir_bsock));
245                   ok = false;
246                   break;
247                }
248                jcr->dir_bsock->spool = false;
249             }
250          }
251          Dmsg0(650, "Enter bnet_get\n");
252       }
253       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
254
255       if (is_bnet_error(ds)) {
256          Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
257          Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
258                bnet_strerror(ds));
259          ok = false;
260          break;
261       }
262    }
263
264    time_t job_elapsed = time(NULL) - jcr->run_time;
265
266    if (job_elapsed <= 0) {
267       job_elapsed = 1;
268    }
269
270    Jmsg(dcr->jcr, M_INFO, 0, _("Job write elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
271          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
272          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
273
274    /* Create Job status for end of session label */
275    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
276
277    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
278
279    /*
280     * Check if we can still write. This may not be the case
281     *  if we are at the end of the tape or we got a fatal I/O error.
282     */
283    if (dev->can_write()) {
284       if (!write_session_label(dcr, EOS_LABEL)) {
285          Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
286                dev->bstrerror());
287          set_jcr_job_status(jcr, JS_ErrorTerminated);
288          ok = false;
289       }
290       if (dev->VolCatInfo.VolCatName[0] == 0) {
291          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
292       }
293       Dmsg0(90, "back from write_end_session_label()\n");
294       /* Flush out final partial block of this session */
295       if (!write_block_to_device(dcr)) {
296          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
297                dev->print_name(), dev->bstrerror());
298          Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
299          ok = false;
300       }
301    }
302    if (dev->VolCatInfo.VolCatName[0] == 0) {
303       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
304    }
305
306    if (!ok) {
307       discard_data_spool(dcr);
308    } else {
309       /* Note: if commit is OK, the device will remain locked */
310       commit_data_spool(dcr);
311    }
312
313    if (ok) {
314       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
315    }
316    
317    /*
318     * Release the device -- and send final Vol info to DIR
319     *  and unlock it.
320     */
321    release_device(dcr);
322
323    if (!ok || job_canceled(jcr)) {
324       discard_attribute_spool(jcr);
325    } else {
326       commit_attribute_spool(jcr);
327    }
328
329    dir_send_job_status(jcr);          /* update director */
330
331    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
332    return ok;
333 }