X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Fdird%2Fmsgchan.c;h=935a9d08ab0764188a620e7cc613db6b79971e5a;hb=78c7af9eaa6aa257ab4a6820e030a952bfa069d5;hp=8b8ee68f298a57784997938e995c09e2404b1ca4;hpb=28a45733fdb9131af7db083f5e40a44e173c679d;p=bacula%2Fbacula diff --git a/bacula/src/dird/msgchan.c b/bacula/src/dird/msgchan.c index 8b8ee68f29..935a9d08ab 100644 --- a/bacula/src/dird/msgchan.c +++ b/bacula/src/dird/msgchan.c @@ -1,3 +1,30 @@ +/* + Bacula® - The Network Backup Solution + + Copyright (C) 2000-2007 Free Software Foundation Europe e.V. + + The main author of Bacula is Kern Sibbald, with contributions from + many others, a complete list can be found in the file AUTHORS. + This program is Free Software; you can redistribute it and/or + modify it under the terms of version two of the GNU General Public + License as published by the Free Software Foundation and included + in the file LICENSE. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. + + Bacula® is a registered trademark of John Walker. + The licensor of Bacula is the Free Software Foundation Europe + (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, + Switzerland, email:ftf@fsfeurope.org. +*/ /* * * Bacula Director -- msgchan.c -- handles the message channel @@ -9,198 +36,315 @@ * * Basic tasks done here: * Open a message channel with the Storage daemon - * to authenticate ourself and to pass the JobId. + * to authenticate ourself and to pass the JobId. * Create a thread to interact with the Storage daemon - * who returns a job status and requests Catalog services, etc. + * who returns a job status and requests Catalog services, etc. * * Version $Id$ */ -/* - Copyright (C) 2000-2004 Kern Sibbald and John Walker - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2 of - the License, or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public - License along with this program; if not, write to the Free - Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, - MA 02111-1307, USA. - - */ #include "bacula.h" #include "dird.h" +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + /* Commands sent to Storage daemon */ -static char jobcmd[] = "JobId=%d job=%s job_name=%s client_name=%s \ -type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s SpoolData=%d"; -static char use_device[] = "use device=%s media_type=%s pool_name=%s pool_type=%s\n"; +static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s " + "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s " + "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n"; +static char use_storage[] = "use storage=%s media_type=%s pool_name=%s " + "pool_type=%s append=%d copy=%d stripe=%d\n"; +static char use_device[] = "use device=%s\n"; +//static char query_device[] = _("query device=%s"); /* Response from Storage daemon */ static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n"; -static char OK_device[] = "3000 OK use device\n"; +static char OK_device[] = "3000 OK use device device=%s\n"; /* Storage Daemon requests */ static char Job_start[] = "3010 Job %127s start\n"; -static char Job_end[] = +static char Job_end[] = "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n"; -static char Job_status[] = "3012 Job %127s jobstatus %d\n"; /* Forward referenced functions */ extern "C" void *msg_thread(void *arg); /* * Establish a message channel connection with the Storage daemon - * and perform authentication. + * and perform authentication. */ -int connect_to_storage_daemon(JCR *jcr, int retry_interval, - int max_retry_time, int verbose) +bool connect_to_storage_daemon(JCR *jcr, int retry_interval, + int max_retry_time, int verbose) { BSOCK *sd; + STORE *store; + utime_t heart_beat; + + if (jcr->store_bsock) { + return true; /* already connected */ + } + + /* If there is a write storage use it */ + if (jcr->wstore) { + store = jcr->wstore; + } else { + store = jcr->rstore; + } + + if (store->heartbeat_interval) { + heart_beat = store->heartbeat_interval; + } else { + heart_beat = director->heartbeat_interval; + } /* - * Open message channel with the Storage daemon + * Open message channel with the Storage daemon */ - Dmsg2(200, "bnet_connect to Storage daemon %s:%d\n", jcr->store->address, - jcr->store->SDport); - sd = bnet_connect(jcr, retry_interval, max_retry_time, - _("Storage daemon"), jcr->store->address, - NULL, jcr->store->SDport, verbose); + Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address, + store->SDport); + sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat, + _("Storage daemon"), store->address, + NULL, store->SDport, verbose); if (sd == NULL) { - return 0; + return false; } - sd->res = (RES *)jcr->store; /* save pointer to other end */ + sd->res = (RES *)store; /* save pointer to other end */ jcr->store_bsock = sd; - if (!authenticate_storage_daemon(jcr)) { - return 0; + if (!authenticate_storage_daemon(jcr, store)) { + sd->close(); + jcr->store_bsock = NULL; + return false; } - return 1; + return true; } +/* + * Here we ask the SD to send us the info for a + * particular device resource. + */ +#ifdef needed +bool update_device_res(JCR *jcr, DEVICE *dev) +{ + POOL_MEM device_name; + BSOCK *sd; + if (!connect_to_storage_daemon(jcr, 5, 30, 0)) { + return false; + } + sd = jcr->store_bsock; + pm_strcpy(device_name, dev->name()); + bash_spaces(device_name); + sd->fsend(query_device, device_name.c_str()); + Dmsg1(100, ">stored: %s\n", sd->msg); + /* The data is returned through Device_update */ + if (bget_dirmsg(sd) <= 0) { + return false; + } + return true; +} +#endif + /* * Start a job with the Storage daemon */ -int start_storage_daemon_job(JCR *jcr) +bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore) { - int status; + bool ok = true; STORE *storage; BSOCK *sd; char auth_key[100]; - POOLMEM *device_name, *pool_name, *pool_type, *media_type; + POOL_MEM store_name, device_name, pool_name, pool_type, media_type; + POOL_MEM job_name, client_name, fileset_name; + int copy = 0; + int stripe = 0; + char ed1[30]; - storage = jcr->store; sd = jcr->store_bsock; /* * Now send JobId and permissions, and get back the authorization key. */ - bash_spaces(jcr->job->hdr.name); - bash_spaces(jcr->client->hdr.name); - bash_spaces(jcr->fileset->hdr.name); + pm_strcpy(job_name, jcr->job->name()); + bash_spaces(job_name); + pm_strcpy(client_name, jcr->client->name()); + bash_spaces(client_name); + pm_strcpy(fileset_name, jcr->fileset->name()); + bash_spaces(fileset_name); if (jcr->fileset->MD5[0] == 0) { - strcpy(jcr->fileset->MD5, "**Dummy**"); + bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5)); } - bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name, - jcr->client->hdr.name, jcr->JobType, jcr->JobLevel, - jcr->fileset->hdr.name, !jcr->pool->catalog_files, - jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data); - Dmsg1(200, "Jobcmd=%s\n", sd->msg); - unbash_spaces(jcr->job->hdr.name); - unbash_spaces(jcr->client->hdr.name); - unbash_spaces(jcr->fileset->hdr.name); + /* If rescheduling, cancel the previous incarnation of this job + * with the SD, which might be waiting on the FD connection. + * If we do not cancel it the SD will not accept a new connection + * for the same jobid. + */ + if (jcr->reschedule_count) { + sd->fsend("cancel Job=%s\n", jcr->Job); + while (sd->recv() >= 0) + { } + } + sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, + job_name.c_str(), client_name.c_str(), + jcr->JobType, jcr->JobLevel, + fileset_name.c_str(), !jcr->pool->catalog_files, + jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, + jcr->write_part_after_job, jcr->job->PreferMountedVolumes); + Dmsg1(100, ">stored: %s\n", sd->msg); if (bget_dirmsg(sd) > 0) { - Dmsg1(110, "msg); - if (sscanf(sd->msg, OKjob, &jcr->VolSessionId, - &jcr->VolSessionTime, &auth_key) != 3) { + Dmsg1(100, "msg); + if (sscanf(sd->msg, OKjob, &jcr->VolSessionId, + &jcr->VolSessionTime, &auth_key) != 3) { Dmsg1(100, "BadJob=%s\n", sd->msg); Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg); - return 0; + return 0; } else { - jcr->sd_auth_key = bstrdup(auth_key); + jcr->sd_auth_key = bstrdup(auth_key); Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key); } } else { Jmsg(jcr, M_FATAL, 0, _("bstrerror()); return 0; } /* - * Send use device = xxx media = yyy pool = zzz + * We have two loops here. The first comes from the + * Storage = associated with the Job, and we need + * to attach to each one. + * The inner loop loops over all the alternative devices + * associated with each Storage. It selects the first + * available one. + * */ - device_name = get_pool_memory(PM_NAME); - pool_name = get_pool_memory(PM_NAME); - pool_type = get_pool_memory(PM_NAME); - media_type = get_pool_memory(PM_NAME); - pm_strcpy(&device_name, storage->dev_name); - pm_strcpy(&media_type, storage->media_type); - pm_strcpy(&pool_type, jcr->pool->pool_type); - pm_strcpy(&pool_name, jcr->pool->hdr.name); - bash_spaces(device_name); - bash_spaces(media_type); - bash_spaces(pool_type); - bash_spaces(pool_name); - bnet_fsend(sd, use_device, device_name, media_type, pool_name, pool_type); - Dmsg1(110, ">stored: %s", sd->msg); - status = response(jcr, sd, OK_device, "Use Device", NO_DISPLAY); - if (!status) { - pm_strcpy(&pool_type, sd->msg); /* save message */ - Jmsg(jcr, M_FATAL, 0, _("\n" - " Storage daemon didn't accept Device \"%s\" because:\n %s"), - device_name, pool_type/* sd->msg */); + /* Do read side of storage daemon */ + if (ok && rstore) { + /* For the moment, only migrate has rpool */ + if (jcr->JobType == JT_MIGRATE) { + pm_strcpy(pool_type, jcr->rpool->pool_type); + pm_strcpy(pool_name, jcr->rpool->name()); + } else { + pm_strcpy(pool_type, jcr->pool->pool_type); + pm_strcpy(pool_name, jcr->pool->name()); + } + bash_spaces(pool_type); + bash_spaces(pool_name); + foreach_alist(storage, rstore) { + Dmsg1(100, "Rstore=%s\n", storage->name()); + bash_spaces(store_name); + pm_strcpy(media_type, storage->media_type); + bash_spaces(media_type); + sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), + pool_name.c_str(), pool_type.c_str(), 0, copy, stripe); + Dmsg1(100, "rstore >stored: %s", sd->msg); + DEVICE *dev; + /* Loop over alternative storage Devices until one is OK */ + foreach_alist(dev, storage->device) { + pm_strcpy(device_name, dev->name()); + bash_spaces(device_name); + sd->fsend(use_device, device_name.c_str()); + Dmsg1(100, ">stored: %s", sd->msg); + } + sd->signal(BNET_EOD); /* end of Devices */ + } + sd->signal(BNET_EOD); /* end of Storages */ + if (bget_dirmsg(sd) > 0) { + Dmsg1(100, "msg); + /* ****FIXME**** save actual device name */ + ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1; + } else { + ok = false; + } } - free_memory(device_name); - free_memory(media_type); - free_memory(pool_name); - free_memory(pool_type); - return status; + /* Do write side of storage daemon */ + if (ok && wstore) { + pm_strcpy(pool_type, jcr->pool->pool_type); + pm_strcpy(pool_name, jcr->pool->name()); + bash_spaces(pool_type); + bash_spaces(pool_name); + foreach_alist(storage, wstore) { + pm_strcpy(store_name, storage->name()); + bash_spaces(store_name); + pm_strcpy(media_type, storage->media_type); + bash_spaces(media_type); + sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), + pool_name.c_str(), pool_type.c_str(), 1, copy, stripe); + + Dmsg1(100, "wstore >stored: %s", sd->msg); + DEVICE *dev; + /* Loop over alternative storage Devices until one is OK */ + foreach_alist(dev, storage->device) { + pm_strcpy(device_name, dev->name()); + bash_spaces(device_name); + sd->fsend(use_device, device_name.c_str()); + Dmsg1(100, ">stored: %s", sd->msg); + } + sd->signal(BNET_EOD); /* end of Devices */ + } + sd->signal(BNET_EOD); /* end of Storages */ + if (bget_dirmsg(sd) > 0) { + Dmsg1(100, "msg); + /* ****FIXME**** save actual device name */ + ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1; + } else { + ok = false; + } + } + if (!ok) { + POOL_MEM err_msg; + if (sd->msg[0]) { + pm_strcpy(err_msg, sd->msg); /* save message */ + Jmsg(jcr, M_FATAL, 0, _("\n" + " Storage daemon didn't accept Device \"%s\" because:\n %s"), + device_name.c_str(), err_msg.c_str()/* sd->msg */); + } else { + Jmsg(jcr, M_FATAL, 0, _("\n" + " Storage daemon didn't accept Device \"%s\" command.\n"), + device_name.c_str()); + } + } else { + Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str()); + } + return ok; } -/* +/* * Start a thread to handle Storage daemon messages and * Catalog requests. */ -int start_storage_daemon_message_thread(JCR *jcr) +bool start_storage_daemon_message_thread(JCR *jcr) { int status; pthread_t thid; - P(jcr->mutex); - jcr->use_count++; /* mark in use by msg thread */ + jcr->inc_use_count(); /* mark in use by msg thread */ jcr->sd_msg_thread_done = false; jcr->SD_msg_chan = 0; - V(jcr->mutex); Dmsg0(100, "Start SD msg_thread.\n"); if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) { - Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), strerror(status)); - } - Dmsg0(100, "SD msg_thread started.\n"); + berrno be; + Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status)); + } /* Wait for thread to start */ while (jcr->SD_msg_chan == 0) { bmicrosleep(0, 50); - } - return 1; + if (job_canceled(jcr) || jcr->sd_msg_thread_done) { + return false; + } + } + Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count()); + return true; } extern "C" void msg_thread_cleanup(void *arg) { JCR *jcr = (JCR *)arg; - Dmsg0(200, "End msg_thread\n"); - db_end_transaction(jcr, jcr->db); /* terminate any open transaction */ - P(jcr->mutex); + db_end_transaction(jcr, jcr->db); /* terminate any open transaction */ jcr->sd_msg_thread_done = true; - pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */ jcr->SD_msg_chan = 0; - V(jcr->mutex); - free_jcr(jcr); /* release jcr */ + pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */ + Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count()); + free_jcr(jcr); /* release jcr */ + db_thread_cleanup(); /* remove thread specific data */ } /* @@ -208,8 +352,7 @@ extern "C" void msg_thread_cleanup(void *arg) * Storage daemon). * Note, we are running in a separate thread. */ -extern "C" -void *msg_thread(void *arg) +extern "C" void *msg_thread(void *arg) { JCR *jcr = (JCR *)arg; BSOCK *sd; @@ -227,27 +370,24 @@ void *msg_thread(void *arg) /* Read the Storage daemon's output. */ Dmsg0(100, "Start msg_thread loop\n"); - while ((stat=bget_dirmsg(sd)) >= 0) { - Dmsg1(200, "msg); - if (sscanf(sd->msg, Job_start, &Job) == 1) { - continue; + while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) { + Dmsg1(400, "msg); + if (sscanf(sd->msg, Job_start, Job) == 1) { + continue; } - if (sscanf(sd->msg, Job_end, &Job, &JobStatus, &JobFiles, - &JobBytes) == 4) { - jcr->SDJobStatus = JobStatus; /* termination status */ - jcr->SDJobFiles = JobFiles; - jcr->SDJobBytes = JobBytes; - break; - } - if (sscanf(sd->msg, Job_status, &Job, &JobStatus) == 2) { - jcr->SDJobStatus = JobStatus; /* current status */ - continue; + if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles, + &JobBytes)) == 4) { + jcr->SDJobStatus = JobStatus; /* termination status */ + jcr->SDJobFiles = JobFiles; + jcr->SDJobBytes = JobBytes; + break; } + Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count()); } - if (is_bnet_error(sd)) { + if (is_bnet_error(sd)) { jcr->SDJobStatus = JS_ErrorTerminated; } - pthread_cleanup_pop(1); + pthread_cleanup_pop(1); /* remove and execute the handler */ return NULL; } @@ -255,8 +395,6 @@ void wait_for_storage_daemon_termination(JCR *jcr) { int cancel_count = 0; /* Now wait for Storage daemon to terminate our message thread */ - set_jcr_job_status(jcr, JS_WaitSD); - P(jcr->mutex); while (!jcr->sd_msg_thread_done) { struct timeval tv; struct timezone tz; @@ -264,17 +402,76 @@ void wait_for_storage_daemon_termination(JCR *jcr) gettimeofday(&tv, &tz); timeout.tv_nsec = 0; - timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */ - Dmsg0(300, "I'm waiting for message thread termination.\n"); - pthread_cond_timedwait(&jcr->term_wait, &jcr->mutex, &timeout); + timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */ + Dmsg0(400, "I'm waiting for message thread termination.\n"); + P(mutex); + pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout); + V(mutex); if (job_canceled(jcr)) { - cancel_count++; + if (jcr->SD_msg_chan) { + jcr->store_bsock->set_timed_out(); + jcr->store_bsock->set_terminated(); + Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count()); + pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL); + } + cancel_count++; } /* Give SD 30 seconds to clean up after cancel */ - if (cancel_count == 3) { - break; + if (cancel_count == 6) { + break; } } - V(jcr->mutex); set_jcr_job_status(jcr, JS_Terminated); } + +#ifdef needed +#define MAX_TRIES 30 +#define WAIT_TIME 2 +extern "C" void *device_thread(void *arg) +{ + int i; + JCR *jcr; + DEVICE *dev; + + + pthread_detach(pthread_self()); + jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM); + for (i=0; i < MAX_TRIES; i++) { + if (!connect_to_storage_daemon(jcr, 10, 30, 1)) { + Dmsg0(900, "Failed connecting to SD.\n"); + continue; + } + LockRes(); + foreach_res(dev, R_DEVICE) { + if (!update_device_res(jcr, dev)) { + Dmsg1(900, "Error updating device=%s\n", dev->name()); + } else { + Dmsg1(900, "Updated Device=%s\n", dev->name()); + } + } + UnlockRes(); + bnet_close(jcr->store_bsock); + jcr->store_bsock = NULL; + break; + + } + free_jcr(jcr); + return NULL; +} + +/* + * Start a thread to handle getting Device resource information + * from SD. This is called once at startup of the Director. + */ +void init_device_resources() +{ + int status; + pthread_t thid; + + Dmsg0(100, "Start Device thread.\n"); + if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) { + berrno be; + Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status)); + } +} +#endif