]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/read.c
329cd599eb41a8310b30b93a5ef68b8e7cbd121e
[bacula/bacula] / bacula / src / stored / read.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5    Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
6
7    The original author of Bacula is Kern Sibbald, with contributions
8    from many others, a complete list can be found in the file AUTHORS.
9
10    You may use this file and others of this release according to the
11    license defined in the LICENSE file, which includes the Affero General
12    Public License, v3.0 ("AGPLv3") and some additional permissions and
13    terms pursuant to its AGPLv3 Section 7.
14
15    This notice must be preserved when any source code is 
16    conveyed and/or propagated.
17
18    Bacula(R) is a registered trademark of Kern Sibbald.
19 */
20 /*
21  * Read code for Storage daemon
22  *
23  *     Kern Sibbald, November MM
24  *
25  */
26
27 #include "bacula.h"
28 #include "stored.h"
29
30 /* Forward referenced subroutines */
31 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec);
32 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec);
33
34 /* Responses sent to the File daemon */
35 static char OK_data[]    = "3000 OK data\n";
36 static char FD_error[]   = "3000 error\n";
37 static char rec_header[] = "rechdr %ld %ld %ld %ld %ld";
38
39 /*
40  *  Read Data and send to File Daemon
41  *   Returns: false on failure
42  *            true  on success
43  */
44 bool do_read_data(JCR *jcr)
45 {
46    BSOCK *fd = jcr->file_bsock;
47    bool ok = true;
48    DCR *dcr = jcr->read_dcr;
49    char ec[50];
50
51    Dmsg0(100, "Start read data.\n");
52
53    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
54       return false;
55    }
56
57    if (jcr->NumReadVolumes == 0) {
58       Jmsg(jcr, M_FATAL, 0, _("No Volume names found for restore.\n"));
59       fd->fsend(FD_error);
60       return false;
61    }
62
63    Dmsg2(200, "Found %d volumes names to restore. First=%s\n", jcr->NumReadVolumes,
64       jcr->VolList->VolumeName);
65
66    /* Ready device for reading */
67    if (!acquire_device_for_read(dcr)) {
68       fd->fsend(FD_error);
69       return false;
70    }
71
72    /* Tell File daemon we will send data */
73    if (!jcr->is_ok_data_sent) {
74       /* OK_DATA can have been already sent for copy/migrate by run_job() to avoid dead lock*/
75       fd->fsend(OK_data);
76       jcr->is_ok_data_sent = true;
77    }
78
79    jcr->sendJobStatus(JS_Running);
80    jcr->run_time = time(NULL);
81    jcr->JobFiles = 0;
82
83    if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY)) {
84       ok = read_records(dcr, mac_record_cb, mount_next_read_volume);
85    } else {
86       ok = read_records(dcr, read_record_cb, mount_next_read_volume);
87    }
88
89    /*
90     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
91     *   and the subsequent Jmsg() editing will break
92     */
93    int32_t job_elapsed = time(NULL) - jcr->run_time;
94
95    if (job_elapsed <= 0) {
96       job_elapsed = 1;
97    }
98
99    Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
100          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
101          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
102
103    /* Send end of data to FD */
104    fd->signal(BNET_EOD);
105
106    if (!release_device(jcr->read_dcr)) {
107       ok = false;
108    }
109
110    Dmsg0(30, "Done reading.\n");
111    return ok;
112 }
113
114 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec)
115 {
116    JCR *jcr = dcr->jcr;
117    BSOCK *fd = jcr->file_bsock;
118    bool ok = true;
119    POOLMEM *save_msg;
120    char ec1[50], ec2[50];
121    POOLMEM *wbuf = rec->data;                 /* send buffer */
122    uint32_t wsize = rec->data_len;            /* send size */
123
124    if (rec->FileIndex < 0) {
125       return true;
126    }
127
128    Dmsg5(400, "Send to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%d\n",
129       rec->VolSessionId, rec->VolSessionTime,
130       FI_to_ascii(ec1, rec->FileIndex),
131       stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
132       wsize);
133
134    /* Send record header to File daemon */
135    if (!fd->fsend(rec_header, rec->VolSessionId, rec->VolSessionTime,
136           rec->FileIndex, rec->Stream, wsize)) {
137       Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
138       Jmsg1(jcr, M_FATAL, 0, _("Error sending header to Client. ERR=%s\n"),
139          fd->bstrerror());
140       return false;
141    }
142    /*
143     * For normal migration jobs, FileIndex values are sequential because
144     *  we are dealing with one job.  However, for Vbackup (consolidation),
145     *  we will be getting records from multiple jobs and writing them back
146     *  out, so we need to ensure that the output FileIndex is sequential.
147     *  We do so by detecting a FileIndex change and incrementing the
148     *  JobFiles, which we then use as the output FileIndex.
149     */
150    if (rec->FileIndex >= 0) {
151       /* If something changed, increment FileIndex */
152       if (rec->VolSessionId != rec->last_VolSessionId ||
153           rec->VolSessionTime != rec->last_VolSessionTime ||
154           rec->FileIndex != rec->last_FileIndex) {
155          jcr->JobFiles++;
156          rec->last_VolSessionId = rec->VolSessionId;
157          rec->last_VolSessionTime = rec->VolSessionTime;
158          rec->last_FileIndex = rec->FileIndex;
159       }
160    }
161
162    /* Debug code: check if we must hangup */
163    if (get_hangup() > 0 && (jcr->JobFiles > (uint32_t)get_hangup())) {
164       jcr->setJobStatus(JS_Incomplete);
165       Jmsg1(jcr, M_FATAL, 0, "Debug hangup requested after %d files.\n",
166          get_hangup());
167       set_hangup(0);
168       return false;
169    }
170    if (get_blowup() > 0 && (jcr->JobFiles > (uint32_t)get_blowup())) {
171       Jmsg1(jcr, M_ABORT, 0, "Debug blowup() requested after %d files.\n",
172          get_blowup());
173       return false;
174    }
175
176    save_msg = fd->msg;          /* save fd message pointer */
177    fd->msg = wbuf;
178    fd->msglen = wsize;
179    /* Send data record to File daemon */
180    jcr->JobBytes += wsize;   /* increment bytes this job */
181    if (!fd->send()) {
182       Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
183       Jmsg1(jcr, M_FATAL, 0, _("Error sending data to Client. ERR=%s\n"),
184          fd->bstrerror());
185       ok = false;
186    }
187    fd->msg = save_msg;
188    return ok;
189 }
190
191 /*
192  * New routine after to SD->SD implementation
193  * Called here for each record from read_records()
194  *  Returns: true if OK
195  *           false if error
196  */
197 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec)
198 {
199    JCR *jcr = dcr->jcr;
200    BSOCK *fd = jcr->file_bsock;
201    char buf1[100], buf2[100];
202    bool new_header = false;
203    POOLMEM *save_msg;
204    char ec1[50], ec2[50];
205    bool ok = true;
206    POOLMEM *wbuf = rec->data;;                 /* send buffer */
207    uint32_t wsize = rec->data_len;             /* send size */
208
209 #ifdef xxx
210    Pmsg5(000, "on entry     JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
211       jcr->JobId,
212       FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
213       stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
214 #endif
215
216    /* If label and not for us, discard it */
217    if (rec->FileIndex < 0) {
218       Dmsg1(100, "FileIndex=%d\n", rec->FileIndex);
219       return true;
220    }
221
222    /*
223     * For normal migration jobs, FileIndex values are sequential because
224     *  we are dealing with one job.  However, for Vbackup (consolidation),
225     *  we will be getting records from multiple jobs and writing them back
226     *  out, so we need to ensure that the output FileIndex is sequential.
227     *  We do so by detecting a FileIndex change and incrementing the
228     *  JobFiles, which we then use as the output FileIndex.
229     */
230    if (rec->FileIndex >= 0) {
231       /* If something changed, increment FileIndex */
232       if (rec->VolSessionId != rec->last_VolSessionId ||
233           rec->VolSessionTime != rec->last_VolSessionTime ||
234           rec->FileIndex != rec->last_FileIndex ||
235           rec->Stream != rec->last_Stream) {
236
237          /* Something changed */
238          if (rec->last_VolSessionId != 0) {        /* Not first record */
239             Dmsg1(200, "Send EOD jobfiles=%d\n", jcr->JobFiles);
240             if (!fd->signal(BNET_EOD)) {  /* End of previous stream */
241                Jmsg(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
242                         fd->bstrerror());
243                return false;
244             }
245          }
246          new_header = true;
247          if (rec->FileIndex != rec->last_FileIndex) {
248             jcr->JobFiles++;
249          }
250          rec->last_VolSessionId = rec->VolSessionId;
251          rec->last_VolSessionTime = rec->VolSessionTime;
252          rec->last_FileIndex = rec->FileIndex;
253          rec->last_Stream = rec->Stream;
254       }
255       rec->FileIndex = jcr->JobFiles;     /* set sequential output FileIndex */
256    }
257
258    if (new_header) {
259       new_header = false;
260       Dmsg5(400, "Send header to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%ld\n",
261          rec->VolSessionId, rec->VolSessionTime,
262          FI_to_ascii(ec1, rec->FileIndex),
263          stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
264          wsize);
265
266       /* Send data header to File daemon */
267       if (!fd->fsend("%ld %ld %ld", rec->FileIndex, rec->Stream, wsize)) {
268          Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
269          Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
270             fd->bstrerror());
271          return false;
272       }
273    }
274
275    Dmsg1(400, "FI=%d\n", rec->FileIndex);
276    /* Send data record to File daemon */
277    save_msg = fd->msg;          /* save fd message pointer */
278    fd->msg = wbuf;         /* pass data directly to the FD */
279    fd->msglen = wsize;
280    jcr->JobBytes += wsize;   /* increment bytes this job */
281    Dmsg1(400, ">filed: send %d bytes data.\n", fd->msglen);
282    if (!fd->send()) {
283       Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
284       Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
285          fd->bstrerror());
286       ok = false;
287    }
288    fd->msg = save_msg;                /* restore fd message pointer */
289
290    Dmsg5(500, "wrote_record JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
291       jcr->JobId,
292       FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
293       stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
294
295    return ok;
296 }