2 * Manipulation routines for Job Control Records and
3 * handling of last_jobs_list.
5 * Kern E. Sibbald, December 2000
9 * These routines are thread safe.
11 * The job list routines were re-written in May 2005 to
12 * eliminate the global lock while traversing the list, and
13 * to use the dlist subroutines. The locking is now done
14 * on the list each time the list is modified or traversed.
15 * That is it is "micro-locked" rather than globally locked.
16 * The result is that there is one lock/unlock for each entry
17 * in the list while traversing it rather than a single lock
18 * at the beginning of a traversal and one at the end. This
19 * incurs slightly more overhead, but effectively eliminates
20 * the possibilty of race conditions. In addition, with the
21 * exception of the global locking of the list during the
22 * re-reading of the config file, no recursion is needed.
26 Bacula® - The Network Backup Solution
28 Copyright (C) 2000-2006 Free Software Foundation Europe e.V.
30 The main author of Bacula is Kern Sibbald, with contributions from
31 many others, a complete list can be found in the file AUTHORS.
32 This program is Free Software; you can redistribute it and/or
33 modify it under the terms of version two of the GNU General Public
34 License as published by the Free Software Foundation plus additions
35 that are listed in the file LICENSE.
37 This program is distributed in the hope that it will be useful, but
38 WITHOUT ANY WARRANTY; without even the implied warranty of
39 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
40 General Public License for more details.
42 You should have received a copy of the GNU General Public License
43 along with this program; if not, write to the Free Software
44 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
47 Bacula® is a registered trademark of John Walker.
48 The licensor of Bacula is the Free Software Foundation Europe
49 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
50 Switzerland, email:ftf@fsfeurope.org.
56 /* External variables we reference */
57 extern time_t watchdog_time;
59 /* Forward referenced functions */
60 extern "C" void timeout_handler(int sig);
61 static void jcr_timeout_check(watchdog_t *self);
62 #ifdef TRACE_JCR_CHAIN
63 static void b_lock_jcr_chain(const char *filen, int line);
64 static void b_unlock_jcr_chain(const char *filen, int line);
65 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
66 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
68 static void lock_jcr_chain();
69 static void unlock_jcr_chain();
74 dlist *last_jobs = NULL;
75 const int max_last_jobs = 10;
77 static dlist *jcrs = NULL; /* JCR chain */
78 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
80 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
82 static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
95 void init_last_jobs_list()
98 struct s_last_job *job_entry = NULL;
100 last_jobs = New(dlist(job_entry, &job_entry->link));
103 jcrs = New(dlist(jcr, &jcr->link));
107 void term_last_jobs_list()
110 while (!last_jobs->empty()) {
111 void *je = last_jobs->first();
112 last_jobs->remove(je);
124 bool read_last_jobs_list(int fd, uint64_t addr)
126 struct s_last_job *je, job;
129 Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
130 if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
133 if (read(fd, &num, sizeof(num)) != sizeof(num)) {
136 Dmsg1(100, "Read num_items=%d\n", num);
137 if (num > 4 * max_last_jobs) { /* sanity check */
140 for ( ; num; num--) {
141 if (read(fd, &job, sizeof(job)) != sizeof(job)) {
142 Dmsg1(000, "Read job entry. ERR=%s\n", strerror(errno));
146 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
147 memcpy((char *)je, (char *)&job, sizeof(job));
149 init_last_jobs_list();
151 last_jobs->append(je);
152 if (last_jobs->size() > max_last_jobs) {
153 je = (struct s_last_job *)last_jobs->first();
154 last_jobs->remove(je);
162 uint64_t write_last_jobs_list(int fd, uint64_t addr)
164 struct s_last_job *je;
167 Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
168 if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
172 /* First record is number of entires */
173 num = last_jobs->size();
174 if (write(fd, &num, sizeof(num)) != sizeof(num)) {
175 Dmsg1(000, "Error writing num_items: ERR=%s\n", strerror(errno));
178 foreach_dlist(je, last_jobs) {
179 if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
180 Dmsg1(000, "Error writing job: ERR=%s\n", strerror(errno));
185 /* Return current address */
186 ssize_t stat = lseek(fd, 0, SEEK_CUR);
194 void lock_last_jobs_list()
199 void unlock_last_jobs_list()
205 * Push a subroutine address into the job end callback stack
207 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
209 jcr->job_end_push.append((void *)job_end_cb);
210 jcr->job_end_push.append(ctx);
213 /* Pop each job_end subroutine and call it */
214 static void job_end_pop(JCR *jcr)
216 void (*job_end_cb)(JCR *jcr, void *ctx);
218 for (int i=jcr->job_end_push.size()-1; i > 0; ) {
219 ctx = jcr->job_end_push.get(i--);
220 job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
221 job_end_cb(jcr, ctx);
226 * Create a Job Control Record and link it into JCR chain
227 * Returns newly allocated JCR
228 * Note, since each daemon has a different JCR, he passes
231 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
234 MQUEUE_ITEM *item = NULL;
235 struct sigaction sigtimer;
237 Dmsg0(3400, "Enter new_jcr\n");
238 jcr = (JCR *)malloc(size);
239 memset(jcr, 0, size);
240 jcr->my_thread_id = pthread_self();
241 jcr->msg_queue = New(dlist(item, &item->link));
242 jcr->job_end_push.init(1, false);
243 jcr->sched_time = time(NULL);
244 jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */
246 jcr->inc_use_count();
247 jcr->VolumeName = get_pool_memory(PM_FNAME);
248 jcr->VolumeName[0] = 0;
249 jcr->errmsg = get_pool_memory(PM_MESSAGE);
251 /* Setup some dummy values */
252 bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
254 jcr->JobType = JT_SYSTEM; /* internal job until defined */
255 jcr->JobLevel = L_NONE;
256 set_jcr_job_status(jcr, JS_Created); /* ready to run */
258 sigtimer.sa_flags = 0;
259 sigtimer.sa_handler = timeout_handler;
260 sigfillset(&sigtimer.sa_mask);
261 sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
264 * Locking jobs is a global lock that is needed
265 * so that the Director can stop new jobs from being
266 * added to the jcr chain while it processes a new
267 * conf file and does the job_end_push().
272 jcrs = New(dlist(jcr, &jcr->link));
283 * Remove a JCR from the chain
284 * NOTE! The chain must be locked prior to calling
287 static void remove_jcr(JCR *jcr)
289 Dmsg0(3400, "Enter remove_jcr\n");
291 Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
294 Dmsg0(3400, "Leave remove_jcr\n");
298 * Free stuff common to all JCRs. N.B. Be careful to include only
299 * generic stuff in the common part of the jcr.
301 static void free_common_jcr(JCR *jcr)
303 struct s_last_job *je, last_job;
305 /* Keep some statistics */
306 switch (jcr->JobType) {
314 last_job.Errors = jcr->Errors;
315 last_job.JobType = jcr->JobType;
316 last_job.JobId = jcr->JobId;
317 last_job.VolSessionId = jcr->VolSessionId;
318 last_job.VolSessionTime = jcr->VolSessionTime;
319 bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
320 last_job.JobFiles = jcr->JobFiles;
321 last_job.JobBytes = jcr->JobBytes;
322 last_job.JobStatus = jcr->JobStatus;
323 last_job.JobLevel = jcr->JobLevel;
324 last_job.start_time = jcr->start_time;
325 last_job.end_time = time(NULL);
326 /* Keep list of last jobs, but not Console where JobId==0 */
327 if (last_job.JobId > 0) {
328 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
329 memcpy((char *)je, (char *)&last_job, sizeof(last_job));
331 init_last_jobs_list();
333 last_jobs->append(je);
334 if (last_jobs->size() > max_last_jobs) {
335 je = (struct s_last_job *)last_jobs->first();
336 last_jobs->remove(je);
344 jcr->destroy_mutex();
346 if (jcr->msg_queue) {
347 delete jcr->msg_queue;
348 jcr->msg_queue = NULL;
350 close_msg(jcr); /* close messages for this job */
352 /* do this after closing messages */
353 if (jcr->client_name) {
354 free_pool_memory(jcr->client_name);
355 jcr->client_name = NULL;
359 free_pool_memory(jcr->attr);
363 if (jcr->sd_auth_key) {
364 free(jcr->sd_auth_key);
365 jcr->sd_auth_key = NULL;
367 if (jcr->VolumeName) {
368 free_pool_memory(jcr->VolumeName);
369 jcr->VolumeName = NULL;
372 if (jcr->dir_bsock) {
373 bnet_close(jcr->dir_bsock);
374 jcr->dir_bsock = NULL;
377 free_pool_memory(jcr->errmsg);
384 if (jcr->cached_path) {
385 free_pool_memory(jcr->cached_path);
386 jcr->cached_path = NULL;
389 free_getuser_cache();
390 free_getgroup_cache();
395 * Global routine to free a jcr
398 void b_free_jcr(const char *file, int line, JCR *jcr)
400 Dmsg3(3400, "Enter free_jcr 0x%x from %s:%d\n", jcr, file, line);
404 void free_jcr(JCR *jcr)
407 Dmsg2(3400, "Enter free_jcr 0x%x job=%d\n", jcr, jcr->JobId);
411 dequeue_messages(jcr);
413 jcr->dec_use_count(); /* decrement use count */
414 if (jcr->use_count() < 0) {
415 Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
416 jcr->use_count(), jcr->JobId);
418 Dmsg3(3400, "Dec free_jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count(), jcr->JobId);
419 if (jcr->use_count() > 0) { /* if in use */
421 Dmsg3(3400, "free_jcr 0x%x job=%d use_count=%d\n", jcr, jcr->JobId, jcr->use_count());
425 remove_jcr(jcr); /* remove Jcr from chain */
428 job_end_pop(jcr); /* pop and call hooked routines */
430 Dmsg1(3400, "End job=%d\n", jcr->JobId);
431 if (jcr->daemon_free_jcr) {
432 jcr->daemon_free_jcr(jcr); /* call daemon free routine */
434 free_common_jcr(jcr);
435 close_msg(NULL); /* flush any daemon messages */
436 garbage_collect_memory_pool();
437 Dmsg0(3400, "Exit free_jcr\n");
442 * Given a JobId, find the JCR
443 * Returns: jcr on success
446 JCR *get_jcr_by_id(uint32_t JobId)
451 if (jcr->JobId == JobId) {
452 jcr->inc_use_count();
453 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
462 * Given a SessionId and SessionTime, find the JCR
463 * Returns: jcr on success
466 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
471 if (jcr->VolSessionId == SessionId &&
472 jcr->VolSessionTime == SessionTime) {
473 jcr->inc_use_count();
474 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
484 * Given a Job, find the JCR
485 * compares on the number of characters in Job
486 * thus allowing partial matches.
487 * Returns: jcr on success
490 JCR *get_jcr_by_partial_name(char *Job)
500 if (strncmp(Job, jcr->Job, len) == 0) {
501 jcr->inc_use_count();
502 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
512 * Given a Job, find the JCR
513 * requires an exact match of names.
514 * Returns: jcr on success
517 JCR *get_jcr_by_full_name(char *Job)
525 if (strcmp(jcr->Job, Job) == 0) {
526 jcr->inc_use_count();
527 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
535 void set_jcr_job_status(JCR *jcr, int JobStatus)
538 * For a set of errors, ... keep the current status
539 * so it isn't lost. For all others, set it.
541 switch (jcr->JobStatus) {
542 case JS_ErrorTerminated:
549 jcr->JobStatus = JobStatus;
553 #ifdef TRACE_JCR_CHAIN
554 static int lock_count = 0;
560 #ifdef TRACE_JCR_CHAIN
561 static void b_lock_jcr_chain(const char *fname, int line)
563 static void lock_jcr_chain()
566 #ifdef TRACE_JCR_CHAIN
567 Dmsg3(3400, "Lock jcr chain %d from %s:%d\n", ++lock_count,
576 #ifdef TRACE_JCR_CHAIN
577 static void b_unlock_jcr_chain(const char *fname, int line)
579 static void unlock_jcr_chain()
582 #ifdef TRACE_JCR_CHAIN
583 Dmsg3(3400, "Unlock jcr chain %d from %s:%d\n", lock_count--,
591 * Start walk of jcr chain
592 * The proper way to walk the jcr chain is:
599 * It is possible to leave out the endeach_jcr(jcr), but
600 * in that case, the last jcr referenced must be explicitly
606 JCR *jcr_walk_start()
610 jcr = (JCR *)jcrs->first();
612 jcr->inc_use_count();
613 Dmsg3(3400, "Inc jcr_walk_start 0x%x job=%d use_count=%d\n", jcr,
614 jcr->JobId, jcr->use_count());
621 * Get next jcr from chain, and release current one
623 JCR *jcr_walk_next(JCR *prev_jcr)
628 jcr = (JCR *)jcrs->next(prev_jcr);
630 jcr->inc_use_count();
631 Dmsg3(3400, "Inc jcr_walk_next 0x%x job=%d use_count=%d\n", jcr,
632 jcr->JobId, jcr->use_count());
642 * Release last jcr referenced
644 void jcr_walk_end(JCR *jcr)
653 * Setup to call the timeout check routine every 30 seconds
654 * This routine will check any timers that have been enabled.
656 bool init_jcr_subsystem(void)
658 watchdog_t *wd = new_watchdog();
660 wd->one_shot = false;
661 wd->interval = 30; /* FIXME: should be configurable somewhere, even
662 if only with a #define */
663 wd->callback = jcr_timeout_check;
665 register_watchdog(wd);
670 static void jcr_timeout_check(watchdog_t *self)
676 Dmsg0(3400, "Start JCR timeout checks\n");
678 /* Walk through all JCRs checking if any one is
679 * blocked for more than specified max time.
682 Dmsg2(3400, "jcr_timeout_check JobId=%u jcr=0x%x\n", jcr->JobId, jcr);
683 if (jcr->JobId == 0) {
686 fd = jcr->store_bsock;
688 timer_start = fd->timer_start;
689 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
690 fd->timer_start = 0; /* turn off timer */
691 fd->timed_out = true;
692 Jmsg(jcr, M_ERROR, 0, _(
693 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
694 watchdog_time - timer_start);
695 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
698 fd = jcr->file_bsock;
700 timer_start = fd->timer_start;
701 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
702 fd->timer_start = 0; /* turn off timer */
703 fd->timed_out = true;
704 Jmsg(jcr, M_ERROR, 0, _(
705 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
706 watchdog_time - timer_start);
707 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
712 timer_start = fd->timer_start;
713 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
714 fd->timer_start = 0; /* turn off timer */
715 fd->timed_out = true;
716 Jmsg(jcr, M_ERROR, 0, _(
717 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
718 watchdog_time - timer_start);
719 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
725 Dmsg0(3400, "Finished JCR timeout checks\n");
729 * Timeout signal comes here
731 extern "C" void timeout_handler(int sig)
733 return; /* thus interrupting the function */