From: Kern Sibbald Date: Fri, 2 May 2003 19:33:24 +0000 (+0000) Subject: Add SD heartbeat X-Git-Tag: Release-1.31~177 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=ad240c831eef5857ddc5bb0c59a8272323f8fbc9;p=bacula%2Fbacula Add SD heartbeat git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@479 91ce42f0-d328-0410-95d8-f526ca767f89 --- diff --git a/bacula/kernstodo b/bacula/kernstodo index 0168026ba1..40ec0450ac 100644 --- a/bacula/kernstodo +++ b/bacula/kernstodo @@ -15,7 +15,7 @@ Testing to do: (painful) - blocksize recognition code. - multiple simultaneous Volumes -For 1.30 release: +For 1.30a release: - Examine Bare Metal restore problem. - Test multiple simultaneous Volumes - Document FInclude ... @@ -23,6 +23,10 @@ For 1.30 release: - Figure out how to use ssh or stunnel to protect Bacula communications. After 1.30: +- Fix command prompt in gnome-console by checking on Ready. +- Implement HEART_BEAT while SD waiting for tapes. +- Include RunBeforeJob and RunAfterJob output in the message + stream. - Check if Job/File retentions apply to multivolume jobs. - Change M_INFO to M_RESTORED for all restored files. - Remove subsysdir from conf files (used only in autostart scripts). diff --git a/bacula/src/dird/getmsg.c b/bacula/src/dird/getmsg.c index 0676e1d092..8383a7ab5b 100644 --- a/bacula/src/dird/getmsg.c +++ b/bacula/src/dird/getmsg.c @@ -83,28 +83,28 @@ int32_t bget_msg(BSOCK *bs, int rtn) if (n == BNET_SIGNAL) { /* handle signal */ /* BNET_SIGNAL (-1) return from bnet_recv() => network signal */ switch (bs->msglen) { - case BNET_EOD: /* end of data */ - return n; - case BNET_EOD_POLL: - bnet_fsend(bs, OK_msg);/* send response */ - return n; /* end of data */ - case BNET_TERMINATE: - bs->terminated = 1; - return n; - case BNET_POLL: - bnet_fsend(bs, OK_msg); /* send response */ - break; - case BNET_HEARTBEAT: - bnet_sig(bs, BNET_HB_RESPONSE); - break; - case BNET_STATUS: - /* *****FIXME***** Implement */ - bnet_fsend(bs, "Status OK\n"); - bnet_sig(bs, BNET_EOD); - break; - default: - Emsg1(M_WARNING, 0, _("bget_msg: unknown signal %d\n"), bs->msglen); - return n; + case BNET_EOD: /* end of data */ + return n; + case BNET_EOD_POLL: + bnet_fsend(bs, OK_msg);/* send response */ + return n; /* end of data */ + case BNET_TERMINATE: + bs->terminated = 1; + return n; + case BNET_POLL: + bnet_fsend(bs, OK_msg); /* send response */ + break; + case BNET_HEARTBEAT: + case BNET_HB_RESPONSE: + break; + case BNET_STATUS: + /* *****FIXME***** Implement */ + bnet_fsend(bs, "Status OK\n"); + bnet_sig(bs, BNET_EOD); + break; + default: + Emsg1(M_WARNING, 0, _("bget_msg: unknown signal %d\n"), bs->msglen); + return n; } continue; } diff --git a/bacula/src/filed/backup.c b/bacula/src/filed/backup.c index 98b0988006..61fe9c79e0 100644 --- a/bacula/src/filed/backup.c +++ b/bacula/src/filed/backup.c @@ -32,9 +32,70 @@ static int save_file(FF_PKT *ff_pkt, void *pkt); +static void *heartbeat_thread(void *arg) +{ + int32_t n; + JCR *jcr = (JCR *)arg; + BSOCK *sd, *dir; + + jcr->heartbeat_id = pthread_self(); + + /* Get our own local copy */ + sd = dup_bsock(jcr->store_bsock); + dir = dup_bsock(jcr->dir_bsock); + + jcr->duped_sd = sd; + + /* Hang reading the socket to the SD, and every time we get + * a heartbeat, we simply send it on to the Director to + * keep him alive. + */ + for ( ;; ) { + n = bnet_recv(sd); + if (is_bnet_stop(sd)) { + break; + } + if (n == BNET_SIGNAL && sd->msglen == BNET_HEARTBEAT) { + bnet_sig(dir, BNET_HEARTBEAT); + } + } + bnet_close(sd); + bnet_close(dir); + return NULL; +} + +/* Startup the heartbeat thread -- see above */ +static void start_heartbeat_monitor(JCR *jcr) +{ + pthread_t hbtid; + jcr->duped_sd = NULL; + pthread_create(&hbtid, NULL, heartbeat_thread, (void *)jcr); +} + +/* Terminate the heartbeat thread */ +static void stop_heartbeat_monitor(JCR *jcr) +{ + pthread_t hbtid = jcr->heartbeat_id; + + while (jcr->duped_sd == NULL) { + bmicrosleep(0, 500); /* avoid race */ + } + jcr->duped_sd->timed_out = 1; /* set timed_out to terminate read */ + + pthread_kill(hbtid, TIMEOUT_SIGNAL); /* make heartbeat thread go away */ + pthread_join(hbtid, NULL); /* wait for him to clean up */ +} + /* * Find all the requested files and send them - * to the Storage daemon. + * to the Storage daemon. + * + * Note, we normally carry on a one-way + * conversation from this point on with the SD, simply blasting + * data to him. To properly know what is going on, we + * also run a "heartbeat" monitor which reads the socket and + * reacts accordingly (at the moment it has nothing to do + * except echo the heartbeat to the Director). * */ int blast_data_to_storage_daemon(JCR *jcr, char *addr) @@ -67,12 +128,16 @@ int blast_data_to_storage_daemon(JCR *jcr, char *addr) set_find_options((FF_PKT *)jcr->ff, jcr->incremental, jcr->mtime); Dmsg0(110, "start find files\n"); + start_heartbeat_monitor(jcr); + /* Subroutine save_file() is called for each file */ if (!find_files(jcr, (FF_PKT *)jcr->ff, save_file, (void *)jcr)) { stat = 0; /* error */ set_jcr_job_status(jcr, JS_ErrorTerminated); } + stop_heartbeat_monitor(jcr); + bnet_sig(sd, BNET_EOD); /* end data connection */ if (jcr->big_buf) { diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index 6092d10381..ed4d390a5a 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -151,6 +151,7 @@ struct s_jcr { int replace; /* Replace option */ #endif /* DIRECTOR_DAEMON */ + #ifdef FILE_DAEMON /* File Daemon specific part of JCR */ uint32_t num_files_examined; /* files examined this job */ @@ -176,8 +177,11 @@ struct s_jcr { uint32_t StartBlock; uint32_t EndBlock; int use_win_backup_api; /* set to use native Win API */ + pthread_t heartbeat_id; /* id of heartbeat thread */ + BSOCK *duped_sd; /* duped SD socket */ #endif /* FILE_DAEMON */ + #ifdef STORAGE_DAEMON /* Storage Daemon specific part of JCR */ struct s_jcr *next_dev; /* next JCR attached to device */ diff --git a/bacula/src/lib/bnet.c b/bacula/src/lib/bnet.c index ca6bc75dd1..66aa09add0 100644 --- a/bacula/src/lib/bnet.c +++ b/bacula/src/lib/bnet.c @@ -341,7 +341,7 @@ bnet_send(BSOCK *bsock) } } else { Jmsg5(bsock->jcr, M_ERROR, 0, _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"), - bsock->who, bsock->host, bsock->port, bsock->msglen, rc); + bsock->msglen, bsock->who, bsock->host, bsock->port, rc); } return 0; } @@ -722,6 +722,12 @@ dup_bsock(BSOCK *osock) memcpy(bsock, osock, sizeof(BSOCK)); bsock->msg = get_pool_memory(PM_MESSAGE); bsock->errmsg = get_pool_memory(PM_MESSAGE); + if (osock->who) { + bsock->who = bstrdup(osock->who); + } + if (osock->host) { + bsock->host = bstrdup(osock->host); + } bsock->duped = TRUE; return bsock; } @@ -735,12 +741,9 @@ bnet_close(BSOCK *bsock) for ( ; bsock != NULL; bsock = next) { next = bsock->next; if (!bsock->duped) { -// shutdown(bsock->fd, SHUT_RDWR); close(bsock->fd); - term_bsock(bsock); - } else { - free(bsock); } + term_bsock(bsock); } return; } diff --git a/bacula/src/stored/askdir.c b/bacula/src/stored/askdir.c index c0e3470a42..2283834f92 100644 --- a/bacula/src/stored/askdir.c +++ b/bacula/src/stored/askdir.c @@ -30,9 +30,8 @@ #include "stored.h" /* pull in Storage Deamon headers */ /* Requests sent to the Director */ -static char Find_media[] = "CatReq Job=%s FindMedia=%d\n"; +static char Find_media[] = "CatReq Job=%s FindMedia=%d\n"; static char Get_Vol_Info[] = "CatReq Job=%s GetVolInfo VolName=%s write=%d\n"; - static char Update_media[] = "CatReq Job=%s UpdateMedia VolName=%s\ VolJobs=%u VolFiles=%u VolBlocks=%u VolBytes=%s VolMounts=%u\ VolErrors=%u VolWrites=%u MaxVolBytes=%s EndTime=%d VolStatus=%s\ @@ -41,11 +40,8 @@ static char Update_media[] = "CatReq Job=%s UpdateMedia VolName=%s\ static char Create_job_media[] = "CatReq Job=%s CreateJobMedia \ FirstIndex=%u LastIndex=%u StartFile=%u EndFile=%u \ StartBlock=%u EndBlock=%u\n"; - - static char FileAttributes[] = "UpdCat Job=%s FileAttributes "; - -static char Job_status[] = "3012 Job %s jobstatus %d\n"; +static char Job_status[] = "3012 Job %s jobstatus %d\n"; /* Responses received from the Director */ @@ -53,9 +49,10 @@ static char OK_media[] = "1000 OK VolName=%127s VolJobs=%u VolFiles=%u\ VolBlocks=%u VolBytes=%" lld " VolMounts=%u VolErrors=%u VolWrites=%u\ MaxVolBytes=%" lld " VolCapacityBytes=%" lld " VolStatus=%20s\ Slot=%d MaxVolJobs=%u MaxVolFiles=%u\n"; - static char OK_update[] = "1000 OK UpdateMedia\n"; +/* Forward referenced functions */ +static int device_wait(JCR *jcr, DEVICE *dev, int wait_sec); /* * Send current JobStatus to Director @@ -252,9 +249,6 @@ int dir_update_file_attributes(JCR *jcr, DEV_RECORD *rec) */ int dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev) { - struct timeval tv; - struct timezone tz; - struct timespec timeout; int stat = 0, jstat; /* ******FIXME******* put these on config variable */ int min_wait = 60 * 60; @@ -263,7 +257,6 @@ int dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev) int wait_sec; int num_wait = 0; - int dev_blocked; Dmsg0(130, "enter dir_ask_sysop_to_mount_next_volume\n"); ASSERT(dev->dev_blocked); @@ -305,36 +298,11 @@ Please use the \"label\" command to create a new Volume for:\n\ jcr->media_type, jcr->pool_name); } - /* - * Wait then send message again - */ - gettimeofday(&tv, &tz); - timeout.tv_nsec = tv.tv_usec * 1000; - timeout.tv_sec = tv.tv_sec + wait_sec; - P(dev->mutex); - dev_blocked = dev->dev_blocked; - dev->dev_blocked = BST_WAITING_FOR_SYSOP; /* indicate waiting for mount */ jcr->JobStatus = jstat; dir_send_job_status(jcr); - for ( ;!job_canceled(jcr); ) { - Dmsg1(190, "I'm going to sleep on device %s\n", dev->dev_name); - stat = pthread_cond_timedwait(&dev->wait_next_vol, &dev->mutex, &timeout); - if (dev->dev_blocked == BST_WAITING_FOR_SYSOP) { - break; - } - /* - * Someone other than us blocked the device (probably the - * user via the Console program. - * So, we continue waiting. - */ - gettimeofday(&tv, &tz); - timeout.tv_nsec = 0; - timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */ - } - dev->dev_blocked = dev_blocked; - V(dev->mutex); + stat = device_wait(jcr, dev, wait_sec); if (stat == ETIMEDOUT) { wait_sec *= 2; /* double wait time */ @@ -402,11 +370,7 @@ int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev) int max_num_wait = 9; /* 5 waits =~ 1 day, then 1 day at a time */ int wait_sec; int num_wait = 0; - int dev_blocked; char *msg; - struct timeval tv; - struct timezone tz; - struct timespec timeout; Dmsg0(130, "enter dir_ask_sysop_to_mount_next_volume\n"); if (!jcr->VolumeName[0]) { @@ -427,36 +391,10 @@ int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev) Dmsg3(190, "Mount %s on %s for Job %s\n", jcr->VolumeName, jcr->dev_name, jcr->Job); - /* - * Wait then send message again - */ - gettimeofday(&tv, &tz); - timeout.tv_nsec = tv.tv_usec * 1000; - timeout.tv_sec = tv.tv_sec + wait_sec; - - P(dev->mutex); - dev_blocked = dev->dev_blocked; - dev->dev_blocked = BST_WAITING_FOR_SYSOP; /* indicate waiting for mount */ jcr->JobStatus = JS_WaitMount; dir_send_job_status(jcr); - for ( ;!job_canceled(jcr); ) { - Dmsg1(190, "I'm going to sleep on device %s\n", dev->dev_name); - stat = pthread_cond_timedwait(&dev->wait_next_vol, &dev->mutex, &timeout); - if (dev->dev_blocked == BST_WAITING_FOR_SYSOP) { - break; - } - /* - * Someone other than us blocked the device (probably the - * user via the Console program. - * So, we continue waiting. - */ - gettimeofday(&tv, &tz); - timeout.tv_nsec = 0; - timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */ - } - dev->dev_blocked = dev_blocked; - V(dev->mutex); + stat = device_wait(jcr, dev, wait_sec); /* wait on device */ if (stat == ETIMEDOUT) { wait_sec *= 2; /* double wait time */ @@ -495,3 +433,79 @@ int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev) Dmsg0(130, "leave dir_ask_sysop_to_mount_next_volume\n"); return 1; } + +#define HB_TIME 20*60 /* send a heatbeat once every 20 minutes while waiting */ + +static int device_wait(JCR *jcr, DEVICE *dev, int wait_sec) +{ + struct timeval tv; + struct timezone tz; + struct timespec timeout; + int dev_blocked; + time_t start = time(NULL); + time_t last_heartbeat = 0; + int stat = 0; + + /* + * Wait requested time (wait_sec). However, we also wake up every + * HB_TIME seconds and send a heartbeat to the FD and the Director + * to keep stateful firewalls from closing them down while waiting + * for the operator. + */ + gettimeofday(&tv, &tz); + timeout.tv_nsec = tv.tv_usec * 1000; + timeout.tv_sec = tv.tv_sec + (wait_sec > HB_TIME ? HB_TIME: wait_sec); + + P(dev->mutex); + dev_blocked = dev->dev_blocked; + dev->dev_blocked = BST_WAITING_FOR_SYSOP; /* indicate waiting for mount */ + + for ( ; !job_canceled(jcr); ) { + int add_wait; + + Dmsg1(190, "I'm going to sleep on device %s\n", dev->dev_name); + stat = pthread_cond_timedwait(&dev->wait_next_vol, &dev->mutex, &timeout); + + /* Note, this always triggers the first time. We want that. */ + time_t now = time(NULL); + if (now - last_heartbeat >= HB_TIME) { + /* send heartbeats */ + if (jcr->file_bsock) { + bnet_sig(jcr->file_bsock, BNET_HEARTBEAT); + } + if (jcr->dir_bsock) { + bnet_sig(jcr->dir_bsock, BNET_HEARTBEAT); + } + last_heartbeat = now; + } + + /* Check if we blocked the device */ + if (dev->dev_blocked == BST_WAITING_FOR_SYSOP) { + if (stat != ETIMEDOUT) { /* we blocked the device */ + break; /* on error return */ + } + if (now - start >= wait_sec) { /* on exceeding wait time return */ + break; + } + add_wait = wait_sec - (now - start); + if (add_wait > HB_TIME) { + add_wait = HB_TIME; + } + } else { /* Oops someone else has it blocked now */ + add_wait = 10; /* hang around until he releases it */ + } + /* + * Note, if dev_blocked is not BST_WAITING FOR_SYSOP, + * someone other than us has blocked the device (probably the + * user via the Console program), so we continue waiting + * until he releases the device back to us. + */ + gettimeofday(&tv, &tz); + timeout.tv_nsec = tv.tv_usec * 1000; + timeout.tv_sec = tv.tv_sec + add_wait; /* additional wait */ + } + + dev->dev_blocked = dev_blocked; + V(dev->mutex); + return stat; +}