]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
fbe9d5e13607daf8986b990705a610f6a75dc3e7
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Copyright (C) 2000-2005 Kern Sibbald
20
21    This program is free software; you can redistribute it and/or
22    modify it under the terms of the GNU General Public License
23    version 2 as amended with additional clauses defined in the
24    file LICENSE in the main source directory.
25
26    This program is distributed in the hope that it will be useful,
27    but WITHOUT ANY WARRANTY; without even the implied warranty of
28    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
29    the file LICENSE for additional details.
30
31  */
32
33 #include "bacula.h"
34 #include "dird.h"
35
36 /* Commands sent to Storage daemon */
37 static char jobcmd[]     = "JobId=%d job=%s job_name=%s client_name=%s "
38    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
39    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
40 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
41    "pool_type=%s append=%d copy=%d stripe=%d\n";
42 static char use_device[] = "use device=%s\n";
43 //static char query_device[] = _("query device=%s");
44
45 /* Response from Storage daemon */
46 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
47 static char OK_device[]  = "3000 OK use device device=%s\n";
48
49 /* Storage Daemon requests */
50 static char Job_start[]  = "3010 Job %127s start\n";
51 static char Job_end[]    =
52    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
53
54 /* Forward referenced functions */
55 extern "C" void *msg_thread(void *arg);
56
57 /*
58  * Establish a message channel connection with the Storage daemon
59  * and perform authentication.
60  */
61 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
62                               int max_retry_time, int verbose)
63 {
64    BSOCK *sd;
65    STORE *store;
66
67    if (jcr->store_bsock) {
68       return true;                    /* already connected */
69    }
70    store = (STORE *)jcr->storage->first();
71
72    /*
73     *  Open message channel with the Storage daemon
74     */
75    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
76       store->SDport);
77    sd = bnet_connect(jcr, retry_interval, max_retry_time,
78           _("Storage daemon"), store->address,
79           NULL, store->SDport, verbose);
80    if (sd == NULL) {
81       return false;
82    }
83    sd->res = (RES *)store;        /* save pointer to other end */
84    jcr->store_bsock = sd;
85
86    if (!authenticate_storage_daemon(jcr, store)) {
87       bnet_close(sd);
88       jcr->store_bsock = NULL;
89       return false;
90    }
91    return true;
92 }
93
94 /*
95  * Here we ask the SD to send us the info for a 
96  *  particular device resource.
97  */
98 #ifdef needed
99 bool update_device_res(JCR *jcr, DEVICE *dev)
100 {
101    POOL_MEM device_name; 
102    BSOCK *sd;
103    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
104       return false;
105    }
106    sd = jcr->store_bsock;
107    pm_strcpy(device_name, dev->hdr.name);
108    bash_spaces(device_name);
109    bnet_fsend(sd, query_device, device_name.c_str());
110    Dmsg1(100, ">stored: %s\n", sd->msg);
111    /* The data is returned through Device_update */
112    if (bget_dirmsg(sd) <= 0) {
113       return false;
114    }
115    return true;
116 }
117 #endif
118
119 /*
120  * Start a job with the Storage daemon
121  */
122 int start_storage_daemon_job(JCR *jcr, alist *store, int append)
123 {
124    bool ok = false;
125    STORE *storage;
126    BSOCK *sd;
127    char auth_key[100];
128    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
129    char PoolId[50];
130    int copy = 0;
131    int stripe = 0;
132
133    sd = jcr->store_bsock;
134    /*
135     * Now send JobId and permissions, and get back the authorization key.
136     */
137    bash_spaces(jcr->job->hdr.name);
138    bash_spaces(jcr->client->hdr.name);
139    bash_spaces(jcr->fileset->hdr.name);
140    if (jcr->fileset->MD5[0] == 0) {
141       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
142    }
143    bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
144               jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
145               jcr->fileset->hdr.name, !jcr->pool->catalog_files,
146               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
147               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
148    Dmsg1(100, ">stored: %s\n", sd->msg);
149    unbash_spaces(jcr->job->hdr.name);
150    unbash_spaces(jcr->client->hdr.name);
151    unbash_spaces(jcr->fileset->hdr.name);
152    if (bget_dirmsg(sd) > 0) {
153        Dmsg1(100, "<stored: %s", sd->msg);
154        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
155                   &jcr->VolSessionTime, &auth_key) != 3) {
156           Dmsg1(100, "BadJob=%s\n", sd->msg);
157           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
158           return 0;
159        } else {
160           jcr->sd_auth_key = bstrdup(auth_key);
161           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
162        }
163    } else {
164       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
165          bnet_strerror(sd));
166       return 0;
167    }
168
169    pm_strcpy(pool_type, jcr->pool->pool_type);
170    pm_strcpy(pool_name, jcr->pool->hdr.name);
171    bash_spaces(pool_type);
172    bash_spaces(pool_name);
173    edit_int64(jcr->PoolId, PoolId);
174
175    /*
176     * We have two loops here. The first comes from the 
177     *  Storage = associated with the Job, and we need 
178     *  to attach to each one.
179     * The inner loop loops over all the alternative devices
180     *  associated with each Storage. It selects the first
181     *  available one.
182     *
183     */
184    foreach_alist(storage, store) {
185       pm_strcpy(store_name, storage->hdr.name);
186       bash_spaces(store_name);
187       pm_strcpy(media_type, storage->media_type);
188       bash_spaces(media_type);
189       bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
190                  pool_name.c_str(), pool_type.c_str(), append, copy, stripe);
191
192       DEVICE *dev;
193       /* Loop over alternative storage Devices until one is OK */
194       foreach_alist(dev, storage->device) {
195          pm_strcpy(device_name, dev->hdr.name);
196          bash_spaces(device_name);
197          bnet_fsend(sd, use_device, device_name.c_str());
198          Dmsg1(100, ">stored: %s", sd->msg);
199       }
200       bnet_sig(sd, BNET_EOD);            /* end of Devices */
201       bnet_sig(sd, BNET_EOD);            /* end of Storages */
202       if (bget_dirmsg(sd) > 0) {
203          Dmsg1(100, "<stored: %s", sd->msg);
204          /* ****FIXME**** save actual device name */
205          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
206       } else {
207          POOL_MEM err_msg;
208          pm_strcpy(err_msg, sd->msg); /* save message */
209          Jmsg(jcr, M_FATAL, 0, _("\n"
210             "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
211             device_name.c_str(), err_msg.c_str()/* sd->msg */);
212       }
213       break;
214    }
215    if (ok) {
216       ok = bnet_fsend(sd, "run");
217       Dmsg1(100, ">stored: %s\n", sd->msg);
218    }
219    return ok;
220 }
221
222 /*
223  * Start a thread to handle Storage daemon messages and
224  *  Catalog requests.
225  */
226 int start_storage_daemon_message_thread(JCR *jcr)
227 {
228    int status;
229    pthread_t thid;
230
231    P(jcr->mutex);
232    jcr->use_count++;                  /* mark in use by msg thread */
233    jcr->sd_msg_thread_done = false;
234    jcr->SD_msg_chan = 0;
235    V(jcr->mutex);
236    Dmsg0(100, "Start SD msg_thread.\n");
237    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
238       berrno be;
239       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
240    }
241    Dmsg0(100, "SD msg_thread started.\n");
242    /* Wait for thread to start */
243    while (jcr->SD_msg_chan == 0) {
244       bmicrosleep(0, 50);
245    }
246    return 1;
247 }
248
249 extern "C" void msg_thread_cleanup(void *arg)
250 {
251    JCR *jcr = (JCR *)arg;
252    Dmsg0(200, "End msg_thread\n");
253    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
254    P(jcr->mutex);
255    jcr->sd_msg_thread_done = true;
256    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
257    jcr->SD_msg_chan = 0;
258    V(jcr->mutex);
259    free_jcr(jcr);                     /* release jcr */
260 }
261
262 /*
263  * Handle the message channel (i.e. requests from the
264  *  Storage daemon).
265  * Note, we are running in a separate thread.
266  */
267 extern "C" void *msg_thread(void *arg)
268 {
269    JCR *jcr = (JCR *)arg;
270    BSOCK *sd;
271    int JobStatus;
272    char Job[MAX_NAME_LENGTH];
273    uint32_t JobFiles;
274    uint64_t JobBytes;
275    int stat;
276
277    pthread_detach(pthread_self());
278    jcr->SD_msg_chan = pthread_self();
279    pthread_cleanup_push(msg_thread_cleanup, arg);
280    sd = jcr->store_bsock;
281
282    /* Read the Storage daemon's output.
283     */
284    Dmsg0(100, "Start msg_thread loop\n");
285    while ((stat=bget_dirmsg(sd)) >= 0) {
286       Dmsg1(200, "<stored: %s", sd->msg);
287       if (sscanf(sd->msg, Job_start, &Job) == 1) {
288          continue;
289       }
290       if (sscanf(sd->msg, Job_end, &Job, &JobStatus, &JobFiles,
291                  &JobBytes) == 4) {
292          jcr->SDJobStatus = JobStatus; /* termination status */
293          jcr->SDJobFiles = JobFiles;
294          jcr->SDJobBytes = JobBytes;
295          break;
296       }
297    }
298    if (is_bnet_error(sd)) {
299       jcr->SDJobStatus = JS_ErrorTerminated;
300    }
301    pthread_cleanup_pop(1);            /* remove and execute the handler */
302    return NULL;
303 }
304
305 void wait_for_storage_daemon_termination(JCR *jcr)
306 {
307    int cancel_count = 0;
308    /* Now wait for Storage daemon to terminate our message thread */
309    set_jcr_job_status(jcr, JS_WaitSD);
310    P(jcr->mutex);
311    while (!jcr->sd_msg_thread_done) {
312       struct timeval tv;
313       struct timezone tz;
314       struct timespec timeout;
315
316       gettimeofday(&tv, &tz);
317       timeout.tv_nsec = 0;
318       timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */
319       Dmsg0(300, "I'm waiting for message thread termination.\n");
320       pthread_cond_timedwait(&jcr->term_wait, &jcr->mutex, &timeout);
321       if (job_canceled(jcr)) {
322          cancel_count++;
323       }
324       /* Give SD 30 seconds to clean up after cancel */
325       if (cancel_count == 3) {
326          break;
327       }
328    }
329    V(jcr->mutex);
330    set_jcr_job_status(jcr, JS_Terminated);
331 }
332
333 #ifdef needed
334 #define MAX_TRIES 30
335 #define WAIT_TIME 2
336 extern "C" void *device_thread(void *arg)
337 {
338    int i;
339    JCR *jcr;
340    DEVICE *dev;
341
342
343    pthread_detach(pthread_self());
344    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
345    for (i=0; i < MAX_TRIES; i++) {
346       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
347          Dmsg0(000, "Failed connecting to SD.\n");
348          continue;
349       }
350       LockRes();
351       foreach_res(dev, R_DEVICE) {
352          if (!update_device_res(jcr, dev)) {
353             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
354          } else {
355             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
356          }
357       }
358       UnlockRes();
359       bnet_close(jcr->store_bsock);
360       jcr->store_bsock = NULL;
361       break;
362
363    }
364    free_jcr(jcr);
365    return NULL;
366 }
367
368 /*
369  * Start a thread to handle getting Device resource information
370  *  from SD. This is called once at startup of the Director.
371  */
372 void init_device_resources()
373 {
374    int status;
375    pthread_t thid;
376
377    Dmsg0(100, "Start Device thread.\n");
378    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
379       berrno be;
380       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
381    }
382 }
383 #endif