]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
======================= Warning ==========================
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Copyright (C) 2000-2006 Kern Sibbald
20
21    This program is free software; you can redistribute it and/or
22    modify it under the terms of the GNU General Public License
23    version 2 as amended with additional clauses defined in the
24    file LICENSE in the main source directory.
25
26    This program is distributed in the hope that it will be useful,
27    but WITHOUT ANY WARRANTY; without even the implied warranty of
28    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
29    the file LICENSE for additional details.
30
31  */
32
33 #include "bacula.h"
34 #include "dird.h"
35
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
37
38 /* Commands sent to Storage daemon */
39 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
40    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
41    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
42 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
43    "pool_type=%s append=%d copy=%d stripe=%d\n";
44 static char use_device[] = "use device=%s\n";
45 //static char query_device[] = _("query device=%s");
46
47 /* Response from Storage daemon */
48 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
49 static char OK_device[]  = "3000 OK use device device=%s\n";
50
51 /* Storage Daemon requests */
52 static char Job_start[]  = "3010 Job %127s start\n";
53 static char Job_end[]    =
54    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
55
56 /* Forward referenced functions */
57 extern "C" void *msg_thread(void *arg);
58
59 /*
60  * Establish a message channel connection with the Storage daemon
61  * and perform authentication.
62  */
63 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
64                               int max_retry_time, int verbose)
65 {
66    BSOCK *sd;
67    STORE *store;
68
69    if (jcr->store_bsock) {
70       return true;                    /* already connected */
71    }
72
73    /* If there is a write storage use it */
74    if (jcr->wstorage) {
75       store = (STORE *)jcr->wstorage->first();
76    } else {
77       store = (STORE *)jcr->rstorage->first();
78    }
79
80    /*
81     *  Open message channel with the Storage daemon
82     */
83    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
84       store->SDport);
85    sd = bnet_connect(jcr, retry_interval, max_retry_time,
86           _("Storage daemon"), store->address,
87           NULL, store->SDport, verbose);
88    if (sd == NULL) {
89       return false;
90    }
91    sd->res = (RES *)store;        /* save pointer to other end */
92    jcr->store_bsock = sd;
93
94    if (!authenticate_storage_daemon(jcr, store)) {
95       bnet_close(sd);
96       jcr->store_bsock = NULL;
97       return false;
98    }
99    return true;
100 }
101
102 /*
103  * Here we ask the SD to send us the info for a 
104  *  particular device resource.
105  */
106 #ifdef needed
107 bool update_device_res(JCR *jcr, DEVICE *dev)
108 {
109    POOL_MEM device_name; 
110    BSOCK *sd;
111    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
112       return false;
113    }
114    sd = jcr->store_bsock;
115    pm_strcpy(device_name, dev->hdr.name);
116    bash_spaces(device_name);
117    bnet_fsend(sd, query_device, device_name.c_str());
118    Dmsg1(100, ">stored: %s\n", sd->msg);
119    /* The data is returned through Device_update */
120    if (bget_dirmsg(sd) <= 0) {
121       return false;
122    }
123    return true;
124 }
125 #endif
126
127 /*
128  * Start a job with the Storage daemon
129  */
130 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
131 {
132    bool ok = true;
133    STORE *storage;
134    BSOCK *sd;
135    char auth_key[100];
136    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
137    POOL_MEM job_name, client_name, fileset_name;
138    int copy = 0;
139    int stripe = 0;
140    char ed1[30];
141
142    sd = jcr->store_bsock;
143    /*
144     * Now send JobId and permissions, and get back the authorization key.
145     */
146    pm_strcpy(job_name, jcr->job->hdr.name);
147    bash_spaces(job_name);
148    pm_strcpy(client_name, jcr->client->hdr.name);
149    bash_spaces(client_name);
150    pm_strcpy(fileset_name, jcr->fileset->hdr.name);
151    bash_spaces(fileset_name);
152    if (jcr->fileset->MD5[0] == 0) {
153       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
154    }
155    /* If rescheduling, cancel the previous incarnation of this job
156     *  with the SD, which might be waiting on the FD connection.
157     *  If we do not cancel it the SD will not accept a new connection
158     *  for the same jobid.
159     */
160    if (jcr->reschedule_count) {
161       bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
162       while (bnet_recv(sd) >= 0)
163          { }
164    } 
165    bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
166               job_name.c_str(), client_name.c_str(), 
167               jcr->JobType, jcr->JobLevel,
168               fileset_name.c_str(), !jcr->pool->catalog_files,
169               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
170               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
171    Dmsg1(100, ">stored: %s\n", sd->msg);
172    if (bget_dirmsg(sd) > 0) {
173        Dmsg1(100, "<stored: %s", sd->msg);
174        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
175                   &jcr->VolSessionTime, &auth_key) != 3) {
176           Dmsg1(100, "BadJob=%s\n", sd->msg);
177           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
178           return 0;
179        } else {
180           jcr->sd_auth_key = bstrdup(auth_key);
181           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
182        }
183    } else {
184       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
185          bnet_strerror(sd));
186       return 0;
187    }
188
189    pm_strcpy(pool_type, jcr->pool->pool_type);
190    pm_strcpy(pool_name, jcr->pool->hdr.name);
191    bash_spaces(pool_type);
192    bash_spaces(pool_name);
193
194    /*
195     * We have two loops here. The first comes from the 
196     *  Storage = associated with the Job, and we need 
197     *  to attach to each one.
198     * The inner loop loops over all the alternative devices
199     *  associated with each Storage. It selects the first
200     *  available one.
201     *
202     */
203    /* Do read side of storage daemon */
204    if (ok && rstore) {
205       foreach_alist(storage, rstore) {
206          pm_strcpy(store_name, storage->hdr.name);
207          bash_spaces(store_name);
208          pm_strcpy(media_type, storage->media_type);
209          bash_spaces(media_type);
210          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
211                     pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
212
213          DEVICE *dev;
214          /* Loop over alternative storage Devices until one is OK */
215          foreach_alist(dev, storage->device) {
216             pm_strcpy(device_name, dev->hdr.name);
217             bash_spaces(device_name);
218             bnet_fsend(sd, use_device, device_name.c_str());
219             Dmsg1(100, ">stored: %s", sd->msg);
220          }
221          bnet_sig(sd, BNET_EOD);            /* end of Devices */
222       }
223       bnet_sig(sd, BNET_EOD);            /* end of Storages */
224       if (bget_dirmsg(sd) > 0) {
225          Dmsg1(100, "<stored: %s", sd->msg);
226          /* ****FIXME**** save actual device name */
227          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
228       } else {
229          ok = false;
230       }
231    }
232
233    /* Do write side of storage daemon */
234    if (ok && wstore) {
235       foreach_alist(storage, wstore) {
236          pm_strcpy(store_name, storage->hdr.name);
237          bash_spaces(store_name);
238          pm_strcpy(media_type, storage->media_type);
239          bash_spaces(media_type);
240          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
241                     pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
242
243          DEVICE *dev;
244          /* Loop over alternative storage Devices until one is OK */
245          foreach_alist(dev, storage->device) {
246             pm_strcpy(device_name, dev->hdr.name);
247             bash_spaces(device_name);
248             bnet_fsend(sd, use_device, device_name.c_str());
249             Dmsg1(100, ">stored: %s", sd->msg);
250          }
251          bnet_sig(sd, BNET_EOD);            /* end of Devices */
252       }
253       bnet_sig(sd, BNET_EOD);            /* end of Storages */
254       if (bget_dirmsg(sd) > 0) {
255          Dmsg1(100, "<stored: %s", sd->msg);
256          /* ****FIXME**** save actual device name */
257          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
258       } else {
259          ok = false;
260       }
261    }
262    if (!ok) {
263       POOL_MEM err_msg;
264       if (sd->msg[0]) {
265          pm_strcpy(err_msg, sd->msg); /* save message */
266          Jmsg(jcr, M_FATAL, 0, _("\n"
267               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
268               device_name.c_str(), err_msg.c_str()/* sd->msg */);
269       } else { 
270          Jmsg(jcr, M_FATAL, 0, _("\n"
271               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
272               device_name.c_str());
273       }
274    }
275    return ok;
276 }
277
278 /*
279  * Start a thread to handle Storage daemon messages and
280  *  Catalog requests.
281  */
282 int start_storage_daemon_message_thread(JCR *jcr)
283 {
284    int status;
285    pthread_t thid;
286
287    jcr->inc_use_count();              /* mark in use by msg thread */
288    jcr->sd_msg_thread_done = false;
289    jcr->SD_msg_chan = 0;
290    Dmsg0(100, "Start SD msg_thread.\n");
291    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
292       berrno be;
293       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
294    }
295    /* Wait for thread to start */
296    while (jcr->SD_msg_chan == 0) {
297       bmicrosleep(0, 50);
298    }
299    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
300    return 1;
301 }
302
303 extern "C" void msg_thread_cleanup(void *arg)
304 {
305    JCR *jcr = (JCR *)arg;
306    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
307    jcr->sd_msg_thread_done = true;
308    jcr->SD_msg_chan = 0;
309    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
310    Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
311    free_jcr(jcr);                     /* release jcr */
312 }
313
314 /*
315  * Handle the message channel (i.e. requests from the
316  *  Storage daemon).
317  * Note, we are running in a separate thread.
318  */
319 extern "C" void *msg_thread(void *arg)
320 {
321    JCR *jcr = (JCR *)arg;
322    BSOCK *sd;
323    int JobStatus;
324    char Job[MAX_NAME_LENGTH];
325    uint32_t JobFiles;
326    uint64_t JobBytes;
327    int stat;
328
329    pthread_detach(pthread_self());
330    jcr->SD_msg_chan = pthread_self();
331    pthread_cleanup_push(msg_thread_cleanup, arg);
332    sd = jcr->store_bsock;
333
334    /* Read the Storage daemon's output.
335     */
336    Dmsg0(100, "Start msg_thread loop\n");
337    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
338       Dmsg1(400, "<stored: %s", sd->msg);
339       if (sscanf(sd->msg, Job_start, Job) == 1) {
340          continue;
341       }
342       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
343                  &JobBytes)) == 4) {
344          jcr->SDJobStatus = JobStatus; /* termination status */
345          jcr->SDJobFiles = JobFiles;
346          jcr->SDJobBytes = JobBytes;
347          break;
348       }
349       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
350    }
351    if (is_bnet_error(sd)) {
352       jcr->SDJobStatus = JS_ErrorTerminated;
353    }
354    pthread_cleanup_pop(1);            /* remove and execute the handler */
355    return NULL;
356 }
357
358 void wait_for_storage_daemon_termination(JCR *jcr)
359 {
360    int cancel_count = 0;
361    /* Now wait for Storage daemon to terminate our message thread */
362    while (!jcr->sd_msg_thread_done) {
363       struct timeval tv;
364       struct timezone tz;
365       struct timespec timeout;
366
367       gettimeofday(&tv, &tz);
368       timeout.tv_nsec = 0;
369       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
370       Dmsg0(400, "I'm waiting for message thread termination.\n");
371       P(mutex);
372       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
373       V(mutex);
374       if (job_canceled(jcr)) {
375          if (jcr->SD_msg_chan) {
376             jcr->store_bsock->timed_out = 1;
377             jcr->store_bsock->terminated = 1;
378             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
379             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
380          }
381          cancel_count++;
382       }
383       /* Give SD 30 seconds to clean up after cancel */
384       if (cancel_count == 6) {
385          break;
386       }
387    }
388    set_jcr_job_status(jcr, JS_Terminated);
389 }
390
391 #ifdef needed
392 #define MAX_TRIES 30
393 #define WAIT_TIME 2
394 extern "C" void *device_thread(void *arg)
395 {
396    int i;
397    JCR *jcr;
398    DEVICE *dev;
399
400
401    pthread_detach(pthread_self());
402    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
403    for (i=0; i < MAX_TRIES; i++) {
404       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
405          Dmsg0(900, "Failed connecting to SD.\n");
406          continue;
407       }
408       LockRes();
409       foreach_res(dev, R_DEVICE) {
410          if (!update_device_res(jcr, dev)) {
411             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
412          } else {
413             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
414          }
415       }
416       UnlockRes();
417       bnet_close(jcr->store_bsock);
418       jcr->store_bsock = NULL;
419       break;
420
421    }
422    free_jcr(jcr);
423    return NULL;
424 }
425
426 /*
427  * Start a thread to handle getting Device resource information
428  *  from SD. This is called once at startup of the Director.
429  */
430 void init_device_resources()
431 {
432    int status;
433    pthread_t thid;
434
435    Dmsg0(100, "Start Device thread.\n");
436    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
437       berrno be;
438       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
439    }
440 }
441 #endif