]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/jcr.c
15Jan06
[bacula/bacula] / bacula / src / lib / jcr.c
1 /*
2  * Manipulation routines for Job Control Records and
3  *  handling of last_jobs_list.
4  *
5  *  Kern E. Sibbald, December 2000
6  *
7  *  Version $Id$
8  *
9  *  These routines are thread safe.
10  *
11  *  The job list routines were re-written in May 2005 to
12  *  eliminate the global lock while traversing the list, and
13  *  to use the dlist subroutines.  The locking is now done
14  *  on the list each time the list is modified or traversed.
15  *  That is it is "micro-locked" rather than globally locked.
16  *  The result is that there is one lock/unlock for each entry
17  *  in the list while traversing it rather than a single lock
18  *  at the beginning of a traversal and one at the end.  This
19  *  incurs slightly more overhead, but effectively eliminates 
20  *  the possibilty of race conditions.  In addition, with the
21  *  exception of the global locking of the list during the
22  *  re-reading of the config file, no recursion is needed.
23  *
24  */
25 /*
26    Copyright (C) 2000-2006 Kern Sibbald
27
28    This program is free software; you can redistribute it and/or
29    modify it under the terms of the GNU General Public License
30    version 2 as amended with additional clauses defined in the
31    file LICENSE in the main source directory.
32
33    This program is distributed in the hope that it will be useful,
34    but WITHOUT ANY WARRANTY; without even the implied warranty of
35    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
36    the file LICENSE for additional details.
37
38  */
39
40 #include "bacula.h"
41 #include "jcr.h"
42
43 /* External variables we reference */
44 extern time_t watchdog_time;
45
46 /* Forward referenced functions */
47 extern "C" void timeout_handler(int sig);
48 static void jcr_timeout_check(watchdog_t *self);
49 #ifdef TRACE_JCR_CHAIN
50 static void b_lock_jcr_chain(const char *filen, int line);
51 static void b_unlock_jcr_chain(const char *filen, int line);
52 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
53 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
54 #else
55 static void lock_jcr_chain();
56 static void unlock_jcr_chain();
57 #endif
58
59
60 int num_jobs_run;
61 dlist *last_jobs = NULL;
62 const int max_last_jobs = 10;
63  
64 static dlist *jcrs = NULL;            /* JCR chain */
65 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
66
67 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
68
69 void lock_jobs()
70 {
71    P(job_start_mutex);
72 }
73
74 void unlock_jobs()
75 {
76    V(job_start_mutex);
77 }
78
79 void init_last_jobs_list()
80 {
81    JCR *jcr = NULL;
82    struct s_last_job *job_entry = NULL;
83    if (!last_jobs) {
84       last_jobs = New(dlist(job_entry, &job_entry->link));
85    }
86    if (!jcrs) {
87       jcrs = New(dlist(jcr, &jcr->link));
88    }
89 }
90
91 void term_last_jobs_list()
92 {
93    if (last_jobs) {
94       while (!last_jobs->empty()) {
95          void *je = last_jobs->first();
96          last_jobs->remove(je);
97          free(je);
98       }
99       delete last_jobs;
100       last_jobs = NULL;
101    }
102    if (jcrs) {
103       delete jcrs;
104       jcrs = NULL;
105    }
106 }
107
108 bool read_last_jobs_list(int fd, uint64_t addr)
109 {
110    struct s_last_job *je, job;
111    uint32_t num;
112
113    Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
114    if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
115       return false;
116    }
117    if (read(fd, &num, sizeof(num)) != sizeof(num)) {
118       return false;
119    }
120    Dmsg1(100, "Read num_items=%d\n", num);
121    if (num > 4 * max_last_jobs) {  /* sanity check */
122       return false;
123    }
124    for ( ; num; num--) {
125       if (read(fd, &job, sizeof(job)) != sizeof(job)) {
126          Dmsg1(000, "Read job entry. ERR=%s\n", strerror(errno));
127          return false;
128       }
129       if (job.JobId > 0) {
130          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
131          memcpy((char *)je, (char *)&job, sizeof(job));
132          if (!last_jobs) {
133             init_last_jobs_list();
134          }
135          last_jobs->append(je);
136          if (last_jobs->size() > max_last_jobs) {
137             je = (struct s_last_job *)last_jobs->first();
138             last_jobs->remove(je);
139             free(je);
140          }
141       }
142    }
143    return true;
144 }
145
146 uint64_t write_last_jobs_list(int fd, uint64_t addr)
147 {
148    struct s_last_job *je;
149    uint32_t num;
150
151    Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
152    if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
153       return 0;
154    }
155    if (last_jobs) {
156       /* First record is number of entires */
157       num = last_jobs->size();
158       if (write(fd, &num, sizeof(num)) != sizeof(num)) {
159          Dmsg1(000, "Error writing num_items: ERR=%s\n", strerror(errno));
160          return 0;
161       }
162       foreach_dlist(je, last_jobs) {
163          if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
164             Dmsg1(000, "Error writing job: ERR=%s\n", strerror(errno));
165             return 0;
166          }
167       }
168    }
169    /* Return current address */
170    ssize_t stat = lseek(fd, 0, SEEK_CUR);
171    if (stat < 0) {
172       stat = 0;
173    }
174    return stat;
175
176 }
177
178 void lock_last_jobs_list()
179 {
180    /* Use jcr chain mutex */
181    lock_jcr_chain();
182 }
183
184 void unlock_last_jobs_list()
185 {
186    /* Use jcr chain mutex */
187    unlock_jcr_chain();
188 }
189
190 /*
191  * Push a subroutine address into the job end callback stack
192  */
193 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
194 {
195    jcr->job_end_push.append((void *)job_end_cb);
196    jcr->job_end_push.append(ctx);
197 }
198
199 /* Pop each job_end subroutine and call it */
200 static void job_end_pop(JCR *jcr)
201 {
202    void (*job_end_cb)(JCR *jcr, void *ctx);
203    void *ctx;
204    for (int i=jcr->job_end_push.size()-1; i > 0; ) {
205       ctx = jcr->job_end_push.get(i--);
206       job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
207       job_end_cb(jcr, ctx);
208    }
209 }
210
211 /*
212  * Create a Job Control Record and link it into JCR chain
213  * Returns newly allocated JCR
214  * Note, since each daemon has a different JCR, he passes
215  *  us the size.
216  */
217 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
218 {
219    JCR *jcr;
220    MQUEUE_ITEM *item = NULL;
221    struct sigaction sigtimer;
222
223    Dmsg0(3400, "Enter new_jcr\n");
224    jcr = (JCR *)malloc(size);
225    memset(jcr, 0, size);
226    jcr->my_thread_id = pthread_self();
227    jcr->msg_queue = New(dlist(item, &item->link));
228    jcr->job_end_push.init(1, false);
229    jcr->sched_time = time(NULL);
230    jcr->daemon_free_jcr = daemon_free_jcr;    /* plug daemon free routine */
231    jcr->use_count = 1;
232    pthread_mutex_init(&(jcr->mutex), NULL);
233    jcr->JobStatus = JS_Created;       /* ready to run */
234    jcr->VolumeName = get_pool_memory(PM_FNAME);
235    jcr->VolumeName[0] = 0;
236    jcr->errmsg = get_pool_memory(PM_MESSAGE);
237    jcr->errmsg[0] = 0;
238    /* Setup some dummy values */
239    bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
240    jcr->JobId = 0;
241    jcr->JobType = JT_SYSTEM;          /* internal job until defined */
242    jcr->JobLevel = L_NONE;
243    jcr->JobStatus = JS_Created;
244
245    sigtimer.sa_flags = 0;
246    sigtimer.sa_handler = timeout_handler;
247    sigfillset(&sigtimer.sa_mask);
248    sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
249
250    /*
251     * Locking jobs is a global lock that is needed
252     * so that the Director can stop new jobs from being
253     * added to the jcr chain while it processes a new
254     * conf file and does the job_end_push().
255     */
256    lock_jobs();
257    lock_jcr_chain();
258    if (!jcrs) {
259       jcrs = New(dlist(jcr, &jcr->link));
260    }
261    jcrs->append(jcr);
262    unlock_jcr_chain();
263    unlock_jobs();
264
265    return jcr;
266 }
267
268
269 /*
270  * Remove a JCR from the chain
271  * NOTE! The chain must be locked prior to calling
272  *       this routine.
273  */
274 static void remove_jcr(JCR *jcr)
275 {
276    Dmsg0(3400, "Enter remove_jcr\n");
277    if (!jcr) {
278       Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
279    }
280    jcrs->remove(jcr);
281    Dmsg0(3400, "Leave remove_jcr\n");
282 }
283
284 /*
285  * Free stuff common to all JCRs.  N.B. Be careful to include only
286  *  generic stuff in the common part of the jcr.
287  */
288 static void free_common_jcr(JCR *jcr)
289 {
290    struct s_last_job *je, last_job;
291
292    /* Keep some statistics */
293    switch (jcr->JobType) {
294    case JT_BACKUP:
295    case JT_VERIFY:
296    case JT_RESTORE:
297    case JT_MIGRATE:
298    case JT_COPY:
299    case JT_ADMIN:
300       num_jobs_run++;
301       last_job.Errors = jcr->Errors;
302       last_job.JobType = jcr->JobType;
303       last_job.JobId = jcr->JobId;
304       last_job.VolSessionId = jcr->VolSessionId;
305       last_job.VolSessionTime = jcr->VolSessionTime;
306       bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
307       last_job.JobFiles = jcr->JobFiles;
308       last_job.JobBytes = jcr->JobBytes;
309       last_job.JobStatus = jcr->JobStatus;
310       last_job.JobLevel = jcr->JobLevel;
311       last_job.start_time = jcr->start_time;
312       last_job.end_time = time(NULL);
313       /* Keep list of last jobs, but not Console where JobId==0 */
314       if (last_job.JobId > 0) {
315          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
316          memcpy((char *)je, (char *)&last_job, sizeof(last_job));
317          if (!last_jobs) {
318             init_last_jobs_list();
319          }
320          last_jobs->append(je);
321          if (last_jobs->size() > max_last_jobs) {
322             je = (struct s_last_job *)last_jobs->first();
323             last_jobs->remove(je);
324             free(je);
325          }
326       }
327       break;
328    default:
329       break;
330    }
331    pthread_mutex_destroy(&jcr->mutex);
332
333    delete jcr->msg_queue;
334    close_msg(jcr);                    /* close messages for this job */
335
336    /* do this after closing messages */
337    if (jcr->client_name) {
338       free_pool_memory(jcr->client_name);
339       jcr->client_name = NULL;
340    }
341
342    if (jcr->attr) {
343       free_pool_memory(jcr->attr);
344       jcr->attr = NULL;
345    }
346
347    if (jcr->sd_auth_key) {
348       free(jcr->sd_auth_key);
349       jcr->sd_auth_key = NULL;
350    }
351    if (jcr->VolumeName) {
352       free_pool_memory(jcr->VolumeName);
353       jcr->VolumeName = NULL;
354    }
355
356    if (jcr->dir_bsock) {
357       bnet_close(jcr->dir_bsock);
358       jcr->dir_bsock = NULL;
359    }
360    if (jcr->errmsg) {
361       free_pool_memory(jcr->errmsg);
362       jcr->errmsg = NULL;
363    }
364    if (jcr->where) {
365       free(jcr->where);
366       jcr->where = NULL;
367    }
368    if (jcr->cached_path) {
369       free_pool_memory(jcr->cached_path);
370       jcr->cached_path = NULL;
371       jcr->cached_pnl = 0;
372    }
373    free_getuser_cache();
374    free_getgroup_cache();
375    free(jcr);
376 }
377
378 /*
379  * Global routine to free a jcr
380  */
381 #ifdef DEBUG
382 void b_free_jcr(const char *file, int line, JCR *jcr)
383 {
384    Dmsg3(3400, "Enter free_jcr 0x%x from %s:%d\n", jcr, file, line);
385
386 #else
387
388 void free_jcr(JCR *jcr)
389 {
390
391    Dmsg2(3400, "Enter free_jcr 0x%x job=%d\n", jcr, jcr->JobId);
392
393 #endif
394
395    dequeue_messages(jcr);
396    lock_jcr_chain();
397    jcr->dec_use_count();              /* decrement use count */
398    if (jcr->use_count < 0) {
399       Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
400          jcr->use_count, jcr->JobId);
401    }
402    Dmsg3(3400, "Dec free_jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count, jcr->JobId);
403    if (jcr->use_count > 0) {          /* if in use */
404       unlock_jcr_chain();
405       Dmsg3(3400, "free_jcr 0x%x job=%d use_count=%d\n", jcr, jcr->JobId, jcr->use_count);
406       return;
407    }
408
409    remove_jcr(jcr);                   /* remove Jcr from chain */
410    unlock_jcr_chain();
411
412    job_end_pop(jcr);                  /* pop and call hooked routines */
413
414    Dmsg1(3400, "End job=%d\n", jcr->JobId);
415    if (jcr->daemon_free_jcr) {
416       jcr->daemon_free_jcr(jcr);      /* call daemon free routine */
417    }
418    free_common_jcr(jcr);
419    close_msg(NULL);                   /* flush any daemon messages */
420    garbage_collect_memory_pool();
421    Dmsg0(3400, "Exit free_jcr\n");
422 }
423
424
425 /*
426  * Given a JobId, find the JCR
427  *   Returns: jcr on success
428  *            NULL on failure
429  */
430 JCR *get_jcr_by_id(uint32_t JobId)
431 {
432    JCR *jcr;
433
434    lock_jcr_chain();                    /* lock chain */
435    foreach_dlist(jcr, jcrs) {
436       if (jcr->JobId == JobId) {
437          jcr->inc_use_count();
438          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
439          break;
440       }
441    }
442    unlock_jcr_chain();
443    return jcr;
444 }
445
446 /*
447  * Given a SessionId and SessionTime, find the JCR
448  *   Returns: jcr on success
449  *            NULL on failure
450  */
451 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
452 {
453    JCR *jcr;
454
455    lock_jcr_chain();
456    foreach_dlist(jcr, jcrs) {
457       if (jcr->VolSessionId == SessionId &&
458           jcr->VolSessionTime == SessionTime) {
459          jcr->inc_use_count();
460          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
461          break;
462       }
463    }
464    unlock_jcr_chain();
465    return jcr;
466 }
467
468
469 /*
470  * Given a Job, find the JCR
471  *  compares on the number of characters in Job
472  *  thus allowing partial matches.
473  *   Returns: jcr on success
474  *            NULL on failure
475  */
476 JCR *get_jcr_by_partial_name(char *Job)
477 {
478    JCR *jcr;
479    int len;
480
481    if (!Job) {
482       return NULL;
483    }
484    lock_jcr_chain();
485    len = strlen(Job);
486    foreach_dlist(jcr, jcrs) {
487       if (strncmp(Job, jcr->Job, len) == 0) {
488          jcr->inc_use_count();
489          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
490          break;
491       }
492    }
493    unlock_jcr_chain();
494    return jcr;
495 }
496
497
498 /*
499  * Given a Job, find the JCR
500  *  requires an exact match of names.
501  *   Returns: jcr on success
502  *            NULL on failure
503  */
504 JCR *get_jcr_by_full_name(char *Job)
505 {
506    JCR *jcr;
507
508    if (!Job) {
509       return NULL;
510    }
511    lock_jcr_chain();
512    foreach_dlist(jcr, jcrs) {
513       if (strcmp(jcr->Job, Job) == 0) {
514          jcr->inc_use_count();
515          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
516          break;
517       }
518    }
519    unlock_jcr_chain();
520    return jcr;
521 }
522
523 void set_jcr_job_status(JCR *jcr, int JobStatus)
524 {
525    /*
526     * For a set of errors, ... keep the current status
527     *   so it isn't lost. For all others, set it.
528     */
529    switch (jcr->JobStatus) {
530    case JS_ErrorTerminated:
531    case JS_Error:
532    case JS_FatalError:
533    case JS_Differences:
534    case JS_Canceled:
535       break;
536    default:
537       jcr->JobStatus = JobStatus;
538    }
539 }
540
541 #ifdef TRACE_JCR_CHAIN
542 static int lock_count = 0;
543 #endif
544
545 /*
546  * Lock the chain
547  */
548 #ifdef TRACE_JCR_CHAIN
549 static void b_lock_jcr_chain(const char *fname, int line)
550 #else
551 static void lock_jcr_chain()
552 #endif
553 {
554 #ifdef TRACE_JCR_CHAIN
555    Dmsg3(3400, "Lock jcr chain %d from %s:%d\n", ++lock_count,
556       fname, line);
557 #endif
558    P(jcr_lock);
559 }
560
561 /*
562  * Unlock the chain
563  */
564 #ifdef TRACE_JCR_CHAIN
565 static void b_unlock_jcr_chain(const char *fname, int line)
566 #else
567 static void unlock_jcr_chain()
568 #endif
569 {
570 #ifdef TRACE_JCR_CHAIN
571    Dmsg3(3400, "Unlock jcr chain %d from %s:%d\n", lock_count--,
572       fname, line);
573 #endif
574    V(jcr_lock);
575 }
576
577
578 /*
579  * Start walk of jcr chain
580  * The proper way to walk the jcr chain is:
581  *    JCR *jcr;
582  *    foreach_jcr(jcr) {
583  *      ...
584  *    }
585  *    endeach_jcr(jcr);
586  *
587  *  It is possible to leave out the endeach_jcr(jcr), but
588  *   in that case, the last jcr referenced must be explicitly
589  *   released with:
590  *
591  *    free_jcr(jcr);
592  *  
593  */
594 JCR *jcr_walk_start() 
595 {
596    JCR *jcr;
597    lock_jcr_chain();
598    jcr = (JCR *)jcrs->first();
599    if (jcr) {
600       jcr->inc_use_count();
601       Dmsg3(3400, "Inc jcr_walk_start 0x%x job=%d use_count=%d\n", jcr, 
602             jcr->JobId, jcr->use_count);
603    }
604    unlock_jcr_chain();
605    return jcr;
606 }
607
608 /*
609  * Get next jcr from chain, and release current one
610  */
611 JCR *jcr_walk_next(JCR *prev_jcr)
612 {
613    JCR *jcr;
614
615    lock_jcr_chain();
616    jcr = (JCR *)jcrs->next(prev_jcr);
617    if (jcr) {
618       jcr->inc_use_count();
619       Dmsg3(3400, "Inc jcr_walk_next 0x%x job=%d use_count=%d\n", jcr, 
620          jcr->JobId, jcr->use_count);
621    }
622    unlock_jcr_chain();
623    if (prev_jcr) {
624       free_jcr(prev_jcr);
625    }
626    return jcr;
627 }
628
629 /*
630  * Release last jcr referenced
631  */
632 void jcr_walk_end(JCR *jcr)
633 {
634    if (jcr) {
635       free_jcr(jcr);
636    }
637 }
638
639
640 /*
641  * Setup to call the timeout check routine every 30 seconds
642  *  This routine will check any timers that have been enabled.
643  */
644 bool init_jcr_subsystem(void)
645 {
646    watchdog_t *wd = new_watchdog();
647
648    wd->one_shot = false;
649    wd->interval = 30;   /* FIXME: should be configurable somewhere, even
650                          if only with a #define */
651    wd->callback = jcr_timeout_check;
652
653    register_watchdog(wd);
654
655    return true;
656 }
657
658 static void jcr_timeout_check(watchdog_t *self)
659 {
660    JCR *jcr;
661    BSOCK *fd;
662    time_t timer_start;
663
664    Dmsg0(3400, "Start JCR timeout checks\n");
665
666    /* Walk through all JCRs checking if any one is
667     * blocked for more than specified max time.
668     */
669    foreach_jcr(jcr) {
670       if (jcr->JobId == 0) {
671          continue;
672       }
673       fd = jcr->store_bsock;
674       if (fd) {
675          timer_start = fd->timer_start;
676          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
677             fd->timer_start = 0;      /* turn off timer */
678             fd->timed_out = true;
679             Jmsg(jcr, M_ERROR, 0, _(
680 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
681                  watchdog_time - timer_start);
682             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
683          }
684       }
685       fd = jcr->file_bsock;
686       if (fd) {
687          timer_start = fd->timer_start;
688          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
689             fd->timer_start = 0;      /* turn off timer */
690             fd->timed_out = true;
691             Jmsg(jcr, M_ERROR, 0, _(
692 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
693                  watchdog_time - timer_start);
694             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
695          }
696       }
697       fd = jcr->dir_bsock;
698       if (fd) {
699          timer_start = fd->timer_start;
700          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
701             fd->timer_start = 0;      /* turn off timer */
702             fd->timed_out = true;
703             Jmsg(jcr, M_ERROR, 0, _(
704 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
705                  watchdog_time - timer_start);
706             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
707          }
708       }
709    }
710    endeach_jcr(jcr);
711
712    Dmsg0(3400, "Finished JCR timeout checks\n");
713 }
714
715 /*
716  * Timeout signal comes here
717  */
718 extern "C" void timeout_handler(int sig)
719 {
720    return;                            /* thus interrupting the function */
721 }