From: Kern Sibbald Date: Mon, 5 May 2003 09:11:30 +0000 (+0000) Subject: Add Dir heartbeat in FD X-Git-Tag: Release-1.31~165 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=84907e655c6902b2bbfd155208e20ed15e889aab;p=bacula%2Fbacula Add Dir heartbeat in FD git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@491 91ce42f0-d328-0410-95d8-f526ca767f89 --- diff --git a/bacula/src/dird/backup.c b/bacula/src/dird/backup.c index f1a4d7e286..358bb96506 100644 --- a/bacula/src/dird/backup.c +++ b/bacula/src/dird/backup.c @@ -273,7 +273,7 @@ static int wait_for_job_termination(JCR *jcr) set_jcr_job_status(jcr, JS_Running); /* Wait for Client to terminate */ - while ((n = bget_msg(fd, 0)) >= 0) { + while ((n = bget_dirmsg(fd)) >= 0) { if (sscanf(fd->msg, EndBackup, &jcr->FDJobStatus, &jcr->JobFiles, &jcr->ReadBytes, &jcr->JobBytes) == 4) { fd_ok = TRUE; diff --git a/bacula/src/dird/fd_cmds.c b/bacula/src/dird/fd_cmds.c index 31ac3208b7..7b2440e450 100644 --- a/bacula/src/dird/fd_cmds.c +++ b/bacula/src/dird/fd_cmds.c @@ -288,7 +288,7 @@ int get_attributes_and_put_in_catalog(JCR *jcr) Dmsg0(120, "bdird: waiting to receive file attributes\n"); /* Pickup file attributes and signature */ - while (!fd->errors && (n = bget_msg(fd, 0)) > 0) { + while (!fd->errors && (n = bget_dirmsg(fd)) > 0) { /*****FIXME****** improve error handling to stop only on * really fatal problems, or the number of errors is too diff --git a/bacula/src/dird/getmsg.c b/bacula/src/dird/getmsg.c index b53fad0346..37fd753b7f 100644 --- a/bacula/src/dird/getmsg.c +++ b/bacula/src/dird/getmsg.c @@ -64,7 +64,7 @@ static char OK_msg[] = "1000 OK\n"; * any message beginning with Jmsg will be processed. * */ -int32_t bget_msg(BSOCK *bs, int rtn) +int bget_dirmsg(BSOCK *bs) { int32_t n; char Job[MAX_NAME_LENGTH]; @@ -75,7 +75,7 @@ int32_t bget_msg(BSOCK *bs, int rtn) for (;;) { n = bnet_recv(bs); - Dmsg2(120, "bget_msg %d: %s\n", n, bs->msg); + Dmsg2(120, "bget_dirmsg %d: %s\n", n, bs->msg); if (is_bnet_stop(bs)) { return n; /* error or terminate */ @@ -105,7 +105,7 @@ int32_t bget_msg(BSOCK *bs, int rtn) bnet_sig(bs, BNET_EOD); break; default: - Emsg1(M_WARNING, 0, _("bget_msg: unknown bnet signal %d\n"), bs->msglen); + Emsg1(M_WARNING, 0, _("bget_dirmsg: unknown bnet signal %d\n"), bs->msglen); return n; } continue; @@ -216,7 +216,7 @@ int response(BSOCK *fd, char *resp, char *cmd, int prtmsg) if (is_bnet_error(fd)) { return 0; } - if ((n = bget_msg(fd, 0)) >= 0) { + if ((n = bget_dirmsg(fd)) >= 0) { Dmsg0(110, fd->msg); if (strcmp(fd->msg, resp) == 0) { return 1; diff --git a/bacula/src/dird/msgchan.c b/bacula/src/dird/msgchan.c index a17c8413b5..808b63c42b 100644 --- a/bacula/src/dird/msgchan.c +++ b/bacula/src/dird/msgchan.c @@ -226,7 +226,7 @@ static void *msg_thread(void *arg) /* Read the Storage daemon's output. */ Dmsg0(200, "Start msg_thread loop\n"); - while ((stat=bget_msg(sd, 0)) >= 0) { + while ((stat=bget_dirmsg(sd)) >= 0) { Dmsg1(200, "msg); if (sscanf(sd->msg, Job_start, &Job) == 1) { continue; diff --git a/bacula/src/dird/protos.h b/bacula/src/dird/protos.h index 5f808e4bf6..b1e1c68bce 100644 --- a/bacula/src/dird/protos.h +++ b/bacula/src/dird/protos.h @@ -48,13 +48,13 @@ extern char *level_to_str(int level); /* fd_cmds.c */ extern int connect_to_file_daemon(JCR *jcr, int retry_interval, - int max_retry_time, int verbose); + int max_retry_time, int verbose); extern int send_include_list(JCR *jcr); extern int send_exclude_list(JCR *jcr); extern int get_attributes_and_put_in_catalog(JCR *jcr); extern int get_attributes_and_compare_to_catalog(JCR *jcr, JobId_t JobId); extern int put_file_into_catalog(JCR *jcr, long file_index, char *fname, - char *link, char *attr, int stream); + char *link, char *attr, int stream); /* job.c */ extern void set_jcr_defaults(JCR *jcr, JOB *job); @@ -67,10 +67,10 @@ extern void mount_request(JCR *jcr, BSOCK *bs, char *buf); /* msgchan.c */ extern int connect_to_storage_daemon(JCR *jcr, int retry_interval, - int max_retry_time, int verbose); + int max_retry_time, int verbose); extern int start_storage_daemon_job(JCR *jcr); extern int start_storage_daemon_message_thread(JCR *jcr); -extern int32_t bget_msg(BSOCK *bs, int type); +extern int bget_dirmsg(BSOCK *bs); extern int response(BSOCK *fd, char *resp, char *cmd, int prtmsg); extern void wait_for_storage_daemon_termination(JCR *jcr); @@ -99,28 +99,28 @@ void prtit(void *ctx, char *msg); void bsendmsg(void *sock, char *fmt, ...); /* ua_select.c */ -STORE *select_storage_resource(UAContext *ua); -JOB *select_job_resource(UAContext *ua); -JOB *select_restore_job_resource(UAContext *ua); -CLIENT *select_client_resource(UAContext *ua); +STORE *select_storage_resource(UAContext *ua); +JOB *select_job_resource(UAContext *ua); +JOB *select_restore_job_resource(UAContext *ua); +CLIENT *select_client_resource(UAContext *ua); FILESET *select_fileset_resource(UAContext *ua); -int select_pool_and_media_dbr(UAContext *ua, POOL_DBR *pr, MEDIA_DBR *mr); -int select_media_dbr(UAContext *ua, MEDIA_DBR *mr); -int select_pool_dbr(UAContext *ua, POOL_DBR *pr); -int select_client_dbr(UAContext *ua, CLIENT_DBR *cr); - -void start_prompt(UAContext *ua, char *msg); -void add_prompt(UAContext *ua, char *prompt); -int do_prompt(UAContext *ua, char *msg, char *prompt, int max_prompt); -CAT *get_catalog_resource(UAContext *ua); +int select_pool_and_media_dbr(UAContext *ua, POOL_DBR *pr, MEDIA_DBR *mr); +int select_media_dbr(UAContext *ua, MEDIA_DBR *mr); +int select_pool_dbr(UAContext *ua, POOL_DBR *pr); +int select_client_dbr(UAContext *ua, CLIENT_DBR *cr); + +void start_prompt(UAContext *ua, char *msg); +void add_prompt(UAContext *ua, char *prompt); +int do_prompt(UAContext *ua, char *msg, char *prompt, int max_prompt); +CAT *get_catalog_resource(UAContext *ua); STORE *get_storage_resource(UAContext *ua, int use_default); -int get_media_type(UAContext *ua, char *MediaType, int max_media); -int get_pool_dbr(UAContext *ua, POOL_DBR *pr); -int get_client_dbr(UAContext *ua, CLIENT_DBR *cr); +int get_media_type(UAContext *ua, char *MediaType, int max_media); +int get_pool_dbr(UAContext *ua, POOL_DBR *pr); +int get_client_dbr(UAContext *ua, CLIENT_DBR *cr); POOL *get_pool_resource(UAContext *ua); POOL *select_pool_resource(UAContext *ua); CLIENT *get_client_resource(UAContext *ua); -int get_job_dbr(UAContext *ua, JOB_DBR *jr); +int get_job_dbr(UAContext *ua, JOB_DBR *jr); int find_arg_keyword(UAContext *ua, char **list); int find_arg(UAContext *ua, char *keyword); diff --git a/bacula/src/dird/restore.c b/bacula/src/dird/restore.c index 513218ae25..0e7fb9d574 100644 --- a/bacula/src/dird/restore.c +++ b/bacula/src/dird/restore.c @@ -253,7 +253,7 @@ int do_restore(JCR *jcr) /* Wait for Job Termination */ Dmsg0(20, "wait for job termination\n"); - while (bget_msg(fd, 0) >= 0) { + while (bget_dirmsg(fd) >= 0) { Dmsg1(100, "dirdmsg); if (sscanf(fd->msg, EndRestore, &jcr->FDJobStatus, &jcr->JobFiles, &jcr->JobBytes) == 3) { diff --git a/bacula/src/dird/ua_label.c b/bacula/src/dird/ua_label.c index 9020445e4f..c424457071 100644 --- a/bacula/src/dird/ua_label.c +++ b/bacula/src/dird/ua_label.c @@ -443,7 +443,7 @@ static int send_label_request(UAContext *ua, MEDIA_DBR *mr, MEDIA_DBR *omr, dev_name, mr->VolumeName, pr->Name, mr->MediaType, mr->Slot); } - while (bget_msg(sd, 0) >= 0) { + while (bget_dirmsg(sd) >= 0) { bsendmsg(ua, "%s", sd->msg); if (strncmp(sd->msg, "3000 OK label.", 14) == 0) { ok = TRUE; @@ -491,7 +491,7 @@ static vol_list_t *get_slot_list_from_SD(UAContext *ua) bnet_fsend(sd, _("autochanger list %s \n"), dev_name); /* Read and organize list of Volumes */ - while (bget_msg(sd, 0) >= 0) { + while (bget_dirmsg(sd) >= 0) { char *p; int Slot; strip_trailing_junk(sd->msg); diff --git a/bacula/src/dird/verify.c b/bacula/src/dird/verify.c index de397ab80d..fbc0726a3f 100644 --- a/bacula/src/dird/verify.c +++ b/bacula/src/dird/verify.c @@ -396,7 +396,7 @@ int get_attributes_and_compare_to_catalog(JCR *jcr, JobId_t JobId) * Attributes * Link name ??? */ - while ((n=bget_msg(fd, 0)) >= 0 && !job_canceled(jcr)) { + while ((n=bget_dirmsg(fd)) >= 0 && !job_canceled(jcr)) { int stream; char *attr, *p, *fn; char Opts_SIG[MAXSTRING]; /* Verify Opts or MD5/SHA1 signature */ diff --git a/bacula/src/filed/filed.c b/bacula/src/filed/filed.c index 7091cf40cb..0ff2d3d5ec 100644 --- a/bacula/src/filed/filed.c +++ b/bacula/src/filed/filed.c @@ -36,6 +36,9 @@ extern void *handle_client_request(void *dir_sock); void terminate_filed(int sig); /* Exported variables */ +CLIENT *me; /* my resource */ +char OK_msg[] = "2000 OK\n"; +char TERM_msg[] = "2999 Terminate\n"; #ifdef HAVE_CYGWIN @@ -52,7 +55,6 @@ static int foreground = 0; static int inetd_request = 0; static workq_t dir_workq; /* queue of work from Director */ -CLIENT *me; /* my resource */ static void usage() { diff --git a/bacula/src/filed/heartbeat.c b/bacula/src/filed/heartbeat.c index ba7841f626..409b4013af 100644 --- a/bacula/src/filed/heartbeat.c +++ b/bacula/src/filed/heartbeat.c @@ -36,7 +36,7 @@ * Send heartbeats to the Director every HB_TIME * seconds. */ -static void *heartbeat_thread(void *arg) +static void *sd_heartbeat_thread(void *arg) { int32_t n; JCR *jcr = (JCR *)arg; @@ -50,7 +50,7 @@ static void *heartbeat_thread(void *arg) sd = dup_bsock(jcr->store_bsock); dir = dup_bsock(jcr->dir_bsock); - jcr->duped_sd = sd; + jcr->hb_bsock = sd; /* Hang reading the socket to the SD, and every time we get * a heartbeat, we simply send it on to the Director to @@ -72,43 +72,80 @@ static void *heartbeat_thread(void *arg) } bnet_close(sd); bnet_close(dir); - jcr->duped_sd = NULL; + jcr->hb_bsock = NULL; return NULL; } /* Startup the heartbeat thread -- see above */ void start_heartbeat_monitor(JCR *jcr) { - jcr->duped_sd = NULL; - pthread_create(&jcr->heartbeat_id, NULL, heartbeat_thread, (void *)jcr); + jcr->hb_bsock = NULL; + pthread_create(&jcr->heartbeat_id, NULL, sd_heartbeat_thread, (void *)jcr); } -/* Terminate the heartbeat thread */ +/* Terminate the heartbeat thread. Used for both SD and DIR */ void stop_heartbeat_monitor(JCR *jcr) { /* Wait for heartbeat thread to start */ - while (jcr->duped_sd == NULL) { + while (jcr->hb_bsock == NULL) { bmicrosleep(0, 50); /* avoid race */ } - jcr->duped_sd->timed_out = 1; /* set timed_out to terminate read */ - jcr->duped_sd->terminated = 1; /* set to terminate read */ + jcr->hb_bsock->timed_out = 1; /* set timed_out to terminate read */ + jcr->hb_bsock->terminated = 1; /* set to terminate read */ /* Wait for heartbeat thread to stop */ - while (jcr->duped_sd) { + while (jcr->hb_bsock) { pthread_kill(jcr->heartbeat_id, TIMEOUT_SIGNAL); /* make heartbeat thread go away */ - bmicrosleep(0, 20); + bmicrosleep(0, 500); } } +/* + * Thread for sending heartbeats to the Director when there + * is no SD monitoring needed -- e.g. restore and verify Vol + * both do their own read() on the SD socket. + */ +static void *dir_heartbeat_thread(void *arg) +{ + JCR *jcr = (JCR *)arg; + BSOCK *dir; + time_t last_heartbeat = time(NULL); + + pthread_detach(pthread_self()); + + /* Get our own local copy */ + dir = dup_bsock(jcr->dir_bsock); + + jcr->hb_bsock = dir; + + for ( ; !is_bnet_stop(dir); ) { + time_t now, next; + + now = time(NULL); + next = now - last_heartbeat; + if (next >= me->heartbeat_interval) { + bnet_sig(dir, BNET_HEARTBEAT); + last_heartbeat = now; + } + bmicrosleep(next, 0); + } + bnet_close(dir); + jcr->hb_bsock = NULL; + return NULL; +} /* * Same as above but we don't listen to the SD */ void start_dir_heartbeat(JCR *jcr) { - /* ***FIXME*** implement */ + if (me->heartbeat_interval) { + pthread_create(&jcr->heartbeat_id, NULL, dir_heartbeat_thread, (void *)jcr); + } } void stop_dir_heartbeat(JCR *jcr) { - /* ***FIXME*** implement */ + if (me->heartbeat_interval) { + stop_heartbeat_monitor(jcr); + } } diff --git a/bacula/src/filed/job.c b/bacula/src/filed/job.c index 020e49bac4..37e0d4b098 100644 --- a/bacula/src/filed/job.c +++ b/bacula/src/filed/job.c @@ -531,7 +531,7 @@ static int backup_cmd(JCR *jcr) /* * Expect to receive back the Ticket number */ - if (bnet_recv(sd) >= 0) { + if (bget_msg(sd) >= 0) { Dmsg1(110, "msg); if (sscanf(sd->msg, OK_open, &jcr->Ticket) != 1) { Jmsg(jcr, M_FATAL, 0, _("Bad response to append open: %s\n"), sd->msg); @@ -595,7 +595,7 @@ static int backup_cmd(JCR *jcr) * Send Append Close to Storage daemon */ bnet_fsend(sd, append_close, jcr->Ticket); - while (bnet_recv(sd) >= 0) { /* stop on signal or error */ + while (bget_msg(sd) >= 0) { /* stop on signal or error */ if (sscanf(sd->msg, OK_close, &SDJobStatus) == 1) { ok = 1; Dmsg2(200, "SDJobStatus = %d %c\n", SDJobStatus, (char)SDJobStatus); @@ -678,7 +678,7 @@ static int verify_cmd(JCR *jcr) Dmsg1(130, "bfiled>stored: %s", sd->msg); /* ****FIXME**** check response */ - bnet_recv(sd); /* get OK */ + bget_msg(sd); /* get OK */ /* Inform Storage daemon that we are done */ bnet_sig(sd, BNET_TERMINATE); @@ -761,7 +761,7 @@ static int restore_cmd(JCR *jcr) bnet_fsend(sd, read_close, jcr->Ticket); Dmsg1(130, "bfiled>stored: %s", sd->msg); - bnet_recv(sd); /* get OK */ + bget_msg(sd); /* get OK */ /* Inform Storage daemon that we are done */ bnet_sig(sd, BNET_TERMINATE); @@ -800,7 +800,7 @@ static int open_sd_read_session(JCR *jcr) /* * Get ticket number */ - if (bnet_recv(sd) >= 0) { + if (bget_msg(sd) >= 0) { Dmsg1(110, "bfiledmsg); if (sscanf(sd->msg, OK_open, &jcr->Ticket) != 1) { Jmsg(jcr, M_FATAL, 0, _("Bad response to SD read open: %s\n"), sd->msg); @@ -867,7 +867,7 @@ int response(JCR *jcr, BSOCK *sd, char *resp, char *cmd) if (sd->errors) { return 0; } - if ((n = bnet_recv(sd)) > 0) { + if ((n = bget_msg(sd)) > 0) { Dmsg0(110, sd->msg); if (strcmp(sd->msg, resp) == 0) { return 1; diff --git a/bacula/src/filed/restore.c b/bacula/src/filed/restore.c index 72f3d680a8..1df512a849 100644 --- a/bacula/src/filed/restore.c +++ b/bacula/src/filed/restore.c @@ -94,7 +94,7 @@ void do_restore(JCR *jcr) * or c. Possibly MD5 or SHA1 record * 3. Repeat step 1 */ - while (bnet_recv(sd) >= 0 && !job_canceled(jcr)) { + while (bget_msg(sd) >= 0 && !job_canceled(jcr)) { /* * First we expect a Stream Record Header */ @@ -108,7 +108,7 @@ void do_restore(JCR *jcr) /* * Now we expect the Stream Data */ - if (bnet_recv(sd) < 0) { + if (bget_msg(sd) < 0) { Jmsg1(jcr, M_FATAL, 0, _("Data record error. ERR=%s\n"), bnet_strerror(sd)); goto bail_out; } diff --git a/bacula/src/filed/verify_vol.c b/bacula/src/filed/verify_vol.c index ea12a055c1..306e4cf90e 100644 --- a/bacula/src/filed/verify_vol.c +++ b/bacula/src/filed/verify_vol.c @@ -75,7 +75,7 @@ void do_verify_volume(JCR *jcr) /* * Get a record from the Storage daemon */ - while (bnet_recv(sd) >= 0 && !job_canceled(jcr)) { + while (bget_msg(sd) >= 0 && !job_canceled(jcr)) { /* * First we expect a Stream Record Header */ @@ -89,7 +89,7 @@ void do_verify_volume(JCR *jcr) /* * Now we expect the Stream Data */ - if (bnet_recv(sd) < 0 && !job_canceled(jcr)) { + if (bget_msg(sd) < 0) { Jmsg1(jcr, M_FATAL, 0, _("Data record error. ERR=%s\n"), bnet_strerror(sd)); goto bail_out; } diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index dfd9db3d87..5a1931c737 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -179,7 +179,7 @@ struct s_jcr { uint32_t EndBlock; int use_win_backup_api; /* set to use native Win API */ pthread_t heartbeat_id; /* id of heartbeat thread */ - BSOCK *duped_sd; /* duped SD socket */ + BSOCK *hb_bsock; /* duped SD socket */ #endif /* FILE_DAEMON */ diff --git a/bacula/src/lib/Makefile.in b/bacula/src/lib/Makefile.in index 140fad3a9a..e15e0af142 100644 --- a/bacula/src/lib/Makefile.in +++ b/bacula/src/lib/Makefile.in @@ -31,7 +31,8 @@ GMP_INC=@GMP_INC@ first_rule: all dummy: -LIBSRCS = alloc.c base64.c bmisc.c bnet.c bnet_server.c \ +LIBSRCS = alloc.c base64.c bmisc.c bget_msg.c \ + bnet.c bnet_server.c \ bpipe.c bshm.c btime.c \ cram-md5.c crc32.c daemon.c edit.c fnmatch.c \ hmac.c idcache.c jcr.c lex.c \ @@ -42,7 +43,8 @@ LIBSRCS = alloc.c base64.c bmisc.c bnet.c bnet_server.c \ # immortal.c filesys.c -LIBOBJS = alloc.o base64.o bmisc.o bnet.o bnet_server.o \ +LIBOBJS = alloc.o base64.o bmisc.o bget_msg.o \ + bnet.o bnet_server.o \ bpipe.o bshm.o btime.o \ cram-md5.o crc32.o daemon.o edit.o fnmatch.o \ hmac.o idcache.o jcr.o lex.o \ diff --git a/bacula/src/lib/protos.h b/bacula/src/lib/protos.h index 93f4801715..d5b0e8fd12 100644 --- a/bacula/src/lib/protos.h +++ b/bacula/src/lib/protos.h @@ -24,140 +24,142 @@ */ /* base64.c */ -void base64_init (void); -int to_base64 (intmax_t value, char *where); -int from_base64 (intmax_t *value, char *where); -int bin_to_base64 (char *buf, char *bin, int len); +void base64_init (void); +int to_base64 (intmax_t value, char *where); +int from_base64 (intmax_t *value, char *where); +int bin_to_base64 (char *buf, char *bin, int len); /* bmisc.c */ -char *bstrncpy (char *dest, const char *src, int maxlen); -char *bstrncat (char *dest, const char *src, int maxlen); -void *b_malloc (char *file, int line, size_t size); +char *bstrncpy (char *dest, const char *src, int maxlen); +char *bstrncat (char *dest, const char *src, int maxlen); +void *b_malloc (char *file, int line, size_t size); #ifndef DEBUG -void *bmalloc (size_t size); +void *bmalloc (size_t size); #endif -void *brealloc (void *buf, size_t size); -void *bcalloc (size_t size1, size_t size2); -int bsnprintf (char *str, size_t size, const char *format, ...); -int bvsnprintf (char *str, size_t size, const char *format, va_list ap); -int pool_sprintf (char *pool_buf, char *fmt, ...); -void create_pid_file (char *dir, char *progname, int port); -int delete_pid_file (char *dir, char *progname, int port); -void drop (char *uid, char *gid); -int bmicrosleep (time_t sec, long msec); +void *brealloc (void *buf, size_t size); +void *bcalloc (size_t size1, size_t size2); +int bsnprintf (char *str, size_t size, const char *format, ...); +int bvsnprintf (char *str, size_t size, const char *format, va_list ap); +int pool_sprintf (char *pool_buf, char *fmt, ...); +void create_pid_file (char *dir, char *progname, int port); +int delete_pid_file (char *dir, char *progname, int port); +void drop (char *uid, char *gid); +int bmicrosleep (time_t sec, long msec); /* bnet.c */ -int32_t bnet_recv (BSOCK *bsock); -int bnet_send (BSOCK *bsock); -int bnet_fsend (BSOCK *bs, char *fmt, ...); -int bnet_set_buffer_size (BSOCK *bs, uint32_t size, int rw); -int bnet_sig (BSOCK *bs, int sig); -int bnet_ssl_server (BSOCK *bsock, char *password, int ssl_need, int ssl_has); -int bnet_ssl_client (BSOCK *bsock, char *password, int ssl_need); -BSOCK * bnet_connect (void *jcr, int retry_interval, - int max_retry_time, char *name, char *host, char *service, - int port, int verbose); -void bnet_close (BSOCK *bsock); -BSOCK * init_bsock (void *jcr, int sockfd, char *who, char *ip, int port); -BSOCK * dup_bsock (BSOCK *bsock); -void term_bsock (BSOCK *bsock); -char * bnet_strerror (BSOCK *bsock); -char * bnet_sig_to_ascii (BSOCK *bsock); -int bnet_wait_data (BSOCK *bsock, int sec); -int bnet_wait_data_intr (BSOCK *bsock, int sec); -int bnet_despool (BSOCK *bsock); -int is_bnet_stop (BSOCK *bsock); -int is_bnet_error (BSOCK *bsock); -void bnet_suppress_error_messages(BSOCK *bsock, int flag); - +int32_t bnet_recv (BSOCK *bsock); +int bnet_send (BSOCK *bsock); +int bnet_fsend (BSOCK *bs, char *fmt, ...); +int bnet_set_buffer_size (BSOCK *bs, uint32_t size, int rw); +int bnet_sig (BSOCK *bs, int sig); +int bnet_ssl_server (BSOCK *bsock, char *password, int ssl_need, int ssl_has); +int bnet_ssl_client (BSOCK *bsock, char *password, int ssl_need); +BSOCK * bnet_connect (void *jcr, int retry_interval, + int max_retry_time, char *name, char *host, char *service, + int port, int verbose); +void bnet_close (BSOCK *bsock); +BSOCK * init_bsock (void *jcr, int sockfd, char *who, char *ip, int port); +BSOCK * dup_bsock (BSOCK *bsock); +void term_bsock (BSOCK *bsock); +char * bnet_strerror (BSOCK *bsock); +char * bnet_sig_to_ascii (BSOCK *bsock); +int bnet_wait_data (BSOCK *bsock, int sec); +int bnet_wait_data_intr (BSOCK *bsock, int sec); +int bnet_despool (BSOCK *bsock); +int is_bnet_stop (BSOCK *bsock); +int is_bnet_error (BSOCK *bsock); +void bnet_suppress_error_messages(BSOCK *bsock, int flag); + +/* bget_msg.c */ +int bget_msg(BSOCK *sock); /* cram-md5.c */ int cram_md5_get_auth(BSOCK *bs, char *password, int ssl_need); int cram_md5_auth(BSOCK *bs, char *password, int ssl_need); void hmac_md5(uint8_t* text, int text_len, uint8_t* key, - int key_len, uint8_t *hmac); + int key_len, uint8_t *hmac); /* crc32.c */ uint32_t bcrc32(uint8_t *buf, int len); /* daemon.c */ -void daemon_start (); +void daemon_start (); /* edit.c */ -uint64_t str_to_uint64(char *str); -int64_t str_to_int64(char *str); -char * edit_uint64_with_commas (uint64_t val, char *buf); -char * add_commas (char *val, char *buf); -char * edit_uint64 (uint64_t val, char *buf); -int duration_to_utime (char *str, utime_t *value); -int size_to_uint64(char *str, int str_len, uint64_t *rtn_value); -char *edit_utime (utime_t val, char *buf); -int is_a_number (const char *num); -int is_an_integer (const char *n); +uint64_t str_to_uint64(char *str); +int64_t str_to_int64(char *str); +char * edit_uint64_with_commas (uint64_t val, char *buf); +char * add_commas (char *val, char *buf); +char * edit_uint64 (uint64_t val, char *buf); +int duration_to_utime (char *str, utime_t *value); +int size_to_uint64(char *str, int str_len, uint64_t *rtn_value); +char *edit_utime (utime_t val, char *buf); +int is_a_number (const char *num); +int is_an_integer (const char *n); /* lex.c */ -LEX * lex_close_file (LEX *lf); -LEX * lex_open_file (LEX *lf, char *fname, LEX_ERROR_HANDLER *scan_error); -int lex_get_char (LEX *lf); -void lex_unget_char (LEX *lf); -char * lex_tok_to_str (int token); -int lex_get_token (LEX *lf, int expect); +LEX * lex_close_file (LEX *lf); +LEX * lex_open_file (LEX *lf, char *fname, LEX_ERROR_HANDLER *scan_error); +int lex_get_char (LEX *lf); +void lex_unget_char (LEX *lf); +char * lex_tok_to_str (int token); +int lex_get_token (LEX *lf, int expect); /* message.c */ -void my_name_is (int argc, char *argv[], char *name); -void init_msg (void *jcr, MSGS *msg); -void term_msg (void); -void close_msg (void *jcr); -void add_msg_dest (MSGS *msg, int dest, int type, char *where, char *dest_code); -void rem_msg_dest (MSGS *msg, int dest, int type, char *where); -void Jmsg (void *jcr, int type, int level, char *fmt, ...); -void dispatch_message (void *jcr, int type, int level, char *buf); -void init_console_msg (char *wd); -void free_msgs_res (MSGS *msgs); -int open_spool_file (void *jcr, BSOCK *bs); -int close_spool_file (void *vjcr, BSOCK *bs); +void my_name_is (int argc, char *argv[], char *name); +void init_msg (void *jcr, MSGS *msg); +void term_msg (void); +void close_msg (void *jcr); +void add_msg_dest (MSGS *msg, int dest, int type, char *where, char *dest_code); +void rem_msg_dest (MSGS *msg, int dest, int type, char *where); +void Jmsg (void *jcr, int type, int level, char *fmt, ...); +void dispatch_message (void *jcr, int type, int level, char *buf); +void init_console_msg (char *wd); +void free_msgs_res (MSGS *msgs); +int open_spool_file (void *jcr, BSOCK *bs); +int close_spool_file (void *vjcr, BSOCK *bs); /* bnet_server.c */ -void bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_wq, - void *handle_client_request(void *bsock)); -void bnet_server (int port, void handle_client_request(BSOCK *bsock)); -int net_connect (int port); -BSOCK * bnet_bind (int port); -BSOCK * bnet_accept (BSOCK *bsock, char *who); +void bnet_thread_server(char *bind_addr, int port, int max_clients, workq_t *client_wq, + void *handle_client_request(void *bsock)); +void bnet_server (int port, void handle_client_request(BSOCK *bsock)); +int net_connect (int port); +BSOCK * bnet_bind (int port); +BSOCK * bnet_accept (BSOCK *bsock, char *who); /* signal.c */ -void init_signals (void terminate(int sig)); -void init_stack_dump (void); +void init_signals (void terminate(int sig)); +void init_stack_dump (void); /* util.c */ -void lcase (char *str); -void bash_spaces (char *str); -void unbash_spaces (char *str); -void strip_trailing_junk (char *str); -void strip_trailing_slashes (char *dir); -int skip_spaces (char **msg); -int skip_nonspaces (char **msg); -int fstrsch (char *a, char *b); -char * encode_time (time_t time, char *buf); -char * encode_mode (mode_t mode, char *buf); -int do_shell_expansion (char *name); -int is_buf_zero (char *buf, int len); -void jobstatus_to_ascii (int JobStatus, char *msg, int maxlen); -void pm_strcat (POOLMEM **pm, char *str); -void pm_strcpy (POOLMEM **pm, char *str); -int run_program (char *prog, int wait, POOLMEM *results); -char * job_type_to_str (int type); -char * job_status_to_str (int stat); -char * job_level_to_str (int level); -void makeSessionKey (char *key, char *seed, int mode); -BPIPE * open_bpipe(char *prog, int wait, char *mode); -int close_wpipe(BPIPE *bpipe); -int close_bpipe(BPIPE *bpipe); -POOLMEM *edit_job_codes(void *jcr, char *omsg, char *imsg, char *to); +void lcase (char *str); +void bash_spaces (char *str); +void unbash_spaces (char *str); +void strip_trailing_junk (char *str); +void strip_trailing_slashes (char *dir); +int skip_spaces (char **msg); +int skip_nonspaces (char **msg); +int fstrsch (char *a, char *b); +char * encode_time (time_t time, char *buf); +char * encode_mode (mode_t mode, char *buf); +int do_shell_expansion (char *name); +int is_buf_zero (char *buf, int len); +void jobstatus_to_ascii (int JobStatus, char *msg, int maxlen); +void pm_strcat (POOLMEM **pm, char *str); +void pm_strcpy (POOLMEM **pm, char *str); +int run_program (char *prog, int wait, POOLMEM *results); +char * job_type_to_str (int type); +char * job_status_to_str (int stat); +char * job_level_to_str (int level); +void makeSessionKey (char *key, char *seed, int mode); +BPIPE * open_bpipe(char *prog, int wait, char *mode); +int close_wpipe(BPIPE *bpipe); +int close_bpipe(BPIPE *bpipe); +POOLMEM *edit_job_codes(void *jcr, char *omsg, char *imsg, char *to); void parse_command_args(POOLMEM *cmd, POOLMEM *args, int *argc, - char **argk, char **argv); + char **argk, char **argv); char *next_arg(char **s); diff --git a/bacula/src/stored/Makefile.in b/bacula/src/stored/Makefile.in index cbb330fb9c..245fcddf7c 100644 --- a/bacula/src/stored/Makefile.in +++ b/bacula/src/stored/Makefile.in @@ -21,13 +21,13 @@ dummy: SVRSRCS = stored.c autochanger.c acquire.c append.c \ askdir.c authenticate.c \ block.c dev.c \ - device.c dircmd.c fd_cmds.c fdmsg.c job.c \ + device.c dircmd.c fd_cmds.c job.c \ label.c match_bsr.c parse_bsr.c \ read.c record.c stored_conf.c mount.c SVROBJS = stored.o autochanger.o acquire.o append.o \ askdir.o authenticate.o \ block.o dev.o \ - device.o dircmd.o fd_cmds.o fdmsg.o job.o \ + device.o dircmd.o fd_cmds.o job.o \ label.o match_bsr.o mount.o parse_bsr.o \ read.o record.o stored_conf.o diff --git a/bacula/src/stored/protos.h b/bacula/src/stored/protos.h index 693ce620c5..ccc0d9f165 100644 --- a/bacula/src/stored/protos.h +++ b/bacula/src/stored/protos.h @@ -28,82 +28,82 @@ uint32_t new_VolSessionId(); /* From acquire.c */ -DEVICE *acquire_device_for_append(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -int acquire_device_for_read(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -int release_device(JCR *jcr, DEVICE *dev); +DEVICE *acquire_device_for_append(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int acquire_device_for_read(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int release_device(JCR *jcr, DEVICE *dev); /* From askdir.c */ -int dir_get_volume_info(JCR *jcr, int writing); -int dir_find_next_appendable_volume(JCR *jcr); -int dir_update_volume_info(JCR *jcr, VOLUME_CAT_INFO *vol, int relabel); -int dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev); -int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev); -int dir_update_file_attributes(JCR *jcr, DEV_RECORD *rec); -int dir_send_job_status(JCR *jcr); -int dir_create_jobmedia_record(JCR *jcr); +int dir_get_volume_info(JCR *jcr, int writing); +int dir_find_next_appendable_volume(JCR *jcr); +int dir_update_volume_info(JCR *jcr, VOLUME_CAT_INFO *vol, int relabel); +int dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev); +int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev); +int dir_update_file_attributes(JCR *jcr, DEV_RECORD *rec); +int dir_send_job_status(JCR *jcr); +int dir_create_jobmedia_record(JCR *jcr); /* authenticate.c */ -int authenticate_director(JCR *jcr); -int authenticate_filed(JCR *jcr); +int authenticate_director(JCR *jcr); +int authenticate_filed(JCR *jcr); /* From block.c */ -void dump_block(DEV_BLOCK *b, char *msg); +void dump_block(DEV_BLOCK *b, char *msg); DEV_BLOCK *new_block(DEVICE *dev); -void init_block_write(DEV_BLOCK *block); -void empty_block(DEV_BLOCK *block); -void free_block(DEV_BLOCK *block); -int write_block_to_device(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -int write_block_to_dev(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -int read_block_from_device(DEVICE *dev, DEV_BLOCK *block); -int read_block_from_dev(DEVICE *dev, DEV_BLOCK *block); +void init_block_write(DEV_BLOCK *block); +void empty_block(DEV_BLOCK *block); +void free_block(DEV_BLOCK *block); +int write_block_to_device(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int write_block_to_dev(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int read_block_from_device(DEVICE *dev, DEV_BLOCK *block); +int read_block_from_dev(DEVICE *dev, DEV_BLOCK *block); /* From butil.c -- utilities for SD tool programs */ -void print_ls_output(char *fname, char *link, int type, struct stat *statp); +void print_ls_output(char *fname, char *link, int type, struct stat *statp); JCR *setup_jcr(char *name, char *device, BSR *bsr, char *VolumeName); DEVICE *setup_to_access_device(JCR *jcr, int read_access); -void display_error_status(DEVICE *dev); +void display_error_status(DEVICE *dev); DEVRES *find_device_res(char *device_name, int read_access); /* From dev.c */ -DEVICE *init_dev(DEVICE *dev, DEVRES *device); -int open_dev(DEVICE *dev, char *VolName, int mode); -void close_dev(DEVICE *dev); -void force_close_dev(DEVICE *dev); -int truncate_dev(DEVICE *dev); -void term_dev(DEVICE *dev); -char * strerror_dev(DEVICE *dev); -void clrerror_dev(DEVICE *dev, int func); -int update_pos_dev(DEVICE *dev); -int rewind_dev(DEVICE *dev); -int load_dev(DEVICE *dev); -int offline_dev(DEVICE *dev); -int flush_dev(DEVICE *dev); -int weof_dev(DEVICE *dev, int num); -int write_block(DEVICE *dev); -int write_dev(DEVICE *dev, char *buf, size_t len); -int read_dev(DEVICE *dev, char *buf, size_t len); -int status_dev(DEVICE *dev, uint32_t *status); -int eod_dev(DEVICE *dev); -int fsf_dev(DEVICE *dev, int num); -int fsr_dev(DEVICE *dev, int num); -int bsf_dev(DEVICE *dev, int num); -int bsr_dev(DEVICE *dev, int num); -void attach_jcr_to_device(DEVICE *dev, JCR *jcr); -void detach_jcr_from_device(DEVICE *dev, JCR *jcr); -JCR *next_attached_jcr(DEVICE *dev, JCR *jcr); +DEVICE *init_dev(DEVICE *dev, DEVRES *device); +int open_dev(DEVICE *dev, char *VolName, int mode); +void close_dev(DEVICE *dev); +void force_close_dev(DEVICE *dev); +int truncate_dev(DEVICE *dev); +void term_dev(DEVICE *dev); +char * strerror_dev(DEVICE *dev); +void clrerror_dev(DEVICE *dev, int func); +int update_pos_dev(DEVICE *dev); +int rewind_dev(DEVICE *dev); +int load_dev(DEVICE *dev); +int offline_dev(DEVICE *dev); +int flush_dev(DEVICE *dev); +int weof_dev(DEVICE *dev, int num); +int write_block(DEVICE *dev); +int write_dev(DEVICE *dev, char *buf, size_t len); +int read_dev(DEVICE *dev, char *buf, size_t len); +int status_dev(DEVICE *dev, uint32_t *status); +int eod_dev(DEVICE *dev); +int fsf_dev(DEVICE *dev, int num); +int fsr_dev(DEVICE *dev, int num); +int bsf_dev(DEVICE *dev, int num); +int bsr_dev(DEVICE *dev, int num); +void attach_jcr_to_device(DEVICE *dev, JCR *jcr); +void detach_jcr_from_device(DEVICE *dev, JCR *jcr); +JCR *next_attached_jcr(DEVICE *dev, JCR *jcr); /* Get info about device */ -char * dev_name(DEVICE *dev); -char * dev_vol_name(DEVICE *dev); +char * dev_name(DEVICE *dev); +char * dev_vol_name(DEVICE *dev); uint32_t dev_block(DEVICE *dev); uint32_t dev_file(DEVICE *dev); -int dev_is_tape(DEVICE *dev); +int dev_is_tape(DEVICE *dev); /* From device.c */ -int open_device(DEVICE *dev); -int fixup_device_block_write_error(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int open_device(DEVICE *dev); +int fixup_device_block_write_error(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); void _lock_device(char *file, int line, DEVICE *dev); void _unlock_device(char *file, int line, DEVICE *dev); void _block_device(char *file, int line, DEVICE *dev, int state); @@ -119,43 +119,40 @@ void new_steal_device_lock(DEVICE *dev, brwsteal_t *hold, int state); void new_return_device_lock(DEVICE *dev, brwsteal_t *hold); /* From dircmd.c */ -void *connection_request(void *arg); +void *connection_request(void *arg); /* From fd_cmds.c */ -void run_job(JCR *jcr); - -/* From fdmsg.c */ -int bget_msg(BSOCK *sock); +void run_job(JCR *jcr); /* From job.c */ -void stored_free_jcr(JCR *jcr); -void connection_from_filed(void *arg); -void handle_filed_connection(BSOCK *fd, char *job_name); +void stored_free_jcr(JCR *jcr); +void connection_from_filed(void *arg); +void handle_filed_connection(BSOCK *fd, char *job_name); /* From label.c */ -int read_dev_volume_label(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -void create_session_label(JCR *jcr, DEV_RECORD *rec, int label); -void create_volume_label(DEVICE *dev, char *VolName); -int write_volume_label_to_dev(JCR *jcr, DEVRES *device, char *VolName, char *PoolName); -int write_session_label(JCR *jcr, DEV_BLOCK *block, int label); -int write_volume_label_to_block(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); -void dump_volume_label(DEVICE *dev); -void dump_label_record(DEVICE *dev, DEV_RECORD *rec, int verbose); -int unser_volume_label(DEVICE *dev, DEV_RECORD *rec); -int unser_session_label(SESSION_LABEL *label, DEV_RECORD *rec); +int read_dev_volume_label(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +void create_session_label(JCR *jcr, DEV_RECORD *rec, int label); +void create_volume_label(DEVICE *dev, char *VolName); +int write_volume_label_to_dev(JCR *jcr, DEVRES *device, char *VolName, char *PoolName); +int write_session_label(JCR *jcr, DEV_BLOCK *block, int label); +int write_volume_label_to_block(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +void dump_volume_label(DEVICE *dev); +void dump_label_record(DEVICE *dev, DEV_RECORD *rec, int verbose); +int unser_volume_label(DEVICE *dev, DEV_RECORD *rec); +int unser_session_label(SESSION_LABEL *label, DEV_RECORD *rec); /* From match_bsr.c */ int match_bsr(BSR *bsr, DEV_RECORD *rec, VOLUME_LABEL *volrec, - SESSION_LABEL *sesrec); + SESSION_LABEL *sesrec); /* From mount.c */ -int mount_next_write_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block, int release); -int mount_next_read_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); +int mount_next_write_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block, int release); +int mount_next_read_volume(JCR *jcr, DEVICE *dev, DEV_BLOCK *block); /* From autochanger.c */ -int autoload_device(JCR *jcr, DEVICE *dev, int writing, BSOCK *dir); -int autochanger_list(JCR *jcr, DEVICE *dev, BSOCK *dir); +int autoload_device(JCR *jcr, DEVICE *dev, int writing, BSOCK *dir); +int autochanger_list(JCR *jcr, DEVICE *dev, BSOCK *dir); /* From parse_bsr.c */ @@ -170,11 +167,11 @@ extern void create_vol_list(JCR *jcr); /* From record.c */ char *FI_to_ascii(int fi); char *stream_to_ascii(int stream, int fi); -int write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec); -int can_write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec); -int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec); +int write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec); +int can_write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec); +int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec); DEV_RECORD *new_record(); -void free_record(DEV_RECORD *rec); +void free_record(DEV_RECORD *rec); /* From read_record.c */ int read_records(JCR *jcr, DEVICE *dev, diff --git a/bacula/src/stored/stored.c b/bacula/src/stored/stored.c index 1a32be9ac1..04840dbc16 100644 --- a/bacula/src/stored/stored.c +++ b/bacula/src/stored/stored.c @@ -44,21 +44,19 @@ static void *device_allocation(void *arg); /* Global variables exported */ +char OK_msg[] = "3000 OK\n"; +char TERM_msg[] = "3999 Terminate\n"; +STORES *me; /* our Global resource */ - -/* This is our own global resource */ -STORES *me; - -static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static uint32_t VolSessionId = 0; uint32_t VolSessionTime; - char *configfile; -static int foreground = 0; +/* Global static variables */ +static int foreground = 0; +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static workq_t dird_workq; /* queue for processing connections */ - static void usage() { fprintf(stderr, _( diff --git a/bacula/src/version.h b/bacula/src/version.h index abc03071fb..2e2802ec2f 100644 --- a/bacula/src/version.h +++ b/bacula/src/version.h @@ -1,8 +1,8 @@ /* */ #define VERSION "1.31" #define VSTRING "1" -#define BDATE "04 May 2003" -#define LSMDATE "04May03" +#define BDATE "05 May 2003" +#define LSMDATE "05May03" /* Debug flags */ #define DEBUG 1