]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/vbackup.c
Apply patches from bugs #2325 and #2326 to fix FIFO bugs
[bacula/bacula] / bacula / src / stored / vbackup.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * SD -- vbackup.c --  responsible for doing virtual backup jobs.
21  *
22  *     Kern Sibbald, January MMVI
23  *
24  */
25
26 #include "bacula.h"
27 #include "stored.h"
28
29 /* Import functions */
30 extern char Job_end[];
31
32 /* Forward referenced subroutines */
33 static bool record_cb(DCR *dcr, DEV_RECORD *rec);
34
35 /*
36  *  Read Data and send to File Daemon
37  *   Returns: false on failure
38  *            true  on success
39  */
40 bool do_vbackup(JCR *jcr)
41 {
42    bool ok = true;
43    BSOCK *dir = jcr->dir_bsock;
44    const char *Type;
45    char ec1[50];
46    DEVICE *dev;
47
48    switch(jcr->getJobType()) {
49    case JT_MIGRATE:
50       Type = "Migration";
51       break;
52    case JT_ARCHIVE:
53       Type = "Archive";
54       break;
55    case JT_COPY:
56       Type = "Copy";
57       break;
58    case JT_BACKUP:
59       Type = "Virtual Backup";
60       break;
61    default:
62       Type = "Unknown";
63       break;
64    }
65
66    /* TODO: Remove when the new match_all is well tested */
67    jcr->use_new_match_all = use_new_match_all;
68
69    Dmsg1(20, "Start read data. newbsr=%d\n", jcr->use_new_match_all);
70
71    if (!jcr->read_dcr || !jcr->dcr) {
72       Jmsg(jcr, M_FATAL, 0, _("Read and write devices not properly initialized.\n"));
73       goto bail_out;
74    }
75    Dmsg2(100, "read_dcr=%p write_dcr=%p\n", jcr->read_dcr, jcr->dcr);
76
77    if (jcr->NumReadVolumes == 0) {
78       Jmsg(jcr, M_FATAL, 0, _("No Volume names found for %s.\n"), Type);
79       goto bail_out;
80    }
81
82    Dmsg3(200, "Found %d volumes names for %s. First=%s\n", jcr->NumReadVolumes,
83       jcr->VolList->VolumeName, Type);
84
85    ASSERT(jcr->read_dcr != jcr->dcr);
86    ASSERT(jcr->read_dcr->dev != jcr->dcr->dev);
87    /* Ready devices for reading and writing */
88    if (!acquire_device_for_read(jcr->read_dcr) ||
89        !acquire_device_for_append(jcr->dcr)) {
90       jcr->setJobStatus(JS_ErrorTerminated);
91       goto bail_out;
92    }
93    jcr->dcr->dev->start_of_job(jcr->dcr);
94
95    Dmsg2(200, "===== After acquire pos %u:%u\n", jcr->dcr->dev->file, jcr->dcr->dev->block_num);
96    jcr->sendJobStatus(JS_Running);
97
98    begin_data_spool(jcr->dcr);
99    begin_attribute_spool(jcr);
100
101    jcr->dcr->VolFirstIndex = jcr->dcr->VolLastIndex = 0;
102    jcr->run_time = time(NULL);
103    set_start_vol_position(jcr->dcr);
104
105    jcr->JobFiles = 0;
106    jcr->dcr->set_ameta();
107    jcr->read_dcr->set_ameta();
108    ok = read_records(jcr->read_dcr, record_cb, mount_next_read_volume);
109    goto ok_out;
110
111 bail_out:
112    ok = false;
113
114 ok_out:
115    if (jcr->dcr) {
116       jcr->dcr->set_ameta();
117       dev = jcr->dcr->dev;
118       Dmsg1(100, "ok=%d\n", ok);
119       if (ok || dev->can_write()) {
120          if (!dev->flush_before_eos(jcr->dcr)) {
121             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
122                   dev->print_name(), dev->bstrerror());
123             Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
124             //possible_incomplete_job(jcr, last_file_index);
125             ok = false;
126          }
127          /* Flush out final ameta partial block of this session */
128          if (!jcr->dcr->write_final_block_to_device()) {
129             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
130                   dev->print_name(), dev->bstrerror());
131             Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
132             ok = false;
133          }
134          Dmsg2(200, "Flush block to device pos %u:%u\n", dev->file, dev->block_num);
135       }
136       flush_jobmedia_queue(jcr);
137       if (!ok) {
138          discard_data_spool(jcr->dcr);
139       } else {
140          /* Note: if commit is OK, the device will remain blocked */
141          commit_data_spool(jcr->dcr);
142       }
143
144       /*
145        * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
146        *   and the subsequent Jmsg() editing will break
147        */
148       int32_t job_elapsed = time(NULL) - jcr->run_time;
149
150       if (job_elapsed <= 0) {
151          job_elapsed = 1;
152       }
153
154       Jmsg(jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
155             job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
156             edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec1));
157
158       /* Release the device -- and send final Vol info to DIR */
159       release_device(jcr->dcr);
160
161       if (!ok || job_canceled(jcr)) {
162          discard_attribute_spool(jcr);
163       } else {
164          commit_attribute_spool(jcr);
165       }
166    }
167
168    if (jcr->read_dcr) {
169       if (!release_device(jcr->read_dcr)) {
170          ok = false;
171       }
172    }
173
174    jcr->sendJobStatus();              /* update director */
175
176    Dmsg0(30, "Done reading.\n");
177    jcr->end_time = time(NULL);
178    dequeue_messages(jcr);             /* send any queued messages */
179    if (ok) {
180       jcr->setJobStatus(JS_Terminated);
181    }
182    generate_daemon_event(jcr, "JobEnd");
183    generate_plugin_event(jcr, bsdEventJobEnd);
184    dir->fsend(Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles,
185       edit_uint64(jcr->JobBytes, ec1), jcr->JobErrors, jcr->StatusErrMsg);
186    Dmsg6(100, Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles, ec1, jcr->JobErrors, jcr->StatusErrMsg);
187
188    dir->signal(BNET_EOD);             /* send EOD to Director daemon */
189    free_plugins(jcr);                 /* release instantiated plugins */
190
191    return ok;
192 }
193
194
195 /*
196  * Called here for each record from read_records()
197  *  Returns: true if OK
198  *           false if error
199  */
200 static bool record_cb(DCR *dcr, DEV_RECORD *rec)
201 {
202    JCR *jcr = dcr->jcr;
203    DEVICE *dev = jcr->dcr->dev;
204    char buf1[100], buf2[100];
205    bool     restoredatap = false;
206    POOLMEM *orgdata = NULL;
207    uint32_t orgdata_len = 0;
208    bool ret = false;
209
210    /* If label and not for us, discard it */
211    if (rec->FileIndex < 0 && rec->match_stat <= 0) {
212       ret = true;
213       goto bail_out;
214    }
215    /* We want to write SOS_LABEL and EOS_LABEL discard all others */
216    switch (rec->FileIndex) {
217    case PRE_LABEL:
218    case VOL_LABEL:
219    case EOT_LABEL:
220    case EOM_LABEL:
221       ret = true;                    /* don't write vol labels */
222       goto bail_out;
223    }
224
225    /*
226     * For normal migration jobs, FileIndex values are sequential because
227     *  we are dealing with one job.  However, for Vbackup (consolidation),
228     *  we will be getting records from multiple jobs and writing them back
229     *  out, so we need to ensure that the output FileIndex is sequential.
230     *  We do so by detecting a FileIndex change and incrementing the
231     *  JobFiles, which we then use as the output FileIndex.
232     */
233    if (rec->FileIndex >= 0) {
234       /* If something changed, increment FileIndex */
235       if (rec->VolSessionId != rec->last_VolSessionId ||
236           rec->VolSessionTime != rec->last_VolSessionTime ||
237           rec->FileIndex != rec->last_FileIndex) {
238          jcr->JobFiles++;
239          rec->last_VolSessionId = rec->VolSessionId;
240          rec->last_VolSessionTime = rec->VolSessionTime;
241          rec->last_FileIndex = rec->FileIndex;
242       }
243       rec->FileIndex = jcr->JobFiles;     /* set sequential output FileIndex */
244    }
245
246    /* TODO: If user really wants to do rehydrate the data, we should propose
247     * this option.
248     */
249
250    /*
251     * Modify record SessionId and SessionTime to correspond to
252     * output.
253     */
254    rec->VolSessionId = jcr->VolSessionId;
255    rec->VolSessionTime = jcr->VolSessionTime;
256    Dmsg5(200, "before write JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
257       jcr->JobId,
258       FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
259       stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
260
261    if (!jcr->dcr->write_record(rec)) {
262       Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
263             dev->print_name(), dev->bstrerror());
264       goto bail_out;
265    }
266    /* Restore packet */
267    rec->VolSessionId = rec->last_VolSessionId;
268    rec->VolSessionTime = rec->last_VolSessionTime;
269    if (rec->FileIndex < 0) {
270       ret = true;                    /* don't send LABELs to Dir */
271       goto bail_out;
272    }
273    jcr->JobBytes += rec->data_len;   /* increment bytes this job */
274    Dmsg5(500, "wrote_record JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
275       jcr->JobId,
276       FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
277       stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
278
279    send_attrs_to_dir(jcr, rec);
280    ret = true;
281
282 bail_out:
283    if (restoredatap) {
284       rec->data = orgdata;
285       rec->data_len = orgdata_len;
286    }
287    return ret;
288 }