From 3dd259ca2a29730303bd43b99f6ba0dea1bcc356 Mon Sep 17 00:00:00 2001 From: Eric Bollengier Date: Fri, 23 Apr 2010 08:21:36 +0200 Subject: [PATCH] Fix cancel crash bug #1551 --- bacula/src/dird/job.c | 35 +++++++++++++-------------- bacula/src/dird/jobq.c | 2 +- bacula/src/dird/msgchan.c | 7 +++--- bacula/src/dird/protos.h | 1 + bacula/src/filed/job.c | 2 +- bacula/src/jcr.h | 3 ++- bacula/src/lib/jcr.c | 49 ++++++++++++++++++++++++++++++++++---- bacula/src/lib/protos.h | 2 +- bacula/src/stored/stored.c | 4 ++-- 9 files changed, 73 insertions(+), 32 deletions(-) diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index 77b3181d7b..48e58f68ff 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -365,6 +365,18 @@ static void *job_thread(void *arg) return NULL; } +void sd_msg_thread_send_signal(JCR *jcr, int sig) +{ + jcr->lock(); + if ( !jcr->sd_msg_thread_done + && jcr->SD_msg_chan + && !pthread_equal(jcr->SD_msg_chan, pthread_self())) + { + Dmsg1(800, "Send kill to SD msg chan jid=%d\n", jcr->JobId); + pthread_kill(jcr->SD_msg_chan, sig); + } + jcr->unlock(); +} /* * Cancel a job -- typically called by the UA (Console program), but may also @@ -412,10 +424,7 @@ bool cancel_job(UAContext *ua, JCR *jcr) fd->close(); ua->jcr->file_bsock = NULL; jcr->file_bsock->set_terminated(); - if (jcr->my_thread_id && !pthread_equal(jcr->my_thread_id, pthread_self())) { - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); - Dmsg1(800, "Send kill to jid=%d\n", jcr->JobId); - } + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } /* Cancel Storage daemon */ @@ -451,13 +460,8 @@ bool cancel_job(UAContext *ua, JCR *jcr) ua->jcr->store_bsock = NULL; jcr->store_bsock->set_timed_out(); jcr->store_bsock->set_terminated(); - if (jcr->SD_msg_chan && !pthread_equal(jcr->SD_msg_chan, pthread_self())) { - Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count()); - pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL); - } - if (jcr->my_thread_id && !pthread_equal(jcr->my_thread_id, pthread_self())) { - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); - } + sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL); + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } break; } @@ -507,13 +511,8 @@ void cancel_storage_daemon_job(JCR *jcr) jcr->sd_canceled = true; jcr->store_bsock->set_timed_out(); jcr->store_bsock->set_terminated(); - if (jcr->SD_msg_chan && !pthread_equal(jcr->SD_msg_chan, pthread_self())) { - Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count()); - pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL); - } - if (jcr->my_thread_id && !pthread_equal(jcr->my_thread_id, pthread_self())) { - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); - } + sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL); + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } bail_out: free_jcr(control_jcr); diff --git a/bacula/src/dird/jobq.c b/bacula/src/dird/jobq.c index 68d67dc9cf..c324d45111 100644 --- a/bacula/src/dird/jobq.c +++ b/bacula/src/dird/jobq.c @@ -450,7 +450,7 @@ void *jobq_server(void *arg) jq->engine(je->jcr); /* Job finished detach from thread */ - set_jcr_in_tsd(INVALID_JCR); + remove_jcr_from_tsd(je->jcr); Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId, jcr->use_count()); diff --git a/bacula/src/dird/msgchan.c b/bacula/src/dird/msgchan.c index 4b653d3790..f354fb3b45 100644 --- a/bacula/src/dird/msgchan.c +++ b/bacula/src/dird/msgchan.c @@ -354,8 +354,10 @@ extern "C" void msg_thread_cleanup(void *arg) { JCR *jcr = (JCR *)arg; db_end_transaction(jcr, jcr->db); /* terminate any open transaction */ + jcr->lock(); jcr->sd_msg_thread_done = true; jcr->SD_msg_chan = 0; + jcr->unlock(); pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */ Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count()); free_jcr(jcr); /* release jcr */ @@ -377,7 +379,7 @@ extern "C" void *msg_thread(void *arg) uint64_t JobBytes; pthread_detach(pthread_self()); - set_jcr_in_tsd(jcr); + set_jcr_in_tsd(jcr, false /* no thread update in jcr */); jcr->SD_msg_chan = pthread_self(); pthread_cleanup_push(msg_thread_cleanup, arg); sd = jcr->store_bsock; @@ -427,8 +429,7 @@ void wait_for_storage_daemon_termination(JCR *jcr) if (jcr->SD_msg_chan) { jcr->store_bsock->set_timed_out(); jcr->store_bsock->set_terminated(); - Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count()); - pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL); + sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL); } cancel_count++; } diff --git a/bacula/src/dird/protos.h b/bacula/src/dird/protos.h index 6d99a859d8..33e12cfcd3 100644 --- a/bacula/src/dird/protos.h +++ b/bacula/src/dird/protos.h @@ -142,6 +142,7 @@ extern void dird_free_jcr(JCR *jcr); extern void dird_free_jcr_pointers(JCR *jcr); extern void cancel_storage_daemon_job(JCR *jcr); extern bool run_console_command(JCR *jcr, const char *cmd); +extern void sd_msg_thread_send_signal(JCR *jcr, int sig); /* migration.c */ extern bool do_migration(JCR *jcr); diff --git a/bacula/src/filed/job.c b/bacula/src/filed/job.c index 52ef73a648..af1ef40bbe 100644 --- a/bacula/src/filed/job.c +++ b/bacula/src/filed/job.c @@ -418,7 +418,7 @@ static int cancel_cmd(JCR *jcr) if (cjcr->store_bsock) { cjcr->store_bsock->set_timed_out(); cjcr->store_bsock->set_terminated(); - pthread_kill(cjcr->my_thread_id, TIMEOUT_SIGNAL); + cjcr->my_thread_send_signal(TIMEOUT_SIGNAL); } generate_plugin_event(cjcr, bEventCancelCommand, NULL); set_jcr_job_status(cjcr, JS_Canceled); diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index 4513e800f6..fc9100f6c0 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -202,10 +202,11 @@ public: const char *get_ActionName(bool past); /* in lib/jcr.c */ void setJobStatus(int JobStatus); /* in lib/jcr.c */ bool JobReads(); /* in lib/jcr.c */ - + void my_thread_send_signal(int sig); /* in lib/jcr.c */ /* Global part of JCR common to all daemons */ dlink link; /* JCR chain link */ + bool my_thread_running; /* is the thread controlling jcr running*/ pthread_t my_thread_id; /* id of thread controlling jcr */ BSOCK *dir_bsock; /* Director bsock or NULL if we are him */ BSOCK *store_bsock; /* Storage connection socket */ diff --git a/bacula/src/lib/jcr.c b/bacula/src/lib/jcr.c index 4e5b4c06e8..408b9207cc 100644 --- a/bacula/src/lib/jcr.c +++ b/bacula/src/lib/jcr.c @@ -342,7 +342,6 @@ JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr) } jcr = (JCR *)malloc(size); memset(jcr, 0, size); - jcr->my_thread_id = pthread_self(); jcr->msg_queue = New(dlist(item, &item->link)); if ((status = pthread_mutex_init(&jcr->msg_queue_mutex, NULL)) != 0) { berrno be; @@ -410,6 +409,9 @@ static void remove_jcr(JCR *jcr) */ static void free_common_jcr(JCR *jcr) { + /* Uses jcr lock/unlock */ + remove_jcr_from_tsd(jcr); + jcr->destroy_mutex(); if (jcr->msg_queue) { @@ -469,7 +471,10 @@ static void free_common_jcr(JCR *jcr) free_guid_list(jcr->id_list); jcr->id_list = NULL; } - remove_jcr_from_tsd(jcr); + if (jcr->comment) { + free_pool_memory(jcr->comment); + jcr->comment = NULL; + } free(jcr); } @@ -581,20 +586,54 @@ void remove_jcr_from_tsd(JCR *jcr) { JCR *tjcr = get_jcr_from_tsd(); if (tjcr == jcr) { + jcr->lock(); + jcr->my_thread_running = false; + memset(&jcr->my_thread_id, 0, sizeof(jcr->my_thread_id)); + jcr->unlock(); set_jcr_in_tsd(INVALID_JCR); } } /* - * Put this jcr in the thread specifc data + * Put this jcr in the thread specifc data + * if update_thread_info is true and the jcr is valide, + * we update the my_thread_id in the JCR */ -void set_jcr_in_tsd(JCR *jcr) +void set_jcr_in_tsd(JCR *jcr, bool update_thread_info) { int status = pthread_setspecific(jcr_key, (void *)jcr); if (status != 0) { berrno be; - Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"), be.bstrerror(status)); + Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"), + be.bstrerror(status)); + } + + /* We explicitly ask to set a jcr in tsd, we can update jcr->my_thread + */ + if (update_thread_info && jcr && jcr != INVALID_JCR) { + Dmsg2(100, "setting my_thread_stuffs 0x%p => 0x%p\n", + jcr->my_thread_id, pthread_self()); + jcr->lock(); + //ASSERT(jcr->my_thread_running == false); + jcr->my_thread_id = pthread_self(); + jcr->my_thread_running = true; + jcr->unlock(); + } +} + +void JCR::my_thread_send_signal(int sig) +{ + this->lock(); + if ( this->my_thread_running + && !pthread_equal(this->my_thread_id, pthread_self())) + { + Dmsg1(800, "Send kill to jid=%d\n", this->JobId); + pthread_kill(this->my_thread_id, sig); + + } else if (!this->my_thread_running) { + Dmsg1(0, "Warning, can't send kill to jid=%d\n", this->JobId); } + this->unlock(); } /* diff --git a/bacula/src/lib/protos.h b/bacula/src/lib/protos.h index 7e700bdad7..b5ff330a54 100644 --- a/bacula/src/lib/protos.h +++ b/bacula/src/lib/protos.h @@ -205,7 +205,7 @@ JCR *jcr_walk_start(); JCR *jcr_walk_next(JCR *prev_jcr); void jcr_walk_end(JCR *jcr); JCR *get_jcr_from_tsd(); -void set_jcr_in_tsd(JCR *jcr); +void set_jcr_in_tsd(JCR *jcr, bool update_thread_info=true); void remove_jcr_from_tsd(JCR *jcr); uint32_t get_jobid_from_tsd(); uint32_t get_jobid_from_tid(pthread_t tid); diff --git a/bacula/src/stored/stored.c b/bacula/src/stored/stored.c index 93735c76b4..7a79467389 100644 --- a/bacula/src/stored/stored.c +++ b/bacula/src/stored/stored.c @@ -615,8 +615,8 @@ void terminate_stored(int sig) fd = jcr->file_bsock; if (fd) { fd->set_timed_out(); + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); Dmsg1(100, "term_stored killing JobId=%d\n", jcr->JobId); - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); /* ***FIXME*** wiffle through all dcrs */ if (jcr->dcr && jcr->dcr->dev && jcr->dcr->dev->blocked()) { pthread_cond_broadcast(&jcr->dcr->dev->wait_next_vol); @@ -661,7 +661,7 @@ void terminate_stored(int sig) config->free_resources(); free(config); config = NULL; - } + } if (debug_level > 10) { print_memory_pool_stats(); -- 2.39.5