]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
- Add VolumePurged method to Python JobEvents class. Fixes
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2  * Append code for Storage daemon
3  *  Kern Sibbald, May MM
4  *
5  *  Version $Id$
6  */
7 /*
8    Copyright (C) 2000-2005 Kern Sibbald
9
10    This program is free software; you can redistribute it and/or
11    modify it under the terms of the GNU General Public License
12    version 2 as amended with additional clauses defined in the
13    file LICENSE in the main source directory.
14
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
18    the file LICENSE for additional details.
19
20  */
21
22 #include "bacula.h"
23 #include "stored.h"
24
25
26 /* Responses sent to the File daemon */
27 static char OK_data[]    = "3000 OK data\n";
28
29 /* Forward referenced functions */
30
31 /*
32  *  Append Data sent from File daemon
33  *
34  */
35 bool do_append_data(JCR *jcr)
36 {
37    int32_t n;
38    int32_t file_index, stream, last_file_index;
39    BSOCK *ds;
40    BSOCK *fd_sock = jcr->file_bsock;
41    bool ok = true;
42    DEV_RECORD rec;
43    DCR *dcr = jcr->dcr;
44    DEVICE *dev = dcr->dev;
45
46
47    Dmsg0(100, "Start append data.\n");
48
49    memset(&rec, 0, sizeof(rec));
50
51    ds = fd_sock;
52
53    if (!bnet_set_buffer_size(ds, dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
54       set_jcr_job_status(jcr, JS_ErrorTerminated);
55       Jmsg(jcr, M_FATAL, 0, _("Unable to set network buffer size.\n"));
56       return false;
57    }
58
59    if (!acquire_device_for_append(dcr)) {
60       set_jcr_job_status(jcr, JS_ErrorTerminated);
61       return false;
62    }
63
64    set_jcr_job_status(jcr, JS_Running);
65    dir_send_job_status(jcr);
66
67    if (dev->VolCatInfo.VolCatName[0] == 0) {
68       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
69    }
70    Dmsg1(20, "Begin append device=%s\n", dev->print_name());
71
72    begin_data_spool(dcr);
73    begin_attribute_spool(jcr);
74
75    Dmsg0(100, "Just after acquire_device_for_append\n");
76    if (dev->VolCatInfo.VolCatName[0] == 0) {
77       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
78    }
79    /*
80     * Write Begin Session Record
81     */
82    if (!write_session_label(dcr, SOS_LABEL)) {
83       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
84          strerror_dev(dev));
85       set_jcr_job_status(jcr, JS_ErrorTerminated);
86       ok = false;
87    }
88    if (dev->VolCatInfo.VolCatName[0] == 0) {
89       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
90    }
91
92    /* Tell File daemon to send data */
93    if (!bnet_fsend(fd_sock, OK_data)) {
94       berrno be;
95       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
96             be.strerror(fd_sock->b_errno));
97       ok = false;
98    }
99
100    /*
101     * Get Data from File daemon, write to device.  To clarify what is
102     *   going on here.  We expect:
103     *     - A stream header
104     *     - Multiple records of data
105     *     - EOD record
106     *
107     *    The Stream header is just used to sychronize things, and
108     *    none of the stream header is written to tape.
109     *    The Multiple records of data, contain first the Attributes,
110     *    then after another stream header, the file data, then
111     *    after another stream header, the MD5 data if any.
112     *
113     *   So we get the (stream header, data, EOD) three time for each
114     *   file. 1. for the Attributes, 2. for the file data if any,
115     *   and 3. for the MD5 if any.
116     */
117    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
118    jcr->run_time = time(NULL);              /* start counting time for rates */
119    for (last_file_index = 0; ok && !job_canceled(jcr); ) {
120
121       /* Read Stream header from the File daemon.
122        *  The stream header consists of the following:
123        *    file_index (sequential Bacula file index, base 1)
124        *    stream     (Bacula number to distinguish parts of data)
125        *    info       (Info for Storage daemon -- compressed, encryped, ...)
126        *       info is not currently used, so is read, but ignored!
127        */
128      if ((n=bget_msg(ds)) <= 0) {
129          if (n == BNET_SIGNAL && ds->msglen == BNET_EOD) {
130             break;                    /* end of data */
131          }
132          Jmsg1(jcr, M_FATAL, 0, _("Error reading data header from FD. ERR=%s\n"),
133                bnet_strerror(ds));
134          ok = false;
135          break;
136       }
137
138       /*
139        * This hand scanning is a bit more complicated than a simple
140        *   sscanf, but it allows us to handle any size integer up to
141        *   int64_t without worrying about whether %d, %ld, %lld, or %q
142        *   is the correct format for each different architecture.
143        * It is a real pity that sscanf() is not portable.
144        */
145       char *p = ds->msg;
146       while (B_ISSPACE(*p)) {
147          p++;
148       }
149       file_index = (int32_t)str_to_int64(p);
150       while (B_ISDIGIT(*p)) {
151          p++;
152       }
153       if (!B_ISSPACE(*p) || !B_ISDIGIT(*(p+1))) {
154          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), ds->msg);
155          ok = false;
156          break;
157       }
158       stream = (int32_t)str_to_int64(p);
159
160       Dmsg2(890, "<filed: Header FilInx=%d stream=%d\n", file_index, stream);
161
162       if (!(file_index > 0 && (file_index == last_file_index ||
163           file_index == last_file_index + 1))) {
164          Jmsg0(jcr, M_FATAL, 0, _("File index from FD not positive or sequential\n"));
165          ok = false;
166          break;
167       }
168       if (file_index != last_file_index) {
169          jcr->JobFiles = file_index;
170          last_file_index = file_index;
171       }
172
173       /* Read data stream from the File daemon.
174        *  The data stream is just raw bytes
175        */
176       while ((n=bget_msg(ds)) > 0 && !job_canceled(jcr)) {
177          rec.VolSessionId = jcr->VolSessionId;
178          rec.VolSessionTime = jcr->VolSessionTime;
179          rec.FileIndex = file_index;
180          rec.Stream = stream;
181          rec.data_len = ds->msglen;
182          rec.data = ds->msg;            /* use message buffer */
183
184          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
185             rec.FileIndex, rec.VolSessionId, stream_to_ascii(rec.Stream,rec.FileIndex),
186             rec.data_len);
187
188          while (!write_record_to_block(dcr->block, &rec)) {
189             Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec.data_len,
190                        rec.remainder);
191             if (!write_block_to_device(dcr)) {
192                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
193                   dev->print_name(), strerror_dev(dev));
194                Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
195                      dev->print_name(), strerror_dev(dev));
196                ok = false;
197                break;
198             }
199          }
200          if (!ok) {
201             Dmsg0(400, "Not OK\n");
202             break;
203          }
204          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
205          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
206             FI_to_ascii(rec.FileIndex), rec.VolSessionId,
207             stream_to_ascii(rec.Stream, rec.FileIndex), rec.data_len);
208
209          /* Send attributes and MD5 to Director for Catalog */
210          if (stream == STREAM_UNIX_ATTRIBUTES    || stream == STREAM_MD5_SIGNATURE ||
211              stream == STREAM_UNIX_ATTRIBUTES_EX || stream == STREAM_SHA1_SIGNATURE) {
212             if (!jcr->no_attributes) {
213                if (are_attributes_spooled(jcr)) {
214                   jcr->dir_bsock->spool = true;
215                }
216                Dmsg0(850, "Send attributes to dir.\n");
217                if (!dir_update_file_attributes(dcr, &rec)) {
218                   jcr->dir_bsock->spool = false;
219                   Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
220                      bnet_strerror(jcr->dir_bsock));
221                   ok = false;
222                   break;
223                }
224                jcr->dir_bsock->spool = false;
225             }
226          }
227          Dmsg0(650, "Enter bnet_get\n");
228       }
229       Dmsg1(650, "End read loop with FD. Stat=%d\n", n);
230       if (is_bnet_error(ds)) {
231          Dmsg1(350, "Network read error from FD. ERR=%s\n", bnet_strerror(ds));
232          Jmsg1(jcr, M_FATAL, 0, _("Network error on data channel. ERR=%s\n"),
233                bnet_strerror(ds));
234          ok = false;
235          break;
236       }
237    }
238
239    /* Create Job status for end of session label */
240    set_jcr_job_status(jcr, ok?JS_Terminated:JS_ErrorTerminated);
241
242    Dmsg1(200, "Write session label JobStatus=%d\n", jcr->JobStatus);
243    if ((!ok || job_canceled(jcr)) && dev->VolCatInfo.VolCatName[0] == 0) {
244       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
245    }
246
247    /*
248     * If !OK, check if we can still write. This may not be the case
249     *  if we are at the end of the tape or we got a fatal I/O error.
250     */
251    if (ok || dev->can_write()) {
252       if (!write_session_label(dcr, EOS_LABEL)) {
253          Jmsg1(jcr, M_FATAL, 0, _("Error writting end session label. ERR=%s\n"),
254                strerror_dev(dev));
255          set_jcr_job_status(jcr, JS_ErrorTerminated);
256          ok = false;
257       }
258       if (dev->VolCatInfo.VolCatName[0] == 0) {
259          Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
260       }
261       Dmsg0(90, "back from write_end_session_label()\n");
262       /* Flush out final partial block of this session */
263       if (!write_block_to_device(dcr)) {
264          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
265                dev->print_name(), strerror_dev(dev));
266          Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
267          ok = false;
268       }
269    }
270    if (dev->VolCatInfo.VolCatName[0] == 0) {
271       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
272    }
273
274    if (!ok) {
275       discard_data_spool(dcr);
276    } else {
277       commit_data_spool(dcr);
278    }
279
280    if (ok) {
281       ok = dvd_close_job(dcr);  /* do DVD cleanup if any */
282    }
283    
284    /* Release the device -- and send final Vol info to DIR */
285    release_device(dcr);
286
287    if (!ok || job_canceled(jcr)) {
288       discard_attribute_spool(jcr);
289    } else {
290       commit_attribute_spool(jcr);
291    }
292
293    dir_send_job_status(jcr);          /* update director */
294
295    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
296    return ok;
297 }