From b435bf6e3527af42c954cf1b6be0207078741d61 Mon Sep 17 00:00:00 2001 From: Eric Bollengier Date: Fri, 23 Apr 2010 08:15:51 +0200 Subject: [PATCH] Fix cancel crash bug #1551 --- bacula/src/dird/job.c | 35 ++++++++++++++--------------- bacula/src/dird/jobq.c | 2 +- bacula/src/dird/msgchan.c | 7 +++--- bacula/src/dird/protos.h | 1 + bacula/src/filed/job.c | 2 +- bacula/src/jcr.h | 3 ++- bacula/src/lib/jcr.c | 45 +++++++++++++++++++++++++++++++++----- bacula/src/lib/protos.h | 2 +- bacula/src/stored/stored.c | 4 ++-- 9 files changed, 69 insertions(+), 32 deletions(-) diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index 1595f56c18..88ac5b84e4 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -364,6 +364,18 @@ static void *job_thread(void *arg) return NULL; } +void sd_msg_thread_send_signal(JCR *jcr, int sig) +{ + jcr->lock(); + if ( !jcr->sd_msg_thread_done + && jcr->SD_msg_chan + && !pthread_equal(jcr->SD_msg_chan, pthread_self())) + { + Dmsg1(800, "Send kill to SD msg chan jid=%d\n", jcr->JobId); + pthread_kill(jcr->SD_msg_chan, sig); + } + jcr->unlock(); +} /* * Cancel a job -- typically called by the UA (Console program), but may also @@ -411,10 +423,7 @@ bool cancel_job(UAContext *ua, JCR *jcr) fd->close(); ua->jcr->file_bsock = NULL; jcr->file_bsock->set_terminated(); - if (jcr->my_thread_id && !pthread_equal(jcr->my_thread_id, pthread_self())) { - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); - Dmsg1(800, "Send kill to jid=%d\n", jcr->JobId); - } + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } /* Cancel Storage daemon */ @@ -450,13 +459,8 @@ bool cancel_job(UAContext *ua, JCR *jcr) ua->jcr->store_bsock = NULL; jcr->store_bsock->set_timed_out(); jcr->store_bsock->set_terminated(); - if (jcr->SD_msg_chan && !pthread_equal(jcr->SD_msg_chan, pthread_self())) { - Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count()); - pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL); - } - if (jcr->my_thread_id && !pthread_equal(jcr->my_thread_id, pthread_self())) { - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); - } + sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL); + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } break; } @@ -506,13 +510,8 @@ void cancel_storage_daemon_job(JCR *jcr) jcr->sd_canceled = true; jcr->store_bsock->set_timed_out(); jcr->store_bsock->set_terminated(); - if (jcr->SD_msg_chan && !pthread_equal(jcr->SD_msg_chan, pthread_self())) { - Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count()); - pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL); - } - if (jcr->my_thread_id && !pthread_equal(jcr->my_thread_id, pthread_self())) { - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); - } + sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL); + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } bail_out: free_jcr(control_jcr); diff --git a/bacula/src/dird/jobq.c b/bacula/src/dird/jobq.c index 68d67dc9cf..c324d45111 100644 --- a/bacula/src/dird/jobq.c +++ b/bacula/src/dird/jobq.c @@ -450,7 +450,7 @@ void *jobq_server(void *arg) jq->engine(je->jcr); /* Job finished detach from thread */ - set_jcr_in_tsd(INVALID_JCR); + remove_jcr_from_tsd(je->jcr); Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId, jcr->use_count()); diff --git a/bacula/src/dird/msgchan.c b/bacula/src/dird/msgchan.c index 4b653d3790..f354fb3b45 100644 --- a/bacula/src/dird/msgchan.c +++ b/bacula/src/dird/msgchan.c @@ -354,8 +354,10 @@ extern "C" void msg_thread_cleanup(void *arg) { JCR *jcr = (JCR *)arg; db_end_transaction(jcr, jcr->db); /* terminate any open transaction */ + jcr->lock(); jcr->sd_msg_thread_done = true; jcr->SD_msg_chan = 0; + jcr->unlock(); pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */ Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count()); free_jcr(jcr); /* release jcr */ @@ -377,7 +379,7 @@ extern "C" void *msg_thread(void *arg) uint64_t JobBytes; pthread_detach(pthread_self()); - set_jcr_in_tsd(jcr); + set_jcr_in_tsd(jcr, false /* no thread update in jcr */); jcr->SD_msg_chan = pthread_self(); pthread_cleanup_push(msg_thread_cleanup, arg); sd = jcr->store_bsock; @@ -427,8 +429,7 @@ void wait_for_storage_daemon_termination(JCR *jcr) if (jcr->SD_msg_chan) { jcr->store_bsock->set_timed_out(); jcr->store_bsock->set_terminated(); - Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count()); - pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL); + sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL); } cancel_count++; } diff --git a/bacula/src/dird/protos.h b/bacula/src/dird/protos.h index ee7bef42de..25882479ac 100644 --- a/bacula/src/dird/protos.h +++ b/bacula/src/dird/protos.h @@ -141,6 +141,7 @@ extern void dird_free_jcr(JCR *jcr); extern void dird_free_jcr_pointers(JCR *jcr); extern void cancel_storage_daemon_job(JCR *jcr); extern bool run_console_command(JCR *jcr, const char *cmd); +extern void sd_msg_thread_send_signal(JCR *jcr, int sig); /* migration.c */ extern bool do_migration(JCR *jcr); diff --git a/bacula/src/filed/job.c b/bacula/src/filed/job.c index 2a9dca806f..b130670e78 100644 --- a/bacula/src/filed/job.c +++ b/bacula/src/filed/job.c @@ -428,7 +428,7 @@ static int cancel_cmd(JCR *jcr) if (cjcr->store_bsock) { cjcr->store_bsock->set_timed_out(); cjcr->store_bsock->set_terminated(); - pthread_kill(cjcr->my_thread_id, TIMEOUT_SIGNAL); + cjcr->my_thread_send_signal(TIMEOUT_SIGNAL); } generate_plugin_event(cjcr, bEventCancelCommand, NULL); set_jcr_job_status(cjcr, JS_Canceled); diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index 9db901ad67..2d38956798 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -202,10 +202,11 @@ public: const char *get_ActionName(bool past); /* in lib/jcr.c */ void setJobStatus(int JobStatus); /* in lib/jcr.c */ bool JobReads(); /* in lib/jcr.c */ - + void my_thread_send_signal(int sig); /* in lib/jcr.c */ /* Global part of JCR common to all daemons */ dlink link; /* JCR chain link */ + bool my_thread_running; /* is the thread controlling jcr running*/ pthread_t my_thread_id; /* id of thread controlling jcr */ BSOCK *dir_bsock; /* Director bsock or NULL if we are him */ BSOCK *store_bsock; /* Storage connection socket */ diff --git a/bacula/src/lib/jcr.c b/bacula/src/lib/jcr.c index 988566ecfd..441f5a9f98 100644 --- a/bacula/src/lib/jcr.c +++ b/bacula/src/lib/jcr.c @@ -342,7 +342,6 @@ JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr) } jcr = (JCR *)malloc(size); memset(jcr, 0, size); - jcr->my_thread_id = pthread_self(); jcr->msg_queue = New(dlist(item, &item->link)); if ((status = pthread_mutex_init(&jcr->msg_queue_mutex, NULL)) != 0) { berrno be; @@ -412,6 +411,9 @@ static void remove_jcr(JCR *jcr) */ static void free_common_jcr(JCR *jcr) { + /* Uses jcr lock/unlock */ + remove_jcr_from_tsd(jcr); + jcr->destroy_mutex(); if (jcr->msg_queue) { @@ -475,7 +477,6 @@ static void free_common_jcr(JCR *jcr) free_pool_memory(jcr->comment); jcr->comment = NULL; } - remove_jcr_from_tsd(jcr); free(jcr); } @@ -587,20 +588,54 @@ void remove_jcr_from_tsd(JCR *jcr) { JCR *tjcr = get_jcr_from_tsd(); if (tjcr == jcr) { + jcr->lock(); + jcr->my_thread_running = false; + memset(&jcr->my_thread_id, 0, sizeof(jcr->my_thread_id)); + jcr->unlock(); set_jcr_in_tsd(INVALID_JCR); } } /* - * Put this jcr in the thread specifc data + * Put this jcr in the thread specifc data + * if update_thread_info is true and the jcr is valide, + * we update the my_thread_id in the JCR */ -void set_jcr_in_tsd(JCR *jcr) +void set_jcr_in_tsd(JCR *jcr, bool update_thread_info) { int status = pthread_setspecific(jcr_key, (void *)jcr); if (status != 0) { berrno be; - Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"), be.bstrerror(status)); + Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"), + be.bstrerror(status)); + } + + /* We explicitly ask to set a jcr in tsd, we can update jcr->my_thread + */ + if (update_thread_info && jcr && jcr != INVALID_JCR) { + Dmsg2(100, "setting my_thread_stuffs 0x%p => 0x%p\n", + jcr->my_thread_id, pthread_self()); + jcr->lock(); + //ASSERT(jcr->my_thread_running == false); + jcr->my_thread_id = pthread_self(); + jcr->my_thread_running = true; + jcr->unlock(); + } +} + +void JCR::my_thread_send_signal(int sig) +{ + this->lock(); + if ( this->my_thread_running + && !pthread_equal(this->my_thread_id, pthread_self())) + { + Dmsg1(800, "Send kill to jid=%d\n", this->JobId); + pthread_kill(this->my_thread_id, sig); + + } else if (!this->my_thread_running) { + Dmsg1(0, "Warning, can't send kill to jid=%d\n", this->JobId); } + this->unlock(); } /* diff --git a/bacula/src/lib/protos.h b/bacula/src/lib/protos.h index 48196a0bf4..e24291cce4 100644 --- a/bacula/src/lib/protos.h +++ b/bacula/src/lib/protos.h @@ -206,7 +206,7 @@ JCR *jcr_walk_next(JCR *prev_jcr); void jcr_walk_end(JCR *jcr); int job_count(); JCR *get_jcr_from_tsd(); -void set_jcr_in_tsd(JCR *jcr); +void set_jcr_in_tsd(JCR *jcr, bool update_thread_info=true); void remove_jcr_from_tsd(JCR *jcr); uint32_t get_jobid_from_tsd(); uint32_t get_jobid_from_tid(pthread_t tid); diff --git a/bacula/src/stored/stored.c b/bacula/src/stored/stored.c index 71f6f9d2a7..8f19c7834f 100644 --- a/bacula/src/stored/stored.c +++ b/bacula/src/stored/stored.c @@ -616,8 +616,8 @@ void terminate_stored(int sig) fd = jcr->file_bsock; if (fd) { fd->set_timed_out(); + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); Dmsg1(100, "term_stored killing JobId=%d\n", jcr->JobId); - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); /* ***FIXME*** wiffle through all dcrs */ if (jcr->dcr && jcr->dcr->dev && jcr->dcr->dev->blocked()) { pthread_cond_broadcast(&jcr->dcr->dev->wait_next_vol); @@ -662,7 +662,7 @@ void terminate_stored(int sig) config->free_resources(); free(config); config = NULL; - } + } if (debug_level > 10) { print_memory_pool_stats(); -- 2.39.5