]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/read.c
Make 5 min Heartbeat the default
[bacula/bacula] / bacula / src / stored / read.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from many
7    others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    Bacula® is a registered trademark of Kern Sibbald.
15 */
16 /*
17  * Read code for Storage daemon
18  *
19  *     Kern Sibbald, November MM
20  *
21  */
22
23 #include "bacula.h"
24 #include "stored.h"
25
26 /* Forward referenced subroutines */
27 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec);
28 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec);
29
30
31 /* Responses sent to the File daemon */
32 static char OK_data[]    = "3000 OK data\n";
33 static char FD_error[]   = "3000 error\n";
34 static char rec_header[] = "rechdr %ld %ld %ld %ld %ld";
35
36 /*
37  *  Read Data and send to File Daemon
38  *   Returns: false on failure
39  *            true  on success
40  */
41 bool do_read_data(JCR *jcr)
42 {
43    BSOCK *fd = jcr->file_bsock;
44    bool ok = true;
45    DCR *dcr = jcr->read_dcr;
46    char ec[50];
47
48    Dmsg0(100, "Start read data.\n");
49
50    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
51       return false;
52    }
53
54    if (jcr->NumReadVolumes == 0) {
55       Jmsg(jcr, M_FATAL, 0, _("No Volume names found for restore.\n"));
56       fd->fsend(FD_error);
57       return false;
58    }
59
60    Dmsg2(200, "Found %d volumes names to restore. First=%s\n", jcr->NumReadVolumes,
61       jcr->VolList->VolumeName);
62
63    /* Ready device for reading */
64    if (!acquire_device_for_read(dcr)) {
65       fd->fsend(FD_error);
66       return false;
67    }
68
69    /* Tell File daemon we will send data */
70    fd->fsend(OK_data);
71
72    jcr->sendJobStatus(JS_Running);
73    jcr->run_time = time(NULL);
74    jcr->JobFiles = 0;
75
76    if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY)) {
77       ok = read_records(dcr, mac_record_cb, mount_next_read_volume);
78    } else {
79       ok = read_records(dcr, read_record_cb, mount_next_read_volume);
80    }
81
82    /*
83     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
84     *   and the subsequent Jmsg() editing will break
85     */
86    int32_t job_elapsed = time(NULL) - jcr->run_time;
87
88    if (job_elapsed <= 0) {
89       job_elapsed = 1;
90    }
91
92    Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
93          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
94          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
95
96    /* Send end of data to FD */
97    fd->signal(BNET_EOD);
98
99    if (!release_device(jcr->read_dcr)) {
100       ok = false;
101    }
102
103    Dmsg0(30, "Done reading.\n");
104    return ok;
105 }
106
107 /*
108  * Called here for reading (restore) for each record from read_records()
109  *  Returns: true if OK
110  *           false if error
111  */
112 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec)
113 {
114    JCR *jcr = dcr->jcr;
115    BSOCK *fd = jcr->file_bsock;
116    bool ok = true;
117    POOLMEM *save_msg;
118    char ec1[50], ec2[50];
119
120    if (rec->FileIndex < 0) {
121       return true;
122    }
123    Dmsg5(400, "Send to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%d\n",
124       rec->VolSessionId, rec->VolSessionTime,
125       FI_to_ascii(ec1, rec->FileIndex),
126       stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
127       rec->data_len);
128
129    /* Send record header to File daemon */
130    if (!fd->fsend(rec_header, rec->VolSessionId, rec->VolSessionTime,
131           rec->FileIndex, rec->Stream, rec->data_len)) {
132       Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
133       Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
134          fd->bstrerror());
135       return false;
136    }
137
138    /*
139     * For normal migration jobs, FileIndex values are sequential because
140     *  we are dealing with one job.  However, for Vbackup (consolidation),
141     *  we will be getting records from multiple jobs and writing them back
142     *  out, so we need to ensure that the output FileIndex is sequential.
143     *  We do so by detecting a FileIndex change and incrementing the
144     *  JobFiles, which we then use as the output FileIndex.
145     */
146    if (rec->FileIndex >= 0) {
147       /* If something changed, increment FileIndex */
148       if (rec->VolSessionId != rec->last_VolSessionId ||
149           rec->VolSessionTime != rec->last_VolSessionTime ||
150           rec->FileIndex != rec->last_FileIndex) {
151          jcr->JobFiles++;
152          rec->last_VolSessionId = rec->VolSessionId;
153          rec->last_VolSessionTime = rec->VolSessionTime;
154          rec->last_FileIndex = rec->FileIndex;
155       }
156    }
157
158    /* Send data record to File daemon */
159    save_msg = fd->msg;          /* save fd message pointer */
160    fd->msg = rec->data;         /* pass data directly to the FD */
161    fd->msglen = rec->data_len;
162    jcr->JobBytes += rec->data_len;   /* increment bytes this job */
163    Dmsg1(400, ">filed: send %d bytes data.\n", fd->msglen);
164    if (!fd->send()) {
165       Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
166       Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
167          fd->bstrerror());
168
169       ok = false;
170    }
171    fd->msg = save_msg;                /* restore fd message pointer */
172    return ok;
173 }
174
175 /*
176  * New routine after to SD->SD implementation
177  * Called here for each record from read_records()
178  *  Returns: true if OK
179  *           false if error
180  */
181 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec)
182 {
183    JCR *jcr = dcr->jcr;
184    BSOCK *fd = jcr->file_bsock;
185    char buf1[100], buf2[100];
186    bool new_header = false;
187    POOLMEM *save_msg;
188    char ec1[50], ec2[50];
189    bool ok = true;
190
191 #ifdef xxx
192    Pmsg5(000, "on entry     JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
193       jcr->JobId,
194       FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
195       stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
196 #endif
197
198    /* If label and not for us, discard it */
199    if (rec->FileIndex < 0) {
200       Dmsg1(100, "FileIndex=%d\n", rec->FileIndex);
201       return true;
202    }
203
204    /*
205     * For normal migration jobs, FileIndex values are sequential because
206     *  we are dealing with one job.  However, for Vbackup (consolidation),
207     *  we will be getting records from multiple jobs and writing them back
208     *  out, so we need to ensure that the output FileIndex is sequential.
209     *  We do so by detecting a FileIndex change and incrementing the
210     *  JobFiles, which we then use as the output FileIndex.
211     */
212    if (rec->FileIndex >= 0) {
213       /* If something changed, increment FileIndex */
214       if (rec->VolSessionId != rec->last_VolSessionId ||
215           rec->VolSessionTime != rec->last_VolSessionTime ||
216           rec->FileIndex != rec->last_FileIndex ||
217           rec->Stream != rec->last_Stream) {
218
219          /* Something changed */
220          if (rec->last_VolSessionId != 0) {        /* Not first record */
221             Dmsg1(200, "Send EOD jobfiles=%d\n", jcr->JobFiles);
222             if (!fd->signal(BNET_EOD)) {  /* End of previous stream */
223                Jmsg(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
224                         fd->bstrerror());
225                return false;
226             }
227          }
228          new_header = true;
229          if (rec->FileIndex != rec->last_FileIndex) {
230             jcr->JobFiles++;
231          }
232          rec->last_VolSessionId = rec->VolSessionId;
233          rec->last_VolSessionTime = rec->VolSessionTime;
234          rec->last_FileIndex = rec->FileIndex;
235          rec->last_Stream = rec->Stream;
236       }
237       rec->FileIndex = jcr->JobFiles;     /* set sequential output FileIndex */
238    }
239
240    if (new_header) {
241       new_header = false;
242       Dmsg5(400, "Send header to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%d\n",
243          rec->VolSessionId, rec->VolSessionTime,
244          FI_to_ascii(ec1, rec->FileIndex),
245          stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
246          rec->data_len);
247
248       /* Send data header to File daemon */
249       if (!fd->fsend("%ld %ld %ld", rec->FileIndex, rec->Stream, rec->data_len)) {
250          Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
251          Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
252             fd->bstrerror());
253          return false;
254       }
255    }
256
257    Dmsg1(400, "FI=%d\n", rec->FileIndex);
258    /* Send data record to File daemon */
259    save_msg = fd->msg;          /* save fd message pointer */
260    fd->msg = rec->data;         /* pass data directly to the FD */
261    fd->msglen = rec->data_len;
262    jcr->JobBytes += rec->data_len;   /* increment bytes this job */
263    Dmsg1(400, ">filed: send %d bytes data.\n", fd->msglen);
264    if (!fd->send()) {
265       Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
266       Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
267          fd->bstrerror());
268       ok = false;
269    }
270    fd->msg = save_msg;                /* restore fd message pointer */
271
272    Dmsg5(500, "wrote_record JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
273       jcr->JobId,
274       FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
275       stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
276
277    return ok;
278 }