]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
539f63ead5acd318f5220619de11b60c23ff0dff
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2  * Append code for Storage daemon
3  *  Kern Sibbald, May MM
4  *
5  *  Version $Id$
6  */
7 /*
8    Copyright (C) 2000-2005 Kern Sibbald
9
10    This program is free software; you can redistribute it and/or
11    modify it under the terms of the GNU General Public License
12    version 2 as amended with additional clauses defined in the
13    file LICENSE in the main source directory.
14
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
18    the file LICENSE for additional details.
19
20  */
21
22 #include "bacula.h"
23 #include "stored.h"
24
25
26 /* Responses sent to the File daemon */
27 static char OK_data[]    = "3000 OK data\n";
28
29 /* Forward referenced functions */
30
31 /*
32  *  Append Data sent from File daemon
33  *
34  */
35 bool do_append_data(JCR *jcr)
36 {
37    int32_t n;
38    int32_t file_index, stream, last_file_index;
39    BSOCK *ds;
40    BSOCK *fd_sock = jcr->file_bsock;
41    bool ok = true;
42    DEV_RECORD rec;
43    DCR *dcr = jcr->dcr;
44    DEVICE *dev = dcr->dev;
45    char buf1[100], buf2[100];
46
47
48    Dmsg1(100, "Start append data. res=%d\n", dev->reserved_device);
49
50    memset(&rec, 0, sizeof(rec));
51
52    ds = fd_sock;
53
54    if (!bnet_set_buffer_size(ds, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
55       set_jcr_job_status(jcr, JS_ErrorTerminated);
56       Jmsg(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
57       return false;
58    }
59
60    if (!acquire_device_for_append(dcr)) {
61       set_jcr_job_status(jcr, JS_ErrorTerminated);
62       return false;
63    }
64
65    set_jcr_job_status(jcr, JS_Running);
66    dir_send_job_status(jcr);
67
68    if (dev->VolCatInfo.VolCatName[0] == 0) {
69       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
70    }
71    Dmsg1(20, "Begin append device=%s\n", dev->print_name());
72
73    begin_data_spool(dcr);
74    begin_attribute_spool(jcr);
75
76    Dmsg0(100, "Just after acquire_device_for_append\n");
77    if (dev->VolCatInfo.VolCatName[0] == 0) {
78       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
79    }
80    /*
81     * Write Begin Session Record
82     */
83    if (!write_session_label(dcr, SOS_LABEL)) {
84       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
85          strerror_dev(dev));
86       set_jcr_job_status(jcr, JS_ErrorTerminated);
87       ok = false;
88    }
89    if (dev->VolCatInfo.VolCatName[0] == 0) {
90       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
91    }
92
93    /* Tell File daemon to send data */
94    if (!bnet_fsend(fd_sock, OK_data)) {
95       berrno be;
96       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
97             be.strerror(fd_sock->b_errno));
98       ok = false;
99    }
100
101    /*
102     * Get Data from File daemon, write to device.  To clarify what is
103     *   going on here.  We expect:
104     *     - A stream header
105     *     - Multiple records of data
106     *     - EOD record
107     *
108     *    The Stream header is just used to sychronize things, and
109     *    none of the stream header is written to tape.
110     *    The Multiple records of data, contain first the Attributes,
111     *    then after another stream header, the file data, then
112     *    after another stream header, the MD5 data if any.
113     *
114     *   So we get the (stream header, data, EOD) three time for each
115     *   file. 1. for the Attributes, 2. for the file data if any,
116     *   and 3. for the MD5 if any.
117     */
118    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
119    jcr->run_time = time(NULL);              /* start counting time for rates */
120    for (last_file_index = 0; ok && !job_canceled(jcr); ) {
121
122       /* Read Stream header from the File daemon.
123        *  The stream header consists of the following:
124        *    file_index (sequential Bacula file index, base 1)
125        *    stream     (Bacula number to distinguish parts of data)
126        *    info       (Info for Storage daemon -- compressed, encryped, ...)
127        *       info is not currently used, so is read, but ignored!
128        */
129      if ((n=bget_msg(ds)) <= 0) {
130          if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
131             break;                    /* end of data */
132          }
133          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
134                bnet_strerror(ds));
135          ok = false;
136          break;
137       }
138
139       /*
140        * This hand scanning is a bit more complicated than a simple
141        *   sscanf, but it allows us to handle any size integer up to
142        *   int64_t without worrying about whether %d, %ld, %lld, or %q
143        *   is the correct format for each different architecture.
144        * It is a real pity that sscanf() is not portable.
145        */
146       char *p = ds->msg;
147       while (B_ISSPACE(*p)) {
148          p++;
149       }
150       file_index = (int32_t)str_to_int64(p);
151       while (B_ISDIGIT(*p)) {
152          p++;
153       }
154       if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
155          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
156          ok = false;
157          break;
158       }
159       stream = (int32_t)str_to_int64(p);
160
161       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
162
163       if (!(file_index > 0 && (file_index == last_file_index ||
164           file_index == last_file_index + 1))) {
165          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
166          ok = false;
167          break;
168       }
169       if (file_index != last_file_index) {
170          jcr->JobFiles = file_index;
171          last_file_index = file_index;
172       }
173
174       /* Read data stream from the File daemon.
175        *  The data stream is just raw bytes
176        */
177       while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
178          rec.VolSessionId = jcr->VolSessionId;
179          rec.VolSessionTime = jcr->VolSessionTime;
180          rec.FileIndex = file_index;
181          rec.Stream = stream;
182          rec.data_len = ds->msglen;
183          rec.data = ds->msg;            /* use message buffer */
184
185          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
186             rec.FileIndex, rec.VolSessionId, 
187             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
188             rec.data_len);
189
190          while (!write_record_to_block(dcr->block, &rec)) {
191             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
192                        rec.remainder);
193             if (!write_block_to_device(dcr)) {
194                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
195                   dev->print_name(), strerror_dev(dev));
196                Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
197                      dev->print_name(), strerror_dev(dev));
198                ok = false;
199                break;
200             }
201          }
202          if (!ok) {
203             Dmsg0(400, "Not OK\n");
204             break;
205          }
206          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
207          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
208             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
209             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
210
211          /* Send attributes and digest to Director for Catalog */
212          if (stream == STREAM_UNIX_ATTRIBUTES || stream == STREAM_UNIX_ATTRIBUTES_EX ||
213              crypto_digest_stream_type(stream) != CRYPTO_DIGEST_NONE) {
214             if (!jcr->no_attributes) {
215                if (are_attributes_spooled(jcr)) {
216                   jcr->dir_bsock->spool = true;
217                }
218                Dmsg0(850, "Send attributes to dir.\n");
219                if (!dir_update_file_attributes(dcr, &rec)) {
220                   jcr->dir_bsock->spool = false;
221                   Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
222                      bnet_strerror(jcr->dir_bsock));
223                   ok = false;
224                   break;
225                }
226                jcr->dir_bsock->spool = false;
227             }
228          }
229          Dmsg0(650, "Enter bnet_get\n");
230       }
231       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
232       if (is_bnet_error(ds)) {
233          Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
234          Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
235                bnet_strerror(ds));
236          ok = false;
237          break;
238       }
239    }
240
241    /* Create Job status for end of session label */
242    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
243
244    Dmsg1(200, "Write session label JobStatus=%d\n", jcr->JobStatus);
245    if ((!ok || job_canceled(jcr)) && dev->VolCatInfo.VolCatName[0] == 0) {
246       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
247    }
248
249    /*
250     * If !OK, check if we can still write. This may not be the case
251     *  if we are at the end of the tape or we got a fatal I/O error.
252     */
253    if (ok || dev->can_write()) {
254       if (!write_session_label(dcr, EOS_LABEL)) {
255          Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
256                strerror_dev(dev));
257          set_jcr_job_status(jcr, JS_ErrorTerminated);
258          ok = false;
259       }
260       if (dev->VolCatInfo.VolCatName[0] == 0) {
261          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
262       }
263       Dmsg0(90, "back from write_end_session_label()\n");
264       /* Flush out final partial block of this session */
265       if (!write_block_to_device(dcr)) {
266          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
267                dev->print_name(), strerror_dev(dev));
268          Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
269          ok = false;
270       }
271    }
272    if (dev->VolCatInfo.VolCatName[0] == 0) {
273       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
274    }
275
276    if (!ok) {
277       discard_data_spool(dcr);
278    } else {
279       commit_data_spool(dcr);
280    }
281
282    if (ok) {
283       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
284    }
285    
286    /* Release the device -- and send final Vol info to DIR */
287    release_device(dcr);
288
289    if (!ok || job_canceled(jcr)) {
290       discard_attribute_spool(jcr);
291    } else {
292       commit_attribute_spool(jcr);
293    }
294
295    dir_send_job_status(jcr);          /* update director */
296
297    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
298    return ok;
299 }