X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Fdird%2Fmsgchan.c;h=43d859aa28cc9ced483c6879476e34f8de95287a;hb=82d827763eef307b073a05bc3114a0d1e2b57b14;hp=e35a5a46fc3e64dc26d1e1b1c81d070fd7cadeba;hpb=32803bea0a074ba600a3b70b0d5dbb3acce09fa6;p=bacula%2Fbacula diff --git a/bacula/src/dird/msgchan.c b/bacula/src/dird/msgchan.c index e35a5a46fc..43d859aa28 100644 --- a/bacula/src/dird/msgchan.c +++ b/bacula/src/dird/msgchan.c @@ -1,3 +1,30 @@ +/* + Bacula® - The Network Backup Solution + + Copyright (C) 2000-2007 Free Software Foundation Europe e.V. + + The main author of Bacula is Kern Sibbald, with contributions from + many others, a complete list can be found in the file AUTHORS. + This program is Free Software; you can redistribute it and/or + modify it under the terms of version two of the GNU General Public + License as published by the Free Software Foundation and included + in the file LICENSE. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. + + Bacula® is a registered trademark of John Walker. + The licensor of Bacula is the Free Software Foundation Europe + (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, + Switzerland, email:ftf@fsfeurope.org. +*/ /* * * Bacula Director -- msgchan.c -- handles the message channel @@ -15,37 +42,20 @@ * * Version $Id$ */ -/* - Copyright (C) 2000-2005 Kern Sibbald - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2 of - the License, or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public - License along with this program; if not, write to the Free - Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, - MA 02111-1307, USA. - - */ #include "bacula.h" #include "dird.h" +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + /* Commands sent to Storage daemon */ -static char jobcmd[] = "JobId=%d job=%s job_name=%s client_name=%s " +static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s " "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s " - "SpoolData=%d WritePartAfterJob=%d NewVol=%d\n"; + "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s\n"; static char use_storage[] = "use storage=%s media_type=%s pool_name=%s " "pool_type=%s append=%d copy=%d stripe=%d\n"; static char use_device[] = "use device=%s\n"; -//static char query_device[] = "query device=%s"; +//static char query_device[] = _("query device=%s"); /* Response from Storage daemon */ static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n"; @@ -68,18 +78,31 @@ bool connect_to_storage_daemon(JCR *jcr, int retry_interval, { BSOCK *sd; STORE *store; + utime_t heart_beat; if (jcr->store_bsock) { return true; /* already connected */ } - store = (STORE *)jcr->storage->first(); + + /* If there is a write storage use it */ + if (jcr->wstore) { + store = jcr->wstore; + } else { + store = jcr->rstore; + } + + if (store->heartbeat_interval) { + heart_beat = store->heartbeat_interval; + } else { + heart_beat = director->heartbeat_interval; + } /* * Open message channel with the Storage daemon */ Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address, store->SDport); - sd = bnet_connect(jcr, retry_interval, max_retry_time, + sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"), store->address, NULL, store->SDport, verbose); if (sd == NULL) { @@ -89,7 +112,7 @@ bool connect_to_storage_daemon(JCR *jcr, int retry_interval, jcr->store_bsock = sd; if (!authenticate_storage_daemon(jcr, store)) { - bnet_close(sd); + sd->close(); jcr->store_bsock = NULL; return false; } @@ -109,9 +132,9 @@ bool update_device_res(JCR *jcr, DEVICE *dev) return false; } sd = jcr->store_bsock; - pm_strcpy(device_name, dev->hdr.name); + pm_strcpy(device_name, dev->name()); bash_spaces(device_name); - bnet_fsend(sd, query_device, device_name.c_str()); + sd->fsend(query_device, device_name.c_str()); Dmsg1(100, ">stored: %s\n", sd->msg); /* The data is returned through Device_update */ if (bget_dirmsg(sd) <= 0) { @@ -124,36 +147,49 @@ bool update_device_res(JCR *jcr, DEVICE *dev) /* * Start a job with the Storage daemon */ -int start_storage_daemon_job(JCR *jcr, alist *store, int append) +bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore) { - bool ok = false; + bool ok = true; STORE *storage; BSOCK *sd; char auth_key[100]; POOL_MEM store_name, device_name, pool_name, pool_type, media_type; - char PoolId[50]; + POOL_MEM job_name, client_name, fileset_name; int copy = 0; int stripe = 0; + char ed1[30], ed2[30]; sd = jcr->store_bsock; /* * Now send JobId and permissions, and get back the authorization key. */ - bash_spaces(jcr->job->hdr.name); - bash_spaces(jcr->client->hdr.name); - bash_spaces(jcr->fileset->hdr.name); + pm_strcpy(job_name, jcr->job->name()); + bash_spaces(job_name); + pm_strcpy(client_name, jcr->client->name()); + bash_spaces(client_name); + pm_strcpy(fileset_name, jcr->fileset->name()); + bash_spaces(fileset_name); if (jcr->fileset->MD5[0] == 0) { bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5)); } - bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name, - jcr->client->hdr.name, jcr->JobType, jcr->JobLevel, - jcr->fileset->hdr.name, !jcr->pool->catalog_files, - jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, - jcr->write_part_after_job, jcr->job->NewVolEachJob); + /* If rescheduling, cancel the previous incarnation of this job + * with the SD, which might be waiting on the FD connection. + * If we do not cancel it the SD will not accept a new connection + * for the same jobid. + */ + if (jcr->reschedule_count) { + sd->fsend("cancel Job=%s\n", jcr->Job); + while (sd->recv() >= 0) + { } + } + sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, + job_name.c_str(), client_name.c_str(), + jcr->JobType, jcr->JobLevel, + fileset_name.c_str(), !jcr->pool->catalog_files, + jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, + jcr->write_part_after_job, jcr->job->PreferMountedVolumes, + edit_int64(jcr->spool_size, ed2)); Dmsg1(100, ">stored: %s\n", sd->msg); - unbash_spaces(jcr->job->hdr.name); - unbash_spaces(jcr->client->hdr.name); - unbash_spaces(jcr->fileset->hdr.name); if (bget_dirmsg(sd) > 0) { Dmsg1(100, "msg); if (sscanf(sd->msg, OKjob, &jcr->VolSessionId, @@ -167,16 +203,10 @@ int start_storage_daemon_job(JCR *jcr, alist *store, int append) } } else { Jmsg(jcr, M_FATAL, 0, _("bstrerror()); return 0; } - pm_strcpy(pool_type, jcr->pool->pool_type); - pm_strcpy(pool_name, jcr->pool->hdr.name); - bash_spaces(pool_type); - bash_spaces(pool_name); - edit_int64(jcr->PoolId, PoolId); - /* * We have two loops here. The first comes from the * Storage = associated with the Job, and we need @@ -185,45 +215,95 @@ int start_storage_daemon_job(JCR *jcr, alist *store, int append) * associated with each Storage. It selects the first * available one. * - * Note, the outer loop is not yet implemented. */ -// foreach_alist(storage, store) { - storage = (STORE *)store->first(); - pm_strcpy(store_name, storage->hdr.name); - bash_spaces(store_name); - pm_strcpy(media_type, storage->media_type); - bash_spaces(media_type); - bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), - pool_name.c_str(), pool_type.c_str(), append, copy, stripe); - - DEVICE *dev; - /* Loop over alternative storage Devices until one is OK */ - foreach_alist(dev, storage->device) { - pm_strcpy(device_name, dev->hdr.name); - bash_spaces(device_name); - bnet_fsend(sd, use_device, device_name.c_str()); - Dmsg1(100, ">stored: %s", sd->msg); + /* Do read side of storage daemon */ + if (ok && rstore) { + /* For the moment, only migrate has rpool */ + if (jcr->JobType == JT_MIGRATE) { + pm_strcpy(pool_type, jcr->rpool->pool_type); + pm_strcpy(pool_name, jcr->rpool->name()); + } else { + pm_strcpy(pool_type, jcr->pool->pool_type); + pm_strcpy(pool_name, jcr->pool->name()); + } + bash_spaces(pool_type); + bash_spaces(pool_name); + foreach_alist(storage, rstore) { + Dmsg1(100, "Rstore=%s\n", storage->name()); + bash_spaces(store_name); + pm_strcpy(media_type, storage->media_type); + bash_spaces(media_type); + sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), + pool_name.c_str(), pool_type.c_str(), 0, copy, stripe); + Dmsg1(100, "rstore >stored: %s", sd->msg); + DEVICE *dev; + /* Loop over alternative storage Devices until one is OK */ + foreach_alist(dev, storage->device) { + pm_strcpy(device_name, dev->name()); + bash_spaces(device_name); + sd->fsend(use_device, device_name.c_str()); + Dmsg1(100, ">stored: %s", sd->msg); + } + sd->signal(BNET_EOD); /* end of Devices */ + } + sd->signal(BNET_EOD); /* end of Storages */ + if (bget_dirmsg(sd) > 0) { + Dmsg1(100, "msg); + /* ****FIXME**** save actual device name */ + ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1; + } else { + ok = false; + } + } + + /* Do write side of storage daemon */ + if (ok && wstore) { + pm_strcpy(pool_type, jcr->pool->pool_type); + pm_strcpy(pool_name, jcr->pool->name()); + bash_spaces(pool_type); + bash_spaces(pool_name); + foreach_alist(storage, wstore) { + pm_strcpy(store_name, storage->name()); + bash_spaces(store_name); + pm_strcpy(media_type, storage->media_type); + bash_spaces(media_type); + sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), + pool_name.c_str(), pool_type.c_str(), 1, copy, stripe); + + Dmsg1(100, "wstore >stored: %s", sd->msg); + DEVICE *dev; + /* Loop over alternative storage Devices until one is OK */ + foreach_alist(dev, storage->device) { + pm_strcpy(device_name, dev->name()); + bash_spaces(device_name); + sd->fsend(use_device, device_name.c_str()); + Dmsg1(100, ">stored: %s", sd->msg); + } + sd->signal(BNET_EOD); /* end of Devices */ } - bnet_sig(sd, BNET_EOD); /* end of Devices */ - bnet_sig(sd, BNET_EOD); /* end of Storages */ + sd->signal(BNET_EOD); /* end of Storages */ if (bget_dirmsg(sd) > 0) { Dmsg1(100, "msg); /* ****FIXME**** save actual device name */ ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1; } else { - POOL_MEM err_msg; + ok = false; + } + } + if (!ok) { + POOL_MEM err_msg; + if (sd->msg[0]) { pm_strcpy(err_msg, sd->msg); /* save message */ - Jmsg(jcr, M_WARNING, 0, _("\n" - " Storage daemon didn't accept Device \"%s\" because:\n %s"), - device_name.c_str(), err_msg.c_str()/* sd->msg */); + Jmsg(jcr, M_FATAL, 0, _("\n" + " Storage daemon didn't accept Device \"%s\" because:\n %s"), + device_name.c_str(), err_msg.c_str()/* sd->msg */); + } else { + Jmsg(jcr, M_FATAL, 0, _("\n" + " Storage daemon didn't accept Device \"%s\" command.\n"), + device_name.c_str()); } -// if (!ok) { -// break; -// } -// } - if (ok) { - ok = bnet_fsend(sd, "run"); - Dmsg1(100, ">stored: %s\n", sd->msg); + } else { + Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str()); } return ok; } @@ -232,40 +312,40 @@ int start_storage_daemon_job(JCR *jcr, alist *store, int append) * Start a thread to handle Storage daemon messages and * Catalog requests. */ -int start_storage_daemon_message_thread(JCR *jcr) +bool start_storage_daemon_message_thread(JCR *jcr) { int status; pthread_t thid; - P(jcr->mutex); - jcr->use_count++; /* mark in use by msg thread */ + jcr->inc_use_count(); /* mark in use by msg thread */ jcr->sd_msg_thread_done = false; jcr->SD_msg_chan = 0; - V(jcr->mutex); Dmsg0(100, "Start SD msg_thread.\n"); if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) { berrno be; - Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status)); + Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status)); } - Dmsg0(100, "SD msg_thread started.\n"); /* Wait for thread to start */ while (jcr->SD_msg_chan == 0) { bmicrosleep(0, 50); + if (job_canceled(jcr) || jcr->sd_msg_thread_done) { + return false; + } } - return 1; + Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count()); + return true; } extern "C" void msg_thread_cleanup(void *arg) { JCR *jcr = (JCR *)arg; - Dmsg0(200, "End msg_thread\n"); db_end_transaction(jcr, jcr->db); /* terminate any open transaction */ - P(jcr->mutex); jcr->sd_msg_thread_done = true; - pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */ jcr->SD_msg_chan = 0; - V(jcr->mutex); + pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */ + Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count()); free_jcr(jcr); /* release jcr */ + db_thread_cleanup(); /* remove thread specific data */ } /* @@ -284,6 +364,7 @@ extern "C" void *msg_thread(void *arg) int stat; pthread_detach(pthread_self()); + set_jcr_in_tsd(jcr); jcr->SD_msg_chan = pthread_self(); pthread_cleanup_push(msg_thread_cleanup, arg); sd = jcr->store_bsock; @@ -291,23 +372,24 @@ extern "C" void *msg_thread(void *arg) /* Read the Storage daemon's output. */ Dmsg0(100, "Start msg_thread loop\n"); - while ((stat=bget_dirmsg(sd)) >= 0) { - Dmsg1(200, "msg); - if (sscanf(sd->msg, Job_start, &Job) == 1) { + while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) { + Dmsg1(400, "msg); + if (sscanf(sd->msg, Job_start, Job) == 1) { continue; } - if (sscanf(sd->msg, Job_end, &Job, &JobStatus, &JobFiles, - &JobBytes) == 4) { + if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles, + &JobBytes)) == 4) { jcr->SDJobStatus = JobStatus; /* termination status */ jcr->SDJobFiles = JobFiles; jcr->SDJobBytes = JobBytes; break; } + Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count()); } if (is_bnet_error(sd)) { jcr->SDJobStatus = JS_ErrorTerminated; } - pthread_cleanup_pop(1); + pthread_cleanup_pop(1); /* remove and execute the handler */ return NULL; } @@ -315,8 +397,6 @@ void wait_for_storage_daemon_termination(JCR *jcr) { int cancel_count = 0; /* Now wait for Storage daemon to terminate our message thread */ - set_jcr_job_status(jcr, JS_WaitSD); - P(jcr->mutex); while (!jcr->sd_msg_thread_done) { struct timeval tv; struct timezone tz; @@ -324,18 +404,25 @@ void wait_for_storage_daemon_termination(JCR *jcr) gettimeofday(&tv, &tz); timeout.tv_nsec = 0; - timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */ - Dmsg0(300, "I'm waiting for message thread termination.\n"); - pthread_cond_timedwait(&jcr->term_wait, &jcr->mutex, &timeout); + timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */ + Dmsg0(400, "I'm waiting for message thread termination.\n"); + P(mutex); + pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout); + V(mutex); if (job_canceled(jcr)) { + if (jcr->SD_msg_chan) { + jcr->store_bsock->set_timed_out(); + jcr->store_bsock->set_terminated(); + Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count()); + pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL); + } cancel_count++; } /* Give SD 30 seconds to clean up after cancel */ - if (cancel_count == 3) { + if (cancel_count == 6) { break; } } - V(jcr->mutex); set_jcr_job_status(jcr, JS_Terminated); } @@ -353,15 +440,15 @@ extern "C" void *device_thread(void *arg) jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM); for (i=0; i < MAX_TRIES; i++) { if (!connect_to_storage_daemon(jcr, 10, 30, 1)) { - Dmsg0(000, "Failed connecting to SD.\n"); + Dmsg0(900, "Failed connecting to SD.\n"); continue; } LockRes(); foreach_res(dev, R_DEVICE) { if (!update_device_res(jcr, dev)) { - Dmsg1(900, "Error updating device=%s\n", dev->hdr.name); + Dmsg1(900, "Error updating device=%s\n", dev->name()); } else { - Dmsg1(900, "Updated Device=%s\n", dev->hdr.name); + Dmsg1(900, "Updated Device=%s\n", dev->name()); } } UnlockRes(); @@ -386,7 +473,7 @@ void init_device_resources() Dmsg0(100, "Start Device thread.\n"); if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) { berrno be; - Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status)); + Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status)); } } #endif