]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
Move more bnet functions into the BSOCK class.
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2  * Append code for Storage daemon
3  *  Kern Sibbald, May MM
4  *
5  *  Version $Id$
6  */
7 /*
8    Bacula® - The Network Backup Solution
9
10    Copyright (C) 2000-2007 Free Software Foundation Europe e.V.
11
12    The main author of Bacula is Kern Sibbald, with contributions from
13    many others, a complete list can be found in the file AUTHORS.
14    This program is Free Software; you can redistribute it and/or
15    modify it under the terms of version two of the GNU General Public
16    License as published by the Free Software Foundation plus additions
17    that are listed in the file LICENSE.
18
19    This program is distributed in the hope that it will be useful, but
20    WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22    General Public License for more details.
23
24    You should have received a copy of the GNU General Public License
25    along with this program; if not, write to the Free Software
26    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27    02110-1301, USA.
28
29    Bacula® is a registered trademark of John Walker.
30    The licensor of Bacula is the Free Software Foundation Europe
31    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
32    Switzerland, email:ftf@fsfeurope.org.
33 */
34
35 #include "bacula.h"
36 #include "stored.h"
37
38
39 /* Responses sent to the File daemon */
40 static char OK_data[]    = "3000 OK data\n";
41 static char OK_append[]  = "3000 OK append data\n";
42
43 /* Forward referenced functions */
44
45 /*
46  *  Append Data sent from File daemon
47  *
48  */
49 bool do_append_data(JCR *jcr)
50 {
51    int32_t n;
52    int32_t file_index, stream, last_file_index;
53    BSOCK *ds;
54    BSOCK *fd_sock = jcr->file_bsock;
55    bool ok = true;
56    DEV_RECORD rec;
57    char buf1[100], buf2[100];
58    DCR *dcr = jcr->dcr;
59    DEVICE *dev;
60    char ec[50];
61
62
63    if (!dcr) { 
64       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
65       return false;
66    }                                              
67    dev = dcr->dev;
68    if (!dev) { 
69       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
70       return false;
71    }                                              
72
73    Dmsg1(100, "Start append data. res=%d\n", dev->reserved_device);
74
75    memset(&rec, 0, sizeof(rec));
76
77    ds = fd_sock;
78
79    if (!bnet_set_buffer_size(ds, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
80       set_jcr_job_status(jcr, JS_ErrorTerminated);
81       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
82       return false;
83    }
84
85    if (!acquire_device_for_append(dcr)) {
86       set_jcr_job_status(jcr, JS_ErrorTerminated);
87       return false;
88    }
89
90    set_jcr_job_status(jcr, JS_Running);
91    dir_send_job_status(jcr);
92
93    if (dev->VolCatInfo.VolCatName[0] == 0) {
94       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
95    }
96    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
97
98    begin_data_spool(dcr);
99    begin_attribute_spool(jcr);
100
101    Dmsg0(100, "Just after acquire_device_for_append\n");
102    if (dev->VolCatInfo.VolCatName[0] == 0) {
103       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
104    }
105    /*
106     * Write Begin Session Record
107     */
108    if (!write_session_label(dcr, SOS_LABEL)) {
109       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
110          dev->bstrerror());
111       set_jcr_job_status(jcr, JS_ErrorTerminated);
112       ok = false;
113    }
114    if (dev->VolCatInfo.VolCatName[0] == 0) {
115       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
116    }
117
118    /* Tell File daemon to send data */
119    if (!bnet_fsend(fd_sock, OK_data)) {
120       berrno be;
121       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
122             be.bstrerror(fd_sock->b_errno));
123       ok = false;
124    }
125
126    /*
127     * Get Data from File daemon, write to device.  To clarify what is
128     *   going on here.  We expect:
129     *     - A stream header
130     *     - Multiple records of data
131     *     - EOD record
132     *
133     *    The Stream header is just used to sychronize things, and
134     *    none of the stream header is written to tape.
135     *    The Multiple records of data, contain first the Attributes,
136     *    then after another stream header, the file data, then
137     *    after another stream header, the MD5 data if any.
138     *
139     *   So we get the (stream header, data, EOD) three time for each
140     *   file. 1. for the Attributes, 2. for the file data if any,
141     *   and 3. for the MD5 if any.
142     */
143    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
144    jcr->run_time = time(NULL);              /* start counting time for rates */
145    for (last_file_index = 0; ok && !job_canceled(jcr); ) {
146
147       /* Read Stream header from the File daemon.
148        *  The stream header consists of the following:
149        *    file_index (sequential Bacula file index, base 1)
150        *    stream     (Bacula number to distinguish parts of data)
151        *    info       (Info for Storage daemon -- compressed, encryped, ...)
152        *       info is not currently used, so is read, but ignored!
153        */
154      if ((n=bget_msg(ds)) <= 0) {
155          if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
156             break;                    /* end of data */
157          }
158          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
159                bnet_strerror(ds));
160          ok = false;
161          break;
162       }
163
164       /*
165        * This hand scanning is a bit more complicated than a simple
166        *   sscanf, but it allows us to handle any size integer up to
167        *   int64_t without worrying about whether %d, %ld, %lld, or %q
168        *   is the correct format for each different architecture.
169        * It is a real pity that sscanf() is not portable.
170        */
171       char *p = ds->msg;
172       while (B_ISSPACE(*p)) {
173          p++;
174       }
175       file_index = (int32_t)str_to_int64(p);
176       while (B_ISDIGIT(*p)) {
177          p++;
178       }
179       if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
180          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
181          ok = false;
182          break;
183       }
184       stream = (int32_t)str_to_int64(p);
185
186       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
187
188       if (!(file_index > 0 && (file_index == last_file_index ||
189           file_index == last_file_index + 1))) {
190          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
191          ok = false;
192          break;
193       }
194       if (file_index != last_file_index) {
195          jcr->JobFiles = file_index;
196          last_file_index = file_index;
197       }
198
199       /* Read data stream from the File daemon.
200        *  The data stream is just raw bytes
201        */
202       while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
203          rec.VolSessionId = jcr->VolSessionId;
204          rec.VolSessionTime = jcr->VolSessionTime;
205          rec.FileIndex = file_index;
206          rec.Stream = stream;
207          rec.data_len = ds->msglen;
208          rec.data = ds->msg;            /* use message buffer */
209
210          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
211             rec.FileIndex, rec.VolSessionId, 
212             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
213             rec.data_len);
214
215          while (!write_record_to_block(dcr->block, &rec)) {
216             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
217                        rec.remainder);
218             if (!write_block_to_device(dcr)) {
219                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
220                   dev->print_name(), dev->bstrerror());
221                ok = false;
222                break;
223             }
224          }
225          if (!ok) {
226             Dmsg0(400, "Not OK\n");
227             break;
228          }
229          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
230          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
231             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
232             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
233
234          /* Send attributes and digest to Director for Catalog */
235          if (stream == STREAM_UNIX_ATTRIBUTES || stream == STREAM_UNIX_ATTRIBUTES_EX ||
236              crypto_digest_stream_type(stream) != CRYPTO_DIGEST_NONE) {
237             if (!jcr->no_attributes) {
238                if (are_attributes_spooled(jcr)) {
239                   jcr->dir_bsock->m_spool = true;
240                }
241                Dmsg0(850, "Send attributes to dir.\n");
242                if (!dir_update_file_attributes(dcr, &rec)) {
243                   jcr->dir_bsock->m_spool = false;
244                   Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
245                      bnet_strerror(jcr->dir_bsock));
246                   ok = false;
247                   break;
248                }
249                jcr->dir_bsock->m_spool = false;
250             }
251          }
252          Dmsg0(650, "Enter bnet_get\n");
253       }
254       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
255
256       if (is_bnet_error(ds)) {
257          Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
258          Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
259                bnet_strerror(ds));
260          ok = false;
261          break;
262       }
263    }
264
265    /* Create Job status for end of session label */
266    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
267
268    /* Terminate connection with FD */
269    bnet_fsend(ds, OK_append);
270    do_fd_commands(jcr);               /* finish dialog with FD */
271
272
273    time_t job_elapsed = time(NULL) - jcr->run_time;
274
275    if (job_elapsed <= 0) {
276       job_elapsed = 1;
277    }
278
279    Jmsg(dcr->jcr, M_INFO, 0, _("Job write elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
280          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
281          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
282
283
284    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
285
286    /*
287     * Check if we can still write. This may not be the case
288     *  if we are at the end of the tape or we got a fatal I/O error.
289     */
290    if (dev->can_write()) {
291       if (!write_session_label(dcr, EOS_LABEL)) {
292          Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
293                dev->bstrerror());
294          set_jcr_job_status(jcr, JS_ErrorTerminated);
295          ok = false;
296       }
297       if (dev->VolCatInfo.VolCatName[0] == 0) {
298          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
299       }
300       Dmsg0(90, "back from write_end_session_label()\n");
301       /* Flush out final partial block of this session */
302       if (!write_block_to_device(dcr)) {
303          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
304                dev->print_name(), dev->bstrerror());
305          Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
306          ok = false;
307       }
308       if (dev->VolCatInfo.VolCatName[0] == 0) {
309          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
310       }
311    }
312
313
314    if (!ok) {
315       discard_data_spool(dcr);
316    } else {
317       /* Note: if commit is OK, the device will remain locked */
318       commit_data_spool(dcr);
319    }
320
321    if (ok) {
322       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
323    }
324    
325    /*
326     * Release the device -- and send final Vol info to DIR
327     *  and unlock it.
328     */
329    release_device(dcr);
330
331    if (!ok || job_canceled(jcr)) {
332       discard_attribute_spool(jcr);
333    } else {
334       commit_attribute_spool(jcr);
335    }
336
337    dir_send_job_status(jcr);          /* update director */
338
339    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
340    return ok;
341 }