]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
ad8d28ff8aef45d4575a4bd4141c7e42aff0ca2b
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2016 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Append code for Storage daemon
21  *  Kern Sibbald, May MM
22  *
23  */
24
25 #include "bacula.h"
26 #include "stored.h"
27
28
29 /* Responses sent to the File daemon */
30 static char OK_data[]    = "3000 OK data\n";
31 static char OK_append[]  = "3000 OK append data\n";
32
33 /* Forward referenced functions */
34
35
36 /*
37  * Check if we can mark this job incomplete
38  *
39  */
40 void possible_incomplete_job(JCR *jcr, int32_t last_file_index)
41 {
42    /*
43     * Note, here we decide if it is worthwhile to restart
44     *  the Job at this point. For the moment, if at least
45     *  10 Files have been seen.
46     *  We must be sure that the saved files are safe.
47     *  Using this function when their is as comm line problem is probably safe,
48     *  it is inappropriate to use it for a any failure that could
49     *  involve corrupted data.
50     */
51    if (jcr->spool_attributes && last_file_index > 10) {
52       jcr->setJobStatus(JS_Incomplete);
53    }
54 }
55 /*
56  *  Append Data sent from Client (FD/SD)
57  *
58  */
59 bool do_append_data(JCR *jcr)
60 {
61    int32_t n;
62    int32_t file_index, stream, last_file_index;
63    uint64_t stream_len;
64    BSOCK *fd = jcr->file_bsock;
65    bool ok = true;
66    DEV_RECORD rec;
67    char buf1[100], buf2[100];
68    DCR *dcr = jcr->dcr;
69    DEVICE *dev;
70    char ec[50];
71    POOLMEM *eblock = NULL;
72    POOL_MEM errmsg(PM_EMSG);
73
74    if (!dcr) {
75       pm_strcpy(jcr->errmsg, _("DCR is NULL!!!\n"));
76       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
77       return false;
78    }
79    dev = dcr->dev;
80    if (!dev) {
81       pm_strcpy(jcr->errmsg, _("DEVICE is NULL!!!\n"));
82       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
83       return false;
84    }
85
86    Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
87
88    memset(&rec, 0, sizeof(rec));
89
90    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
91       jcr->setJobStatus(JS_ErrorTerminated);
92       pm_strcpy(jcr->errmsg, _("Unable to set network buffer size.\n"));
93       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
94       return false;
95    }
96
97    if (!acquire_device_for_append(dcr)) {
98       jcr->setJobStatus(JS_ErrorTerminated);
99       return false;
100    }
101
102    jcr->sendJobStatus(JS_Running);
103
104    //ASSERT(dev->VolCatInfo.VolCatName[0]);
105    if (dev->VolCatInfo.VolCatName[0] == 0) {
106       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
107    }
108    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
109
110    begin_data_spool(dcr);
111    begin_attribute_spool(jcr);
112
113    Dmsg0(100, "Just after acquire_device_for_append\n");
114    //ASSERT(dev->VolCatInfo.VolCatName[0]);
115    if (dev->VolCatInfo.VolCatName[0] == 0) {
116       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
117    }
118    /*
119     * Write Begin Session Record
120     */
121    if (!write_session_label(dcr, SOS_LABEL)) {
122       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
123          dev->bstrerror());
124       jcr->setJobStatus(JS_ErrorTerminated);
125       ok = false;
126    }
127
128    //ASSERT(dev->VolCatInfo.VolCatName[0]);
129    if (dev->VolCatInfo.VolCatName[0] == 0) {
130       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
131    }
132
133    /* Tell File daemon to send data */
134    if (!fd->fsend(OK_data)) {
135       berrno be;
136       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
137             be.bstrerror(fd->b_errno));
138       ok = false;
139    }
140
141    /*
142     * Get Data from File daemon, write to device.  To clarify what is
143     *   going on here.  We expect:
144     *     - A stream header
145     *     - Multiple records of data
146     *     - EOD record
147     *
148     *    The Stream header is just used to synchronize things, and
149     *    none of the stream header is written to tape.
150     *    The Multiple records of data, contain first the Attributes,
151     *    then after another stream header, the file data, then
152     *    after another stream header, the MD5 data if any.
153     *
154     *   So we get the (stream header, data, EOD) three time for each
155     *   file. 1. for the Attributes, 2. for the file data if any,
156     *   and 3. for the MD5 if any.
157     */
158    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
159    jcr->run_time = time(NULL);              /* start counting time for rates */
160
161    GetMsg *qfd;
162
163    qfd = New(GetMsg(jcr, fd, NULL, GETMSG_MAX_MSG_SIZE));
164    qfd->start_read_sock();
165
166    for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
167
168       /* Read Stream header from the File daemon.
169        *  The stream header consists of the following:
170        *    file_index (sequential Bacula file index, base 1)
171        *    stream     (Bacula number to distinguish parts of data)
172        *    stream_len (Expected length of this stream. This
173        *       will be the size backed up if the file does not
174        *       grow during the backup.
175        */
176       n = qfd->bget_msg(NULL);
177       if (n <= 0) {
178          if (n == BNET_SIGNAL && qfd->msglen == BNET_EOD) {
179             Dmsg0(200, "Got EOD on reading header.\n");
180             break;                    /* end of data */
181          }
182          Jmsg3(jcr, M_FATAL, 0, _("Error reading data header from FD. n=%d msglen=%d ERR=%s\n"),
183                n, qfd->msglen, fd->bstrerror());
184          // ASX TODO the fd->bstrerror() can be related to the wrong error, I should Queue the error too
185          possible_incomplete_job(jcr, last_file_index);
186          ok = false;
187          break;
188       }
189
190       if (sscanf(qfd->msg, "%ld %ld %lld", &file_index, &stream, &stream_len) != 3) {
191          // TODO ASX already done in bufmsg, should reuse the values
192          char buf[256];
193          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), asciidump(qfd->msg, qfd->msglen, buf, sizeof(buf)));
194          ok = false;
195          possible_incomplete_job(jcr, last_file_index);
196          break;
197       }
198
199       Dmsg3(890, "<filed: Header FilInx=%d stream=%d stream_len=%lld\n",
200          file_index, stream, stream_len);
201
202       /*
203        * We make sure the file_index is advancing sequentially.
204        * An incomplete job can start the file_index at any number.
205        * otherwise, it must start at 1.
206        */
207       if (jcr->rerunning && file_index > 0 && last_file_index == 0) {
208          goto fi_checked;
209       }
210       Dmsg2(400, "file_index=%d last_file_index=%d\n", file_index, last_file_index);
211       if (file_index > 0 && (file_index == last_file_index ||
212           file_index == last_file_index + 1)) {
213          goto fi_checked;
214       }
215       Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or last_FI=%d\n"),
216             file_index, last_file_index);
217       possible_incomplete_job(jcr, last_file_index);
218       ok = false;
219       break;
220
221 fi_checked:
222       if (file_index != last_file_index) {
223          jcr->JobFiles = file_index;
224          last_file_index = file_index;
225       }
226
227       /* Read data stream from the File daemon.
228        *  The data stream is just raw bytes
229        */
230       while ((n=qfd->bget_msg(NULL)) > 0 && !jcr->is_job_canceled()) {
231
232          rec.VolSessionId = jcr->VolSessionId;
233          rec.VolSessionTime = jcr->VolSessionTime;
234          rec.FileIndex = file_index;
235          rec.Stream = stream;
236          rec.StreamLen = stream_len;
237          rec.maskedStream = stream & STREAMMASK_TYPE;   /* strip high bits */
238          rec.data_len = qfd->msglen;
239          rec.data = qfd->msg;            /* use message buffer */
240
241          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
242             rec.FileIndex, rec.VolSessionId,
243             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
244             rec.data_len);
245          ok = dcr->write_record(&rec);
246          if (!ok) {
247             Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
248                   dcr->dev->print_name(), dcr->dev->bstrerror());
249             break;
250          }
251          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
252          jcr->JobBytes += qfd->bmsg->jobbytes; // if the block as been downloaded, count it
253          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
254             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
255             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
256
257          send_attrs_to_dir(jcr, &rec);
258          Dmsg0(650, "Enter bnet_get\n");
259       }
260       Dmsg2(650, "End read loop with FD. JobFiles=%d Stat=%d\n", jcr->JobFiles, n);
261
262       if (fd->is_error()) {
263          if (!jcr->is_job_canceled()) {
264             Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
265             Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
266                   fd->bstrerror());
267             possible_incomplete_job(jcr, last_file_index);
268          }
269          ok = false;
270          break;
271       }
272    }
273
274    qfd->wait_read_sock();
275    free_GetMsg(qfd);
276
277    if (eblock != NULL) {
278       free_pool_memory(eblock);
279    }
280
281    /* Create Job status for end of session label */
282    jcr->setJobStatus(ok?JS_Terminated:JS_ErrorTerminated);
283
284    if (ok) {
285       /* Terminate connection with Client */
286       fd->fsend(OK_append);
287       do_client_commands(jcr);            /* finish dialog with Client */
288    } else {
289       fd->fsend("3999 Failed append\n");
290    }
291
292    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
293
294    /*
295     * Check if we can still write. This may not be the case
296     *  if we are at the end of the tape or we got a fatal I/O error.
297     */
298    if (ok || dev->can_write()) {
299       if (!write_session_label(dcr, EOS_LABEL)) {
300          /* Print only if ok and not cancelled to avoid spurious messages */
301          if (ok && !jcr->is_job_canceled()) {
302             Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
303                   dev->bstrerror());
304             possible_incomplete_job(jcr, last_file_index);
305          }
306          jcr->setJobStatus(JS_ErrorTerminated);
307          ok = false;
308       }
309       /* Flush out final partial block of this session */
310       if (!dcr->write_final_block_to_device()) {
311          /* Print only if ok and not cancelled to avoid spurious messages */
312          if (ok && !jcr->is_job_canceled()) {
313             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
314                   dev->print_name(), dev->bstrerror());
315             Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
316             possible_incomplete_job(jcr, last_file_index);
317          }
318          jcr->setJobStatus(JS_ErrorTerminated);
319          ok = false;
320       }
321    }
322    flush_jobmedia_queue(jcr);
323    if (!ok && !jcr->is_JobStatus(JS_Incomplete)) {
324       discard_data_spool(dcr);
325    } else {
326       /* Note: if commit is OK, the device will remain blocked */
327       commit_data_spool(dcr);
328    }
329
330    /*
331     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
332     *   and the subsequent Jmsg() editing will break
333     */
334    int32_t job_elapsed = time(NULL) - jcr->run_time;
335
336    if (job_elapsed <= 0) {
337       job_elapsed = 1;
338    }
339
340    Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
341          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
342          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
343
344    /*
345     * Release the device -- and send final Vol info to DIR
346     *  and unlock it.
347     */
348    release_device(dcr);
349
350    if ((!ok || jcr->is_job_canceled()) && !jcr->is_JobStatus(JS_Incomplete)) {
351       discard_attribute_spool(jcr);
352    } else {
353       commit_attribute_spool(jcr);
354    }
355
356    jcr->sendJobStatus();          /* update director */
357
358    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
359    return ok;
360 }
361
362
363 /* Send attributes and digest to Director for Catalog */
364 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
365 {
366    if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES    ||
367        rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
368        rec->maskedStream == STREAM_RESTORE_OBJECT     ||
369        crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
370       if (!jcr->no_attributes) {
371          BSOCK *dir = jcr->dir_bsock;
372          if (are_attributes_spooled(jcr)) {
373             dir->set_spooling();
374          }
375          Dmsg1(850, "Send attributes to dir. FI=%d\n", rec->FileIndex);
376          if (!dir_update_file_attributes(jcr->dcr, rec)) {
377             Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
378                dir->bstrerror());
379             dir->clear_spooling();
380             return false;
381          }
382          dir->clear_spooling();
383       }
384    }
385    return true;
386 }