X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Flib%2Fjcr.c;h=8aafd522d61aadd9aa2a70af44be3bc794c32603;hb=9acd9114f65ed90400fc7bb69426be4c4c3ba421;hp=276a29ea68d229e30cc719f4a4f79b2ddf3f011c;hpb=0fa1b7cc7034aa297392c251025fe20822619cef;p=bacula%2Fbacula diff --git a/bacula/src/lib/jcr.c b/bacula/src/lib/jcr.c index 276a29ea68..8aafd522d6 100644 --- a/bacula/src/lib/jcr.c +++ b/bacula/src/lib/jcr.c @@ -1,29 +1,20 @@ /* - Bacula® - The Network Backup Solution - - Copyright (C) 2000-2008 Free Software Foundation Europe e.V. - - The main author of Bacula is Kern Sibbald, with contributions from - many others, a complete list can be found in the file AUTHORS. - This program is Free Software; you can redistribute it and/or - modify it under the terms of version two of the GNU General Public - License as published by the Free Software Foundation and included - in the file LICENSE. - - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - 02110-1301, USA. - - Bacula® is a registered trademark of John Walker. - The licensor of Bacula is the Free Software Foundation Europe - (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, - Switzerland, email:ftf@fsfeurope.org. + Bacula(R) - The Network Backup Solution + + Copyright (C) 2000-2017 Kern Sibbald + + The original author of Bacula is Kern Sibbald, with contributions + from many others, a complete list can be found in the file AUTHORS. + + You may use this file and others of this release according to the + license defined in the LICENSE file, which includes the Affero General + Public License, v3.0 ("AGPLv3") and some additional permissions and + terms pursuant to its AGPLv3 Section 7. + + This notice must be preserved when any source code is + conveyed and/or propagated. + + Bacula(R) is a registered trademark of Kern Sibbald. */ /* * Manipulation routines for Job Control Records and @@ -31,8 +22,6 @@ * * Kern E. Sibbald, December 2000 * - * Version $Id$ - * * These routines are thread safe. * * The job list routines were re-written in May 2005 to @@ -43,7 +32,7 @@ * The result is that there is one lock/unlock for each entry * in the list while traversing it rather than a single lock * at the beginning of a traversal and one at the end. This - * incurs slightly more overhead, but effectively eliminates + * incurs slightly more overhead, but effectively eliminates * the possibilty of race conditions. In addition, with the * exception of the global locking of the list during the * re-reading of the config file, no recursion is needed. @@ -55,16 +44,7 @@ const int dbglvl = 3400; -/* - * Setting a NULL in tsd doesn't clear the tsd but instead tells - * pthreads not to call the tsd destructor. Consequently, we - * define this *invalid* jcr address and stuff it in the tsd - * when the jcr is no longer valid. - */ -#define INVALID_JCR ((JCR *)(-1)) - /* External variables we reference */ -extern time_t watchdog_time; /* External referenced functions */ void free_bregexps(alist *bregexps); @@ -86,7 +66,7 @@ static void unlock_jcr_chain(); int num_jobs_run; dlist *last_jobs = NULL; const int max_last_jobs = 10; - + static dlist *jcrs = NULL; /* JCR chain */ static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER; @@ -96,7 +76,9 @@ static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_key_t jcr_key; /* Pointer to jcr for each thread */ -pthread_once_t key_once = PTHREAD_ONCE_INIT; +pthread_once_t key_once = PTHREAD_ONCE_INIT; + +static char Job_status[] = "Status JobId=%ld JobStatus=%d\n"; void lock_jobs() @@ -124,6 +106,7 @@ void init_last_jobs_list() void term_last_jobs_list() { if (last_jobs) { + lock_last_jobs_list(); while (!last_jobs->empty()) { void *je = last_jobs->first(); last_jobs->remove(je); @@ -131,6 +114,7 @@ void term_last_jobs_list() } delete last_jobs; last_jobs = NULL; + unlock_last_jobs_list(); } if (jcrs) { delete jcrs; @@ -142,9 +126,10 @@ bool read_last_jobs_list(int fd, uint64_t addr) { struct s_last_job *je, job; uint32_t num; + bool ok = true; Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr); - if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) { + if (addr == 0 || lseek(fd, (boffset_t)addr, SEEK_SET) < 0) { return false; } if (read(fd, &num, sizeof(num)) != sizeof(num)) { @@ -154,11 +139,13 @@ bool read_last_jobs_list(int fd, uint64_t addr) if (num > 4 * max_last_jobs) { /* sanity check */ return false; } + lock_last_jobs_list(); for ( ; num; num--) { if (read(fd, &job, sizeof(job)) != sizeof(job)) { berrno be; Pmsg1(000, "Read job entry. ERR=%s\n", be.bstrerror()); - return false; + ok = false; + break; } if (job.JobId > 0) { je = (struct s_last_job *)malloc(sizeof(struct s_last_job)); @@ -174,41 +161,48 @@ bool read_last_jobs_list(int fd, uint64_t addr) } } } - return true; + unlock_last_jobs_list(); + return ok; } uint64_t write_last_jobs_list(int fd, uint64_t addr) { struct s_last_job *je; uint32_t num; + ssize_t stat; Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr); - if (lseek(fd, (off_t)addr, SEEK_SET) < 0) { + if (lseek(fd, (boffset_t)addr, SEEK_SET) < 0) { return 0; } if (last_jobs) { + lock_last_jobs_list(); /* First record is number of entires */ num = last_jobs->size(); if (write(fd, &num, sizeof(num)) != sizeof(num)) { berrno be; Pmsg1(000, "Error writing num_items: ERR=%s\n", be.bstrerror()); - return 0; + goto bail_out; } foreach_dlist(je, last_jobs) { if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) { berrno be; Pmsg1(000, "Error writing job: ERR=%s\n", be.bstrerror()); - return 0; + goto bail_out; } } + unlock_last_jobs_list(); } /* Return current address */ - ssize_t stat = lseek(fd, 0, SEEK_CUR); + stat = lseek(fd, 0, SEEK_CUR); if (stat < 0) { stat = 0; } return stat; +bail_out: + unlock_last_jobs_list(); + return 0; } void lock_last_jobs_list() @@ -221,6 +215,88 @@ void unlock_last_jobs_list() V(last_jobs_mutex); } +/* Get an ASCII representation of the Operation being performed as an english Noun */ +const char *JCR::get_OperationName() +{ + switch(m_JobType) { + case JT_BACKUP: + return _("Backup"); + case JT_VERIFY: + return _("Verifying"); + case JT_RESTORE: + return _("Restoring"); + case JT_ARCHIVE: + return _("Archiving"); + case JT_COPY: + return _("Copying"); + case JT_MIGRATE: + return _("Migration"); + case JT_SCAN: + return _("Scanning"); + default: + return _("Unknown operation"); + } +} + +/* Get an ASCII representation of the Action being performed either an english Verb or Adjective */ +const char *JCR::get_ActionName(bool past) +{ + switch(m_JobType) { + case JT_BACKUP: + return _("backup"); + case JT_VERIFY: + return (past == true) ? _("verified") : _("verify"); + case JT_RESTORE: + return (past == true) ? _("restored") : _("restore"); + case JT_ARCHIVE: + return (past == true) ? _("archived") : _("archive"); + case JT_COPY: + return (past == true) ? _("copied") : _("copy"); + case JT_MIGRATE: + return (past == true) ? _("migrated") : _("migrate"); + case JT_SCAN: + return (past == true) ? _("scanned") : _("scan"); + default: + return _("unknown action"); + } +} + +bool JCR::JobReads() +{ + switch (m_JobType) { + case JT_VERIFY: + case JT_RESTORE: + case JT_COPY: + case JT_MIGRATE: + return true; + case JT_BACKUP: + if (m_JobLevel == L_VIRTUAL_FULL) { + return true; + } + break; + default: + break; + } + return false; +} + +/* We can stop only Backup jobs connected to a client. It doesn't make sens at + * this time to stop a copy, migraton, restore or a verify job. The specific + * code should be implemented first. + */ +bool JCR::can_be_stopped() +{ + bool ok=true; + if (getJobType() == JT_BACKUP) { /* Is a Backup */ + if (getJobLevel() == L_VIRTUAL_FULL) { /* Is a VirtualFull */ + ok = false; + } + } else { /* Is not a backup (so, copy, migration, admin, verify, ... */ + ok = false; + } + return ok; +} + /* * Push a subroutine address into the job end callback stack */ @@ -230,6 +306,9 @@ void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx) jcr->job_end_push.append(ctx); } +/* DELETE ME when bugs in MA1512, MA1632 MA1639 are fixed */ +void (*MA1512_reload_job_end_cb)(JCR *,void *) = NULL; + /* Pop each job_end subroutine and call it */ static void job_end_pop(JCR *jcr) { @@ -238,10 +317,26 @@ static void job_end_pop(JCR *jcr) for (int i=jcr->job_end_push.size()-1; i > 0; ) { ctx = jcr->job_end_push.get(i--); job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--); - job_end_cb(jcr, ctx); + /* check for bug MA1512, MA1632 MA1639, + * today, job_end_cb can only be reload_job_end_cb() from DIR */ + if (job_end_cb != MA1512_reload_job_end_cb && MA1512_reload_job_end_cb != NULL) { + Tmsg2(0, "Bug 'job_end_pop' detected, skip ! job_end_cb=0x%p ctx=0x%p\n", job_end_cb, ctx); + Tmsg0(0, "Display job_end_push list\n"); + for (int j=jcr->job_end_push.size()-1; j > 0; ) { + void *ctx2 = jcr->job_end_push.get(j--); + void *job_end_cb2 = jcr->job_end_push.get(j--); + Tmsg3(0, "Bug 'job_end_pop' entry[%d] job_end_cb=0x%p ctx=0x%p\n", j+1, job_end_cb2, ctx2); + } + } else + { + job_end_cb(jcr, ctx); + } } } +/* + * Create thread key for thread specific data + */ void create_jcr_key() { int status = pthread_key_create(&jcr_key, NULL); @@ -262,7 +357,6 @@ JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr) { JCR *jcr; MQUEUE_ITEM *item = NULL; - struct sigaction sigtimer; int status; Dmsg0(dbglvl, "Enter new_jcr\n"); @@ -272,29 +366,43 @@ JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr) Jmsg1(NULL, M_ABORT, 0, _("pthread_once failed. ERR=%s\n"), be.bstrerror(status)); } jcr = (JCR *)malloc(size); - memset(jcr, 0, size); + bmemzero(jcr, size); + /* Note for the director, this value is changed in jobq.c */ jcr->my_thread_id = pthread_self(); jcr->msg_queue = New(dlist(item, &item->link)); + if ((status = pthread_mutex_init(&jcr->msg_queue_mutex, NULL)) != 0) { + berrno be; + Jmsg(NULL, M_ABORT, 0, _("Could not init msg_queue mutex. ERR=%s\n"), + be.bstrerror(status)); + } jcr->job_end_push.init(1, false); jcr->sched_time = time(NULL); + jcr->initial_sched_time = jcr->sched_time; jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */ jcr->init_mutex(); - jcr->inc_use_count(); + jcr->inc_use_count(); jcr->VolumeName = get_pool_memory(PM_FNAME); jcr->VolumeName[0] = 0; jcr->errmsg = get_pool_memory(PM_MESSAGE); jcr->errmsg[0] = 0; + jcr->comment = get_pool_memory(PM_FNAME); + jcr->comment[0] = 0; + jcr->StatusErrMsg = get_pool_memory(PM_FNAME); + jcr->StatusErrMsg[0] = 0; + jcr->job_uid = -1; /* Setup some dummy values */ bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job)); jcr->JobId = 0; - jcr->JobType = JT_SYSTEM; /* internal job until defined */ - jcr->JobLevel = L_NONE; - set_jcr_job_status(jcr, JS_Created); /* ready to run */ - set_jcr_in_tsd(jcr); + jcr->setJobType(JT_SYSTEM); /* internal job until defined */ + jcr->setJobLevel(L_NONE); + jcr->setJobStatus(JS_Created); /* ready to run */ +#ifndef HAVE_WIN32 + struct sigaction sigtimer; sigtimer.sa_flags = 0; sigtimer.sa_handler = timeout_handler; sigfillset(&sigtimer.sa_mask); sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL); +#endif /* * Locking jobs is a global lock that is needed @@ -336,42 +444,33 @@ static void remove_jcr(JCR *jcr) */ static void free_common_jcr(JCR *jcr) { + /* Uses jcr lock/unlock */ + remove_jcr_from_tsd(jcr); + jcr->set_killable(false); + jcr->destroy_mutex(); if (jcr->msg_queue) { delete jcr->msg_queue; jcr->msg_queue = NULL; + pthread_mutex_destroy(&jcr->msg_queue_mutex); } - close_msg(jcr); /* close messages for this job */ /* do this after closing messages */ - if (jcr->client_name) { - free_pool_memory(jcr->client_name); - jcr->client_name = NULL; - } - - if (jcr->attr) { - free_pool_memory(jcr->attr); - jcr->attr = NULL; - } + free_and_null_pool_memory(jcr->JobIds); + free_and_null_pool_memory(jcr->client_name); + free_and_null_pool_memory(jcr->attr); + free_and_null_pool_memory(jcr->VolumeName); + free_and_null_pool_memory(jcr->errmsg); + free_and_null_pool_memory(jcr->StatusErrMsg); if (jcr->sd_auth_key) { free(jcr->sd_auth_key); jcr->sd_auth_key = NULL; } - if (jcr->VolumeName) { - free_pool_memory(jcr->VolumeName); - jcr->VolumeName = NULL; - } - if (jcr->dir_bsock) { - bnet_close(jcr->dir_bsock); - jcr->dir_bsock = NULL; - } - if (jcr->errmsg) { - free_pool_memory(jcr->errmsg); - jcr->errmsg = NULL; - } + free_bsock(jcr->dir_bsock); + if (jcr->where) { free(jcr->where); jcr->where = NULL; @@ -394,8 +493,10 @@ static void free_common_jcr(JCR *jcr) free_guid_list(jcr->id_list); jcr->id_list = NULL; } - /* Invalidate the tsd jcr data */ - set_jcr_in_tsd(INVALID_JCR); + if (jcr->comment) { + free_pool_memory(jcr->comment); + jcr->comment = NULL; + } free(jcr); } @@ -405,7 +506,7 @@ static void free_common_jcr(JCR *jcr) #ifdef DEBUG void b_free_jcr(const char *file, int line, JCR *jcr) { - struct s_last_job *je, last_job; + struct s_last_job *je; Dmsg3(dbglvl, "Enter free_jcr jid=%u from %s:%d\n", jcr->JobId, file, line); @@ -413,22 +514,21 @@ void b_free_jcr(const char *file, int line, JCR *jcr) void free_jcr(JCR *jcr) { - struct s_last_job *je, last_job; + struct s_last_job *je; - Dmsg3(dbglvl, "Enter free_jcr jid=%u use_count=%d Job=%s\n", + Dmsg3(dbglvl, "Enter free_jcr jid=%u use_count=%d Job=%s\n", jcr->JobId, jcr->use_count(), jcr->Job); #endif - dequeue_messages(jcr); lock_jcr_chain(); jcr->dec_use_count(); /* decrement use count */ - if (jcr->use_count() < 0) { - Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"), - jcr->use_count(), jcr->JobId); - } + ASSERT2(jcr->use_count() >= 0, "JCR use_count < 0"); + // Jmsg2(jcr, M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"), + // jcr->use_count(), jcr->JobId); + //} if (jcr->JobId > 0) { - Dmsg3(dbglvl, "Dec free_jcr jid=%u use_count=%d Job=%s\n", + Dmsg3(dbglvl, "Dec free_jcr jid=%u use_count=%d Job=%s\n", jcr->JobId, jcr->use_count(), jcr->Job); } if (jcr->use_count() > 0) { /* if in use */ @@ -436,40 +536,46 @@ void free_jcr(JCR *jcr) return; } if (jcr->JobId > 0) { - Dmsg3(dbglvl, "remove jcr jid=%u use_count=%d Job=%s\n", + Dmsg3(dbglvl, "remove jcr jid=%u use_count=%d Job=%s\n", jcr->JobId, jcr->use_count(), jcr->Job); } + jcr->exiting = true; remove_jcr(jcr); /* remove Jcr from chain */ + unlock_jcr_chain(); + dequeue_messages(jcr); + close_msg(jcr); /* close messages for this job */ job_end_pop(jcr); /* pop and call hooked routines */ Dmsg1(dbglvl, "End job=%d\n", jcr->JobId); /* Keep some statistics */ - switch (jcr->JobType) { + switch (jcr->getJobType()) { case JT_BACKUP: case JT_VERIFY: case JT_RESTORE: case JT_MIGRATE: case JT_COPY: case JT_ADMIN: - num_jobs_run++; - last_job.Errors = jcr->Errors; - last_job.JobType = jcr->JobType; - last_job.JobId = jcr->JobId; - last_job.VolSessionId = jcr->VolSessionId; - last_job.VolSessionTime = jcr->VolSessionTime; - bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job)); - last_job.JobFiles = jcr->JobFiles; - last_job.JobBytes = jcr->JobBytes; - last_job.JobStatus = jcr->JobStatus; - last_job.JobLevel = jcr->JobLevel; - last_job.start_time = jcr->start_time; - last_job.end_time = time(NULL); /* Keep list of last jobs, but not Console where JobId==0 */ - if (last_job.JobId > 0) { + if (jcr->JobId > 0) { + lock_last_jobs_list(); + num_jobs_run++; je = (struct s_last_job *)malloc(sizeof(struct s_last_job)); - memcpy((char *)je, (char *)&last_job, sizeof(last_job)); + memset(je, 0, sizeof(struct s_last_job)); /* zero in case unset fields */ + je->Errors = jcr->JobErrors; + je->JobType = jcr->getJobType(); + je->JobId = jcr->JobId; + je->VolSessionId = jcr->VolSessionId; + je->VolSessionTime = jcr->VolSessionTime; + bstrncpy(je->Job, jcr->Job, sizeof(je->Job)); + je->JobFiles = jcr->JobFiles; + je->JobBytes = jcr->JobBytes; + je->JobStatus = jcr->JobStatus; + je->JobLevel = jcr->getJobLevel(); + je->start_time = jcr->start_time; + je->end_time = time(NULL); + if (!last_jobs) { init_last_jobs_list(); } @@ -479,32 +585,81 @@ void free_jcr(JCR *jcr) last_jobs->remove(je); free(je); } + unlock_last_jobs_list(); } break; default: break; - } + if (jcr->daemon_free_jcr) { jcr->daemon_free_jcr(jcr); /* call daemon free routine */ } - unlock_jcr_chain(); free_common_jcr(jcr); close_msg(NULL); /* flush any daemon messages */ - garbage_collect_memory_pool(); Dmsg0(dbglvl, "Exit free_jcr\n"); } +/* + * Remove jcr from thread specific data, but + * but make sure it is us who are attached. + */ +void remove_jcr_from_tsd(JCR *jcr) +{ + JCR *tjcr = get_jcr_from_tsd(); + if (tjcr == jcr) { + set_jcr_in_tsd(INVALID_JCR); + } +} + +void JCR::set_killable(bool killable) +{ + lock(); + my_thread_killable = killable; + unlock(); +} + +/* + * Put this jcr in the thread specifc data + * if update_thread_info is true and the jcr is valide, + * we update the my_thread_id in the JCR + */ void set_jcr_in_tsd(JCR *jcr) { int status = pthread_setspecific(jcr_key, (void *)jcr); if (status != 0) { berrno be; - Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"), be.bstrerror(status)); + Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"), + be.bstrerror(status)); + } +} + +void JCR::my_thread_send_signal(int sig) +{ + lock_jcr_chain(); /* use global lock */ + this->lock(); + if (this->exiting) { + goto get_out; + } + if (this->is_killable() && + !pthread_equal(this->my_thread_id, pthread_self())) + { + Dmsg1(800, "Send kill to jid=%d\n", this->JobId); + pthread_kill(this->my_thread_id, sig); + this->exiting = true; + + } else if (!this->is_killable()) { + Dmsg1(10, "Warning, cannot send kill to jid=%d marked not killable.\n", this->JobId); } +get_out: + this->unlock(); + unlock_jcr_chain(); } +/* + * Give me the jcr that is attached to this thread + */ JCR *get_jcr_from_tsd() { JCR *jcr = (JCR *)pthread_getspecific(jcr_key); @@ -516,7 +671,7 @@ JCR *get_jcr_from_tsd() return jcr; } - + /* * Find which JobId corresponds to the current thread */ @@ -544,7 +699,7 @@ JCR *get_jcr_by_id(uint32_t JobId) foreach_jcr(jcr) { if (jcr->JobId == JobId) { jcr->inc_use_count(); - Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n", + Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n", jcr->JobId, jcr->use_count(), jcr->Job); break; } @@ -553,6 +708,30 @@ JCR *get_jcr_by_id(uint32_t JobId) return jcr; } +/* + * Given a thread id, find the JobId + * Returns: JobId on success + * 0 on failure + */ +uint32_t get_jobid_from_tid(pthread_t tid) +{ + JCR *jcr = NULL; + bool found = false; + + foreach_jcr(jcr) { + if (pthread_equal(jcr->my_thread_id, tid)) { + found = true; + break; + } + } + endeach_jcr(jcr); + if (found) { + return jcr->JobId; + } + return 0; +} + + /* * Given a SessionId and SessionTime, find the JCR * Returns: jcr on success @@ -566,7 +745,7 @@ JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime) if (jcr->VolSessionId == SessionId && jcr->VolSessionTime == SessionTime) { jcr->inc_use_count(); - Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n", + Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n", jcr->JobId, jcr->use_count(), jcr->Job); break; } @@ -595,7 +774,7 @@ JCR *get_jcr_by_partial_name(char *Job) foreach_jcr(jcr) { if (strncmp(Job, jcr->Job, len) == 0) { jcr->inc_use_count(); - Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n", + Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n", jcr->JobId, jcr->use_count(), jcr->Job); break; } @@ -621,7 +800,7 @@ JCR *get_jcr_by_full_name(char *Job) foreach_jcr(jcr) { if (strcmp(jcr->Job, Job) == 0) { jcr->inc_use_count(); - Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n", + Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n", jcr->JobId, jcr->use_count(), jcr->Job); break; } @@ -630,52 +809,34 @@ JCR *get_jcr_by_full_name(char *Job) return jcr; } -void set_jcr_job_status(JCR *jcr, int JobStatus) -{ - bool set_waittime=false; - Dmsg2(800, "set_jcr_job_status(%s, %c)\n", jcr->Job, JobStatus); - /* if wait state is new, we keep current time for watchdog MaxWaitTime */ - switch (JobStatus) { - case JS_WaitFD: - case JS_WaitSD: - case JS_WaitMedia: - case JS_WaitMount: - case JS_WaitStoreRes: - case JS_WaitJobRes: - case JS_WaitClientRes: - case JS_WaitMaxJobs: - case JS_WaitPriority: - set_waittime = true; - default: - break; - } - - /* - * For a set of errors, ... keep the current status - * so it isn't lost. For all others, set it. - */ - Dmsg3(300, "jid=%u OnEntry JobStatus=%c set=%c\n", (uint32_t)jcr->JobId, - jcr->JobStatus, JobStatus); - switch (jcr->JobStatus) { - case JS_ErrorTerminated: - case JS_FatalError: - case JS_Canceled: +static void update_wait_time(JCR *jcr, int newJobStatus) +{ + bool enter_in_waittime; + int oldJobStatus = jcr->JobStatus; + + switch (newJobStatus) { + case JS_WaitFD: + case JS_WaitSD: + case JS_WaitMedia: + case JS_WaitMount: + case JS_WaitStoreRes: + case JS_WaitJobRes: + case JS_WaitClientRes: + case JS_WaitMaxJobs: + case JS_WaitPriority: + enter_in_waittime = true; break; - case JS_Error: - case JS_Differences: - switch (JobStatus) { - case JS_ErrorTerminated: - case JS_FatalError: - case JS_Canceled: - /* Override more minor status */ - jcr->JobStatus = JobStatus; - break; - default: - break; - } + default: + enter_in_waittime = false; /* not a Wait situation */ + break; + } + /* - * For a set of Wait situation, keep old time. + * If we were previously waiting and are not any more + * we want to update the wait_time variable, which is + * the start of waiting. */ + switch (oldJobStatus) { case JS_WaitFD: case JS_WaitSD: case JS_WaitMedia: @@ -685,17 +846,117 @@ void set_jcr_job_status(JCR *jcr, int JobStatus) case JS_WaitClientRes: case JS_WaitMaxJobs: case JS_WaitPriority: - set_waittime = false; /* keep old time */ + if (!enter_in_waittime) { /* we get out the wait time */ + jcr->wait_time_sum += (time(NULL) - jcr->wait_time); + jcr->wait_time = 0; + } + break; + + /* if wait state is new, we keep current time for watchdog MaxWaitTime */ default: - jcr->JobStatus = JobStatus; - if (set_waittime) { - /* set it before JobStatus */ - Dmsg0(800, "Setting wait_time\n"); + if (enter_in_waittime) { jcr->wait_time = time(NULL); } + break; + } +} + +/* + * Priority runs from 0 (lowest) to 10 (highest) + */ +static int get_status_priority(int JobStatus) +{ + int priority = 0; + switch (JobStatus) { + case JS_Incomplete: + priority = 10; + break; + case JS_ErrorTerminated: + case JS_FatalError: + case JS_Canceled: + priority = 9; + break; + case JS_Error: + priority = 8; + break; + case JS_Differences: + priority = 7; + break; + } + return priority; +} + +/* + * Send Job status to Director + */ +bool JCR::sendJobStatus() +{ + if (dir_bsock) { + return dir_bsock->fsend(Job_status, JobId, JobStatus); + } + return true; +} + +/* + * Set and send Job status to Director + */ +bool JCR::sendJobStatus(int aJobStatus) +{ + if (!is_JobStatus(aJobStatus)) { + setJobStatus(aJobStatus); + if (dir_bsock) { + return dir_bsock->fsend(Job_status, JobId, JobStatus); + } + } + return true; +} + +void JCR::setJobStarted() +{ + job_started = true; + job_started_time = time(NULL); +} + +static pthread_mutex_t status_lock = PTHREAD_MUTEX_INITIALIZER; + +void JCR::setJobStatus(int newJobStatus) +{ + int priority, old_priority; + int oldJobStatus = JobStatus; + + P(status_lock); + priority = get_status_priority(newJobStatus); + old_priority = get_status_priority(oldJobStatus); + + Dmsg2(800, "set_jcr_job_status(%ld, %c)\n", JobId, newJobStatus); + + /* Update wait_time depending on newJobStatus and oldJobStatus */ + update_wait_time(this, newJobStatus); + + /* + * For a set of errors, ... keep the current status + * so it isn't lost. For all others, set it. + */ + Dmsg2(800, "OnEntry JobStatus=%c newJobstatus=%c\n", (oldJobStatus==0)?'0':oldJobStatus, newJobStatus); + /* + * If status priority is > than proposed new status, change it. + * If status priority == new priority and both are zero, take + * the new status. + * If it is not zero, then we keep the first non-zero "error" that + * occurred. + */ + if (priority > old_priority || ( + priority == 0 && old_priority == 0)) { + Dmsg4(800, "Set new stat. old: %c,%d new: %c,%d\n", + (oldJobStatus==0)?'0':oldJobStatus, old_priority, newJobStatus, priority); + JobStatus = newJobStatus; /* replace with new status */ + } + + if (oldJobStatus != JobStatus) { + Dmsg2(800, "leave setJobStatus old=%c new=%c\n", (oldJobStatus==0)?'0':oldJobStatus, newJobStatus); +// generate_plugin_event(this, bEventStatusChange, NULL); } - Dmsg3(100, "jid=%u OnExit JobStatus=%c set=%c\n", (uint32_t)jcr->JobId, - jcr->JobStatus, JobStatus); + V(status_lock); } #ifdef TRACE_JCR_CHAIN @@ -732,7 +993,6 @@ static void unlock_jcr_chain() V(jcr_lock); } - /* * Start walk of jcr chain * The proper way to walk the jcr chain is: @@ -747,9 +1007,9 @@ static void unlock_jcr_chain() * released with: * * free_jcr(jcr); - * + * */ -JCR *jcr_walk_start() +JCR *jcr_walk_start() { JCR *jcr; lock_jcr_chain(); @@ -757,7 +1017,7 @@ JCR *jcr_walk_start() if (jcr) { jcr->inc_use_count(); if (jcr->JobId > 0) { - Dmsg3(dbglvl, "Inc walk_start jid=%u use_count=%d Job=%s\n", + Dmsg3(dbglvl, "Inc walk_start jid=%u use_count=%d Job=%s\n", jcr->JobId, jcr->use_count(), jcr->Job); } } @@ -777,7 +1037,7 @@ JCR *jcr_walk_next(JCR *prev_jcr) if (jcr) { jcr->inc_use_count(); if (jcr->JobId > 0) { - Dmsg3(dbglvl, "Inc walk_next jid=%u use_count=%d Job=%s\n", + Dmsg3(dbglvl, "Inc walk_next jid=%u use_count=%d Job=%s\n", jcr->JobId, jcr->use_count(), jcr->Job); } } @@ -795,13 +1055,31 @@ void jcr_walk_end(JCR *jcr) { if (jcr) { if (jcr->JobId > 0) { - Dmsg3(dbglvl, "Free walk_end jid=%u use_count=%d Job=%s\n", + Dmsg3(dbglvl, "Free walk_end jid=%u use_count=%d Job=%s\n", jcr->JobId, jcr->use_count(), jcr->Job); } free_jcr(jcr); } } +/* + * Return number of Jobs + */ +int job_count() +{ + JCR *jcr; + int count = 0; + + lock_jcr_chain(); + for (jcr = (JCR *)jcrs->first(); (jcr = (JCR *)jcrs->next(jcr)); ) { + if (jcr->JobId > 0) { + count++; + } + } + unlock_jcr_chain(); + return count; +} + /* * Setup to call the timeout check routine every 30 seconds @@ -824,7 +1102,7 @@ bool init_jcr_subsystem(void) static void jcr_timeout_check(watchdog_t *self) { JCR *jcr; - BSOCK *fd; + BSOCK *bs; time_t timer_start; Dmsg0(dbglvl, "Start JCR timeout checks\n"); @@ -837,40 +1115,40 @@ static void jcr_timeout_check(watchdog_t *self) if (jcr->JobId == 0) { continue; } - fd = jcr->store_bsock; - if (fd) { - timer_start = fd->timer_start; - if (timer_start && (watchdog_time - timer_start) > fd->timeout) { - fd->timer_start = 0; /* turn off timer */ - fd->set_timed_out(); - Jmsg(jcr, M_ERROR, 0, _( + bs = jcr->store_bsock; + if (bs) { + timer_start = bs->timer_start; + if (timer_start && (watchdog_time - timer_start) > bs->timeout) { + bs->timer_start = 0; /* turn off timer */ + bs->set_timed_out(); + Qmsg(jcr, M_ERROR, 0, _( "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"), - watchdog_time - timer_start); - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); + (int)(watchdog_time - timer_start)); + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } } - fd = jcr->file_bsock; - if (fd) { - timer_start = fd->timer_start; - if (timer_start && (watchdog_time - timer_start) > fd->timeout) { - fd->timer_start = 0; /* turn off timer */ - fd->set_timed_out(); - Jmsg(jcr, M_ERROR, 0, _( + bs = jcr->file_bsock; + if (bs) { + timer_start = bs->timer_start; + if (timer_start && (watchdog_time - timer_start) > bs->timeout) { + bs->timer_start = 0; /* turn off timer */ + bs->set_timed_out(); + Qmsg(jcr, M_ERROR, 0, _( "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"), - watchdog_time - timer_start); - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); + (int)(watchdog_time - timer_start)); + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } } - fd = jcr->dir_bsock; - if (fd) { - timer_start = fd->timer_start; - if (timer_start && (watchdog_time - timer_start) > fd->timeout) { - fd->timer_start = 0; /* turn off timer */ - fd->set_timed_out(); - Jmsg(jcr, M_ERROR, 0, _( + bs = jcr->dir_bsock; + if (bs) { + timer_start = bs->timer_start; + if (timer_start && (watchdog_time - timer_start) > bs->timeout) { + bs->timer_start = 0; /* turn off timer */ + bs->set_timed_out(); + Qmsg(jcr, M_ERROR, 0, _( "Watchdog sending kill after %d secs to thread stalled reading Director.\n"), - watchdog_time - timer_start); - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); + (int)(watchdog_time - timer_start)); + jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } } } @@ -879,6 +1157,41 @@ static void jcr_timeout_check(watchdog_t *self) Dmsg0(dbglvl, "Finished JCR timeout checks\n"); } +/* + * Return next JobId from comma separated list + * + * Returns: + * 1 if next JobId returned + * 0 if no more JobIds are in list + * -1 there is an error + */ +int get_next_jobid_from_list(char **p, uint32_t *JobId) +{ + const int maxlen = 30; + char jobid[maxlen+1]; + char *q = *p; + + jobid[0] = 0; + for (int i=0; isize()); + + for (JCR *jcr = (JCR *)jcrs->first(); jcr ; jcr = (JCR *)jcrs->next(jcr)) { + fprintf(fp, "threadid=%p JobId=%d JobStatus=%c jcr=%p name=%s\n", + get_threadid(jcr->my_thread_id), (int)jcr->JobId, jcr->JobStatus, jcr, jcr->Job); + fprintf(fp, "\tuse_count=%i killable=%d\n", + jcr->use_count(), jcr->is_killable()); + fprintf(fp, "\tJobType=%c JobLevel=%c\n", + jcr->getJobType(), jcr->getJobLevel()); + bstrftime(buf1, sizeof(buf1), jcr->sched_time); + bstrftime(buf2, sizeof(buf2), jcr->start_time); + bstrftime(buf3, sizeof(buf3), jcr->end_time); + bstrftime(buf4, sizeof(buf4), jcr->wait_time); + fprintf(fp, "\tsched_time=%s start_time=%s\n\tend_time=%s wait_time=%s\n", + buf1, buf2, buf3, buf4); + fprintf(fp, "\tdb=%p db_batch=%p batch_started=%i\n", + jcr->db, jcr->db_batch, jcr->batch_started); + + /* + * Call all the jcr debug hooks + */ + for(int i=0; i < dbg_jcr_handler_count; i++) { + dbg_jcr_hook_t *hook = dbg_jcr_hooks[i]; + hook(jcr, fp); + } + } +}