]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
eeae53420bd481965aec63da5ae870ba0f4ca95a
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2  * Append code for Storage daemon
3  *  Kern Sibbald, May MM
4  *
5  *  Version $Id$
6  */
7 /*
8    Copyright (C) 2000-2006 Kern Sibbald
9
10    This program is free software; you can redistribute it and/or
11    modify it under the terms of the GNU General Public License
12    version 2 as amended with additional clauses defined in the
13    file LICENSE in the main source directory.
14
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
18    the file LICENSE for additional details.
19
20  */
21
22 #include "bacula.h"
23 #include "stored.h"
24
25
26 /* Responses sent to the File daemon */
27 static char OK_data[]    = "3000 OK data\n";
28
29 /* Forward referenced functions */
30
31 /*
32  *  Append Data sent from File daemon
33  *
34  */
35 bool do_append_data(JCR *jcr)
36 {
37    int32_t n;
38    int32_t file_index, stream, last_file_index;
39    BSOCK *ds;
40    BSOCK *fd_sock = jcr->file_bsock;
41    bool ok = true;
42    DEV_RECORD rec;
43    char buf1[100], buf2[100];
44    DCR *dcr = jcr->dcr;
45    DEVICE *dev;
46
47
48    if (!dcr) { 
49       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
50       return false;
51    }                                              
52    dev = dcr->dev;
53    if (!dev) { 
54       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
55       return false;
56    }                                              
57
58    Dmsg1(100, "Start append data. res=%d\n", dev->reserved_device);
59
60    memset(&rec, 0, sizeof(rec));
61
62    ds = fd_sock;
63
64    if (!bnet_set_buffer_size(ds, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
65       set_jcr_job_status(jcr, JS_ErrorTerminated);
66       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
67       return false;
68    }
69
70    if (!acquire_device_for_append(dcr)) {
71       set_jcr_job_status(jcr, JS_ErrorTerminated);
72       jcr->dcr = NULL;
73       return false;
74    }
75
76    set_jcr_job_status(jcr, JS_Running);
77    dir_send_job_status(jcr);
78
79    if (dev->VolCatInfo.VolCatName[0] == 0) {
80       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
81    }
82    Dmsg1(20, "Begin append device=%s\n", dev->print_name());
83
84    begin_data_spool(dcr);
85    begin_attribute_spool(jcr);
86
87    Dmsg0(100, "Just after acquire_device_for_append\n");
88    if (dev->VolCatInfo.VolCatName[0] == 0) {
89       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
90    }
91    /*
92     * Write Begin Session Record
93     */
94    if (!write_session_label(dcr, SOS_LABEL)) {
95       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
96          dev->bstrerror());
97       set_jcr_job_status(jcr, JS_ErrorTerminated);
98       ok = false;
99    }
100    if (dev->VolCatInfo.VolCatName[0] == 0) {
101       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
102    }
103
104    /* Tell File daemon to send data */
105    if (!bnet_fsend(fd_sock, OK_data)) {
106       berrno be;
107       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
108             be.strerror(fd_sock->b_errno));
109       ok = false;
110    }
111
112    /*
113     * Get Data from File daemon, write to device.  To clarify what is
114     *   going on here.  We expect:
115     *     - A stream header
116     *     - Multiple records of data
117     *     - EOD record
118     *
119     *    The Stream header is just used to sychronize things, and
120     *    none of the stream header is written to tape.
121     *    The Multiple records of data, contain first the Attributes,
122     *    then after another stream header, the file data, then
123     *    after another stream header, the MD5 data if any.
124     *
125     *   So we get the (stream header, data, EOD) three time for each
126     *   file. 1. for the Attributes, 2. for the file data if any,
127     *   and 3. for the MD5 if any.
128     */
129    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
130    jcr->run_time = time(NULL);              /* start counting time for rates */
131    for (last_file_index = 0; ok && !job_canceled(jcr); ) {
132
133       /* Read Stream header from the File daemon.
134        *  The stream header consists of the following:
135        *    file_index (sequential Bacula file index, base 1)
136        *    stream     (Bacula number to distinguish parts of data)
137        *    info       (Info for Storage daemon -- compressed, encryped, ...)
138        *       info is not currently used, so is read, but ignored!
139        */
140      if ((n=bget_msg(ds)) <= 0) {
141          if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
142             break;                    /* end of data */
143          }
144          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
145                bnet_strerror(ds));
146          ok = false;
147          break;
148       }
149
150       /*
151        * This hand scanning is a bit more complicated than a simple
152        *   sscanf, but it allows us to handle any size integer up to
153        *   int64_t without worrying about whether %d, %ld, %lld, or %q
154        *   is the correct format for each different architecture.
155        * It is a real pity that sscanf() is not portable.
156        */
157       char *p = ds->msg;
158       while (B_ISSPACE(*p)) {
159          p++;
160       }
161       file_index = (int32_t)str_to_int64(p);
162       while (B_ISDIGIT(*p)) {
163          p++;
164       }
165       if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
166          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
167          ok = false;
168          break;
169       }
170       stream = (int32_t)str_to_int64(p);
171
172       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
173
174       if (!(file_index > 0 && (file_index == last_file_index ||
175           file_index == last_file_index + 1))) {
176          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
177          ok = false;
178          break;
179       }
180       if (file_index != last_file_index) {
181          jcr->JobFiles = file_index;
182          last_file_index = file_index;
183       }
184
185       /* Read data stream from the File daemon.
186        *  The data stream is just raw bytes
187        */
188       while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
189          rec.VolSessionId = jcr->VolSessionId;
190          rec.VolSessionTime = jcr->VolSessionTime;
191          rec.FileIndex = file_index;
192          rec.Stream = stream;
193          rec.data_len = ds->msglen;
194          rec.data = ds->msg;            /* use message buffer */
195
196          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
197             rec.FileIndex, rec.VolSessionId, 
198             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
199             rec.data_len);
200
201          while (!write_record_to_block(dcr->block, &rec)) {
202             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
203                        rec.remainder);
204             if (!write_block_to_device(dcr)) {
205                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
206                   dev->print_name(), dev->bstrerror());
207                Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
208                      dev->print_name(), dev->bstrerror());
209                ok = false;
210                break;
211             }
212          }
213          if (!ok) {
214             Dmsg0(400, "Not OK\n");
215             break;
216          }
217          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
218          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
219             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
220             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
221
222          /* Send attributes and digest to Director for Catalog */
223          if (stream == STREAM_UNIX_ATTRIBUTES || stream == STREAM_UNIX_ATTRIBUTES_EX ||
224              crypto_digest_stream_type(stream) != CRYPTO_DIGEST_NONE) {
225             if (!jcr->no_attributes) {
226                if (are_attributes_spooled(jcr)) {
227                   jcr->dir_bsock->spool = true;
228                }
229                Dmsg0(850, "Send attributes to dir.\n");
230                if (!dir_update_file_attributes(dcr, &rec)) {
231                   jcr->dir_bsock->spool = false;
232                   Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
233                      bnet_strerror(jcr->dir_bsock));
234                   ok = false;
235                   break;
236                }
237                jcr->dir_bsock->spool = false;
238             }
239          }
240          Dmsg0(650, "Enter bnet_get\n");
241       }
242       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
243       if (is_bnet_error(ds)) {
244          Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
245          Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
246                bnet_strerror(ds));
247          ok = false;
248          break;
249       }
250    }
251
252    /* Create Job status for end of session label */
253    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
254
255    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
256
257    /*
258     * If !OK, check if we can still write. This may not be the case
259     *  if we are at the end of the tape or we got a fatal I/O error.
260     */
261    if (ok || dev->can_write()) {
262       if (!write_session_label(dcr, EOS_LABEL)) {
263          Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
264                dev->bstrerror());
265          set_jcr_job_status(jcr, JS_ErrorTerminated);
266          ok = false;
267       }
268       if (dev->VolCatInfo.VolCatName[0] == 0) {
269          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
270       }
271       Dmsg0(90, "back from write_end_session_label()\n");
272       /* Flush out final partial block of this session */
273       if (!write_block_to_device(dcr)) {
274          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
275                dev->print_name(), dev->bstrerror());
276          Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
277          ok = false;
278       }
279    }
280    if (dev->VolCatInfo.VolCatName[0] == 0) {
281       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
282    }
283
284    if (!ok) {
285       discard_data_spool(dcr);
286    } else {
287       commit_data_spool(dcr);
288    }
289
290    if (ok) {
291       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
292    }
293    
294    /* Release the device -- and send final Vol info to DIR */
295    release_device(dcr);
296
297    if (!ok || job_canceled(jcr)) {
298       discard_attribute_spool(jcr);
299    } else {
300       commit_attribute_spool(jcr);
301    }
302
303    dir_send_job_status(jcr);          /* update director */
304
305    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
306    return ok;
307 }