]> git.sur5r.net Git - bacula/bacula/blob - bacula/patches/testing/blastattributes.patch
ebl Change Win32 by Win64 in status
[bacula/bacula] / bacula / patches / testing / blastattributes.patch
1 Index: src/dird/catreq.c
2 ===================================================================
3 --- src/dird/catreq.c   (révision 8270)
4 +++ src/dird/catreq.c   (copie de travail)
5 @@ -357,13 +357,11 @@
6  }
7  
8  /*
9 - * Update File Attributes in the catalog with data
10 - *  sent by the Storage daemon.  Note, we receive the whole
11 - *  attribute record, but we select out only the stat packet,
12 - *  VolSessionId, VolSessionTime, FileIndex, file type, and 
13 - *  file name to store in the catalog.
14 + * Note, we receive the whole attribute record, but we select out only the stat
15 + * packet, VolSessionId, VolSessionTime, FileIndex, file type, and file name to
16 + * store in the catalog.
17   */
18 -void catalog_update(JCR *jcr, BSOCK *bs)
19 +static void update_attribute(JCR *jcr, char *msg, int32_t msglen)
20  {
21     unser_declare;
22     uint32_t VolSessionId, VolSessionTime;
23 @@ -375,20 +373,7 @@
24     int len;
25     char *fname, *attr;
26     ATTR_DBR *ar = NULL;
27 -   POOLMEM *omsg;
28  
29 -   if (job_canceled(jcr) || !jcr->pool->catalog_files) {
30 -      goto bail_out;                  /* user disabled cataloging */
31 -   }
32 -   if (!jcr->db) {
33 -      omsg = get_memory(bs->msglen+1);
34 -      pm_strcpy(omsg, bs->msg);
35 -      bs->fsend(_("1994 Invalid Catalog Update: %s"), omsg);    
36 -      Jmsg1(jcr, M_FATAL, 0, _("Invalid Catalog Update; DB not open: %s"), omsg);
37 -      free_memory(omsg);
38 -      goto bail_out;
39 -   }
40 -
41     /* Start transaction allocates jcr->attr and jcr->ar if needed */
42     db_start_transaction(jcr, jcr->db);     /* start transaction if not already open */
43     ar = jcr->ar;      
44 @@ -397,7 +382,7 @@
45      *  there may be a cached attr so we cannot yet write into
46      *  jcr->attr or jcr->ar  
47      */
48 -   p = bs->msg;
49 +   p = msg;
50     skip_nonspaces(&p);                /* UpdCat */
51     skip_spaces(&p);
52     skip_nonspaces(&p);                /* Job=nnn */
53 @@ -412,7 +397,7 @@
54     unser_uint32(data_len);
55     p += unser_length(p);
56  
57 -   Dmsg1(400, "UpdCat msg=%s\n", bs->msg);
58 +   Dmsg1(400, "UpdCat msg=%s\n", msg);
59     Dmsg5(400, "UpdCat VolSessId=%d VolSessT=%d FI=%d Strm=%d data_len=%d\n",
60        VolSessionId, VolSessionTime, FileIndex, Stream, data_len);
61  
62 @@ -424,9 +409,9 @@
63           }
64        }
65        /* Any cached attr is flushed so we can reuse jcr->attr and jcr->ar */
66 -      jcr->attr = check_pool_memory_size(jcr->attr, bs->msglen);
67 -      memcpy(jcr->attr, bs->msg, bs->msglen);
68 -      p = jcr->attr - bs->msg + p;    /* point p into jcr->attr */
69 +      jcr->attr = check_pool_memory_size(jcr->attr, msglen);
70 +      memcpy(jcr->attr, msg, msglen);
71 +      p = jcr->attr - msg + p;    /* point p into jcr->attr */
72        skip_nonspaces(&p);             /* skip FileIndex */
73        skip_spaces(&p);
74        filetype = str_to_int32(p);     /* TODO: choose between unserialize and str_to_int32 */
75 @@ -510,8 +495,110 @@
76           }
77        }
78     }
79 +}
80 +
81 +/*
82 + * Update File Attributes in the catalog with data
83 + *  sent by the Storage daemon.
84 + */
85 +void catalog_update(JCR *jcr, BSOCK *bs)
86 +{
87 +   POOLMEM *omsg;
88 +
89 +   if (job_canceled(jcr) || !jcr->pool->catalog_files) {
90 +      goto bail_out;                  /* user disabled cataloging */
91 +   }
92 +   if (!jcr->db) {
93 +      omsg = get_memory(bs->msglen+1);
94 +      pm_strcpy(omsg, bs->msg);
95 +      bs->fsend(_("1994 Invalid Catalog Update: %s"), omsg);    
96 +      Jmsg1(jcr, M_FATAL, 0, _("Invalid Catalog Update; DB not open: %s"), omsg);
97 +      free_memory(omsg);
98 +      goto bail_out;
99 +
100 +   }
101 +
102 +   update_attribute(jcr, bs->msg, bs->msglen);
103 +
104  bail_out:
105     if (job_canceled(jcr)) {
106        cancel_storage_daemon_job(jcr);
107     }
108  }
109 +
110 +/*
111 + * Update File Attributes in the catalog with data read from
112 + * the storage daemon spool file. We receive the filename and
113 + * we try to read it.
114 + */
115 +bool despool_attributes_from_file(JCR *jcr, const char *file)
116 +{
117 +   bool ret=false;
118 +   int32_t pktsiz;
119 +   size_t nbytes;
120 +   ssize_t last = 0, size = 0;
121 +   int count = 0;
122 +   int32_t msglen;                    /* message length */
123 +   POOLMEM *msg = get_pool_memory(PM_MESSAGE);
124 +   FILE *spool_fd=NULL;
125 +
126 +   Dmsg0(0, "Begin despool_attributes_from_file\n");
127 +
128 +   if (job_canceled(jcr) || !jcr->pool->catalog_files || !jcr->db) {
129 +      goto bail_out;                  /* user disabled cataloging */
130 +   }
131 +
132 +   spool_fd = fopen(file, "rb");
133 +   if (!spool_fd) {
134 +      Dmsg0(0, "cancel despool_attributes_from_file\n");
135 +      /* send an error message */
136 +      goto bail_out;
137 +   }
138 +#if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
139 +   posix_fadvise(fileno(spool_fd), 0, 0, POSIX_FADV_WILLNEED);
140 +#endif
141 +
142 +   while (fread((char *)&pktsiz, 1, sizeof(int32_t), spool_fd) ==
143 +          sizeof(int32_t)) {
144 +      size += sizeof(int32_t);
145 +      msglen = ntohl(pktsiz);
146 +      if (msglen > 0) {
147 +         if (msglen > (int32_t) sizeof_pool_memory(msg)) {
148 +            msg = realloc_pool_memory(msg, msglen + 1);
149 +         }
150 +         nbytes = fread(msg, 1, msglen, spool_fd);
151 +         if (nbytes != (size_t) msglen) {
152 +            berrno be;
153 +            Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen);
154 +            Qmsg1(jcr, M_FATAL, 0, _("fread attr spool error. ERR=%s\n"),
155 +                  be.bstrerror());
156 +            goto bail_out;
157 +         }
158 +         size += nbytes;
159 +         if ((++count & 0x3F) == 0) {
160 +            last = size;
161 +         }
162 +      }
163 +      update_attribute(jcr, msg, msglen);
164 +   }
165 +   if (ferror(spool_fd)) {
166 +      berrno be;
167 +      Qmsg1(jcr, M_FATAL, 0, _("fread attr spool error. ERR=%s\n"),
168 +            be.bstrerror());
169 +      goto bail_out;
170 +   }
171 +   ret = true;
172 +
173 +bail_out:
174 +   if (spool_fd) {
175 +      fclose(spool_fd);
176 +   }
177 +
178 +   if (job_canceled(jcr)) {
179 +      cancel_storage_daemon_job(jcr);
180 +   }
181 +
182 +   free_pool_memory(msg);
183 +   Dmsg1(0, "End despool_attributes_from_file ret=%i\n", ret);
184 +   return ret;
185 +}
186 Index: src/dird/getmsg.c
187 ===================================================================
188 --- src/dird/getmsg.c   (révision 8270)
189 +++ src/dird/getmsg.c   (copie de travail)
190 @@ -135,7 +135,7 @@
191  
192     for (;;) {
193        n = bs->recv();
194 -      Dmsg2(300, "bget_dirmsg %d: %s\n", n, bs->msg);
195 +      Dmsg2(100, "bget_dirmsg %d: %s\n", n, bs->msg);
196  
197        if (is_bnet_stop(bs)) {
198           return n;                    /* error or terminate */
199 @@ -236,6 +236,22 @@
200           catalog_update(jcr, bs);
201           continue;
202        }
203 +      if (bs->msg[0] == 'B') {        /* SD sending file spool attributes */
204 +         Dmsg2(0, "Blast attributes jcr 0x%x: %s", jcr, bs->msg);
205 +         char filename[MAX_NAME_LENGTH];
206 +         if (sscanf(bs->msg, "BlastAttr Job=%127s File=%127s", 
207 +                    Job, filename) != 2) {
208 +            Jmsg1(jcr, M_ERROR, 0, _("Malformed message: %s\n"), bs->msg);
209 +            continue;
210 +         }
211 +         unbash_spaces(filename);
212 +         if (despool_attributes_from_file(jcr, filename)) {
213 +            bs->fsend("1000 OK BlastAttr\n");
214 +         } else {
215 +            bs->fsend("1990 ERROR BlastAttr\n");
216 +         }
217 +         continue;
218 +      }
219        if (bs->msg[0] == 'M') {        /* Mount request */
220           Dmsg1(900, "Mount req: %s", bs->msg);
221           mount_request(jcr, bs, msg);
222 Index: src/dird/protos.h
223 ===================================================================
224 --- src/dird/protos.h   (révision 8270)
225 +++ src/dird/protos.h   (copie de travail)
226 @@ -80,6 +80,7 @@
227  /* catreq.c */
228  extern void catalog_request(JCR *jcr, BSOCK *bs);
229  extern void catalog_update(JCR *jcr, BSOCK *bs);
230 +extern bool despool_attributes_from_file(JCR *jcr, const char *file);
231  
232  /* dird_conf.c */
233  extern const char *level_to_str(int level);
234 Index: src/stored/spool.c
235 ===================================================================
236 --- src/stored/spool.c  (révision 8270)
237 +++ src/stored/spool.c  (copie de travail)
238 @@ -622,13 +622,40 @@
239     V(mutex);
240  }
241  
242 +static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
243 +{
244 +   Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
245 +      jcr->Job, fd);
246 +}
247 +
248 +static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
249 +{
250 +   /* send full spool file name */
251 +   POOLMEM *name  = get_pool_memory(PM_MESSAGE);
252 +   make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
253 +   bash_spaces(name);
254 +   jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n",
255 +                         jcr->Job, name);
256 +   free_pool_memory(name);
257 +   
258 +   if (jcr->dir_bsock->recv() <= 0) {
259 +      Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
260 +      return false;
261 +   }
262 +   
263 +   if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
264 +      return false;
265 +   }
266 +   return true;
267 +}
268 +
269  bool commit_attribute_spool(JCR *jcr)
270  {
271     boffset_t size;
272     char ec1[30];
273     char tbuf[100];
274  
275 -   Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
276 +   Dmsg1(0, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
277           ( utime_t)time(NULL)));
278     if (are_attributes_spooled(jcr)) {
279        if (fseeko(jcr->dir_bsock->m_spool_fd, 0, SEEK_END) != 0) {
280 @@ -654,7 +681,13 @@
281        dir_send_job_status(jcr);
282        Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
283              edit_uint64_with_commas(size, ec1));
284 -      jcr->dir_bsock->despool(update_attr_spool_size, size);
285 +
286 +      if (!blast_attr_spool_file(jcr, size)) {
287 +         /* Can't read spool file from director side,
288 +          * send content over network.
289 +          */
290 +         jcr->dir_bsock->despool(update_attr_spool_size, size);
291 +      }
292        return close_attr_spool_file(jcr, jcr->dir_bsock);
293     }
294     return true;
295 @@ -664,13 +697,6 @@
296     return false;
297  }
298  
299 -static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
300 -{
301 -   Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
302 -      jcr->Job, fd);
303 -}
304 -
305 -
306  bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
307  {
308     POOLMEM *name  = get_pool_memory(PM_MESSAGE);