X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;ds=inline;f=bacula%2Fsrc%2Fdird%2Fmsgchan.c;h=1e7cfc7038cdcdeb8610e70260f7b022badb73e3;hb=2b9fce40ad3cf7677d06b876c19f1fabffd9e0c2;hp=e6f85cee4958d7987cb2917d74849f55d12dbe29;hpb=404b02b216516d968df165d9d2ca17dfac620636;p=bacula%2Fbacula diff --git a/bacula/src/dird/msgchan.c b/bacula/src/dird/msgchan.c index e6f85cee49..1e7cfc7038 100644 --- a/bacula/src/dird/msgchan.c +++ b/bacula/src/dird/msgchan.c @@ -1,26 +1,26 @@ /* Bacula® - The Network Backup Solution - Copyright (C) 2000-20076 Free Software Foundation Europe e.V. + Copyright (C) 2000-2009 Free Software Foundation Europe e.V. The main author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. This program is Free Software; you can redistribute it and/or - modify it under the terms of version two of the GNU General Public - License as published by the Free Software Foundation plus additions - that are listed in the file LICENSE. + modify it under the terms of version three of the GNU Affero General Public + License as published by the Free Software Foundation and included + in the file LICENSE. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - You should have received a copy of the GNU General Public License + You should have received a copy of the GNU Affero General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - Bacula® is a registered trademark of John Walker. + Bacula® is a registered trademark of Kern Sibbald. The licensor of Bacula is the Free Software Foundation Europe (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, Switzerland, email:ftf@fsfeurope.org. @@ -40,7 +40,6 @@ * Create a thread to interact with the Storage daemon * who returns a job status and requests Catalog services, etc. * - * Version $Id$ */ #include "bacula.h" @@ -51,7 +50,8 @@ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* Commands sent to Storage daemon */ static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s " "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s " - "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n"; + "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s " + "rerunning=%d VolSessionId=%d VolSessionTime=%d\n"; static char use_storage[] = "use storage=%s media_type=%s pool_name=%s " "pool_type=%s append=%d copy=%d stripe=%d\n"; static char use_device[] = "use device=%s\n"; @@ -64,7 +64,7 @@ static char OK_device[] = "3000 OK use device device=%s\n"; /* Storage Daemon requests */ static char Job_start[] = "3010 Job %127s start\n"; static char Job_end[] = - "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n"; + "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n"; /* Forward referenced functions */ extern "C" void *msg_thread(void *arg); @@ -76,7 +76,7 @@ extern "C" void *msg_thread(void *arg); bool connect_to_storage_daemon(JCR *jcr, int retry_interval, int max_retry_time, int verbose) { - BSOCK *sd; + BSOCK *sd = new_bsock(); STORE *store; utime_t heart_beat; @@ -102,9 +102,13 @@ bool connect_to_storage_daemon(JCR *jcr, int retry_interval, */ Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address, store->SDport); - sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat, - _("Storage daemon"), store->address, - NULL, store->SDport, verbose); + sd->set_source_address(director->DIRsrc_addr); + if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"), + store->address, NULL, store->SDport, verbose)) { + sd->destroy(); + sd = NULL; + } + if (sd == NULL) { return false; } @@ -112,7 +116,7 @@ bool connect_to_storage_daemon(JCR *jcr, int retry_interval, jcr->store_bsock = sd; if (!authenticate_storage_daemon(jcr, store)) { - bnet_close(sd); + sd->close(); jcr->store_bsock = NULL; return false; } @@ -123,7 +127,7 @@ bool connect_to_storage_daemon(JCR *jcr, int retry_interval, * Here we ask the SD to send us the info for a * particular device resource. */ -#ifdef needed +#ifdef xxx bool update_device_res(JCR *jcr, DEVICE *dev) { POOL_MEM device_name; @@ -134,7 +138,7 @@ bool update_device_res(JCR *jcr, DEVICE *dev) sd = jcr->store_bsock; pm_strcpy(device_name, dev->name()); bash_spaces(device_name); - bnet_fsend(sd, query_device, device_name.c_str()); + sd->fsend(query_device, device_name.c_str()); Dmsg1(100, ">stored: %s\n", sd->msg); /* The data is returned through Device_update */ if (bget_dirmsg(sd) <= 0) { @@ -144,10 +148,12 @@ bool update_device_res(JCR *jcr, DEVICE *dev) } #endif +static char OKbootstrap[] = "3000 OK bootstrap\n"; + /* * Start a job with the Storage daemon */ -bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore) +bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr) { bool ok = true; STORE *storage; @@ -157,7 +163,7 @@ bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore) POOL_MEM job_name, client_name, fileset_name; int copy = 0; int stripe = 0; - char ed1[30]; + char ed1[30], ed2[30]; sd = jcr->store_bsock; /* @@ -178,32 +184,40 @@ bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore) * for the same jobid. */ if (jcr->reschedule_count) { - bnet_fsend(sd, "cancel Job=%s\n", jcr->Job); - while (bnet_recv(sd) >= 0) + sd->fsend("cancel Job=%s\n", jcr->Job); + while (sd->recv() >= 0) { } } - bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, - job_name.c_str(), client_name.c_str(), - jcr->JobType, jcr->JobLevel, - fileset_name.c_str(), !jcr->pool->catalog_files, - jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, - jcr->write_part_after_job, jcr->job->PreferMountedVolumes); - Dmsg1(100, ">stored: %s\n", sd->msg); + sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, + job_name.c_str(), client_name.c_str(), + jcr->getJobType(), jcr->getJobLevel(), + fileset_name.c_str(), !jcr->pool->catalog_files, + jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, + jcr->write_part_after_job, jcr->job->PreferMountedVolumes, + edit_int64(jcr->spool_size, ed2), jcr->rerunning, + jcr->VolSessionId, jcr->VolSessionTime); + Dmsg1(100, ">stored: %s", sd->msg); if (bget_dirmsg(sd) > 0) { Dmsg1(100, "msg); if (sscanf(sd->msg, OKjob, &jcr->VolSessionId, &jcr->VolSessionTime, &auth_key) != 3) { Dmsg1(100, "BadJob=%s\n", sd->msg); Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg); - return 0; + return false; } else { + bfree_and_null(jcr->sd_auth_key); jcr->sd_auth_key = bstrdup(auth_key); Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key); } } else { Jmsg(jcr, M_FATAL, 0, _("bstrerror()); + return false; + } + + if (send_bsr && (!send_bootstrap_file(jcr, sd) || + !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) { + return false; } /* @@ -217,8 +231,9 @@ bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore) */ /* Do read side of storage daemon */ if (ok && rstore) { - /* For the moment, only migrate has rpool */ - if (jcr->JobType == JT_MIGRATE) { + /* For the moment, only migrate, copy and vbackup have rpool */ + if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) || + (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) { pm_strcpy(pool_type, jcr->rpool->pool_type); pm_strcpy(pool_name, jcr->rpool->name()); } else { @@ -229,23 +244,24 @@ bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore) bash_spaces(pool_name); foreach_alist(storage, rstore) { Dmsg1(100, "Rstore=%s\n", storage->name()); + pm_strcpy(store_name, storage->name()); bash_spaces(store_name); pm_strcpy(media_type, storage->media_type); bash_spaces(media_type); - bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), - pool_name.c_str(), pool_type.c_str(), 0, copy, stripe); + sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), + pool_name.c_str(), pool_type.c_str(), 0, copy, stripe); Dmsg1(100, "rstore >stored: %s", sd->msg); DEVICE *dev; /* Loop over alternative storage Devices until one is OK */ foreach_alist(dev, storage->device) { pm_strcpy(device_name, dev->name()); bash_spaces(device_name); - bnet_fsend(sd, use_device, device_name.c_str()); + sd->fsend(use_device, device_name.c_str()); Dmsg1(100, ">stored: %s", sd->msg); } - bnet_sig(sd, BNET_EOD); /* end of Devices */ + sd->signal(BNET_EOD); /* end of Devices */ } - bnet_sig(sd, BNET_EOD); /* end of Storages */ + sd->signal(BNET_EOD); /* end of Storages */ if (bget_dirmsg(sd) > 0) { Dmsg1(100, "msg); /* ****FIXME**** save actual device name */ @@ -266,8 +282,8 @@ bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore) bash_spaces(store_name); pm_strcpy(media_type, storage->media_type); bash_spaces(media_type); - bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), - pool_name.c_str(), pool_type.c_str(), 1, copy, stripe); + sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), + pool_name.c_str(), pool_type.c_str(), 1, copy, stripe); Dmsg1(100, "wstore >stored: %s", sd->msg); DEVICE *dev; @@ -275,12 +291,12 @@ bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore) foreach_alist(dev, storage->device) { pm_strcpy(device_name, dev->name()); bash_spaces(device_name); - bnet_fsend(sd, use_device, device_name.c_str()); + sd->fsend(use_device, device_name.c_str()); Dmsg1(100, ">stored: %s", sd->msg); } - bnet_sig(sd, BNET_EOD); /* end of Devices */ + sd->signal(BNET_EOD); /* end of Devices */ } - bnet_sig(sd, BNET_EOD); /* end of Storages */ + sd->signal(BNET_EOD); /* end of Storages */ if (bget_dirmsg(sd) > 0) { Dmsg1(100, "msg); /* ****FIXME**** save actual device name */ @@ -338,13 +354,15 @@ bool start_storage_daemon_message_thread(JCR *jcr) extern "C" void msg_thread_cleanup(void *arg) { JCR *jcr = (JCR *)arg; - db_end_transaction(jcr, jcr->db); /* terminate any open transaction */ + db_end_transaction(jcr, jcr->db); /* terminate any open transaction */ + jcr->lock(); jcr->sd_msg_thread_done = true; jcr->SD_msg_chan = 0; + jcr->unlock(); pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */ - Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count()); - free_jcr(jcr); /* release jcr */ - db_thread_cleanup(); /* remove thread specific data */ + Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count()); + db_thread_cleanup(jcr->db); /* remove thread specific data */ + free_jcr(jcr); /* release jcr */ } /* @@ -357,12 +375,13 @@ extern "C" void *msg_thread(void *arg) JCR *jcr = (JCR *)arg; BSOCK *sd; int JobStatus; + int n; char Job[MAX_NAME_LENGTH]; - uint32_t JobFiles; + uint32_t JobFiles, JobErrors; uint64_t JobBytes; - int stat; pthread_detach(pthread_self()); + set_jcr_in_tsd(jcr); jcr->SD_msg_chan = pthread_self(); pthread_cleanup_push(msg_thread_cleanup, arg); sd = jcr->store_bsock; @@ -370,19 +389,28 @@ extern "C" void *msg_thread(void *arg) /* Read the Storage daemon's output. */ Dmsg0(100, "Start msg_thread loop\n"); - while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) { + n = 0; + while (!job_canceled(jcr) && (n=bget_dirmsg(sd)) >= 0) { Dmsg1(400, "msg); if (sscanf(sd->msg, Job_start, Job) == 1) { continue; } - if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles, - &JobBytes)) == 4) { + if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles, + &JobBytes, &JobErrors) == 5) { jcr->SDJobStatus = JobStatus; /* termination status */ jcr->SDJobFiles = JobFiles; jcr->SDJobBytes = JobBytes; + jcr->SDErrors = JobErrors; break; } - Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count()); + Dmsg1(400, "end loop use=%d\n", jcr->use_count()); + } + if (n == BNET_HARDEOF) { + /* + * This probably should be M_FATAL, but I am not 100% sure + * that this return *always* corresponds to a dropped line. + */ + Qmsg(jcr, M_ERROR, 0, _("Director's comm line to SD dropped.\n")); } if (is_bnet_error(sd)) { jcr->SDJobStatus = JS_ErrorTerminated; @@ -407,12 +435,11 @@ void wait_for_storage_daemon_termination(JCR *jcr) P(mutex); pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout); V(mutex); - if (job_canceled(jcr)) { + if (jcr->is_canceled()) { if (jcr->SD_msg_chan) { - jcr->store_bsock->timed_out = 1; - jcr->store_bsock->terminated = 1; - Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count()); - pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL); + jcr->store_bsock->set_timed_out(); + jcr->store_bsock->set_terminated(); + sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL); } cancel_count++; } @@ -421,9 +448,46 @@ void wait_for_storage_daemon_termination(JCR *jcr) break; } } - set_jcr_job_status(jcr, JS_Terminated); + jcr->setJobStatus(JS_Terminated); } +/* + * Send bootstrap file to Storage daemon. + * This is used for restore, verify VolumeToCatalog, migration, + * and copy Jobs. + */ +bool send_bootstrap_file(JCR *jcr, BSOCK *sd) +{ + FILE *bs; + char buf[1000]; + const char *bootstrap = "bootstrap\n"; + + Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap); + if (!jcr->RestoreBootstrap) { + return true; + } + bs = fopen(jcr->RestoreBootstrap, "rb"); + if (!bs) { + berrno be; + Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"), + jcr->RestoreBootstrap, be.bstrerror()); + jcr->setJobStatus(JS_ErrorTerminated); + return false; + } + sd->fsend(bootstrap); + while (fgets(buf, sizeof(buf), bs)) { + sd->fsend("%s", buf); + } + sd->signal(BNET_EOD); + fclose(bs); + if (jcr->unlink_bsr) { + unlink(jcr->RestoreBootstrap); + jcr->unlink_bsr = false; + } + return true; +} + + #ifdef needed #define MAX_TRIES 30 #define WAIT_TIME 2