]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
kes Change error message 'illegal' to 'invalid' -- bug #707
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2  * Append code for Storage daemon
3  *  Kern Sibbald, May MM
4  *
5  *  Version $Id$
6  */
7 /*
8    Copyright (C) 2000-2006 Kern Sibbald
9
10    This program is free software; you can redistribute it and/or
11    modify it under the terms of the GNU General Public License
12    version 2 as amended with additional clauses defined in the
13    file LICENSE in the main source directory.
14
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
18    the file LICENSE for additional details.
19
20  */
21
22 #include "bacula.h"
23 #include "stored.h"
24
25
26 /* Responses sent to the File daemon */
27 static char OK_data[]    = "3000 OK data\n";
28
29 /* Forward referenced functions */
30
31 /*
32  *  Append Data sent from File daemon
33  *
34  */
35 bool do_append_data(JCR *jcr)
36 {
37    int32_t n;
38    int32_t file_index, stream, last_file_index;
39    BSOCK *ds;
40    BSOCK *fd_sock = jcr->file_bsock;
41    bool ok = true;
42    DEV_RECORD rec;
43    char buf1[100], buf2[100];
44    DCR *dcr = jcr->dcr;
45    DEVICE *dev;
46    char ec[50];
47
48
49    if (!dcr) { 
50       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
51       return false;
52    }                                              
53    dev = dcr->dev;
54    if (!dev) { 
55       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
56       return false;
57    }                                              
58
59    Dmsg1(100, "Start append data. res=%d\n", dev->reserved_device);
60
61    memset(&rec, 0, sizeof(rec));
62
63    ds = fd_sock;
64
65    if (!bnet_set_buffer_size(ds, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
66       set_jcr_job_status(jcr, JS_ErrorTerminated);
67       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
68       return false;
69    }
70
71    if (!acquire_device_for_append(dcr)) {
72       set_jcr_job_status(jcr, JS_ErrorTerminated);
73       jcr->dcr = NULL;
74       return false;
75    }
76
77    set_jcr_job_status(jcr, JS_Running);
78    dir_send_job_status(jcr);
79
80    if (dev->VolCatInfo.VolCatName[0] == 0) {
81       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
82    }
83    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
84
85    begin_data_spool(dcr);
86    begin_attribute_spool(jcr);
87
88    Dmsg0(100, "Just after acquire_device_for_append\n");
89    if (dev->VolCatInfo.VolCatName[0] == 0) {
90       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
91    }
92    /*
93     * Write Begin Session Record
94     */
95    if (!write_session_label(dcr, SOS_LABEL)) {
96       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
97          dev->bstrerror());
98       set_jcr_job_status(jcr, JS_ErrorTerminated);
99       ok = false;
100    }
101    if (dev->VolCatInfo.VolCatName[0] == 0) {
102       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
103    }
104
105    /* Tell File daemon to send data */
106    if (!bnet_fsend(fd_sock, OK_data)) {
107       berrno be;
108       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
109             be.strerror(fd_sock->b_errno));
110       ok = false;
111    }
112
113    /*
114     * Get Data from File daemon, write to device.  To clarify what is
115     *   going on here.  We expect:
116     *     - A stream header
117     *     - Multiple records of data
118     *     - EOD record
119     *
120     *    The Stream header is just used to sychronize things, and
121     *    none of the stream header is written to tape.
122     *    The Multiple records of data, contain first the Attributes,
123     *    then after another stream header, the file data, then
124     *    after another stream header, the MD5 data if any.
125     *
126     *   So we get the (stream header, data, EOD) three time for each
127     *   file. 1. for the Attributes, 2. for the file data if any,
128     *   and 3. for the MD5 if any.
129     */
130    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
131    jcr->run_time = time(NULL);              /* start counting time for rates */
132    for (last_file_index = 0; ok && !job_canceled(jcr); ) {
133
134       /* Read Stream header from the File daemon.
135        *  The stream header consists of the following:
136        *    file_index (sequential Bacula file index, base 1)
137        *    stream     (Bacula number to distinguish parts of data)
138        *    info       (Info for Storage daemon -- compressed, encryped, ...)
139        *       info is not currently used, so is read, but ignored!
140        */
141      if ((n=bget_msg(ds)) <= 0) {
142          if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
143             break;                    /* end of data */
144          }
145          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
146                bnet_strerror(ds));
147          ok = false;
148          break;
149       }
150
151       /*
152        * This hand scanning is a bit more complicated than a simple
153        *   sscanf, but it allows us to handle any size integer up to
154        *   int64_t without worrying about whether %d, %ld, %lld, or %q
155        *   is the correct format for each different architecture.
156        * It is a real pity that sscanf() is not portable.
157        */
158       char *p = ds->msg;
159       while (B_ISSPACE(*p)) {
160          p++;
161       }
162       file_index = (int32_t)str_to_int64(p);
163       while (B_ISDIGIT(*p)) {
164          p++;
165       }
166       if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
167          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
168          ok = false;
169          break;
170       }
171       stream = (int32_t)str_to_int64(p);
172
173       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
174
175       if (!(file_index > 0 && (file_index == last_file_index ||
176           file_index == last_file_index + 1))) {
177          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
178          ok = false;
179          break;
180       }
181       if (file_index != last_file_index) {
182          jcr->JobFiles = file_index;
183          last_file_index = file_index;
184       }
185
186       /* Read data stream from the File daemon.
187        *  The data stream is just raw bytes
188        */
189       while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
190          rec.VolSessionId = jcr->VolSessionId;
191          rec.VolSessionTime = jcr->VolSessionTime;
192          rec.FileIndex = file_index;
193          rec.Stream = stream;
194          rec.data_len = ds->msglen;
195          rec.data = ds->msg;            /* use message buffer */
196
197          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
198             rec.FileIndex, rec.VolSessionId, 
199             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
200             rec.data_len);
201
202          while (!write_record_to_block(dcr->block, &rec)) {
203             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
204                        rec.remainder);
205             if (!write_block_to_device(dcr)) {
206                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
207                   dev->print_name(), dev->bstrerror());
208                ok = false;
209                break;
210             }
211          }
212          if (!ok) {
213             Dmsg0(400, "Not OK\n");
214             break;
215          }
216          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
217          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
218             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
219             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
220
221          /* Send attributes and digest to Director for Catalog */
222          if (stream == STREAM_UNIX_ATTRIBUTES || stream == STREAM_UNIX_ATTRIBUTES_EX ||
223              crypto_digest_stream_type(stream) != CRYPTO_DIGEST_NONE) {
224             if (!jcr->no_attributes) {
225                if (are_attributes_spooled(jcr)) {
226                   jcr->dir_bsock->spool = true;
227                }
228                Dmsg0(850, "Send attributes to dir.\n");
229                if (!dir_update_file_attributes(dcr, &rec)) {
230                   jcr->dir_bsock->spool = false;
231                   Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
232                      bnet_strerror(jcr->dir_bsock));
233                   ok = false;
234                   break;
235                }
236                jcr->dir_bsock->spool = false;
237             }
238          }
239          Dmsg0(650, "Enter bnet_get\n");
240       }
241       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
242
243       if (is_bnet_error(ds)) {
244          Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
245          Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
246                bnet_strerror(ds));
247          ok = false;
248          break;
249       }
250    }
251
252    time_t job_elapsed = time(NULL) - jcr->run_time;
253
254    if (job_elapsed <= 0) {
255       job_elapsed = 1;
256    }
257
258    Jmsg(dcr->jcr, M_INFO, 0, _("Job write elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
259          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
260          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
261
262    /* Create Job status for end of session label */
263    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
264
265    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
266
267    /*
268     * If !OK, check if we can still write. This may not be the case
269     *  if we are at the end of the tape or we got a fatal I/O error.
270     */
271    if (ok || dev->can_write()) {
272       if (!write_session_label(dcr, EOS_LABEL)) {
273          Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
274                dev->bstrerror());
275          set_jcr_job_status(jcr, JS_ErrorTerminated);
276          ok = false;
277       }
278       if (dev->VolCatInfo.VolCatName[0] == 0) {
279          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
280       }
281       Dmsg0(90, "back from write_end_session_label()\n");
282       /* Flush out final partial block of this session */
283       if (!write_block_to_device(dcr)) {
284          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
285                dev->print_name(), dev->bstrerror());
286          Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
287          ok = false;
288       }
289    }
290    if (dev->VolCatInfo.VolCatName[0] == 0) {
291       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
292    }
293
294    if (!ok) {
295       discard_data_spool(dcr);
296    } else {
297       /* Note: if commit is OK, the device will remain locked */
298       commit_data_spool(dcr);
299    }
300
301    if (ok) {
302       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
303    }
304    
305    /*
306     * Release the device -- and send final Vol info to DIR
307     *  and unlock it.
308     */
309    release_device(dcr);
310
311    if (!ok || job_canceled(jcr)) {
312       discard_attribute_spool(jcr);
313    } else {
314       commit_attribute_spool(jcr);
315    }
316
317    dir_send_job_status(jcr);          /* update director */
318
319    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
320    return ok;
321 }