]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
Minor bsock cleanup in stored
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Append code for Storage daemon
30  *  Kern Sibbald, May MM
31  *
32  *  Version $Id$
33  */
34
35 #include "bacula.h"
36 #include "stored.h"
37
38
39 /* Responses sent to the File daemon */
40 static char OK_data[]    = "3000 OK data\n";
41 static char OK_append[]  = "3000 OK append data\n";
42
43 /* Forward referenced functions */
44
45 /*
46  *  Append Data sent from File daemon
47  *
48  */
49 bool do_append_data(JCR *jcr)
50 {
51    int32_t n;
52    int32_t file_index, stream, last_file_index;
53    BSOCK *ds;
54    BSOCK *fd_sock = jcr->file_bsock;
55    bool ok = true;
56    DEV_RECORD rec;
57    char buf1[100], buf2[100];
58    DCR *dcr = jcr->dcr;
59    DEVICE *dev;
60    char ec[50];
61
62
63    if (!dcr) { 
64       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
65       return false;
66    }                                              
67    dev = dcr->dev;
68    if (!dev) { 
69       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
70       return false;
71    }                                              
72
73    Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
74
75    memset(&rec, 0, sizeof(rec));
76
77    ds = fd_sock;
78
79    if (!ds->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
80       set_jcr_job_status(jcr, JS_ErrorTerminated);
81       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
82       return false;
83    }
84
85    if (!acquire_device_for_append(dcr)) {
86       set_jcr_job_status(jcr, JS_ErrorTerminated);
87       return false;
88    }
89
90    set_jcr_job_status(jcr, JS_Running);
91    dir_send_job_status(jcr);
92
93    if (dev->VolCatInfo.VolCatName[0] == 0) {
94       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
95    }
96    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
97
98    begin_data_spool(dcr);
99    begin_attribute_spool(jcr);
100
101    Dmsg0(100, "Just after acquire_device_for_append\n");
102    if (dev->VolCatInfo.VolCatName[0] == 0) {
103       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
104    }
105    /*
106     * Write Begin Session Record
107     */
108    if (!write_session_label(dcr, SOS_LABEL)) {
109       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
110          dev->bstrerror());
111       set_jcr_job_status(jcr, JS_ErrorTerminated);
112       ok = false;
113    }
114    if (dev->VolCatInfo.VolCatName[0] == 0) {
115       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
116    }
117
118    /* Tell File daemon to send data */
119    if (!fd_sock->fsend(OK_data)) {
120       berrno be;
121       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
122             be.bstrerror(fd_sock->b_errno));
123       ok = false;
124    }
125
126    /*
127     * Get Data from File daemon, write to device.  To clarify what is
128     *   going on here.  We expect:
129     *     - A stream header
130     *     - Multiple records of data
131     *     - EOD record
132     *
133     *    The Stream header is just used to sychronize things, and
134     *    none of the stream header is written to tape.
135     *    The Multiple records of data, contain first the Attributes,
136     *    then after another stream header, the file data, then
137     *    after another stream header, the MD5 data if any.
138     *
139     *   So we get the (stream header, data, EOD) three time for each
140     *   file. 1. for the Attributes, 2. for the file data if any,
141     *   and 3. for the MD5 if any.
142     */
143    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
144    jcr->run_time = time(NULL);              /* start counting time for rates */
145    for (last_file_index = 0; ok && !job_canceled(jcr); ) {
146
147       /* Read Stream header from the File daemon.
148        *  The stream header consists of the following:
149        *    file_index (sequential Bacula file index, base 1)
150        *    stream     (Bacula number to distinguish parts of data)
151        *    info       (Info for Storage daemon -- compressed, encryped, ...)
152        *       info is not currently used, so is read, but ignored!
153        */
154      if ((n=bget_msg(ds)) <= 0) {
155          if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
156             break;                    /* end of data */
157          }
158          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
159                ds->bstrerror());
160          ok = false;
161          break;
162       }
163
164       if (sscanf(ds->msg, "%ld %ld", &file_index, &stream) != 2) {
165          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
166          ok = false;
167          break;
168       }
169
170       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
171
172       if (!(file_index > 0 && (file_index == last_file_index ||
173           file_index == last_file_index + 1))) {
174          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
175          ok = false;
176          break;
177       }
178       if (file_index != last_file_index) {
179          jcr->JobFiles = file_index;
180          last_file_index = file_index;
181       }
182
183       /* Read data stream from the File daemon.
184        *  The data stream is just raw bytes
185        */
186       while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
187          rec.VolSessionId = jcr->VolSessionId;
188          rec.VolSessionTime = jcr->VolSessionTime;
189          rec.FileIndex = file_index;
190          rec.Stream = stream;
191          rec.data_len = ds->msglen;
192          rec.data = ds->msg;            /* use message buffer */
193
194          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
195             rec.FileIndex, rec.VolSessionId, 
196             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
197             rec.data_len);
198
199          while (!write_record_to_block(dcr->block, &rec)) {
200             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
201                        rec.remainder);
202             if (!write_block_to_device(dcr)) {
203                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
204                   dev->print_name(), dev->bstrerror());
205                ok = false;
206                break;
207             }
208          }
209          if (!ok) {
210             Dmsg0(400, "Not OK\n");
211             break;
212          }
213          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
214          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
215             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
216             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
217
218          /* Send attributes and digest to Director for Catalog */
219          if (stream == STREAM_UNIX_ATTRIBUTES || stream == STREAM_UNIX_ATTRIBUTES_EX ||
220              crypto_digest_stream_type(stream) != CRYPTO_DIGEST_NONE) {
221             if (!jcr->no_attributes) {
222                BSOCK *dir = jcr->dir_bsock;
223                if (are_attributes_spooled(jcr)) {
224                   dir->set_spooling();
225                }
226                Dmsg0(850, "Send attributes to dir.\n");
227                if (!dir_update_file_attributes(dcr, &rec)) {
228                   dir->clear_spooling();
229                   Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
230                      dir->bstrerror());
231                   ok = false;
232                   break;
233                }
234                dir->clear_spooling();
235             }
236          }
237          Dmsg0(650, "Enter bnet_get\n");
238       }
239       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
240
241       if (ds->is_error()) {
242          Dmsg1(350, "Network read error from FD. ERR=%s\n", ds->bstrerror());
243          Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
244                ds->bstrerror());
245          ok = false;
246          break;
247       }
248    }
249
250    /* Create Job status for end of session label */
251    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
252
253    /* Terminate connection with FD */
254    ds->fsend(OK_append);
255    do_fd_commands(jcr);               /* finish dialog with FD */
256
257    /*
258     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
259     *   and the subsequent Jmsg() editing will break
260     */
261    int32_t job_elapsed = time(NULL) - jcr->run_time;
262
263    if (job_elapsed <= 0) {
264       job_elapsed = 1;
265    }
266
267    Jmsg(dcr->jcr, M_INFO, 0, _("Job write elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
268          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
269          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
270
271
272    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
273
274    /*
275     * Check if we can still write. This may not be the case
276     *  if we are at the end of the tape or we got a fatal I/O error.
277     */
278    if (ok || dev->can_write()) {
279       if (!write_session_label(dcr, EOS_LABEL)) {
280          Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
281                dev->bstrerror());
282          set_jcr_job_status(jcr, JS_ErrorTerminated);
283          ok = false;
284       }
285       if (dev->VolCatInfo.VolCatName[0] == 0) {
286          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
287          Dmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
288       }
289       Dmsg0(90, "back from write_end_session_label()\n");
290       /* Flush out final partial block of this session */
291       if (!write_block_to_device(dcr)) {
292          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
293                dev->print_name(), dev->bstrerror());
294          Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
295          ok = false;
296       }
297       if (dev->VolCatInfo.VolCatName[0] == 0) {
298          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
299          Dmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
300       }
301    }
302
303
304    if (!ok) {
305       discard_data_spool(dcr);
306    } else {
307       /* Note: if commit is OK, the device will remain locked */
308       commit_data_spool(dcr);
309    }
310
311    if (ok) {
312       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
313    }
314    
315    /*
316     * Release the device -- and send final Vol info to DIR
317     *  and unlock it.
318     */
319    release_device(dcr);
320
321    if (!ok || job_canceled(jcr)) {
322       discard_attribute_spool(jcr);
323    } else {
324       commit_attribute_spool(jcr);
325    }
326
327    dir_send_job_status(jcr);          /* update director */
328
329    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
330    return ok;
331 }