]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
This commit was manufactured by cvs2svn to create tag
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2  * Append code for Storage daemon
3  *  Kern Sibbald, May MM
4  *
5  *  Version $Id$
6  */
7 /*
8    Copyright (C) 2000-2005 Kern Sibbald
9
10    This program is free software; you can redistribute it and/or
11    modify it under the terms of the GNU General Public License
12    version 2 as amended with additional clauses defined in the
13    file LICENSE in the main source directory.
14
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
18    the file LICENSE for additional details.
19
20  */
21
22 #include "bacula.h"
23 #include "stored.h"
24
25
26 /* Responses sent to the File daemon */
27 static char OK_data[]    = "3000 OK data\n";
28
29 /* Forward referenced functions */
30
31 /*
32  *  Append Data sent from File daemon
33  *
34  */
35 bool do_append_data(JCR *jcr)
36 {
37    int32_t n;
38    int32_t file_index, stream, last_file_index;
39    BSOCK *ds;
40    BSOCK *fd_sock = jcr->file_bsock;
41    bool ok = true;
42    DEV_RECORD rec;
43    char buf1[100], buf2[100];
44    DCR *dcr = jcr->dcr;
45    DEVICE *dev;
46
47
48    if (!dcr) { 
49       Jmsg0(jcr, M_FATAL, 0, _("DCR is NULL!!!\n"));
50       return false;
51    }                                              
52    dev = dcr->dev;
53    if (!dev) { 
54       Jmsg0(jcr, M_FATAL, 0, _("DEVICE is NULL!!!\n"));
55       return false;
56    }                                              
57
58    Dmsg1(100, "Start append data. res=%d\n", dev->reserved_device);
59
60    memset(&rec, 0, sizeof(rec));
61
62    ds = fd_sock;
63
64    if (!bnet_set_buffer_size(ds, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
65       set_jcr_job_status(jcr, JS_ErrorTerminated);
66       Jmsg0(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
67       return false;
68    }
69
70    if (!acquire_device_for_append(dcr)) {
71       set_jcr_job_status(jcr, JS_ErrorTerminated);
72       return false;
73    }
74
75    set_jcr_job_status(jcr, JS_Running);
76    dir_send_job_status(jcr);
77
78    if (dev->VolCatInfo.VolCatName[0] == 0) {
79       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
80    }
81    Dmsg1(20, "Begin append device=%s\n", dev->print_name());
82
83    begin_data_spool(dcr);
84    begin_attribute_spool(jcr);
85
86    Dmsg0(100, "Just after acquire_device_for_append\n");
87    if (dev->VolCatInfo.VolCatName[0] == 0) {
88       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
89    }
90    /*
91     * Write Begin Session Record
92     */
93    if (!write_session_label(dcr, SOS_LABEL)) {
94       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
95          strerror_dev(dev));
96       set_jcr_job_status(jcr, JS_ErrorTerminated);
97       ok = false;
98    }
99    if (dev->VolCatInfo.VolCatName[0] == 0) {
100       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
101    }
102
103    /* Tell File daemon to send data */
104    if (!bnet_fsend(fd_sock, OK_data)) {
105       berrno be;
106       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
107             be.strerror(fd_sock->b_errno));
108       ok = false;
109    }
110
111    /*
112     * Get Data from File daemon, write to device.  To clarify what is
113     *   going on here.  We expect:
114     *     - A stream header
115     *     - Multiple records of data
116     *     - EOD record
117     *
118     *    The Stream header is just used to sychronize things, and
119     *    none of the stream header is written to tape.
120     *    The Multiple records of data, contain first the Attributes,
121     *    then after another stream header, the file data, then
122     *    after another stream header, the MD5 data if any.
123     *
124     *   So we get the (stream header, data, EOD) three time for each
125     *   file. 1. for the Attributes, 2. for the file data if any,
126     *   and 3. for the MD5 if any.
127     */
128    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
129    jcr->run_time = time(NULL);              /* start counting time for rates */
130    for (last_file_index = 0; ok && !job_canceled(jcr); ) {
131
132       /* Read Stream header from the File daemon.
133        *  The stream header consists of the following:
134        *    file_index (sequential Bacula file index, base 1)
135        *    stream     (Bacula number to distinguish parts of data)
136        *    info       (Info for Storage daemon -- compressed, encryped, ...)
137        *       info is not currently used, so is read, but ignored!
138        */
139      if ((n=bget_msg(ds)) <= 0) {
140          if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
141             break;                    /* end of data */
142          }
143          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
144                bnet_strerror(ds));
145          ok = false;
146          break;
147       }
148
149       /*
150        * This hand scanning is a bit more complicated than a simple
151        *   sscanf, but it allows us to handle any size integer up to
152        *   int64_t without worrying about whether %d, %ld, %lld, or %q
153        *   is the correct format for each different architecture.
154        * It is a real pity that sscanf() is not portable.
155        */
156       char *p = ds->msg;
157       while (B_ISSPACE(*p)) {
158          p++;
159       }
160       file_index = (int32_t)str_to_int64(p);
161       while (B_ISDIGIT(*p)) {
162          p++;
163       }
164       if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
165          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
166          ok = false;
167          break;
168       }
169       stream = (int32_t)str_to_int64(p);
170
171       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
172
173       if (!(file_index > 0 && (file_index == last_file_index ||
174           file_index == last_file_index + 1))) {
175          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
176          ok = false;
177          break;
178       }
179       if (file_index != last_file_index) {
180          jcr->JobFiles = file_index;
181          last_file_index = file_index;
182       }
183
184       /* Read data stream from the File daemon.
185        *  The data stream is just raw bytes
186        */
187       while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
188          rec.VolSessionId = jcr->VolSessionId;
189          rec.VolSessionTime = jcr->VolSessionTime;
190          rec.FileIndex = file_index;
191          rec.Stream = stream;
192          rec.data_len = ds->msglen;
193          rec.data = ds->msg;            /* use message buffer */
194
195          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
196             rec.FileIndex, rec.VolSessionId, 
197             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
198             rec.data_len);
199
200          while (!write_record_to_block(dcr->block, &rec)) {
201             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
202                        rec.remainder);
203             if (!write_block_to_device(dcr)) {
204                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
205                   dev->print_name(), strerror_dev(dev));
206                Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
207                      dev->print_name(), strerror_dev(dev));
208                ok = false;
209                break;
210             }
211          }
212          if (!ok) {
213             Dmsg0(400, "Not OK\n");
214             break;
215          }
216          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
217          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
218             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
219             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
220
221          /* Send attributes and digest to Director for Catalog */
222          if (stream == STREAM_UNIX_ATTRIBUTES    || stream == STREAM_MD5_SIGNATURE ||
223              stream == STREAM_UNIX_ATTRIBUTES_EX || stream == STREAM_SHA1_SIGNATURE) {
224             if (!jcr->no_attributes) {
225                if (are_attributes_spooled(jcr)) {
226                   jcr->dir_bsock->spool = true;
227                }
228                Dmsg0(850, "Send attributes to dir.\n");
229                if (!dir_update_file_attributes(dcr, &rec)) {
230                   jcr->dir_bsock->spool = false;
231                   Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
232                      bnet_strerror(jcr->dir_bsock));
233                   ok = false;
234                   break;
235                }
236                jcr->dir_bsock->spool = false;
237             }
238          }
239          Dmsg0(650, "Enter bnet_get\n");
240       }
241       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
242       if (is_bnet_error(ds)) {
243          Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
244          Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
245                bnet_strerror(ds));
246          ok = false;
247          break;
248       }
249    }
250
251    /* Create Job status for end of session label */
252    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
253
254    Dmsg1(200, "Write session label JobStatus=%d\n", jcr->JobStatus);
255    if ((!ok || job_canceled(jcr)) && dev->VolCatInfo.VolCatName[0] == 0) {
256       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
257    }
258
259    /*
260     * If !OK, check if we can still write. This may not be the case
261     *  if we are at the end of the tape or we got a fatal I/O error.
262     */
263    if (ok || dev->can_write()) {
264       if (!write_session_label(dcr, EOS_LABEL)) {
265          Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
266                strerror_dev(dev));
267          set_jcr_job_status(jcr, JS_ErrorTerminated);
268          ok = false;
269       }
270       if (dev->VolCatInfo.VolCatName[0] == 0) {
271          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
272       }
273       Dmsg0(90, "back from write_end_session_label()\n");
274       /* Flush out final partial block of this session */
275       if (!write_block_to_device(dcr)) {
276          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
277                dev->print_name(), strerror_dev(dev));
278          Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
279          ok = false;
280       }
281    }
282    if (dev->VolCatInfo.VolCatName[0] == 0) {
283       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
284    }
285
286    if (!ok) {
287       discard_data_spool(dcr);
288    } else {
289       commit_data_spool(dcr);
290    }
291
292    if (ok) {
293       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
294    }
295    
296    /* Release the device -- and send final Vol info to DIR */
297    release_device(dcr);
298
299    if (!ok || job_canceled(jcr)) {
300       discard_attribute_spool(jcr);
301    } else {
302       commit_attribute_spool(jcr);
303    }
304
305    dir_send_job_status(jcr);          /* update director */
306
307    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
308    return ok;
309 }