]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/read.c
df3f13d2d9214d0e132edce66d0b36895a7675b1
[bacula/bacula] / bacula / src / stored / read.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Read code for Storage daemon
21  *
22  *     Kern Sibbald, November MM
23  *
24  */
25
26 #include "bacula.h"
27 #include "stored.h"
28
29 /* Forward referenced subroutines */
30 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec);
31 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec);
32
33 /* Responses sent to the File daemon */
34 static char OK_data[]    = "3000 OK data\n";
35 static char FD_error[]   = "3000 error\n";
36 static char rec_header[] = "rechdr %ld %ld %ld %ld %ld";
37
38 /*
39  *  Read Data and send to File Daemon
40  *   Returns: false on failure
41  *            true  on success
42  */
43 bool do_read_data(JCR *jcr)
44 {
45    BSOCK *fd = jcr->file_bsock;
46    bool ok = true;
47    DCR *dcr = jcr->read_dcr;
48    char ec[50];
49
50    Dmsg0(100, "Start read data.\n");
51
52    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
53       return false;
54    }
55
56    if (jcr->NumReadVolumes == 0) {
57       Jmsg(jcr, M_FATAL, 0, _("No Volume names found for restore.\n"));
58       fd->fsend(FD_error);
59       return false;
60    }
61
62    Dmsg2(200, "Found %d volumes names to restore. First=%s\n", jcr->NumReadVolumes,
63       jcr->VolList->VolumeName);
64
65    /* Ready device for reading */
66    if (!acquire_device_for_read(dcr)) {
67       fd->fsend(FD_error);
68       return false;
69    }
70    dcr->dev->start_of_job(dcr);
71
72    /* Tell File daemon we will send data */
73    if (!jcr->is_ok_data_sent) {
74       fd->fsend(OK_data);
75       jcr->is_ok_data_sent = true;
76    }
77
78    jcr->sendJobStatus(JS_Running);
79    jcr->run_time = time(NULL);
80    jcr->JobFiles = 0;
81
82    if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY)) {
83       ok = read_records(dcr, mac_record_cb, mount_next_read_volume);
84    } else {
85       ok = read_records(dcr, read_record_cb, mount_next_read_volume);
86    }
87
88    /*
89     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
90     *   and the subsequent Jmsg() editing will break
91     */
92    int32_t job_elapsed = time(NULL) - jcr->run_time;
93
94    if (job_elapsed <= 0) {
95       job_elapsed = 1;
96    }
97
98    Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
99          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
100          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
101
102    /* Send end of data to FD */
103    fd->signal(BNET_EOD);
104
105    if (!release_device(jcr->read_dcr)) {
106       ok = false;
107    }
108
109    Dmsg0(30, "Done reading.\n");
110    return ok;
111 }
112
113 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec)
114 {
115    JCR *jcr = dcr->jcr;
116    BSOCK *fd = jcr->file_bsock;
117    bool ok = true;
118    POOLMEM *save_msg;
119    char ec1[50], ec2[50];
120    POOLMEM *wbuf = rec->data;                 /* send buffer */
121    uint32_t wsize = rec->data_len;            /* send size */
122
123    if (rec->FileIndex < 0) {
124       return true;
125    }
126
127    Dmsg5(400, "Send to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%d\n",
128       rec->VolSessionId, rec->VolSessionTime,
129       FI_to_ascii(ec1, rec->FileIndex),
130       stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
131       wsize);
132
133    Dmsg2(640, ">filed: send header stream=0x%lx len=%ld\n", rec->Stream, wsize);
134    /* Send record header to File daemon */
135    if (!fd->fsend(rec_header, rec->VolSessionId, rec->VolSessionTime,
136           rec->FileIndex, rec->Stream, wsize)) {
137       Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
138       Jmsg1(jcr, M_FATAL, 0, _("Error sending header to Client. ERR=%s\n"),
139          fd->bstrerror());
140       return false;
141    }
142    /*
143     * For normal migration jobs, FileIndex values are sequential because
144     *  we are dealing with one job.  However, for Vbackup (consolidation),
145     *  we will be getting records from multiple jobs and writing them back
146     *  out, so we need to ensure that the output FileIndex is sequential.
147     *  We do so by detecting a FileIndex change and incrementing the
148     *  JobFiles, which we then use as the output FileIndex.
149     */
150    if (rec->FileIndex >= 0) {
151       /* If something changed, increment FileIndex */
152       if (rec->VolSessionId != rec->last_VolSessionId ||
153           rec->VolSessionTime != rec->last_VolSessionTime ||
154           rec->FileIndex != rec->last_FileIndex) {
155          jcr->JobFiles++;
156          rec->last_VolSessionId = rec->VolSessionId;
157          rec->last_VolSessionTime = rec->VolSessionTime;
158          rec->last_FileIndex = rec->FileIndex;
159       }
160    }
161
162    /* Debug code: check if we must hangup or blowup */
163    if (handle_hangup_blowup(jcr, jcr->JobFiles, jcr->JobBytes)) {
164       return false;
165    }
166
167    save_msg = fd->msg;          /* save fd message pointer */
168    fd->msg = wbuf;
169    fd->msglen = wsize;
170    /* Send data record to File daemon */
171    jcr->JobBytes += wsize;   /* increment bytes this job */
172    Dmsg1(640, ">filed: send %d bytes data.\n", fd->msglen);
173    if (!fd->send()) {
174       Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
175       Jmsg1(jcr, M_FATAL, 0, _("Error sending data to Client. ERR=%s\n"),
176          fd->bstrerror());
177       ok = false;
178    }
179    fd->msg = save_msg;
180    return ok;
181 }
182
183 /*
184  * New routine after to SD->SD implementation
185  * Called here for each record from read_records()
186  *  Returns: true if OK
187  *           false if error
188  */
189 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec)
190 {
191    JCR *jcr = dcr->jcr;
192    BSOCK *fd = jcr->file_bsock;
193    char buf1[100], buf2[100];
194    bool new_header = false;
195    POOLMEM *save_msg;
196    char ec1[50], ec2[50];
197    bool ok = true;
198    POOLMEM *wbuf = rec->data;;                 /* send buffer */
199    uint32_t wsize = rec->data_len;             /* send size */
200
201 #ifdef xxx
202    Pmsg5(000, "on entry     JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
203       jcr->JobId,
204       FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
205       stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
206 #endif
207
208    /* If label and not for us, discard it */
209    if (rec->FileIndex < 0) {
210       Dmsg1(100, "FileIndex=%d\n", rec->FileIndex);
211       return true;
212    }
213
214    /*
215     * For normal migration jobs, FileIndex values are sequential because
216     *  we are dealing with one job.  However, for Vbackup (consolidation),
217     *  we will be getting records from multiple jobs and writing them back
218     *  out, so we need to ensure that the output FileIndex is sequential.
219     *  We do so by detecting a FileIndex change and incrementing the
220     *  JobFiles, which we then use as the output FileIndex.
221     */
222    if (rec->FileIndex >= 0) {
223       /* If something changed, increment FileIndex */
224       if (rec->VolSessionId != rec->last_VolSessionId ||
225           rec->VolSessionTime != rec->last_VolSessionTime ||
226           rec->FileIndex != rec->last_FileIndex ||
227           rec->Stream != rec->last_Stream) {
228
229          /* Something changed */
230          if (rec->last_VolSessionId != 0) {        /* Not first record */
231             Dmsg1(200, "Send EOD jobfiles=%d\n", jcr->JobFiles);
232             if (!fd->signal(BNET_EOD)) {  /* End of previous stream */
233                Jmsg(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
234                         fd->bstrerror());
235                return false;
236             }
237          }
238          new_header = true;
239          if (rec->FileIndex != rec->last_FileIndex) {
240             jcr->JobFiles++;
241          }
242          rec->last_VolSessionId = rec->VolSessionId;
243          rec->last_VolSessionTime = rec->VolSessionTime;
244          rec->last_FileIndex = rec->FileIndex;
245          rec->last_Stream = rec->Stream;
246       }
247       rec->FileIndex = jcr->JobFiles;     /* set sequential output FileIndex */
248    }
249
250    if (new_header) {
251       new_header = false;
252       Dmsg5(400, "Send header to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%ld\n",
253          rec->VolSessionId, rec->VolSessionTime,
254          FI_to_ascii(ec1, rec->FileIndex),
255          stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
256          wsize);
257
258       /* Send data header to File daemon */
259       if (!fd->fsend("%ld %ld %ld", rec->FileIndex, rec->Stream, wsize)) {
260          Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
261          Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
262             fd->bstrerror());
263          return false;
264       }
265    }
266
267    Dmsg1(400, "FI=%d\n", rec->FileIndex);
268    /* Send data record to File daemon */
269    save_msg = fd->msg;          /* save fd message pointer */
270    fd->msg = wbuf;         /* pass data directly to the FD */
271    fd->msglen = wsize;
272    jcr->JobBytes += wsize;   /* increment bytes this job */
273    Dmsg1(400, ">filed: send %d bytes data.\n", fd->msglen);
274    if (!fd->send()) {
275       Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
276       Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
277          fd->bstrerror());
278       ok = false;
279    }
280    fd->msg = save_msg;                /* restore fd message pointer */
281
282    Dmsg5(500, "wrote_record JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
283       jcr->JobId,
284       FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
285       stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
286
287    return ok;
288 }