]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/jcr.c
11Apr07
[bacula/bacula] / bacula / src / lib / jcr.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation plus additions
11    that are listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Manipulation routines for Job Control Records and
30  *  handling of last_jobs_list.
31  *
32  *  Kern E. Sibbald, December 2000
33  *
34  *  Version $Id$
35  *
36  *  These routines are thread safe.
37  *
38  *  The job list routines were re-written in May 2005 to
39  *  eliminate the global lock while traversing the list, and
40  *  to use the dlist subroutines.  The locking is now done
41  *  on the list each time the list is modified or traversed.
42  *  That is it is "micro-locked" rather than globally locked.
43  *  The result is that there is one lock/unlock for each entry
44  *  in the list while traversing it rather than a single lock
45  *  at the beginning of a traversal and one at the end.  This
46  *  incurs slightly more overhead, but effectively eliminates 
47  *  the possibilty of race conditions.  In addition, with the
48  *  exception of the global locking of the list during the
49  *  re-reading of the config file, no recursion is needed.
50  *
51  */
52
53 #include "bacula.h"
54 #include "jcr.h"
55
56 /* External variables we reference */
57 extern time_t watchdog_time;
58
59 /* Forward referenced functions */
60 extern "C" void timeout_handler(int sig);
61 static void jcr_timeout_check(watchdog_t *self);
62 #ifdef TRACE_JCR_CHAIN
63 static void b_lock_jcr_chain(const char *filen, int line);
64 static void b_unlock_jcr_chain(const char *filen, int line);
65 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
66 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
67 #else
68 static void lock_jcr_chain();
69 static void unlock_jcr_chain();
70 #endif
71
72
73 int num_jobs_run;
74 dlist *last_jobs = NULL;
75 const int max_last_jobs = 10;
76  
77 static dlist *jcrs = NULL;            /* JCR chain */
78 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
79
80 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
81
82 static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
83
84
85 void lock_jobs()
86 {
87    P(job_start_mutex);
88 }
89
90 void unlock_jobs()
91 {
92    V(job_start_mutex);
93 }
94
95 void init_last_jobs_list()
96 {
97    JCR *jcr = NULL;
98    struct s_last_job *job_entry = NULL;
99    if (!last_jobs) {
100       last_jobs = New(dlist(job_entry, &job_entry->link));
101    }
102    if (!jcrs) {
103       jcrs = New(dlist(jcr, &jcr->link));
104    }
105 }
106
107 void term_last_jobs_list()
108 {
109    if (last_jobs) {
110       while (!last_jobs->empty()) {
111          void *je = last_jobs->first();
112          last_jobs->remove(je);
113          free(je);
114       }
115       delete last_jobs;
116       last_jobs = NULL;
117    }
118    if (jcrs) {
119       delete jcrs;
120       jcrs = NULL;
121    }
122 }
123
124 bool read_last_jobs_list(int fd, uint64_t addr)
125 {
126    struct s_last_job *je, job;
127    uint32_t num;
128
129    Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
130    if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
131       return false;
132    }
133    if (read(fd, &num, sizeof(num)) != sizeof(num)) {
134       return false;
135    }
136    Dmsg1(100, "Read num_items=%d\n", num);
137    if (num > 4 * max_last_jobs) {  /* sanity check */
138       return false;
139    }
140    for ( ; num; num--) {
141       if (read(fd, &job, sizeof(job)) != sizeof(job)) {
142          Dmsg1(000, "Read job entry. ERR=%s\n", strerror(errno));
143          return false;
144       }
145       if (job.JobId > 0) {
146          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
147          memcpy((char *)je, (char *)&job, sizeof(job));
148          if (!last_jobs) {
149             init_last_jobs_list();
150          }
151          last_jobs->append(je);
152          if (last_jobs->size() > max_last_jobs) {
153             je = (struct s_last_job *)last_jobs->first();
154             last_jobs->remove(je);
155             free(je);
156          }
157       }
158    }
159    return true;
160 }
161
162 uint64_t write_last_jobs_list(int fd, uint64_t addr)
163 {
164    struct s_last_job *je;
165    uint32_t num;
166
167    Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
168    if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
169       return 0;
170    }
171    if (last_jobs) {
172       /* First record is number of entires */
173       num = last_jobs->size();
174       if (write(fd, &num, sizeof(num)) != sizeof(num)) {
175          Dmsg1(000, "Error writing num_items: ERR=%s\n", strerror(errno));
176          return 0;
177       }
178       foreach_dlist(je, last_jobs) {
179          if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
180             Dmsg1(000, "Error writing job: ERR=%s\n", strerror(errno));
181             return 0;
182          }
183       }
184    }
185    /* Return current address */
186    ssize_t stat = lseek(fd, 0, SEEK_CUR);
187    if (stat < 0) {
188       stat = 0;
189    }
190    return stat;
191
192 }
193
194 void lock_last_jobs_list()
195 {
196    P(last_jobs_mutex);
197 }
198
199 void unlock_last_jobs_list()
200 {
201    V(last_jobs_mutex);
202 }
203
204 /*
205  * Push a subroutine address into the job end callback stack
206  */
207 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
208 {
209    jcr->job_end_push.append((void *)job_end_cb);
210    jcr->job_end_push.append(ctx);
211 }
212
213 /* Pop each job_end subroutine and call it */
214 static void job_end_pop(JCR *jcr)
215 {
216    void (*job_end_cb)(JCR *jcr, void *ctx);
217    void *ctx;
218    for (int i=jcr->job_end_push.size()-1; i > 0; ) {
219       ctx = jcr->job_end_push.get(i--);
220       job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
221       job_end_cb(jcr, ctx);
222    }
223 }
224
225 /*
226  * Create a Job Control Record and link it into JCR chain
227  * Returns newly allocated JCR
228  * Note, since each daemon has a different JCR, he passes
229  *  us the size.
230  */
231 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
232 {
233    JCR *jcr;
234    MQUEUE_ITEM *item = NULL;
235    struct sigaction sigtimer;
236
237    Dmsg0(3400, "Enter new_jcr\n");
238    jcr = (JCR *)malloc(size);
239    memset(jcr, 0, size);
240    jcr->my_thread_id = pthread_self();
241    jcr->msg_queue = New(dlist(item, &item->link));
242    jcr->job_end_push.init(1, false);
243    jcr->sched_time = time(NULL);
244    jcr->daemon_free_jcr = daemon_free_jcr;    /* plug daemon free routine */
245    jcr->init_mutex();
246    jcr->inc_use_count();   
247    jcr->VolumeName = get_pool_memory(PM_FNAME);
248    jcr->VolumeName[0] = 0;
249    jcr->errmsg = get_pool_memory(PM_MESSAGE);
250    jcr->errmsg[0] = 0;
251    /* Setup some dummy values */
252    bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
253    jcr->JobId = 0;
254    jcr->JobType = JT_SYSTEM;          /* internal job until defined */
255    jcr->JobLevel = L_NONE;
256    set_jcr_job_status(jcr, JS_Created);       /* ready to run */
257
258    sigtimer.sa_flags = 0;
259    sigtimer.sa_handler = timeout_handler;
260    sigfillset(&sigtimer.sa_mask);
261    sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
262
263    /*
264     * Locking jobs is a global lock that is needed
265     * so that the Director can stop new jobs from being
266     * added to the jcr chain while it processes a new
267     * conf file and does the job_end_push().
268     */
269    lock_jobs();
270    lock_jcr_chain();
271    if (!jcrs) {
272       jcrs = New(dlist(jcr, &jcr->link));
273    }
274    jcrs->append(jcr);
275    unlock_jcr_chain();
276    unlock_jobs();
277
278    return jcr;
279 }
280
281
282 /*
283  * Remove a JCR from the chain
284  * NOTE! The chain must be locked prior to calling
285  *       this routine.
286  */
287 static void remove_jcr(JCR *jcr)
288 {
289    Dmsg0(3400, "Enter remove_jcr\n");
290    if (!jcr) {
291       Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
292    }
293    jcrs->remove(jcr);
294    Dmsg0(3400, "Leave remove_jcr\n");
295 }
296
297 /*
298  * Free stuff common to all JCRs.  N.B. Be careful to include only
299  *  generic stuff in the common part of the jcr.
300  */
301 static void free_common_jcr(JCR *jcr)
302 {
303    struct s_last_job *je, last_job;
304
305    /* Keep some statistics */
306    switch (jcr->JobType) {
307    case JT_BACKUP:
308    case JT_VERIFY:
309    case JT_RESTORE:
310    case JT_MIGRATE:
311    case JT_COPY:
312    case JT_ADMIN:
313       num_jobs_run++;
314       last_job.Errors = jcr->Errors;
315       last_job.JobType = jcr->JobType;
316       last_job.JobId = jcr->JobId;
317       last_job.VolSessionId = jcr->VolSessionId;
318       last_job.VolSessionTime = jcr->VolSessionTime;
319       bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
320       last_job.JobFiles = jcr->JobFiles;
321       last_job.JobBytes = jcr->JobBytes;
322       last_job.JobStatus = jcr->JobStatus;
323       last_job.JobLevel = jcr->JobLevel;
324       last_job.start_time = jcr->start_time;
325       last_job.end_time = time(NULL);
326       /* Keep list of last jobs, but not Console where JobId==0 */
327       if (last_job.JobId > 0) {
328          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
329          memcpy((char *)je, (char *)&last_job, sizeof(last_job));
330          if (!last_jobs) {
331             init_last_jobs_list();
332          }
333          last_jobs->append(je);
334          if (last_jobs->size() > max_last_jobs) {
335             je = (struct s_last_job *)last_jobs->first();
336             last_jobs->remove(je);
337             free(je);
338          }
339       }
340       break;
341    default:
342       break;
343    }
344    jcr->destroy_mutex();
345
346    if (jcr->msg_queue) {
347       delete jcr->msg_queue;
348       jcr->msg_queue = NULL;
349    }
350    close_msg(jcr);                    /* close messages for this job */
351
352    /* do this after closing messages */
353    if (jcr->client_name) {
354       free_pool_memory(jcr->client_name);
355       jcr->client_name = NULL;
356    }
357
358    if (jcr->attr) {
359       free_pool_memory(jcr->attr);
360       jcr->attr = NULL;
361    }
362
363    if (jcr->sd_auth_key) {
364       free(jcr->sd_auth_key);
365       jcr->sd_auth_key = NULL;
366    }
367    if (jcr->VolumeName) {
368       free_pool_memory(jcr->VolumeName);
369       jcr->VolumeName = NULL;
370    }
371
372    if (jcr->dir_bsock) {
373       bnet_close(jcr->dir_bsock);
374       jcr->dir_bsock = NULL;
375    }
376    if (jcr->errmsg) {
377       free_pool_memory(jcr->errmsg);
378       jcr->errmsg = NULL;
379    }
380    if (jcr->where) {
381       free(jcr->where);
382       jcr->where = NULL;
383    }
384    if (jcr->cached_path) {
385       free_pool_memory(jcr->cached_path);
386       jcr->cached_path = NULL;
387       jcr->cached_pnl = 0;
388    }
389    free_getuser_cache();
390    free_getgroup_cache();
391    free(jcr);
392 }
393
394 /*
395  * Global routine to free a jcr
396  */
397 #ifdef DEBUG
398 void b_free_jcr(const char *file, int line, JCR *jcr)
399 {
400    Dmsg3(3400, "Enter free_jcr 0x%x from %s:%d\n", jcr, file, line);
401
402 #else
403
404 void free_jcr(JCR *jcr)
405 {
406
407    Dmsg2(3400, "Enter free_jcr 0x%x job=%d\n", jcr, jcr->JobId);
408
409 #endif
410
411    dequeue_messages(jcr);
412    lock_jcr_chain();
413    jcr->dec_use_count();              /* decrement use count */
414    if (jcr->use_count() < 0) {
415       Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
416          jcr->use_count(), jcr->JobId);
417    }
418    Dmsg3(3400, "Dec free_jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count(), jcr->JobId);
419    if (jcr->use_count() > 0) {          /* if in use */
420       unlock_jcr_chain();
421       Dmsg3(3400, "free_jcr 0x%x job=%d use_count=%d\n", jcr, jcr->JobId, jcr->use_count());
422       return;
423    }
424
425    remove_jcr(jcr);                   /* remove Jcr from chain */
426    unlock_jcr_chain();
427
428    job_end_pop(jcr);                  /* pop and call hooked routines */
429
430    Dmsg1(3400, "End job=%d\n", jcr->JobId);
431    if (jcr->daemon_free_jcr) {
432       jcr->daemon_free_jcr(jcr);      /* call daemon free routine */
433    }
434    free_common_jcr(jcr);
435    close_msg(NULL);                   /* flush any daemon messages */
436    garbage_collect_memory_pool();
437    Dmsg0(3400, "Exit free_jcr\n");
438 }
439
440
441 /*
442  * Given a JobId, find the JCR
443  *   Returns: jcr on success
444  *            NULL on failure
445  */
446 JCR *get_jcr_by_id(uint32_t JobId)
447 {
448    JCR *jcr;
449
450    foreach_jcr(jcr) {
451       if (jcr->JobId == JobId) {
452          jcr->inc_use_count();
453          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
454          break;
455       }
456    }
457    endeach_jcr(jcr);
458    return jcr;
459 }
460
461 /*
462  * Given a SessionId and SessionTime, find the JCR
463  *   Returns: jcr on success
464  *            NULL on failure
465  */
466 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
467 {
468    JCR *jcr;
469
470    foreach_jcr(jcr) {
471       if (jcr->VolSessionId == SessionId &&
472           jcr->VolSessionTime == SessionTime) {
473          jcr->inc_use_count();
474          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
475          break;
476       }
477    }
478    endeach_jcr(jcr);
479    return jcr;
480 }
481
482
483 /*
484  * Given a Job, find the JCR
485  *  compares on the number of characters in Job
486  *  thus allowing partial matches.
487  *   Returns: jcr on success
488  *            NULL on failure
489  */
490 JCR *get_jcr_by_partial_name(char *Job)
491 {
492    JCR *jcr;
493    int len;
494
495    if (!Job) {
496       return NULL;
497    }
498    len = strlen(Job);
499    foreach_jcr(jcr) {
500       if (strncmp(Job, jcr->Job, len) == 0) {
501          jcr->inc_use_count();
502          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
503          break;
504       }
505    }
506    endeach_jcr(jcr);
507    return jcr;
508 }
509
510
511 /*
512  * Given a Job, find the JCR
513  *  requires an exact match of names.
514  *   Returns: jcr on success
515  *            NULL on failure
516  */
517 JCR *get_jcr_by_full_name(char *Job)
518 {
519    JCR *jcr;
520
521    if (!Job) {
522       return NULL;
523    }
524    foreach_jcr(jcr) {
525       if (strcmp(jcr->Job, Job) == 0) {
526          jcr->inc_use_count();
527          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
528          break;
529       }
530    }
531    endeach_jcr(jcr);
532    return jcr;
533 }
534
535 void set_jcr_job_status(JCR *jcr, int JobStatus)
536 {
537    /*
538     * For a set of errors, ... keep the current status
539     *   so it isn't lost. For all others, set it.
540     */
541    switch (jcr->JobStatus) {
542    case JS_ErrorTerminated:
543    case JS_Error:
544    case JS_FatalError:
545    case JS_Differences:
546    case JS_Canceled:
547       break;
548    default:
549       jcr->JobStatus = JobStatus;
550    }
551 }
552
553 #ifdef TRACE_JCR_CHAIN
554 static int lock_count = 0;
555 #endif
556
557 /*
558  * Lock the chain
559  */
560 #ifdef TRACE_JCR_CHAIN
561 static void b_lock_jcr_chain(const char *fname, int line)
562 #else
563 static void lock_jcr_chain()
564 #endif
565 {
566 #ifdef TRACE_JCR_CHAIN
567    Dmsg3(3400, "Lock jcr chain %d from %s:%d\n", ++lock_count,
568       fname, line);
569 #endif
570    P(jcr_lock);
571 }
572
573 /*
574  * Unlock the chain
575  */
576 #ifdef TRACE_JCR_CHAIN
577 static void b_unlock_jcr_chain(const char *fname, int line)
578 #else
579 static void unlock_jcr_chain()
580 #endif
581 {
582 #ifdef TRACE_JCR_CHAIN
583    Dmsg3(3400, "Unlock jcr chain %d from %s:%d\n", lock_count--,
584       fname, line);
585 #endif
586    V(jcr_lock);
587 }
588
589
590 /*
591  * Start walk of jcr chain
592  * The proper way to walk the jcr chain is:
593  *    JCR *jcr;
594  *    foreach_jcr(jcr) {
595  *      ...
596  *    }
597  *    endeach_jcr(jcr);
598  *
599  *  It is possible to leave out the endeach_jcr(jcr), but
600  *   in that case, the last jcr referenced must be explicitly
601  *   released with:
602  *
603  *    free_jcr(jcr);
604  *  
605  */
606 JCR *jcr_walk_start() 
607 {
608    JCR *jcr;
609    lock_jcr_chain();
610    jcr = (JCR *)jcrs->first();
611    if (jcr) {
612       jcr->inc_use_count();
613       Dmsg3(3400, "Inc jcr_walk_start 0x%x job=%d use_count=%d\n", jcr, 
614             jcr->JobId, jcr->use_count());
615    }
616    unlock_jcr_chain();
617    return jcr;
618 }
619
620 /*
621  * Get next jcr from chain, and release current one
622  */
623 JCR *jcr_walk_next(JCR *prev_jcr)
624 {
625    JCR *jcr;
626
627    lock_jcr_chain();
628    jcr = (JCR *)jcrs->next(prev_jcr);
629    if (jcr) {
630       jcr->inc_use_count();
631       Dmsg3(3400, "Inc jcr_walk_next 0x%x job=%d use_count=%d\n", jcr, 
632          jcr->JobId, jcr->use_count());
633    }
634    unlock_jcr_chain();
635    if (prev_jcr) {
636       free_jcr(prev_jcr);
637    }
638    return jcr;
639 }
640
641 /*
642  * Release last jcr referenced
643  */
644 void jcr_walk_end(JCR *jcr)
645 {
646    if (jcr) {
647       free_jcr(jcr);
648    }
649 }
650
651
652 /*
653  * Setup to call the timeout check routine every 30 seconds
654  *  This routine will check any timers that have been enabled.
655  */
656 bool init_jcr_subsystem(void)
657 {
658    watchdog_t *wd = new_watchdog();
659
660    wd->one_shot = false;
661    wd->interval = 30;   /* FIXME: should be configurable somewhere, even
662                          if only with a #define */
663    wd->callback = jcr_timeout_check;
664
665    register_watchdog(wd);
666
667    return true;
668 }
669
670 static void jcr_timeout_check(watchdog_t *self)
671 {
672    JCR *jcr;
673    BSOCK *fd;
674    time_t timer_start;
675
676    Dmsg0(3400, "Start JCR timeout checks\n");
677
678    /* Walk through all JCRs checking if any one is
679     * blocked for more than specified max time.
680     */
681    foreach_jcr(jcr) {
682       Dmsg2(3400, "jcr_timeout_check JobId=%u jcr=0x%x\n", jcr->JobId, jcr);
683       if (jcr->JobId == 0) {
684          continue;
685       }
686       fd = jcr->store_bsock;
687       if (fd) {
688          timer_start = fd->timer_start;
689          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
690             fd->timer_start = 0;      /* turn off timer */
691             fd->timed_out = true;
692             Jmsg(jcr, M_ERROR, 0, _(
693 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
694                  watchdog_time - timer_start);
695             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
696          }
697       }
698       fd = jcr->file_bsock;
699       if (fd) {
700          timer_start = fd->timer_start;
701          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
702             fd->timer_start = 0;      /* turn off timer */
703             fd->timed_out = true;
704             Jmsg(jcr, M_ERROR, 0, _(
705 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
706                  watchdog_time - timer_start);
707             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
708          }
709       }
710       fd = jcr->dir_bsock;
711       if (fd) {
712          timer_start = fd->timer_start;
713          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
714             fd->timer_start = 0;      /* turn off timer */
715             fd->timed_out = true;
716             Jmsg(jcr, M_ERROR, 0, _(
717 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
718                  watchdog_time - timer_start);
719             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
720          }
721       }
722    }
723    endeach_jcr(jcr);
724
725    Dmsg0(3400, "Finished JCR timeout checks\n");
726 }
727
728 /*
729  * Timeout signal comes here
730  */
731 extern "C" void timeout_handler(int sig)
732 {
733    return;                            /* thus interrupting the function */
734 }