]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
42dd9446e57d853cd982f621078a7ce720f7b8b4
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Copyright (C) 2000-2006 Kern Sibbald
20
21    This program is free software; you can redistribute it and/or
22    modify it under the terms of the GNU General Public License
23    version 2 as amended with additional clauses defined in the
24    file LICENSE in the main source directory.
25
26    This program is distributed in the hope that it will be useful,
27    but WITHOUT ANY WARRANTY; without even the implied warranty of
28    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
29    the file LICENSE for additional details.
30
31  */
32
33 #include "bacula.h"
34 #include "dird.h"
35
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
37
38 /* Commands sent to Storage daemon */
39 static char jobcmd[]     = "JobId=%d job=%s job_name=%s client_name=%s "
40    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
41    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
42 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
43    "pool_type=%s append=%d copy=%d stripe=%d\n";
44 static char use_device[] = "use device=%s\n";
45 //static char query_device[] = _("query device=%s");
46
47 /* Response from Storage daemon */
48 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
49 static char OK_device[]  = "3000 OK use device device=%s\n";
50
51 /* Storage Daemon requests */
52 static char Job_start[]  = "3010 Job %127s start\n";
53 static char Job_end[]    =
54    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld\n";
55
56 /* Forward referenced functions */
57 extern "C" void *msg_thread(void *arg);
58
59 /*
60  * Establish a message channel connection with the Storage daemon
61  * and perform authentication.
62  */
63 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
64                               int max_retry_time, int verbose)
65 {
66    BSOCK *sd;
67    STORE *store;
68
69    if (jcr->store_bsock) {
70       return true;                    /* already connected */
71    }
72    store = (STORE *)jcr->storage->first();
73
74    /*
75     *  Open message channel with the Storage daemon
76     */
77    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
78       store->SDport);
79    sd = bnet_connect(jcr, retry_interval, max_retry_time,
80           _("Storage daemon"), store->address,
81           NULL, store->SDport, verbose);
82    if (sd == NULL) {
83       return false;
84    }
85    sd->res = (RES *)store;        /* save pointer to other end */
86    jcr->store_bsock = sd;
87
88    if (!authenticate_storage_daemon(jcr, store)) {
89       bnet_close(sd);
90       jcr->store_bsock = NULL;
91       return false;
92    }
93    return true;
94 }
95
96 /*
97  * Here we ask the SD to send us the info for a 
98  *  particular device resource.
99  */
100 #ifdef needed
101 bool update_device_res(JCR *jcr, DEVICE *dev)
102 {
103    POOL_MEM device_name; 
104    BSOCK *sd;
105    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
106       return false;
107    }
108    sd = jcr->store_bsock;
109    pm_strcpy(device_name, dev->hdr.name);
110    bash_spaces(device_name);
111    bnet_fsend(sd, query_device, device_name.c_str());
112    Dmsg1(100, ">stored: %s\n", sd->msg);
113    /* The data is returned through Device_update */
114    if (bget_dirmsg(sd) <= 0) {
115       return false;
116    }
117    return true;
118 }
119 #endif
120
121 /*
122  * Start a job with the Storage daemon
123  */
124 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
125 {
126    bool ok = true;
127    STORE *storage;
128    BSOCK *sd;
129    char auth_key[100];
130    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
131    int copy = 0;
132    int stripe = 0;
133
134    sd = jcr->store_bsock;
135    /*
136     * Now send JobId and permissions, and get back the authorization key.
137     */
138    bash_spaces(jcr->job->hdr.name);
139    bash_spaces(jcr->client->hdr.name);
140    bash_spaces(jcr->fileset->hdr.name);
141    if (jcr->fileset->MD5[0] == 0) {
142       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
143    }
144    bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
145               jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
146               jcr->fileset->hdr.name, !jcr->pool->catalog_files,
147               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
148               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
149    Dmsg1(100, ">stored: %s\n", sd->msg);
150    unbash_spaces(jcr->job->hdr.name);
151    unbash_spaces(jcr->client->hdr.name);
152    unbash_spaces(jcr->fileset->hdr.name);
153    if (bget_dirmsg(sd) > 0) {
154        Dmsg1(100, "<stored: %s", sd->msg);
155        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
156                   &jcr->VolSessionTime, &auth_key) != 3) {
157           Dmsg1(100, "BadJob=%s\n", sd->msg);
158           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
159           return 0;
160        } else {
161           jcr->sd_auth_key = bstrdup(auth_key);
162           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
163        }
164    } else {
165       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
166          bnet_strerror(sd));
167       return 0;
168    }
169
170    pm_strcpy(pool_type, jcr->pool->pool_type);
171    pm_strcpy(pool_name, jcr->pool->hdr.name);
172    bash_spaces(pool_type);
173    bash_spaces(pool_name);
174
175    /*
176     * We have two loops here. The first comes from the 
177     *  Storage = associated with the Job, and we need 
178     *  to attach to each one.
179     * The inner loop loops over all the alternative devices
180     *  associated with each Storage. It selects the first
181     *  available one.
182     *
183     */
184    /* Do read side of storage daemon */
185    if (ok && rstore) {
186       foreach_alist(storage, rstore) {
187          pm_strcpy(store_name, storage->hdr.name);
188          bash_spaces(store_name);
189          pm_strcpy(media_type, storage->media_type);
190          bash_spaces(media_type);
191          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
192                     pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
193
194          DEVICE *dev;
195          /* Loop over alternative storage Devices until one is OK */
196          foreach_alist(dev, storage->device) {
197             pm_strcpy(device_name, dev->hdr.name);
198             bash_spaces(device_name);
199             bnet_fsend(sd, use_device, device_name.c_str());
200             Dmsg1(100, ">stored: %s", sd->msg);
201          }
202          bnet_sig(sd, BNET_EOD);            /* end of Devices */
203       }
204       bnet_sig(sd, BNET_EOD);            /* end of Storages */
205       if (bget_dirmsg(sd) > 0) {
206          Dmsg1(100, "<stored: %s", sd->msg);
207          /* ****FIXME**** save actual device name */
208          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
209       } else {
210          POOL_MEM err_msg;
211          pm_strcpy(err_msg, sd->msg); /* save message */
212          Jmsg(jcr, M_FATAL, 0, _("\n"
213             "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
214             device_name.c_str(), err_msg.c_str()/* sd->msg */);
215          ok = false;
216       }
217    }
218
219    /* Do write side of storage daemon */
220    if (ok && wstore) {
221       foreach_alist(storage, wstore) {
222          pm_strcpy(store_name, storage->hdr.name);
223          bash_spaces(store_name);
224          pm_strcpy(media_type, storage->media_type);
225          bash_spaces(media_type);
226          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
227                     pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
228
229          DEVICE *dev;
230          /* Loop over alternative storage Devices until one is OK */
231          foreach_alist(dev, storage->device) {
232             pm_strcpy(device_name, dev->hdr.name);
233             bash_spaces(device_name);
234             bnet_fsend(sd, use_device, device_name.c_str());
235             Dmsg1(100, ">stored: %s", sd->msg);
236          }
237          bnet_sig(sd, BNET_EOD);            /* end of Devices */
238       }
239       bnet_sig(sd, BNET_EOD);            /* end of Storages */
240       if (bget_dirmsg(sd) > 0) {
241          Dmsg1(100, "<stored: %s", sd->msg);
242          /* ****FIXME**** save actual device name */
243          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
244       } else {
245          POOL_MEM err_msg;
246          pm_strcpy(err_msg, sd->msg); /* save message */
247          Jmsg(jcr, M_FATAL, 0, _("\n"
248             "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
249             device_name.c_str(), err_msg.c_str()/* sd->msg */);
250          ok = false;
251       }
252    }
253    return ok;
254 }
255
256 /*
257  * Start a thread to handle Storage daemon messages and
258  *  Catalog requests.
259  */
260 int start_storage_daemon_message_thread(JCR *jcr)
261 {
262    int status;
263    pthread_t thid;
264
265    jcr->inc_use_count();              /* mark in use by msg thread */
266    jcr->sd_msg_thread_done = false;
267    jcr->SD_msg_chan = 0;
268    Dmsg0(100, "Start SD msg_thread.\n");
269    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
270       berrno be;
271       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
272    }
273    /* Wait for thread to start */
274    while (jcr->SD_msg_chan == 0) {
275       bmicrosleep(0, 50);
276    }
277    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
278    return 1;
279 }
280
281 extern "C" void msg_thread_cleanup(void *arg)
282 {
283    JCR *jcr = (JCR *)arg;
284    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
285    jcr->sd_msg_thread_done = true;
286    jcr->SD_msg_chan = 0;
287    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
288    Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
289    free_jcr(jcr);                     /* release jcr */
290 }
291
292 /*
293  * Handle the message channel (i.e. requests from the
294  *  Storage daemon).
295  * Note, we are running in a separate thread.
296  */
297 extern "C" void *msg_thread(void *arg)
298 {
299    JCR *jcr = (JCR *)arg;
300    BSOCK *sd;
301    int JobStatus;
302    char Job[MAX_NAME_LENGTH];
303    uint32_t JobFiles;
304    uint64_t JobBytes;
305    int stat;
306
307    pthread_detach(pthread_self());
308    jcr->SD_msg_chan = pthread_self();
309    pthread_cleanup_push(msg_thread_cleanup, arg);
310    sd = jcr->store_bsock;
311
312    /* Read the Storage daemon's output.
313     */
314    Dmsg0(100, "Start msg_thread loop\n");
315    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
316       Dmsg1(400, "<stored: %s", sd->msg);
317       if (sscanf(sd->msg, Job_start, Job) == 1) {
318          continue;
319       }
320       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
321                  &JobBytes)) == 4) {
322          jcr->SDJobStatus = JobStatus; /* termination status */
323          jcr->SDJobFiles = JobFiles;
324          jcr->SDJobBytes = JobBytes;
325          break;
326       }
327       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
328    }
329    if (is_bnet_error(sd)) {
330       jcr->SDJobStatus = JS_ErrorTerminated;
331    }
332    pthread_cleanup_pop(1);            /* remove and execute the handler */
333    return NULL;
334 }
335
336 void wait_for_storage_daemon_termination(JCR *jcr)
337 {
338    int cancel_count = 0;
339    /* Now wait for Storage daemon to terminate our message thread */
340    while (!jcr->sd_msg_thread_done) {
341       struct timeval tv;
342       struct timezone tz;
343       struct timespec timeout;
344
345       gettimeofday(&tv, &tz);
346       timeout.tv_nsec = 0;
347       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
348       Dmsg0(400, "I'm waiting for message thread termination.\n");
349       P(mutex);
350       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
351       V(mutex);
352       if (job_canceled(jcr)) {
353          if (jcr->SD_msg_chan) {
354             jcr->store_bsock->timed_out = 1;
355             jcr->store_bsock->terminated = 1;
356             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
357             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
358          }
359          cancel_count++;
360       }
361       /* Give SD 30 seconds to clean up after cancel */
362       if (cancel_count == 6) {
363          break;
364       }
365    }
366    set_jcr_job_status(jcr, JS_Terminated);
367 }
368
369 #ifdef needed
370 #define MAX_TRIES 30
371 #define WAIT_TIME 2
372 extern "C" void *device_thread(void *arg)
373 {
374    int i;
375    JCR *jcr;
376    DEVICE *dev;
377
378
379    pthread_detach(pthread_self());
380    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
381    for (i=0; i < MAX_TRIES; i++) {
382       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
383          Dmsg0(900, "Failed connecting to SD.\n");
384          continue;
385       }
386       LockRes();
387       foreach_res(dev, R_DEVICE) {
388          if (!update_device_res(jcr, dev)) {
389             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
390          } else {
391             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
392          }
393       }
394       UnlockRes();
395       bnet_close(jcr->store_bsock);
396       jcr->store_bsock = NULL;
397       break;
398
399    }
400    free_jcr(jcr);
401    return NULL;
402 }
403
404 /*
405  * Start a thread to handle getting Device resource information
406  *  from SD. This is called once at startup of the Director.
407  */
408 void init_device_resources()
409 {
410    int status;
411    pthread_t thid;
412
413    Dmsg0(100, "Start Device thread.\n");
414    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
415       berrno be;
416       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
417    }
418 }
419 #endif