]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/append.c
aed73b3cbfcceefed52b32270f3bde0cb54d70c7
[bacula/bacula] / bacula / src / stored / append.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Append code for Storage daemon
21  *  Kern Sibbald, May MM
22  */
23
24 #include "bacula.h"
25 #include "stored.h"
26
27
28 /* Responses sent to the File daemon */
29 static char OK_data[]    = "3000 OK data\n";
30 static char OK_append[]  = "3000 OK append data\n";
31
32 /* Forward referenced functions */
33
34
35 /*
36  * Check if we can mark this job incomplete
37  *
38  */
39 void possible_incomplete_job(JCR *jcr, int32_t last_file_index)
40 {
41    /*
42     * Note, here we decide if it is worthwhile to restart
43     *  the Job at this point. For the moment, if at least
44     *  10 Files have been seen.
45     *  We must be sure that the saved files are safe.
46     *  Using this function when their is as comm line problem is probably safe,
47     *  it is inappropriate to use it for a any failure that could
48     *  involve corrupted data.
49     */
50    if (jcr->spool_attributes && last_file_index > 10) {
51       jcr->setJobStatus(JS_Incomplete);
52    }
53 }
54
55 /*
56  *  Append Data sent from Client (FD/SD)
57  *
58  */
59 bool do_append_data(JCR *jcr)
60 {
61    int32_t n;
62    int32_t file_index, stream, last_file_index;
63    uint64_t stream_len;
64    BSOCK *fd = jcr->file_bsock;
65    bool ok = true;
66    DEV_RECORD rec;
67    char buf1[100], buf2[100];
68    DCR *dcr = jcr->dcr;
69    DEVICE *dev;
70    char ec[50];
71    POOLMEM *eblock = NULL;
72    POOL_MEM errmsg(PM_EMSG);
73
74    if (!dcr) {
75       pm_strcpy(jcr->errmsg, _("DCR is NULL!!!\n"));
76       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
77       return false;
78    }
79    dev = dcr->dev;
80    if (!dev) {
81       pm_strcpy(jcr->errmsg, _("DEVICE is NULL!!!\n"));
82       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
83       return false;
84    }
85
86    Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
87
88    memset(&rec, 0, sizeof(rec));
89
90    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
91       jcr->setJobStatus(JS_ErrorTerminated);
92       pm_strcpy(jcr->errmsg, _("Unable to set network buffer size.\n"));
93       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
94       return false;
95    }
96
97    if (!acquire_device_for_append(dcr)) {
98       jcr->setJobStatus(JS_ErrorTerminated);
99       return false;
100    }
101
102    dev->start_of_job(dcr);
103    jcr->sendJobStatus(JS_Running);
104
105    //ASSERT(dev->VolCatInfo.VolCatName[0]);
106    if (dev->VolCatInfo.VolCatName[0] == 0) {
107       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
108    }
109    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
110
111    begin_data_spool(dcr);
112    begin_attribute_spool(jcr);
113
114    Dmsg0(100, "Just after acquire_device_for_append\n");
115    //ASSERT(dev->VolCatInfo.VolCatName[0]);
116    if (dev->VolCatInfo.VolCatName[0] == 0) {
117       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
118    }
119    /*
120     * Write Begin Session Record
121     */
122    if (!write_session_label(dcr, SOS_LABEL)) {
123       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
124          dev->bstrerror());
125       jcr->setJobStatus(JS_ErrorTerminated);
126       ok = false;
127    }
128
129    //ASSERT(dev->VolCatInfo.VolCatName[0]);
130    if (dev->VolCatInfo.VolCatName[0] == 0) {
131       Pmsg0(000, _("NULL Volume name. This shouldn't happen!!!\n"));
132    }
133
134    /* Tell File daemon to send data */
135    if (!fd->fsend(OK_data)) {
136       berrno be;
137       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
138             be.bstrerror(fd->b_errno));
139       ok = false;
140    }
141
142    /*
143     * Get Data from File daemon, write to device.  To clarify what is
144     *   going on here.  We expect:
145     *     - A stream header
146     *     - Multiple records of data
147     *     - EOD record
148     *
149     *    The Stream header is just used to synchronize things, and
150     *    none of the stream header is written to tape.
151     *    The Multiple records of data, contain first the Attributes,
152     *    then after another stream header, the file data, then
153     *    after another stream header, the MD5 data if any.
154     *
155     *   So we get the (stream header, data, EOD) three time for each
156     *   file. 1. for the Attributes, 2. for the file data if any,
157     *   and 3. for the MD5 if any.
158     */
159    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
160    jcr->run_time = time(NULL);              /* start counting time for rates */
161
162    GetMsg *qfd;
163
164    qfd = New(GetMsg(jcr, fd, NULL, GETMSG_MAX_MSG_SIZE));
165    qfd->start_read_sock();
166
167    for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
168
169       /* Read Stream header from the File daemon.
170        *  The stream header consists of the following:
171        *    file_index (sequential Bacula file index, base 1)
172        *    stream     (Bacula number to distinguish parts of data)
173        *    stream_len (Expected length of this stream. This
174        *       will be the size backed up if the file does not
175        *       grow during the backup.
176        */
177       n = qfd->bget_msg(NULL);
178       if (n <= 0) {
179          if (n == BNET_SIGNAL && qfd->msglen == BNET_EOD) {
180             Dmsg0(200, "Got EOD on reading header.\n");
181             break;                    /* end of data */
182          }
183          Jmsg3(jcr, M_FATAL, 0, _("Error reading data header from FD. n=%d msglen=%d ERR=%s\n"),
184                n, qfd->msglen, fd->bstrerror());
185          // ASX TODO the fd->bstrerror() can be related to the wrong error, I should Queue the error too
186          possible_incomplete_job(jcr, last_file_index);
187          ok = false;
188          break;
189       }
190
191       if (sscanf(qfd->msg, "%ld %ld %lld", &file_index, &stream, &stream_len) != 3) {
192          // TODO ASX already done in bufmsg, should reuse the values
193          char buf[256];
194          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), asciidump(qfd->msg, qfd->msglen, buf, sizeof(buf)));
195          ok = false;
196          possible_incomplete_job(jcr, last_file_index);
197          break;
198       }
199
200       Dmsg3(890, "<filed: Header FilInx=%d stream=%d stream_len=%lld\n",
201          file_index, stream, stream_len);
202
203       /*
204        * We make sure the file_index is advancing sequentially.
205        * An incomplete job can start the file_index at any number.
206        * otherwise, it must start at 1.
207        */
208       if (jcr->rerunning && file_index > 0 && last_file_index == 0) {
209          goto fi_checked;
210       }
211       Dmsg2(400, "file_index=%d last_file_index=%d\n", file_index, last_file_index);
212       if (file_index > 0 && (file_index == last_file_index ||
213           file_index == last_file_index + 1)) {
214          goto fi_checked;
215       }
216       Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or last_FI=%d\n"),
217             file_index, last_file_index);
218       possible_incomplete_job(jcr, last_file_index);
219       ok = false;
220       break;
221
222 fi_checked:
223       if (file_index != last_file_index) {
224          jcr->JobFiles = file_index;
225          last_file_index = file_index;
226       }
227
228       /* Read data stream from the File daemon.
229        *  The data stream is just raw bytes
230        */
231       while ((n=qfd->bget_msg(NULL)) > 0 && !jcr->is_job_canceled()) {
232
233          rec.VolSessionId = jcr->VolSessionId;
234          rec.VolSessionTime = jcr->VolSessionTime;
235          rec.FileIndex = file_index;
236          rec.Stream = stream;
237          rec.StreamLen = stream_len;
238          rec.maskedStream = stream & STREAMMASK_TYPE;   /* strip high bits */
239          rec.data_len = qfd->msglen;
240          rec.data = qfd->msg;            /* use message buffer */
241
242          /* Debug code: check if we must hangup or blowup */
243          if (handle_hangup_blowup(jcr, jcr->JobFiles, jcr->JobBytes)) {
244             return false;
245          }
246          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
247             rec.FileIndex, rec.VolSessionId,
248             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
249             rec.data_len);
250          ok = dcr->write_record(&rec);
251          if (!ok) {
252             Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
253                   dcr->dev->print_name(), dcr->dev->bstrerror());
254             break;
255          }
256          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
257          jcr->JobBytes += qfd->bmsg->jobbytes; // if the block as been downloaded, count it
258          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
259             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
260             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
261
262          send_attrs_to_dir(jcr, &rec);
263          Dmsg0(650, "Enter bnet_get\n");
264       }
265       Dmsg2(650, "End read loop with FD. JobFiles=%d Stat=%d\n", jcr->JobFiles, n);
266
267       if (fd->is_error()) {
268          if (!jcr->is_job_canceled()) {
269             Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
270             Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
271                   fd->bstrerror());
272             possible_incomplete_job(jcr, last_file_index);
273          }
274          ok = false;
275          break;
276       }
277    }
278
279    qfd->wait_read_sock((ok == false) || jcr->is_job_canceled());
280    free_GetMsg(qfd);
281
282    if (eblock != NULL) {
283       free_pool_memory(eblock);
284    }
285
286    /* Create Job status for end of session label */
287    jcr->setJobStatus(ok?JS_Terminated:JS_ErrorTerminated);
288
289    if (ok) {
290       /* Terminate connection with Client */
291       fd->fsend(OK_append);
292       do_client_commands(jcr);            /* finish dialog with Client */
293    } else {
294       fd->fsend("3999 Failed append\n");
295    }
296
297    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
298
299    /*
300     * Check if we can still write. This may not be the case
301     *  if we are at the end of the tape or we got a fatal I/O error.
302     */
303    dcr->set_ameta();
304    if (ok || dev->can_write()) {
305       if (!dev->flush_before_eos(dcr)) {
306          /* Print only if ok and not cancelled to avoid spurious messages */
307          if (!jcr->is_job_canceled()) {
308             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
309                   dev->print_name(), dev->bstrerror());
310             Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
311             possible_incomplete_job(jcr, last_file_index);
312          }
313          jcr->setJobStatus(JS_ErrorTerminated);
314          ok = false;
315       }
316       if (!write_session_label(dcr, EOS_LABEL)) {
317          /* Print only if ok and not cancelled to avoid spurious messages */
318          if (ok && !jcr->is_job_canceled()) {
319             Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
320                   dev->bstrerror());
321             possible_incomplete_job(jcr, last_file_index);
322          }
323          jcr->setJobStatus(JS_ErrorTerminated);
324          ok = false;
325       }
326       /* Flush out final partial block of this session */
327       Dmsg1(200, "=== Flush adata=%d last block.\n", dcr->block->adata);
328       ASSERT(!dcr->block->adata);
329       if (!dcr->write_final_block_to_device()) {
330          /* Print only if ok and not cancelled to avoid spurious messages */
331          if (ok && !jcr->is_job_canceled()) {
332             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
333                   dev->print_name(), dev->bstrerror());
334             Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
335             possible_incomplete_job(jcr, last_file_index);
336          }
337          jcr->setJobStatus(JS_ErrorTerminated);
338          ok = false;
339       }
340    }
341    flush_jobmedia_queue(jcr);
342    if (!ok && !jcr->is_JobStatus(JS_Incomplete)) {
343       discard_data_spool(dcr);
344    } else {
345       /* Note: if commit is OK, the device will remain blocked */
346       commit_data_spool(dcr);
347    }
348
349    /*
350     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
351     *   and the subsequent Jmsg() editing will break
352     */
353    int32_t job_elapsed = time(NULL) - jcr->run_time;
354
355    if (job_elapsed <= 0) {
356       job_elapsed = 1;
357    }
358
359    Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
360          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
361          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
362
363    /*
364     * Release the device -- and send final Vol info to DIR
365     *  and unlock it.
366     */
367    release_device(dcr);
368
369    if ((!ok || jcr->is_job_canceled()) && !jcr->is_JobStatus(JS_Incomplete)) {
370       discard_attribute_spool(jcr);
371    } else {
372       commit_attribute_spool(jcr);
373    }
374
375    jcr->sendJobStatus();          /* update director */
376
377    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
378    return ok;
379 }
380
381
382 /* Send attributes and digest to Director for Catalog */
383 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
384 {
385    if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES    ||
386        rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
387        rec->maskedStream == STREAM_RESTORE_OBJECT     ||
388        crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
389       if (!jcr->no_attributes) {
390          BSOCK *dir = jcr->dir_bsock;
391          if (are_attributes_spooled(jcr)) {
392             dir->set_spooling();
393          }
394          Dmsg1(850, "Send attributes to dir. FI=%d\n", rec->FileIndex);
395          if (!dir_update_file_attributes(jcr->dcr, rec)) {
396             Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
397                dir->bstrerror());
398             dir->clear_spooling();
399             return false;
400          }
401          dir->clear_spooling();
402       }
403    }
404    return true;
405 }