2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Manipulation routines for Job Control Records and
30 * handling of last_jobs_list.
32 * Kern E. Sibbald, December 2000
36 * These routines are thread safe.
38 * The job list routines were re-written in May 2005 to
39 * eliminate the global lock while traversing the list, and
40 * to use the dlist subroutines. The locking is now done
41 * on the list each time the list is modified or traversed.
42 * That is it is "micro-locked" rather than globally locked.
43 * The result is that there is one lock/unlock for each entry
44 * in the list while traversing it rather than a single lock
45 * at the beginning of a traversal and one at the end. This
46 * incurs slightly more overhead, but effectively eliminates
47 * the possibilty of race conditions. In addition, with the
48 * exception of the global locking of the list during the
49 * re-reading of the config file, no recursion is needed.
56 const int dbglvl = 3400;
58 /* External variables we reference */
59 extern time_t watchdog_time;
61 /* External referenced functions */
62 void free_bregexps(alist *bregexps);
64 /* Forward referenced functions */
65 extern "C" void timeout_handler(int sig);
66 static void jcr_timeout_check(watchdog_t *self);
67 #ifdef TRACE_JCR_CHAIN
68 static void b_lock_jcr_chain(const char *filen, int line);
69 static void b_unlock_jcr_chain(const char *filen, int line);
70 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
71 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
73 static void lock_jcr_chain();
74 static void unlock_jcr_chain();
79 dlist *last_jobs = NULL;
80 const int max_last_jobs = 10;
82 static dlist *jcrs = NULL; /* JCR chain */
83 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
85 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
87 static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
89 static pthread_key_t jcr_key; /* Pointer to jcr for each thread */
91 pthread_once_t key_once = PTHREAD_ONCE_INIT;
104 void init_last_jobs_list()
107 struct s_last_job *job_entry = NULL;
109 last_jobs = New(dlist(job_entry, &job_entry->link));
112 jcrs = New(dlist(jcr, &jcr->link));
116 void term_last_jobs_list()
119 lock_last_jobs_list();
120 while (!last_jobs->empty()) {
121 void *je = last_jobs->first();
122 last_jobs->remove(je);
127 unlock_last_jobs_list();
135 bool read_last_jobs_list(int fd, uint64_t addr)
137 struct s_last_job *je, job;
141 Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
142 if (addr == 0 || lseek(fd, (boffset_t)addr, SEEK_SET) < 0) {
145 if (read(fd, &num, sizeof(num)) != sizeof(num)) {
148 Dmsg1(100, "Read num_items=%d\n", num);
149 if (num > 4 * max_last_jobs) { /* sanity check */
152 lock_last_jobs_list();
153 for ( ; num; num--) {
154 if (read(fd, &job, sizeof(job)) != sizeof(job)) {
156 Pmsg1(000, "Read job entry. ERR=%s\n", be.bstrerror());
161 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
162 memcpy((char *)je, (char *)&job, sizeof(job));
164 init_last_jobs_list();
166 last_jobs->append(je);
167 if (last_jobs->size() > max_last_jobs) {
168 je = (struct s_last_job *)last_jobs->first();
169 last_jobs->remove(je);
174 unlock_last_jobs_list();
178 uint64_t write_last_jobs_list(int fd, uint64_t addr)
180 struct s_last_job *je;
184 Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
185 if (lseek(fd, (boffset_t)addr, SEEK_SET) < 0) {
189 lock_last_jobs_list();
190 /* First record is number of entires */
191 num = last_jobs->size();
192 if (write(fd, &num, sizeof(num)) != sizeof(num)) {
194 Pmsg1(000, "Error writing num_items: ERR=%s\n", be.bstrerror());
197 foreach_dlist(je, last_jobs) {
198 if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
200 Pmsg1(000, "Error writing job: ERR=%s\n", be.bstrerror());
204 unlock_last_jobs_list();
206 /* Return current address */
207 stat = lseek(fd, 0, SEEK_CUR);
214 unlock_last_jobs_list();
218 void lock_last_jobs_list()
223 void unlock_last_jobs_list()
228 /* Set Job type in JCR and also set appropriate read flag */
229 void JCR::set_JobType(int32_t JobType)
234 /* Set Job level in JCR and also set appropriate read flag */
235 void JCR::set_JobLevel(int32_t JobLevel)
237 m_JobLevel = JobLevel;
249 if (m_JobLevel == L_VIRTUAL_FULL) {
260 * Push a subroutine address into the job end callback stack
262 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
264 jcr->job_end_push.append((void *)job_end_cb);
265 jcr->job_end_push.append(ctx);
268 /* Pop each job_end subroutine and call it */
269 static void job_end_pop(JCR *jcr)
271 void (*job_end_cb)(JCR *jcr, void *ctx);
273 for (int i=jcr->job_end_push.size()-1; i > 0; ) {
274 ctx = jcr->job_end_push.get(i--);
275 job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
276 job_end_cb(jcr, ctx);
280 void create_jcr_key()
282 int status = pthread_key_create(&jcr_key, NULL);
285 Jmsg1(NULL, M_ABORT, 0, _("pthread key create failed: ERR=%s\n"),
286 be.bstrerror(status));
291 * Create a Job Control Record and link it into JCR chain
292 * Returns newly allocated JCR
293 * Note, since each daemon has a different JCR, he passes
296 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
299 MQUEUE_ITEM *item = NULL;
300 struct sigaction sigtimer;
303 Dmsg0(dbglvl, "Enter new_jcr\n");
304 status = pthread_once(&key_once, create_jcr_key);
307 Jmsg1(NULL, M_ABORT, 0, _("pthread_once failed. ERR=%s\n"), be.bstrerror(status));
309 jcr = (JCR *)malloc(size);
310 memset(jcr, 0, size);
311 jcr->my_thread_id = pthread_self();
312 jcr->msg_queue = New(dlist(item, &item->link));
313 jcr->job_end_push.init(1, false);
314 jcr->sched_time = time(NULL);
315 jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */
317 jcr->inc_use_count();
318 jcr->VolumeName = get_pool_memory(PM_FNAME);
319 jcr->VolumeName[0] = 0;
320 jcr->errmsg = get_pool_memory(PM_MESSAGE);
322 /* Setup some dummy values */
323 bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
325 jcr->set_JobType(JT_SYSTEM); /* internal job until defined */
326 jcr->set_JobLevel(L_NONE);
327 set_jcr_job_status(jcr, JS_Created); /* ready to run */
329 sigtimer.sa_flags = 0;
330 sigtimer.sa_handler = timeout_handler;
331 sigfillset(&sigtimer.sa_mask);
332 sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
335 * Locking jobs is a global lock that is needed
336 * so that the Director can stop new jobs from being
337 * added to the jcr chain while it processes a new
338 * conf file and does the job_end_push().
343 jcrs = New(dlist(jcr, &jcr->link));
354 * Remove a JCR from the chain
355 * NOTE! The chain must be locked prior to calling
358 static void remove_jcr(JCR *jcr)
360 Dmsg0(dbglvl, "Enter remove_jcr\n");
362 Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
365 Dmsg0(dbglvl, "Leave remove_jcr\n");
369 * Free stuff common to all JCRs. N.B. Be careful to include only
370 * generic stuff in the common part of the jcr.
372 static void free_common_jcr(JCR *jcr)
374 jcr->destroy_mutex();
376 if (jcr->msg_queue) {
377 delete jcr->msg_queue;
378 jcr->msg_queue = NULL;
380 close_msg(jcr); /* close messages for this job */
382 /* do this after closing messages */
383 if (jcr->client_name) {
384 free_pool_memory(jcr->client_name);
385 jcr->client_name = NULL;
389 free_pool_memory(jcr->attr);
393 if (jcr->sd_auth_key) {
394 free(jcr->sd_auth_key);
395 jcr->sd_auth_key = NULL;
397 if (jcr->VolumeName) {
398 free_pool_memory(jcr->VolumeName);
399 jcr->VolumeName = NULL;
402 if (jcr->dir_bsock) {
403 bnet_close(jcr->dir_bsock);
404 jcr->dir_bsock = NULL;
407 free_pool_memory(jcr->errmsg);
414 if (jcr->RegexWhere) {
415 free(jcr->RegexWhere);
416 jcr->RegexWhere = NULL;
418 if (jcr->where_bregexp) {
419 free_bregexps(jcr->where_bregexp);
420 delete jcr->where_bregexp;
421 jcr->where_bregexp = NULL;
423 if (jcr->cached_path) {
424 free_pool_memory(jcr->cached_path);
425 jcr->cached_path = NULL;
429 free_guid_list(jcr->id_list);
432 /* Invalidate the tsd jcr data */
433 set_jcr_in_tsd(INVALID_JCR);
438 * Global routine to free a jcr
441 void b_free_jcr(const char *file, int line, JCR *jcr)
443 struct s_last_job *je;
445 Dmsg3(dbglvl, "Enter free_jcr jid=%u from %s:%d\n", jcr->JobId, file, line);
449 void free_jcr(JCR *jcr)
451 struct s_last_job *je;
453 Dmsg3(dbglvl, "Enter free_jcr jid=%u use_count=%d Job=%s\n",
454 jcr->JobId, jcr->use_count(), jcr->Job);
458 dequeue_messages(jcr);
460 jcr->dec_use_count(); /* decrement use count */
461 if (jcr->use_count() < 0) {
462 Jmsg2(jcr, M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
463 jcr->use_count(), jcr->JobId);
465 if (jcr->JobId > 0) {
466 Dmsg3(dbglvl, "Dec free_jcr jid=%u use_count=%d Job=%s\n",
467 jcr->JobId, jcr->use_count(), jcr->Job);
469 if (jcr->use_count() > 0) { /* if in use */
473 if (jcr->JobId > 0) {
474 Dmsg3(dbglvl, "remove jcr jid=%u use_count=%d Job=%s\n",
475 jcr->JobId, jcr->use_count(), jcr->Job);
477 remove_jcr(jcr); /* remove Jcr from chain */
479 job_end_pop(jcr); /* pop and call hooked routines */
481 Dmsg1(dbglvl, "End job=%d\n", jcr->JobId);
483 /* Keep some statistics */
484 switch (jcr->get_JobType()) {
491 /* Keep list of last jobs, but not Console where JobId==0 */
492 if (jcr->JobId > 0) {
493 lock_last_jobs_list();
495 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
496 memset(je, 0, sizeof(struct s_last_job)); /* zero in case unset fields */
497 je->Errors = jcr->Errors;
498 je->JobType = jcr->get_JobType();
499 je->JobId = jcr->JobId;
500 je->VolSessionId = jcr->VolSessionId;
501 je->VolSessionTime = jcr->VolSessionTime;
502 bstrncpy(je->Job, jcr->Job, sizeof(je->Job));
503 je->JobFiles = jcr->JobFiles;
504 je->JobBytes = jcr->JobBytes;
505 je->JobStatus = jcr->JobStatus;
506 je->JobLevel = jcr->get_JobLevel();
507 je->start_time = jcr->start_time;
508 je->end_time = time(NULL);
511 init_last_jobs_list();
513 last_jobs->append(je);
514 if (last_jobs->size() > max_last_jobs) {
515 je = (struct s_last_job *)last_jobs->first();
516 last_jobs->remove(je);
519 unlock_last_jobs_list();
526 if (jcr->daemon_free_jcr) {
527 jcr->daemon_free_jcr(jcr); /* call daemon free routine */
531 free_common_jcr(jcr);
532 close_msg(NULL); /* flush any daemon messages */
533 garbage_collect_memory_pool();
534 Dmsg0(dbglvl, "Exit free_jcr\n");
537 void set_jcr_in_tsd(JCR *jcr)
539 int status = pthread_setspecific(jcr_key, (void *)jcr);
542 Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"), be.bstrerror(status));
546 JCR *get_jcr_from_tsd()
548 JCR *jcr = (JCR *)pthread_getspecific(jcr_key);
549 // printf("get_jcr_from_tsd: jcr=%p\n", jcr);
550 /* set any INVALID_JCR to NULL which the rest of Bacula understands */
551 if (jcr == INVALID_JCR) {
559 * Find which JobId corresponds to the current thread
561 uint32_t get_jobid_from_tsd()
565 jcr = get_jcr_from_tsd();
566 // printf("get_jobid_from_tsr: jcr=%p\n", jcr);
568 JobId = (uint32_t)jcr->JobId;
574 * Given a JobId, find the JCR
575 * Returns: jcr on success
578 JCR *get_jcr_by_id(uint32_t JobId)
583 if (jcr->JobId == JobId) {
584 jcr->inc_use_count();
585 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
586 jcr->JobId, jcr->use_count(), jcr->Job);
595 * Given a SessionId and SessionTime, find the JCR
596 * Returns: jcr on success
599 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
604 if (jcr->VolSessionId == SessionId &&
605 jcr->VolSessionTime == SessionTime) {
606 jcr->inc_use_count();
607 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
608 jcr->JobId, jcr->use_count(), jcr->Job);
618 * Given a Job, find the JCR
619 * compares on the number of characters in Job
620 * thus allowing partial matches.
621 * Returns: jcr on success
624 JCR *get_jcr_by_partial_name(char *Job)
634 if (strncmp(Job, jcr->Job, len) == 0) {
635 jcr->inc_use_count();
636 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
637 jcr->JobId, jcr->use_count(), jcr->Job);
647 * Given a Job, find the JCR
648 * requires an exact match of names.
649 * Returns: jcr on success
652 JCR *get_jcr_by_full_name(char *Job)
660 if (strcmp(jcr->Job, Job) == 0) {
661 jcr->inc_use_count();
662 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
663 jcr->JobId, jcr->use_count(), jcr->Job);
671 void set_jcr_job_status(JCR *jcr, int JobStatus)
673 bool set_waittime = false;
674 int oldJobStatus = jcr->JobStatus;
676 Dmsg2(800, "set_jcr_job_status(%s, %c)\n", jcr->Job, JobStatus);
677 /* if wait state is new, we keep current time for watchdog MaxWaitTime */
683 case JS_WaitStoreRes:
685 case JS_WaitClientRes:
687 case JS_WaitPriority:
694 * For a set of errors, ... keep the current status
695 * so it isn't lost. For all others, set it.
697 Dmsg3(300, "jid=%u OnEntry JobStatus=%c set=%c\n", (uint32_t)jcr->JobId,
698 jcr->JobStatus, JobStatus);
699 switch (jcr->JobStatus) {
700 case JS_ErrorTerminated:
707 case JS_ErrorTerminated:
710 /* Override more minor status */
711 jcr->JobStatus = JobStatus;
717 * For a set of Wait situation, keep old time.
723 case JS_WaitStoreRes:
725 case JS_WaitClientRes:
727 case JS_WaitPriority:
728 set_waittime = false; /* keep old time */
730 jcr->JobStatus = JobStatus;
732 /* set it before JobStatus */
733 Dmsg0(800, "Setting wait_time\n");
734 jcr->wait_time = time(NULL);
737 if (oldJobStatus != jcr->JobStatus) {
738 Dmsg3(200, "jid=%u leave set_old_job_status=%c new_set=%c\n", (uint32_t)jcr->JobId,
739 oldJobStatus, JobStatus);
740 // generate_plugin_event(jcr, bEventStatusChange, NULL);
744 #ifdef TRACE_JCR_CHAIN
745 static int lock_count = 0;
751 #ifdef TRACE_JCR_CHAIN
752 static void b_lock_jcr_chain(const char *fname, int line)
754 static void lock_jcr_chain()
757 #ifdef TRACE_JCR_CHAIN
758 Dmsg3(dbglvl, "Lock jcr chain %d from %s:%d\n", ++lock_count, fname, line);
766 #ifdef TRACE_JCR_CHAIN
767 static void b_unlock_jcr_chain(const char *fname, int line)
769 static void unlock_jcr_chain()
772 #ifdef TRACE_JCR_CHAIN
773 Dmsg3(dbglvl, "Unlock jcr chain %d from %s:%d\n", lock_count--, fname, line);
780 * Start walk of jcr chain
781 * The proper way to walk the jcr chain is:
788 * It is possible to leave out the endeach_jcr(jcr), but
789 * in that case, the last jcr referenced must be explicitly
795 JCR *jcr_walk_start()
799 jcr = (JCR *)jcrs->first();
801 jcr->inc_use_count();
802 if (jcr->JobId > 0) {
803 Dmsg3(dbglvl, "Inc walk_start jid=%u use_count=%d Job=%s\n",
804 jcr->JobId, jcr->use_count(), jcr->Job);
812 * Get next jcr from chain, and release current one
814 JCR *jcr_walk_next(JCR *prev_jcr)
819 jcr = (JCR *)jcrs->next(prev_jcr);
821 jcr->inc_use_count();
822 if (jcr->JobId > 0) {
823 Dmsg3(dbglvl, "Inc walk_next jid=%u use_count=%d Job=%s\n",
824 jcr->JobId, jcr->use_count(), jcr->Job);
835 * Release last jcr referenced
837 void jcr_walk_end(JCR *jcr)
840 if (jcr->JobId > 0) {
841 Dmsg3(dbglvl, "Free walk_end jid=%u use_count=%d Job=%s\n",
842 jcr->JobId, jcr->use_count(), jcr->Job);
850 * Setup to call the timeout check routine every 30 seconds
851 * This routine will check any timers that have been enabled.
853 bool init_jcr_subsystem(void)
855 watchdog_t *wd = new_watchdog();
857 wd->one_shot = false;
858 wd->interval = 30; /* FIXME: should be configurable somewhere, even
859 if only with a #define */
860 wd->callback = jcr_timeout_check;
862 register_watchdog(wd);
867 static void jcr_timeout_check(watchdog_t *self)
873 Dmsg0(dbglvl, "Start JCR timeout checks\n");
875 /* Walk through all JCRs checking if any one is
876 * blocked for more than specified max time.
879 Dmsg2(dbglvl, "jcr_timeout_check JobId=%u jcr=0x%x\n", jcr->JobId, jcr);
880 if (jcr->JobId == 0) {
883 bs = jcr->store_bsock;
885 timer_start = bs->timer_start;
886 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
887 bs->timer_start = 0; /* turn off timer */
889 Qmsg(jcr, M_ERROR, 0, _(
890 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
891 watchdog_time - timer_start);
892 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
895 bs = jcr->file_bsock;
897 timer_start = bs->timer_start;
898 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
899 bs->timer_start = 0; /* turn off timer */
901 Qmsg(jcr, M_ERROR, 0, _(
902 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
903 watchdog_time - timer_start);
904 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
909 timer_start = bs->timer_start;
910 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
911 bs->timer_start = 0; /* turn off timer */
913 Qmsg(jcr, M_ERROR, 0, _(
914 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
915 watchdog_time - timer_start);
916 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
922 Dmsg0(dbglvl, "Finished JCR timeout checks\n");
926 * Timeout signal comes here
928 extern "C" void timeout_handler(int sig)
930 return; /* thus interrupting the function */