]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
Start adding back removed code.
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2  * Append code for Storage daemon
3  *  Kern Sibbald, May MM
4  *
5  *  Version $Id$
6  */
7 /*
8    Copyright (C) 2000-2006 Kern Sibbald
9
10    This program is free software; you can redistribute it and/or
11    modify it under the terms of the GNU General Public License
12    version 2 as amended with additional clauses defined in the
13    file LICENSE in the main source directory.
14
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
18    the file LICENSE for additional details.
19
20  */
21
22 #include "bacula.h"
23 #include "stored.h"
24
25
26 /* Responses sent to the File daemon */
27 static char OK_data[]    = "3000 OK data\n";
28
29 /* Forward referenced functions */
30
31 /*
32  *  Append Data sent from File daemon
33  *
34  */
35 bool do_append_data(JCR *jcr)
36 {
37    int32_t n;
38    int32_t file_index, stream, last_file_index;
39    BSOCK *ds;
40    BSOCK *fd_sock = jcr->file_bsock;
41    bool ok = true;
42    DEV_RECORD rec;
43    char buf1[100], buf2[100];
44    DCR *dcr = jcr->dcr;
45    DEVICE *dev;
46    char ec[50];
47
48
49    if (!dcr) { 
50       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
51       return false;
52    }                                              
53    dev = dcr->dev;
54    if (!dev) { 
55       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
56       return false;
57    }                                              
58
59    Dmsg1(100, "Start append data. res=%d\n", dev->reserved_device);
60
61    memset(&rec, 0, sizeof(rec));
62
63    ds = fd_sock;
64
65    if (!bnet_set_buffer_size(ds, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
66       set_jcr_job_status(jcr, JS_ErrorTerminated);
67       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
68       return false;
69    }
70
71    if (!acquire_device_for_append(dcr)) {
72       set_jcr_job_status(jcr, JS_ErrorTerminated);
73       jcr->dcr = NULL;
74       return false;
75    }
76
77    set_jcr_job_status(jcr, JS_Running);
78    dir_send_job_status(jcr);
79
80    if (dev->VolCatInfo.VolCatName[0] == 0) {
81       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
82    }
83    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
84
85    begin_data_spool(dcr);
86    begin_attribute_spool(jcr);
87
88    Dmsg0(100, "Just after acquire_device_for_append\n");
89    if (dev->VolCatInfo.VolCatName[0] == 0) {
90       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
91    }
92    /*
93     * Write Begin Session Record
94     */
95    if (!write_session_label(dcr, SOS_LABEL)) {
96       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
97          dev->bstrerror());
98       set_jcr_job_status(jcr, JS_ErrorTerminated);
99       ok = false;
100    }
101    if (dev->VolCatInfo.VolCatName[0] == 0) {
102       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
103    }
104
105    /* Tell File daemon to send data */
106    if (!bnet_fsend(fd_sock, OK_data)) {
107       berrno be;
108       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
109             be.strerror(fd_sock->b_errno));
110       ok = false;
111    }
112
113    /*
114     * Get Data from File daemon, write to device.  To clarify what is
115     *   going on here.  We expect:
116     *     - A stream header
117     *     - Multiple records of data
118     *     - EOD record
119     *
120     *    The Stream header is just used to sychronize things, and
121     *    none of the stream header is written to tape.
122     *    The Multiple records of data, contain first the Attributes,
123     *    then after another stream header, the file data, then
124     *    after another stream header, the MD5 data if any.
125     *
126     *   So we get the (stream header, data, EOD) three time for each
127     *   file. 1. for the Attributes, 2. for the file data if any,
128     *   and 3. for the MD5 if any.
129     */
130    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
131    jcr->run_time = time(NULL);              /* start counting time for rates */
132    for (last_file_index = 0; ok && !job_canceled(jcr); ) {
133
134       /* Read Stream header from the File daemon.
135        *  The stream header consists of the following:
136        *    file_index (sequential Bacula file index, base 1)
137        *    stream     (Bacula number to distinguish parts of data)
138        *    info       (Info for Storage daemon -- compressed, encryped, ...)
139        *       info is not currently used, so is read, but ignored!
140        */
141      if ((n=bget_msg(ds)) <= 0) {
142          if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
143             break;                    /* end of data */
144          }
145          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
146                bnet_strerror(ds));
147          ok = false;
148          break;
149       }
150
151       /*
152        * This hand scanning is a bit more complicated than a simple
153        *   sscanf, but it allows us to handle any size integer up to
154        *   int64_t without worrying about whether %d, %ld, %lld, or %q
155        *   is the correct format for each different architecture.
156        * It is a real pity that sscanf() is not portable.
157        */
158       char *p = ds->msg;
159       while (B_ISSPACE(*p)) {
160          p++;
161       }
162       file_index = (int32_t)str_to_int64(p);
163       while (B_ISDIGIT(*p)) {
164          p++;
165       }
166       if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
167          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
168          ok = false;
169          break;
170       }
171       stream = (int32_t)str_to_int64(p);
172
173       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
174
175       if (!(file_index > 0 && (file_index == last_file_index ||
176           file_index == last_file_index + 1))) {
177          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
178          ok = false;
179          break;
180       }
181       if (file_index != last_file_index) {
182          jcr->JobFiles = file_index;
183          last_file_index = file_index;
184       }
185
186       /* Read data stream from the File daemon.
187        *  The data stream is just raw bytes
188        */
189       while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
190          rec.VolSessionId = jcr->VolSessionId;
191          rec.VolSessionTime = jcr->VolSessionTime;
192          rec.FileIndex = file_index;
193          rec.Stream = stream;
194          rec.data_len = ds->msglen;
195          rec.data = ds->msg;            /* use message buffer */
196
197          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
198             rec.FileIndex, rec.VolSessionId, 
199             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
200             rec.data_len);
201
202          while (!write_record_to_block(dcr->block, &rec)) {
203             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
204                        rec.remainder);
205             if (!write_block_to_device(dcr)) {
206                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
207                   dev->print_name(), dev->bstrerror());
208                Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
209                      dev->print_name(), dev->bstrerror());
210                ok = false;
211                break;
212             }
213          }
214          if (!ok) {
215             Dmsg0(400, "Not OK\n");
216             break;
217          }
218          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
219          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
220             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
221             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
222
223          /* Send attributes and digest to Director for Catalog */
224          if (stream == STREAM_UNIX_ATTRIBUTES || stream == STREAM_UNIX_ATTRIBUTES_EX ||
225              crypto_digest_stream_type(stream) != CRYPTO_DIGEST_NONE) {
226             if (!jcr->no_attributes) {
227                if (are_attributes_spooled(jcr)) {
228                   jcr->dir_bsock->spool = true;
229                }
230                Dmsg0(850, "Send attributes to dir.\n");
231                if (!dir_update_file_attributes(dcr, &rec)) {
232                   jcr->dir_bsock->spool = false;
233                   Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
234                      bnet_strerror(jcr->dir_bsock));
235                   ok = false;
236                   break;
237                }
238                jcr->dir_bsock->spool = false;
239             }
240          }
241          Dmsg0(650, "Enter bnet_get\n");
242       }
243       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
244
245       if (is_bnet_error(ds)) {
246          Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
247          Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
248                bnet_strerror(ds));
249          ok = false;
250          break;
251       }
252    }
253
254    time_t job_elapsed = time(NULL) - jcr->run_time;
255
256    if (job_elapsed <= 0) {
257       job_elapsed = 1;
258    }
259
260    Jmsg(dcr->jcr, M_INFO, 0, _("Job write elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
261          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
262          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
263
264    /* Create Job status for end of session label */
265    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
266
267    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
268
269    /*
270     * If !OK, check if we can still write. This may not be the case
271     *  if we are at the end of the tape or we got a fatal I/O error.
272     */
273    if (ok || dev->can_write()) {
274       if (!write_session_label(dcr, EOS_LABEL)) {
275          Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
276                dev->bstrerror());
277          set_jcr_job_status(jcr, JS_ErrorTerminated);
278          ok = false;
279       }
280       if (dev->VolCatInfo.VolCatName[0] == 0) {
281          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
282       }
283       Dmsg0(90, "back from write_end_session_label()\n");
284       /* Flush out final partial block of this session */
285       if (!write_block_to_device(dcr)) {
286          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
287                dev->print_name(), dev->bstrerror());
288          Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
289          ok = false;
290       }
291    }
292    if (dev->VolCatInfo.VolCatName[0] == 0) {
293       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
294    }
295
296    if (!ok) {
297       discard_data_spool(dcr);
298    } else {
299       commit_data_spool(dcr);
300    }
301
302    if (ok) {
303       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
304    }
305    
306    /* Release the device -- and send final Vol info to DIR */
307    release_device(dcr);
308
309    if (!ok || job_canceled(jcr)) {
310       discard_attribute_spool(jcr);
311    } else {
312       commit_attribute_spool(jcr);
313    }
314
315    dir_send_job_status(jcr);          /* update director */
316
317    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
318    return ok;
319 }