]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
648d44cab08fb8a5c2520d63e6d37d180efad404
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2  * Append code for Storage daemon
3  *  Kern Sibbald, May MM
4  *
5  *  Version $Id$
6  */
7 /*
8    Copyright (C) 2000-2004 Kern Sibbald and John Walker
9
10    This program is free software; you can redistribute it and/or
11    modify it under the terms of the GNU General Public License as
12    published by the Free Software Foundation; either version 2 of
13    the License, or (at your option) any later version.
14
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18    General Public License for more details.
19
20    You should have received a copy of the GNU General Public
21    License along with this program; if not, write to the Free
22    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
23    MA 02111-1307, USA.
24
25  */
26
27 #include "bacula.h"
28 #include "stored.h"
29
30
31 /* Responses sent to the File daemon */
32 static char OK_data[]    = "3000 OK data\n";
33
34 /* Forward referenced functions */
35
36 /* 
37  *  Append Data sent from File daemon   
38  *
39  */
40 bool do_append_data(JCR *jcr) 
41 {
42    int32_t n;
43    int32_t file_index, stream, last_file_index;
44    BSOCK *ds;
45    BSOCK *fd_sock = jcr->file_bsock;
46    bool ok = true;
47    DEVICE *dev;
48    DEV_RECORD rec;
49    DCR *dcr;
50    
51    Dmsg0(10, "Start append data.\n");
52
53    ds = fd_sock;
54
55    if (!bnet_set_buffer_size(ds, jcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
56       set_jcr_job_status(jcr, JS_ErrorTerminated);
57       Jmsg(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
58       return false;
59    }
60
61    /* 
62     * Acquire output device for writing.  Note, after acquiring a
63     *   device, we MUST release it, which is done at the end of this
64     *   subroutine.
65     */
66    Dmsg0(100, "just before acquire_device\n");
67    if (!(dcr=acquire_device_for_append(jcr))) {
68       set_jcr_job_status(jcr, JS_ErrorTerminated);
69       return false;
70    }
71    dev = dcr->dev;
72    memset(&rec, 0, sizeof(rec));
73
74    if (dev->VolCatInfo.VolCatName[0] == 0) {
75       Dmsg0(000, "NULL Volume name. This shouldn't happen!!!\n");
76    }
77    Dmsg1(20, "Begin append device=%s\n", dev_name(dev));
78
79    begin_data_spool(jcr);
80    begin_attribute_spool(jcr);
81
82    Dmsg0(100, "Just after acquire_device_for_append\n");
83    if (dev->VolCatInfo.VolCatName[0] == 0) {
84       Dmsg0(000, "NULL Volume name. This shouldn't happen!!!\n");
85    }
86    /*
87     * Write Begin Session Record
88     */
89    if (!write_session_label(dcr, SOS_LABEL)) {
90       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
91          strerror_dev(dev));
92       set_jcr_job_status(jcr, JS_ErrorTerminated);
93       ok = false;
94    }
95    if (dev->VolCatInfo.VolCatName[0] == 0) {
96       Dmsg0(000, "NULL Volume name. This shouldn't happen!!!\n");
97    }
98
99    /* Tell File daemon to send data */
100    if (!bnet_fsend(fd_sock, OK_data)) {
101       berrno be;
102       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
103             be.strerror(fd_sock->b_errno));
104       ok = false;
105    }
106
107    /* 
108     * Get Data from File daemon, write to device.  To clarify what is
109     *   going on here.  We expect:        
110     *     - A stream header
111     *     - Multiple records of data
112     *     - EOD record
113     *
114     *    The Stream header is just used to sychronize things, and
115     *    none of the stream header is written to tape.
116     *    The Multiple records of data, contain first the Attributes,
117     *    then after another stream header, the file data, then
118     *    after another stream header, the MD5 data if any.  
119     *
120     *   So we get the (stream header, data, EOD) three time for each
121     *   file. 1. for the Attributes, 2. for the file data if any, 
122     *   and 3. for the MD5 if any.
123     */
124    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
125    jcr->run_time = time(NULL);              /* start counting time for rates */
126    for (last_file_index = 0; ok && !job_canceled(jcr); ) {
127
128       /* Read Stream header from the File daemon.
129        *  The stream header consists of the following:
130        *    file_index (sequential Bacula file index, base 1)
131        *    stream     (Bacula number to distinguish parts of data)
132        *    info       (Info for Storage daemon -- compressed, encryped, ...)
133        *       info is not currently used, so is read, but ignored!
134        */
135      if ((n=bget_msg(ds)) <= 0) {
136          if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
137             break;                    /* end of data */
138          }
139          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
140                bnet_strerror(ds));
141          ok = false;
142          break;
143       }
144         
145       /* 
146        * This hand scanning is a bit more complicated than a simple
147        *   sscanf, but it allows us to handle any size integer up to
148        *   int64_t without worrying about whether %d, %ld, %lld, or %q 
149        *   is the correct format for each different architecture.
150        * It is a real pity that sscanf() is not portable.
151        */
152       char *p = ds->msg;
153       while (B_ISSPACE(*p)) {
154          p++;
155       }
156       file_index = (int32_t)str_to_int64(p);
157       while (B_ISDIGIT(*p)) {
158          p++;
159       }
160       if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
161          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
162          ok = false;
163          break;
164       }
165       stream = (int32_t)str_to_int64(p);
166
167       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
168
169       if (!(file_index > 0 && (file_index == last_file_index ||
170           file_index == last_file_index + 1))) {
171          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
172          ok = false;
173          break;
174       }
175       if (file_index != last_file_index) {
176          jcr->JobFiles = file_index;
177          last_file_index = file_index;
178       }
179       
180       /* Read data stream from the File daemon.
181        *  The data stream is just raw bytes
182        */
183       while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
184          rec.VolSessionId = jcr->VolSessionId;
185          rec.VolSessionTime = jcr->VolSessionTime;
186          rec.FileIndex = file_index;
187          rec.Stream = stream;
188          rec.data_len = ds->msglen;
189          rec.data = ds->msg;            /* use message buffer */
190
191          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
192             rec.FileIndex, rec.VolSessionId, stream_to_ascii(rec.Stream,rec.FileIndex), 
193             rec.data_len);
194           
195          while (!write_record_to_block(dcr->block, &rec)) {
196             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
197                        rec.remainder);
198             if (!write_block_to_device(dcr)) {
199                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
200                   dev_name(dev), strerror_dev(dev));
201                Jmsg(jcr, M_FATAL, 0, _("Fatal device error: ERR=%s\n"),
202                      strerror_dev(dev));
203                ok = false;
204                break;
205             }
206          }
207          if (!ok) {
208             Dmsg0(400, "Not OK\n");
209             break;
210          }
211          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
212          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
213             FI_to_ascii(rec.FileIndex), rec.VolSessionId, 
214             stream_to_ascii(rec.Stream, rec.FileIndex), rec.data_len);
215
216          /* Send attributes and MD5 to Director for Catalog */
217          if (stream == STREAM_UNIX_ATTRIBUTES    || stream == STREAM_MD5_SIGNATURE ||
218              stream == STREAM_UNIX_ATTRIBUTES_EX || stream == STREAM_SHA1_SIGNATURE) { 
219             if (!jcr->no_attributes) {
220                if (are_attributes_spooled(jcr)) {
221                   jcr->dir_bsock->spool = true;
222                }
223                Dmsg0(850, "Send attributes to dir.\n");
224                if (!dir_update_file_attributes(dcr, &rec)) {
225                   jcr->dir_bsock->spool = false;
226                   Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
227                      bnet_strerror(jcr->dir_bsock));
228                   ok = false;
229                   break;
230                }
231                jcr->dir_bsock->spool = false;
232             }
233          }
234          Dmsg0(350, "Enter bnet_get\n");
235       }
236       Dmsg1(350, "End read loop with FD. Stat=%d\n", n);
237       if (is_bnet_error(ds)) {
238          Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
239          Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
240                bnet_strerror(ds));
241          ok = false;
242          break;
243       }
244    }
245
246    /* Create Job status for end of session label */
247    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
248
249    Dmsg1(200, "Write session label JobStatus=%d\n", jcr->JobStatus);
250    if (dev->VolCatInfo.VolCatName[0] == 0) {
251       Dmsg0(000, "NULL Volume name. This shouldn't happen!!!\n");
252    }
253
254    /*
255     * If !OK, check if we can still write. This may not be the case
256     *  if we are at the end of the tape or we got a fatal I/O error.
257     */
258    if (ok || dev_can_write(dev)) {
259       if (!write_session_label(dcr, EOS_LABEL)) {
260          Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
261                strerror_dev(dev));
262          set_jcr_job_status(jcr, JS_ErrorTerminated);
263          ok = false;
264       }
265       if (dev->VolCatInfo.VolCatName[0] == 0) {
266          Dmsg0(000, "NULL Volume name. This shouldn't happen!!!\n");
267       }
268       Dmsg0(90, "back from write_end_session_label()\n");
269       /* Flush out final partial block of this session */
270       if (!write_block_to_device(dcr)) {
271          Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
272          set_jcr_job_status(jcr, JS_ErrorTerminated);
273          ok = false;
274       }
275    }
276    if (dev->VolCatInfo.VolCatName[0] == 0) {
277       Dmsg0(000, "NULL Volume name. This shouldn't happen!!!\n");
278    }
279
280    if (!ok) {
281       discard_data_spool(jcr);
282    } else {
283       commit_data_spool(jcr);
284    }
285
286    Dmsg1(200, "calling release device JobStatus=%d\n", jcr->JobStatus);
287    /* Release the device */
288    if (!release_device(jcr)) {
289       Pmsg0(000, _("Error in release_device\n"));
290       set_jcr_job_status(jcr, JS_ErrorTerminated);
291       ok = false;
292    }
293
294    if (!ok) {
295       discard_attribute_spool(jcr);
296    } else {
297       commit_attribute_spool(jcr);
298    }
299
300    dir_send_job_status(jcr);          /* update director */
301
302    Dmsg1(100, "return from do_append_data() stat=%d\n", ok);
303    return ok;
304 }