2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Manipulation routines for Job Control Records and
30 * handling of last_jobs_list.
32 * Kern E. Sibbald, December 2000
36 * These routines are thread safe.
38 * The job list routines were re-written in May 2005 to
39 * eliminate the global lock while traversing the list, and
40 * to use the dlist subroutines. The locking is now done
41 * on the list each time the list is modified or traversed.
42 * That is it is "micro-locked" rather than globally locked.
43 * The result is that there is one lock/unlock for each entry
44 * in the list while traversing it rather than a single lock
45 * at the beginning of a traversal and one at the end. This
46 * incurs slightly more overhead, but effectively eliminates
47 * the possibilty of race conditions. In addition, with the
48 * exception of the global locking of the list during the
49 * re-reading of the config file, no recursion is needed.
56 const int dbglvl = 3400;
58 /* External variables we reference */
59 extern time_t watchdog_time;
61 /* External referenced functions */
62 void free_bregexps(alist *bregexps);
64 /* Forward referenced functions */
65 extern "C" void timeout_handler(int sig);
66 static void jcr_timeout_check(watchdog_t *self);
67 #ifdef TRACE_JCR_CHAIN
68 static void b_lock_jcr_chain(const char *filen, int line);
69 static void b_unlock_jcr_chain(const char *filen, int line);
70 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
71 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
73 static void lock_jcr_chain();
74 static void unlock_jcr_chain();
79 dlist *last_jobs = NULL;
80 const int max_last_jobs = 10;
82 static dlist *jcrs = NULL; /* JCR chain */
83 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
85 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
87 static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
89 static pthread_key_t jcr_key; /* Pointer to jcr for each thread */
91 pthread_once_t key_once = PTHREAD_ONCE_INIT;
104 void init_last_jobs_list()
107 struct s_last_job *job_entry = NULL;
109 last_jobs = New(dlist(job_entry, &job_entry->link));
112 jcrs = New(dlist(jcr, &jcr->link));
116 void term_last_jobs_list()
119 lock_last_jobs_list();
120 while (!last_jobs->empty()) {
121 void *je = last_jobs->first();
122 last_jobs->remove(je);
127 unlock_last_jobs_list();
135 bool read_last_jobs_list(int fd, uint64_t addr)
137 struct s_last_job *je, job;
141 Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
142 if (addr == 0 || lseek(fd, (boffset_t)addr, SEEK_SET) < 0) {
145 if (read(fd, &num, sizeof(num)) != sizeof(num)) {
148 Dmsg1(100, "Read num_items=%d\n", num);
149 if (num > 4 * max_last_jobs) { /* sanity check */
152 lock_last_jobs_list();
153 for ( ; num; num--) {
154 if (read(fd, &job, sizeof(job)) != sizeof(job)) {
156 Pmsg1(000, "Read job entry. ERR=%s\n", be.bstrerror());
161 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
162 memcpy((char *)je, (char *)&job, sizeof(job));
164 init_last_jobs_list();
166 last_jobs->append(je);
167 if (last_jobs->size() > max_last_jobs) {
168 je = (struct s_last_job *)last_jobs->first();
169 last_jobs->remove(je);
174 unlock_last_jobs_list();
178 uint64_t write_last_jobs_list(int fd, uint64_t addr)
180 struct s_last_job *je;
184 Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
185 if (lseek(fd, (boffset_t)addr, SEEK_SET) < 0) {
189 lock_last_jobs_list();
190 /* First record is number of entires */
191 num = last_jobs->size();
192 if (write(fd, &num, sizeof(num)) != sizeof(num)) {
194 Pmsg1(000, "Error writing num_items: ERR=%s\n", be.bstrerror());
197 foreach_dlist(je, last_jobs) {
198 if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
200 Pmsg1(000, "Error writing job: ERR=%s\n", be.bstrerror());
204 unlock_last_jobs_list();
206 /* Return current address */
207 stat = lseek(fd, 0, SEEK_CUR);
214 unlock_last_jobs_list();
218 void lock_last_jobs_list()
223 void unlock_last_jobs_list()
228 /* Get an ASCII representation of the Operation being performed as an english Noun */
229 const char *JCR::get_OperationName()
235 return _("Verifying");
237 return _("Restoring");
239 return _("Archiving");
243 return _("Migration");
245 return _("Scanning");
247 return _("Unknown operation");
251 /* Get an ASCII representation of the Action being performed either an english Verb or Adjective */
252 const char *JCR::get_ActionName(bool past)
258 return (past == true) ? _("verified") : _("verify");
260 return (past == true) ? _("restored") : _("restore");
262 return (past == true) ? _("archived") : _("archive");
264 return (past == true) ? _("copied") : _("copy");
266 return (past == true) ? _("migrated") : _("migrate");
268 return (past == true) ? _("scanned") : _("scan");
270 return _("unknown action");
274 /* Set Job type in JCR and also set appropriate read flag */
275 void JCR::set_JobType(int32_t JobType)
280 /* Set Job level in JCR and also set appropriate read flag */
281 void JCR::set_JobLevel(int32_t JobLevel)
283 m_JobLevel = JobLevel;
295 if (m_JobLevel == L_VIRTUAL_FULL) {
306 * Push a subroutine address into the job end callback stack
308 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
310 jcr->job_end_push.append((void *)job_end_cb);
311 jcr->job_end_push.append(ctx);
314 /* Pop each job_end subroutine and call it */
315 static void job_end_pop(JCR *jcr)
317 void (*job_end_cb)(JCR *jcr, void *ctx);
319 for (int i=jcr->job_end_push.size()-1; i > 0; ) {
320 ctx = jcr->job_end_push.get(i--);
321 job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
322 job_end_cb(jcr, ctx);
326 void create_jcr_key()
328 int status = pthread_key_create(&jcr_key, NULL);
331 Jmsg1(NULL, M_ABORT, 0, _("pthread key create failed: ERR=%s\n"),
332 be.bstrerror(status));
337 * Create a Job Control Record and link it into JCR chain
338 * Returns newly allocated JCR
339 * Note, since each daemon has a different JCR, he passes
342 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
345 MQUEUE_ITEM *item = NULL;
346 struct sigaction sigtimer;
349 Dmsg0(dbglvl, "Enter new_jcr\n");
350 status = pthread_once(&key_once, create_jcr_key);
353 Jmsg1(NULL, M_ABORT, 0, _("pthread_once failed. ERR=%s\n"), be.bstrerror(status));
355 jcr = (JCR *)malloc(size);
356 memset(jcr, 0, size);
357 jcr->my_thread_id = pthread_self();
358 jcr->msg_queue = New(dlist(item, &item->link));
359 jcr->job_end_push.init(1, false);
360 jcr->sched_time = time(NULL);
361 jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */
363 jcr->inc_use_count();
364 jcr->VolumeName = get_pool_memory(PM_FNAME);
365 jcr->VolumeName[0] = 0;
366 jcr->errmsg = get_pool_memory(PM_MESSAGE);
368 /* Setup some dummy values */
369 bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
371 jcr->set_JobType(JT_SYSTEM); /* internal job until defined */
372 jcr->set_JobLevel(L_NONE);
373 set_jcr_job_status(jcr, JS_Created); /* ready to run */
375 sigtimer.sa_flags = 0;
376 sigtimer.sa_handler = timeout_handler;
377 sigfillset(&sigtimer.sa_mask);
378 sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
381 * Locking jobs is a global lock that is needed
382 * so that the Director can stop new jobs from being
383 * added to the jcr chain while it processes a new
384 * conf file and does the job_end_push().
389 jcrs = New(dlist(jcr, &jcr->link));
400 * Remove a JCR from the chain
401 * NOTE! The chain must be locked prior to calling
404 static void remove_jcr(JCR *jcr)
406 Dmsg0(dbglvl, "Enter remove_jcr\n");
408 Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
411 Dmsg0(dbglvl, "Leave remove_jcr\n");
415 * Free stuff common to all JCRs. N.B. Be careful to include only
416 * generic stuff in the common part of the jcr.
418 static void free_common_jcr(JCR *jcr)
420 jcr->destroy_mutex();
422 if (jcr->msg_queue) {
423 delete jcr->msg_queue;
424 jcr->msg_queue = NULL;
426 close_msg(jcr); /* close messages for this job */
428 /* do this after closing messages */
429 if (jcr->client_name) {
430 free_pool_memory(jcr->client_name);
431 jcr->client_name = NULL;
435 free_pool_memory(jcr->attr);
439 if (jcr->sd_auth_key) {
440 free(jcr->sd_auth_key);
441 jcr->sd_auth_key = NULL;
443 if (jcr->VolumeName) {
444 free_pool_memory(jcr->VolumeName);
445 jcr->VolumeName = NULL;
448 if (jcr->dir_bsock) {
449 bnet_close(jcr->dir_bsock);
450 jcr->dir_bsock = NULL;
453 free_pool_memory(jcr->errmsg);
460 if (jcr->RegexWhere) {
461 free(jcr->RegexWhere);
462 jcr->RegexWhere = NULL;
464 if (jcr->where_bregexp) {
465 free_bregexps(jcr->where_bregexp);
466 delete jcr->where_bregexp;
467 jcr->where_bregexp = NULL;
469 if (jcr->cached_path) {
470 free_pool_memory(jcr->cached_path);
471 jcr->cached_path = NULL;
475 free_guid_list(jcr->id_list);
478 /* Invalidate the tsd jcr data */
479 set_jcr_in_tsd(INVALID_JCR);
484 * Global routine to free a jcr
487 void b_free_jcr(const char *file, int line, JCR *jcr)
489 struct s_last_job *je;
491 Dmsg3(dbglvl, "Enter free_jcr jid=%u from %s:%d\n", jcr->JobId, file, line);
495 void free_jcr(JCR *jcr)
497 struct s_last_job *je;
499 Dmsg3(dbglvl, "Enter free_jcr jid=%u use_count=%d Job=%s\n",
500 jcr->JobId, jcr->use_count(), jcr->Job);
504 dequeue_messages(jcr);
506 jcr->dec_use_count(); /* decrement use count */
507 if (jcr->use_count() < 0) {
508 Jmsg2(jcr, M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
509 jcr->use_count(), jcr->JobId);
511 if (jcr->JobId > 0) {
512 Dmsg3(dbglvl, "Dec free_jcr jid=%u use_count=%d Job=%s\n",
513 jcr->JobId, jcr->use_count(), jcr->Job);
515 if (jcr->use_count() > 0) { /* if in use */
519 if (jcr->JobId > 0) {
520 Dmsg3(dbglvl, "remove jcr jid=%u use_count=%d Job=%s\n",
521 jcr->JobId, jcr->use_count(), jcr->Job);
523 remove_jcr(jcr); /* remove Jcr from chain */
525 job_end_pop(jcr); /* pop and call hooked routines */
527 Dmsg1(dbglvl, "End job=%d\n", jcr->JobId);
529 /* Keep some statistics */
530 switch (jcr->get_JobType()) {
537 /* Keep list of last jobs, but not Console where JobId==0 */
538 if (jcr->JobId > 0) {
539 lock_last_jobs_list();
541 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
542 memset(je, 0, sizeof(struct s_last_job)); /* zero in case unset fields */
543 je->Errors = jcr->Errors;
544 je->JobType = jcr->get_JobType();
545 je->JobId = jcr->JobId;
546 je->VolSessionId = jcr->VolSessionId;
547 je->VolSessionTime = jcr->VolSessionTime;
548 bstrncpy(je->Job, jcr->Job, sizeof(je->Job));
549 je->JobFiles = jcr->JobFiles;
550 je->JobBytes = jcr->JobBytes;
551 je->JobStatus = jcr->JobStatus;
552 je->JobLevel = jcr->get_JobLevel();
553 je->start_time = jcr->start_time;
554 je->end_time = time(NULL);
557 init_last_jobs_list();
559 last_jobs->append(je);
560 if (last_jobs->size() > max_last_jobs) {
561 je = (struct s_last_job *)last_jobs->first();
562 last_jobs->remove(je);
565 unlock_last_jobs_list();
572 if (jcr->daemon_free_jcr) {
573 jcr->daemon_free_jcr(jcr); /* call daemon free routine */
577 free_common_jcr(jcr);
578 close_msg(NULL); /* flush any daemon messages */
579 garbage_collect_memory_pool();
580 Dmsg0(dbglvl, "Exit free_jcr\n");
583 void set_jcr_in_tsd(JCR *jcr)
585 int status = pthread_setspecific(jcr_key, (void *)jcr);
588 Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"), be.bstrerror(status));
592 JCR *get_jcr_from_tsd()
594 JCR *jcr = (JCR *)pthread_getspecific(jcr_key);
595 // printf("get_jcr_from_tsd: jcr=%p\n", jcr);
596 /* set any INVALID_JCR to NULL which the rest of Bacula understands */
597 if (jcr == INVALID_JCR) {
605 * Find which JobId corresponds to the current thread
607 uint32_t get_jobid_from_tsd()
611 jcr = get_jcr_from_tsd();
612 // printf("get_jobid_from_tsr: jcr=%p\n", jcr);
614 JobId = (uint32_t)jcr->JobId;
620 * Given a JobId, find the JCR
621 * Returns: jcr on success
624 JCR *get_jcr_by_id(uint32_t JobId)
629 if (jcr->JobId == JobId) {
630 jcr->inc_use_count();
631 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
632 jcr->JobId, jcr->use_count(), jcr->Job);
641 * Given a SessionId and SessionTime, find the JCR
642 * Returns: jcr on success
645 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
650 if (jcr->VolSessionId == SessionId &&
651 jcr->VolSessionTime == SessionTime) {
652 jcr->inc_use_count();
653 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
654 jcr->JobId, jcr->use_count(), jcr->Job);
664 * Given a Job, find the JCR
665 * compares on the number of characters in Job
666 * thus allowing partial matches.
667 * Returns: jcr on success
670 JCR *get_jcr_by_partial_name(char *Job)
680 if (strncmp(Job, jcr->Job, len) == 0) {
681 jcr->inc_use_count();
682 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
683 jcr->JobId, jcr->use_count(), jcr->Job);
693 * Given a Job, find the JCR
694 * requires an exact match of names.
695 * Returns: jcr on success
698 JCR *get_jcr_by_full_name(char *Job)
706 if (strcmp(jcr->Job, Job) == 0) {
707 jcr->inc_use_count();
708 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
709 jcr->JobId, jcr->use_count(), jcr->Job);
717 void set_jcr_job_status(JCR *jcr, int JobStatus)
719 bool set_waittime = false;
720 int oldJobStatus = jcr->JobStatus;
722 Dmsg2(800, "set_jcr_job_status(%s, %c)\n", jcr->Job, JobStatus);
723 /* if wait state is new, we keep current time for watchdog MaxWaitTime */
729 case JS_WaitStoreRes:
731 case JS_WaitClientRes:
733 case JS_WaitPriority:
740 * For a set of errors, ... keep the current status
741 * so it isn't lost. For all others, set it.
743 Dmsg3(300, "jid=%u OnEntry JobStatus=%c set=%c\n", (uint32_t)jcr->JobId,
744 jcr->JobStatus, JobStatus);
745 switch (jcr->JobStatus) {
746 case JS_ErrorTerminated:
753 case JS_ErrorTerminated:
756 /* Override more minor status */
757 jcr->JobStatus = JobStatus;
763 * For a set of Wait situation, keep old time.
769 case JS_WaitStoreRes:
771 case JS_WaitClientRes:
773 case JS_WaitPriority:
774 set_waittime = false; /* keep old time */
776 jcr->JobStatus = JobStatus;
778 /* set it before JobStatus */
779 Dmsg0(800, "Setting wait_time\n");
780 jcr->wait_time = time(NULL);
783 if (oldJobStatus != jcr->JobStatus) {
784 Dmsg3(200, "jid=%u leave set_old_job_status=%c new_set=%c\n", (uint32_t)jcr->JobId,
785 oldJobStatus, JobStatus);
786 // generate_plugin_event(jcr, bEventStatusChange, NULL);
790 #ifdef TRACE_JCR_CHAIN
791 static int lock_count = 0;
797 #ifdef TRACE_JCR_CHAIN
798 static void b_lock_jcr_chain(const char *fname, int line)
800 static void lock_jcr_chain()
803 #ifdef TRACE_JCR_CHAIN
804 Dmsg3(dbglvl, "Lock jcr chain %d from %s:%d\n", ++lock_count, fname, line);
812 #ifdef TRACE_JCR_CHAIN
813 static void b_unlock_jcr_chain(const char *fname, int line)
815 static void unlock_jcr_chain()
818 #ifdef TRACE_JCR_CHAIN
819 Dmsg3(dbglvl, "Unlock jcr chain %d from %s:%d\n", lock_count--, fname, line);
826 * Start walk of jcr chain
827 * The proper way to walk the jcr chain is:
834 * It is possible to leave out the endeach_jcr(jcr), but
835 * in that case, the last jcr referenced must be explicitly
841 JCR *jcr_walk_start()
845 jcr = (JCR *)jcrs->first();
847 jcr->inc_use_count();
848 if (jcr->JobId > 0) {
849 Dmsg3(dbglvl, "Inc walk_start jid=%u use_count=%d Job=%s\n",
850 jcr->JobId, jcr->use_count(), jcr->Job);
858 * Get next jcr from chain, and release current one
860 JCR *jcr_walk_next(JCR *prev_jcr)
865 jcr = (JCR *)jcrs->next(prev_jcr);
867 jcr->inc_use_count();
868 if (jcr->JobId > 0) {
869 Dmsg3(dbglvl, "Inc walk_next jid=%u use_count=%d Job=%s\n",
870 jcr->JobId, jcr->use_count(), jcr->Job);
881 * Release last jcr referenced
883 void jcr_walk_end(JCR *jcr)
886 if (jcr->JobId > 0) {
887 Dmsg3(dbglvl, "Free walk_end jid=%u use_count=%d Job=%s\n",
888 jcr->JobId, jcr->use_count(), jcr->Job);
896 * Setup to call the timeout check routine every 30 seconds
897 * This routine will check any timers that have been enabled.
899 bool init_jcr_subsystem(void)
901 watchdog_t *wd = new_watchdog();
903 wd->one_shot = false;
904 wd->interval = 30; /* FIXME: should be configurable somewhere, even
905 if only with a #define */
906 wd->callback = jcr_timeout_check;
908 register_watchdog(wd);
913 static void jcr_timeout_check(watchdog_t *self)
919 Dmsg0(dbglvl, "Start JCR timeout checks\n");
921 /* Walk through all JCRs checking if any one is
922 * blocked for more than specified max time.
925 Dmsg2(dbglvl, "jcr_timeout_check JobId=%u jcr=0x%x\n", jcr->JobId, jcr);
926 if (jcr->JobId == 0) {
929 bs = jcr->store_bsock;
931 timer_start = bs->timer_start;
932 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
933 bs->timer_start = 0; /* turn off timer */
935 Qmsg(jcr, M_ERROR, 0, _(
936 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
937 watchdog_time - timer_start);
938 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
941 bs = jcr->file_bsock;
943 timer_start = bs->timer_start;
944 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
945 bs->timer_start = 0; /* turn off timer */
947 Qmsg(jcr, M_ERROR, 0, _(
948 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
949 watchdog_time - timer_start);
950 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
955 timer_start = bs->timer_start;
956 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
957 bs->timer_start = 0; /* turn off timer */
959 Qmsg(jcr, M_ERROR, 0, _(
960 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
961 watchdog_time - timer_start);
962 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
968 Dmsg0(dbglvl, "Finished JCR timeout checks\n");
972 * Timeout signal comes here
974 extern "C" void timeout_handler(int sig)
976 return; /* thus interrupting the function */