]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/jcr.c
747ebc7fc9632701809cca7685684069685c4b5c
[bacula/bacula] / bacula / src / lib / jcr.c
1 /*
2  * Manipulation routines for Job Control Records and
3  *  handling of last_jobs_list.
4  *
5  *  Kern E. Sibbald, December 2000
6  *
7  *  Version $Id$
8  *
9  *  These routines are thread safe.
10  *
11  *  The job list routines were re-written in May 2005 to
12  *  eliminate the global lock while traversing the list, and
13  *  to use the dlist subroutines.  The locking is now done
14  *  on the list each time the list is modified or traversed.
15  *  That is it is "micro-locked" rather than globally locked.
16  *  The result is that there is one lock/unlock for each entry
17  *  in the list while traversing it rather than a single lock
18  *  at the beginning of a traversal and one at the end.  This
19  *  incurs slightly more overhead, but effectively eliminates 
20  *  the possibilty of race conditions.  In addition, with the
21  *  exception of the global locking of the list during the
22  *  re-reading of the config file, no recursion is needed.
23  *
24  */
25 /*
26    Copyright (C) 2000-2005 Kern Sibbald
27
28    This program is free software; you can redistribute it and/or
29    modify it under the terms of the GNU General Public License
30    version 2 as amended with additional clauses defined in the
31    file LICENSE in the main source directory.
32
33    This program is distributed in the hope that it will be useful,
34    but WITHOUT ANY WARRANTY; without even the implied warranty of
35    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
36    the file LICENSE for additional details.
37
38  */
39
40 #include "bacula.h"
41 #include "jcr.h"
42
43 /* External variables we reference */
44 extern time_t watchdog_time;
45
46 /* Forward referenced functions */
47 extern "C" void timeout_handler(int sig);
48 static void jcr_timeout_check(watchdog_t *self);
49 #ifdef TRACE_JCR_CHAIN
50 static void b_lock_jcr_chain(const char *filen, int line);
51 static void b_unlock_jcr_chain(const char *filen, int line);
52 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
53 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
54 #else
55 static void lock_jcr_chain();
56 static void unlock_jcr_chain();
57 #endif
58
59
60 int num_jobs_run;
61 dlist *last_jobs = NULL;
62 const int max_last_jobs = 10;
63  
64 static dlist *jcrs = NULL;            /* JCR chain */
65 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
66
67 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
68
69 void lock_jobs()
70 {
71    P(job_start_mutex);
72 }
73
74 void unlock_jobs()
75 {
76    V(job_start_mutex);
77 }
78
79 void init_last_jobs_list()
80 {
81    JCR *jcr = NULL;
82    struct s_last_job *job_entry = NULL;
83    if (!last_jobs) {
84       last_jobs = New(dlist(job_entry, &job_entry->link));
85    }
86    if (!jcrs) {
87       jcrs = New(dlist(jcr, &jcr->link));
88    }
89 }
90
91 void term_last_jobs_list()
92 {
93    if (last_jobs) {
94       while (!last_jobs->empty()) {
95          void *je = last_jobs->first();
96          last_jobs->remove(je);
97          free(je);
98       }
99       delete last_jobs;
100       last_jobs = NULL;
101    }
102    if (jcrs) {
103       delete jcrs;
104       jcrs = NULL;
105    }
106 }
107
108 void read_last_jobs_list(int fd, uint64_t addr)
109 {
110    struct s_last_job *je, job;
111    uint32_t num;
112
113    Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
114    if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
115       return;
116    }
117    if (read(fd, &num, sizeof(num)) != sizeof(num)) {
118       return;
119    }
120    Dmsg1(100, "Read num_items=%d\n", num);
121    if (num > 4 * max_last_jobs) {  /* sanity check */
122       return;
123    }
124    for ( ; num; num--) {
125       if (read(fd, &job, sizeof(job)) != sizeof(job)) {
126          Dmsg1(000, "Read job entry. ERR=%s\n", strerror(errno));
127          return;
128       }
129       if (job.JobId > 0) {
130          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
131          memcpy((char *)je, (char *)&job, sizeof(job));
132          if (!last_jobs) {
133             init_last_jobs_list();
134          }
135          last_jobs->append(je);
136          if (last_jobs->size() > max_last_jobs) {
137             je = (struct s_last_job *)last_jobs->first();
138             last_jobs->remove(je);
139             free(je);
140          }
141       }
142    }
143 }
144
145 uint64_t write_last_jobs_list(int fd, uint64_t addr)
146 {
147    struct s_last_job *je;
148    uint32_t num;
149
150    Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
151    if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
152       return 0;
153    }
154    if (last_jobs) {
155       /* First record is number of entires */
156       num = last_jobs->size();
157       if (write(fd, &num, sizeof(num)) != sizeof(num)) {
158          Dmsg1(000, "Error writing num_items: ERR=%s\n", strerror(errno));
159          return 0;
160       }
161       foreach_dlist(je, last_jobs) {
162          if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
163             Dmsg1(000, "Error writing job: ERR=%s\n", strerror(errno));
164             return 0;
165          }
166       }
167    }
168    /* Return current address */
169    ssize_t stat = lseek(fd, 0, SEEK_CUR);
170    if (stat < 0) {
171       stat = 0;
172    }
173    return stat;
174
175 }
176
177 void lock_last_jobs_list()
178 {
179    /* Use jcr chain mutex */
180    lock_jcr_chain();
181 }
182
183 void unlock_last_jobs_list()
184 {
185    /* Use jcr chain mutex */
186    unlock_jcr_chain();
187 }
188
189 /*
190  * Push a subroutine address into the job end callback stack
191  */
192 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
193 {
194    jcr->job_end_push.append((void *)job_end_cb);
195    jcr->job_end_push.append(ctx);
196 }
197
198 /* Pop each job_end subroutine and call it */
199 static void job_end_pop(JCR *jcr)
200 {
201    void (*job_end_cb)(JCR *jcr, void *ctx);
202    void *ctx;
203    for (int i=jcr->job_end_push.size()-1; i > 0; ) {
204       ctx = jcr->job_end_push.get(i--);
205       job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
206       job_end_cb(jcr, ctx);
207    }
208 }
209
210 /*
211  * Create a Job Control Record and link it into JCR chain
212  * Returns newly allocated JCR
213  * Note, since each daemon has a different JCR, he passes
214  *  us the size.
215  */
216 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
217 {
218    JCR *jcr;
219    MQUEUE_ITEM *item = NULL;
220    struct sigaction sigtimer;
221
222    Dmsg0(3400, "Enter new_jcr\n");
223    jcr = (JCR *)malloc(size);
224    memset(jcr, 0, size);
225    jcr->my_thread_id = pthread_self();
226    jcr->msg_queue = New(dlist(item, &item->link));
227    jcr->job_end_push.init(1, false);
228    jcr->sched_time = time(NULL);
229    jcr->daemon_free_jcr = daemon_free_jcr;    /* plug daemon free routine */
230    jcr->use_count = 1;
231    pthread_mutex_init(&(jcr->mutex), NULL);
232    jcr->JobStatus = JS_Created;       /* ready to run */
233    jcr->VolumeName = get_pool_memory(PM_FNAME);
234    jcr->VolumeName[0] = 0;
235    jcr->errmsg = get_pool_memory(PM_MESSAGE);
236    jcr->errmsg[0] = 0;
237    /* Setup some dummy values */
238    bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
239    jcr->JobId = 0;
240    jcr->JobType = JT_SYSTEM;          /* internal job until defined */
241    jcr->JobLevel = L_NONE;
242    jcr->JobStatus = JS_Created;
243
244    sigtimer.sa_flags = 0;
245    sigtimer.sa_handler = timeout_handler;
246    sigfillset(&sigtimer.sa_mask);
247    sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
248
249    /*
250     * Locking jobs is a global lock that is needed
251     * so that the Director can stop new jobs from being
252     * added to the jcr chain while it processes a new
253     * conf file and does the job_end_push().
254     */
255    lock_jobs();
256    lock_jcr_chain();
257    if (!jcrs) {
258       jcrs = New(dlist(jcr, &jcr->link));
259    }
260    jcrs->append(jcr);
261    unlock_jcr_chain();
262    unlock_jobs();
263
264    return jcr;
265 }
266
267
268 /*
269  * Remove a JCR from the chain
270  * NOTE! The chain must be locked prior to calling
271  *       this routine.
272  */
273 static void remove_jcr(JCR *jcr)
274 {
275    Dmsg0(3400, "Enter remove_jcr\n");
276    if (!jcr) {
277       Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
278    }
279    jcrs->remove(jcr);
280    Dmsg0(3400, "Leave remove_jcr\n");
281 }
282
283 /*
284  * Free stuff common to all JCRs.  N.B. Be careful to include only
285  *  generic stuff in the common part of the jcr.
286  */
287 static void free_common_jcr(JCR *jcr)
288 {
289    struct s_last_job *je, last_job;
290
291    /* Keep some statistics */
292    switch (jcr->JobType) {
293    case JT_BACKUP:
294    case JT_VERIFY:
295    case JT_RESTORE:
296    case JT_MIGRATE:
297    case JT_COPY:
298    case JT_ADMIN:
299       num_jobs_run++;
300       last_job.Errors = jcr->Errors;
301       last_job.JobType = jcr->JobType;
302       last_job.JobId = jcr->JobId;
303       last_job.VolSessionId = jcr->VolSessionId;
304       last_job.VolSessionTime = jcr->VolSessionTime;
305       bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
306       last_job.JobFiles = jcr->JobFiles;
307       last_job.JobBytes = jcr->JobBytes;
308       last_job.JobStatus = jcr->JobStatus;
309       last_job.JobLevel = jcr->JobLevel;
310       last_job.start_time = jcr->start_time;
311       last_job.end_time = time(NULL);
312       /* Keep list of last jobs, but not Console where JobId==0 */
313       if (last_job.JobId > 0) {
314          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
315          memcpy((char *)je, (char *)&last_job, sizeof(last_job));
316          if (!last_jobs) {
317             init_last_jobs_list();
318          }
319          last_jobs->append(je);
320          if (last_jobs->size() > max_last_jobs) {
321             je = (struct s_last_job *)last_jobs->first();
322             last_jobs->remove(je);
323             free(je);
324          }
325       }
326       break;
327    default:
328       break;
329    }
330    pthread_mutex_destroy(&jcr->mutex);
331
332    delete jcr->msg_queue;
333    close_msg(jcr);                    /* close messages for this job */
334
335    /* do this after closing messages */
336    if (jcr->client_name) {
337       free_pool_memory(jcr->client_name);
338       jcr->client_name = NULL;
339    }
340
341    if (jcr->attr) {
342       free_pool_memory(jcr->attr);
343       jcr->attr = NULL;
344    }
345
346    if (jcr->sd_auth_key) {
347       free(jcr->sd_auth_key);
348       jcr->sd_auth_key = NULL;
349    }
350    if (jcr->VolumeName) {
351       free_pool_memory(jcr->VolumeName);
352       jcr->VolumeName = NULL;
353    }
354
355    if (jcr->dir_bsock) {
356       bnet_close(jcr->dir_bsock);
357       jcr->dir_bsock = NULL;
358    }
359    if (jcr->errmsg) {
360       free_pool_memory(jcr->errmsg);
361       jcr->errmsg = NULL;
362    }
363    if (jcr->where) {
364       free(jcr->where);
365       jcr->where = NULL;
366    }
367    if (jcr->cached_path) {
368       free_pool_memory(jcr->cached_path);
369       jcr->cached_path = NULL;
370       jcr->cached_pnl = 0;
371    }
372    free_getuser_cache();
373    free_getgroup_cache();
374    free(jcr);
375 }
376
377 /*
378  * Global routine to free a jcr
379  */
380 #ifdef DEBUG
381 void b_free_jcr(const char *file, int line, JCR *jcr)
382 {
383    Dmsg3(3400, "Enter free_jcr 0x%x from %s:%d\n", jcr, file, line);
384
385 #else
386
387 void free_jcr(JCR *jcr)
388 {
389
390    Dmsg1(3400, "Enter free_jcr 0x%x\n", jcr);
391
392 #endif
393
394    dequeue_messages(jcr);
395    lock_jcr_chain();
396    jcr->dec_use_count();              /* decrement use count */
397    if (jcr->use_count < 0) {
398       Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
399          jcr->use_count, jcr->JobId);
400    }
401    Dmsg3(3400, "Dec free_jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count, jcr->JobId);
402    if (jcr->use_count > 0) {          /* if in use */
403       unlock_jcr_chain();
404       Dmsg2(3400, "free_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
405       return;
406    }
407
408    remove_jcr(jcr);                   /* remove Jcr from chain */
409    unlock_jcr_chain();
410
411    job_end_pop(jcr);                  /* pop and call hooked routines */
412
413    Dmsg1(3400, "End job=%d\n", jcr->JobId);
414    if (jcr->daemon_free_jcr) {
415       jcr->daemon_free_jcr(jcr);      /* call daemon free routine */
416    }
417    free_common_jcr(jcr);
418    close_msg(NULL);                   /* flush any daemon messages */
419    Dmsg0(3400, "Exit free_jcr\n");
420 }
421
422
423 /*
424  * Given a JobId, find the JCR
425  *   Returns: jcr on success
426  *            NULL on failure
427  */
428 JCR *get_jcr_by_id(uint32_t JobId)
429 {
430    JCR *jcr;
431
432    lock_jcr_chain();                    /* lock chain */
433    foreach_dlist(jcr, jcrs) {
434       if (jcr->JobId == JobId) {
435          jcr->inc_use_count();
436          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
437          break;
438       }
439    }
440    unlock_jcr_chain();
441    return jcr;
442 }
443
444 /*
445  * Given a SessionId and SessionTime, find the JCR
446  *   Returns: jcr on success
447  *            NULL on failure
448  */
449 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
450 {
451    JCR *jcr;
452
453    lock_jcr_chain();
454    foreach_dlist(jcr, jcrs) {
455       if (jcr->VolSessionId == SessionId &&
456           jcr->VolSessionTime == SessionTime) {
457          jcr->inc_use_count();
458          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
459          break;
460       }
461    }
462    unlock_jcr_chain();
463    return jcr;
464 }
465
466
467 /*
468  * Given a Job, find the JCR
469  *  compares on the number of characters in Job
470  *  thus allowing partial matches.
471  *   Returns: jcr on success
472  *            NULL on failure
473  */
474 JCR *get_jcr_by_partial_name(char *Job)
475 {
476    JCR *jcr;
477    int len;
478
479    if (!Job) {
480       return NULL;
481    }
482    lock_jcr_chain();
483    len = strlen(Job);
484    foreach_dlist(jcr, jcrs) {
485       if (strncmp(Job, jcr->Job, len) == 0) {
486          jcr->inc_use_count();
487          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
488          break;
489       }
490    }
491    unlock_jcr_chain();
492    return jcr;
493 }
494
495
496 /*
497  * Given a Job, find the JCR
498  *  requires an exact match of names.
499  *   Returns: jcr on success
500  *            NULL on failure
501  */
502 JCR *get_jcr_by_full_name(char *Job)
503 {
504    JCR *jcr;
505
506    if (!Job) {
507       return NULL;
508    }
509    lock_jcr_chain();
510    foreach_dlist(jcr, jcrs) {
511       if (strcmp(jcr->Job, Job) == 0) {
512          jcr->inc_use_count();
513          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
514          break;
515       }
516    }
517    unlock_jcr_chain();
518    return jcr;
519 }
520
521 void set_jcr_job_status(JCR *jcr, int JobStatus)
522 {
523    /*
524     * For a set of errors, ... keep the current status
525     *   so it isn't lost. For all others, set it.
526     */
527    switch (jcr->JobStatus) {
528    case JS_ErrorTerminated:
529    case JS_Error:
530    case JS_FatalError:
531    case JS_Differences:
532    case JS_Canceled:
533       break;
534    default:
535       jcr->JobStatus = JobStatus;
536    }
537 }
538
539 #ifdef TRACE_JCR_CHAIN
540 static int lock_count = 0;
541 #endif
542
543 /*
544  * Lock the chain
545  */
546 #ifdef TRACE_JCR_CHAIN
547 static void b_lock_jcr_chain(const char *fname, int line)
548 #else
549 static void lock_jcr_chain()
550 #endif
551 {
552 #ifdef TRACE_JCR_CHAIN
553    Dmsg3(3400, "Lock jcr chain %d from %s:%d\n", ++lock_count,
554       fname, line);
555 #endif
556    P(jcr_lock);
557 }
558
559 /*
560  * Unlock the chain
561  */
562 #ifdef TRACE_JCR_CHAIN
563 static void b_unlock_jcr_chain(const char *fname, int line)
564 #else
565 static void unlock_jcr_chain()
566 #endif
567 {
568 #ifdef TRACE_JCR_CHAIN
569    Dmsg3(3400, "Unlock jcr chain %d from %s:%d\n", lock_count--,
570       fname, line);
571 #endif
572    V(jcr_lock);
573 }
574
575
576 JCR *get_next_jcr(JCR *prev_jcr)
577 {
578    JCR *jcr;
579
580    lock_jcr_chain();
581    jcr = (JCR *)jcrs->next(prev_jcr);
582    if (jcr) {
583       jcr->inc_use_count();
584       Dmsg2(3400, "Inc get_next_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
585    }
586    unlock_jcr_chain();
587    return jcr;
588 }
589
590 bool init_jcr_subsystem(void)
591 {
592    watchdog_t *wd = new_watchdog();
593
594    wd->one_shot = false;
595    wd->interval = 30;   /* FIXME: should be configurable somewhere, even
596                          if only with a #define */
597    wd->callback = jcr_timeout_check;
598
599    register_watchdog(wd);
600
601    return true;
602 }
603
604 static void jcr_timeout_check(watchdog_t *self)
605 {
606    JCR *jcr;
607    BSOCK *fd;
608    time_t timer_start;
609
610    Dmsg0(3400, "Start JCR timeout checks\n");
611
612    /* Walk through all JCRs checking if any one is
613     * blocked for more than specified max time.
614     */
615    foreach_jcr(jcr) {
616       if (jcr->JobId == 0) {
617          free_jcr(jcr);
618          continue;
619       }
620       fd = jcr->store_bsock;
621       if (fd) {
622          timer_start = fd->timer_start;
623          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
624             fd->timer_start = 0;      /* turn off timer */
625             fd->timed_out = TRUE;
626             Jmsg(jcr, M_ERROR, 0, _(
627 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
628                  watchdog_time - timer_start);
629             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
630          }
631       }
632       fd = jcr->file_bsock;
633       if (fd) {
634          timer_start = fd->timer_start;
635          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
636             fd->timer_start = 0;      /* turn off timer */
637             fd->timed_out = TRUE;
638             Jmsg(jcr, M_ERROR, 0, _(
639 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
640                  watchdog_time - timer_start);
641             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
642          }
643       }
644       fd = jcr->dir_bsock;
645       if (fd) {
646          timer_start = fd->timer_start;
647          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
648             fd->timer_start = 0;      /* turn off timer */
649             fd->timed_out = TRUE;
650             Jmsg(jcr, M_ERROR, 0, _(
651 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
652                  watchdog_time - timer_start);
653             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
654          }
655       }
656       free_jcr(jcr);
657    }
658
659    Dmsg0(3400, "Finished JCR timeout checks\n");
660 }
661
662 /*
663  * Timeout signal comes here
664  */
665 extern "C" void timeout_handler(int sig)
666 {
667    return;                            /* thus interrupting the function */
668 }