]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/job.c
d5481b5e96750ac44a432e380d648c7545a26822
[bacula/bacula] / bacula / src / dird / job.c
1 /*
2  *
3  *   Bacula Director Job processing routines
4  *
5  *     Kern Sibbald, October MM
6  *
7  *    Version $Id$
8  */
9 /*
10    Copyright (C) 2000-2003 Kern Sibbald and John Walker
11
12    This program is free software; you can redistribute it and/or
13    modify it under the terms of the GNU General Public License as
14    published by the Free Software Foundation; either version 2 of
15    the License, or (at your option) any later version.
16
17    This program is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20    General Public License for more details.
21
22    You should have received a copy of the GNU General Public
23    License along with this program; if not, write to the Free
24    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
25    MA 02111-1307, USA.
26
27  */
28
29 #include "bacula.h"
30 #include "dird.h"
31
32 /* Forward referenced subroutines */
33 static void *job_thread(void *arg);
34 static char *edit_run_codes(JCR *jcr, char *omsg, char *imsg);
35 static void release_resource_locks(JCR *jcr);
36 static int acquire_resource_locks(JCR *jcr);
37 #ifdef USE_SEMAPHORE
38 static void backoff_resource_locks(JCR *jcr, int count);
39 #endif
40
41 /* Exported subroutines */
42 void run_job(JCR *jcr);
43
44
45 /* Imported subroutines */
46 extern void term_scheduler();
47 extern void term_ua_server();
48 extern int do_backup(JCR *jcr);
49 extern int do_restore(JCR *jcr);
50 extern int do_verify(JCR *jcr);
51 extern void backup_cleanup(void);
52
53 #ifdef USE_SEMAPHORE
54 static semlock_t job_lock;
55 static pthread_mutex_t mutex;
56 static pthread_cond_t  resource_wait;
57 static int waiting = 0;               /* count of waiting threads */
58 #else
59 /* Queue of jobs to be run */
60 workq_t job_wq;                   /* our job work queue */
61 #endif
62
63 void init_job_server(int max_workers)
64 {
65    int stat;
66 #ifdef USE_SEMAPHORE
67    if ((stat = sem_init(&job_lock, max_workers)) != 0) {
68       Emsg1(M_ABORT, 0, _("Could not init job lock: ERR=%s\n"), strerror(stat));
69    }
70    if ((stat = pthread_mutex_init(&mutex, NULL)) != 0) {
71       Emsg1(M_ABORT, 0, _("Could not init resource mutex: ERR=%s\n"), strerror(stat));
72    }
73    if ((stat = pthread_cond_init(&resource_wait, NULL)) != 0) {
74       Emsg1(M_ABORT, 0, _("Could not init resource wait: ERR=%s\n"), strerror(stat));
75    }
76
77 #else
78    if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
79       Emsg1(M_ABORT, 0, _("Could not init job work queue: ERR=%s\n"), strerror(stat));
80    }
81 #endif
82    return;
83 }
84
85 /*
86  * Run a job -- typically called by the scheduler, but may also
87  *              be called by the UA (Console program).
88  *
89  */
90 void run_job(JCR *jcr)
91 {
92    int stat, errstat;
93 #ifdef USE_SEMAPHORE
94    pthread_t tid;
95 #else
96    workq_ele_t *work_item;
97 #endif
98
99    sm_check(__FILE__, __LINE__, True);
100    init_msg(jcr, jcr->messages);
101    create_unique_job_name(jcr, jcr->job->hdr.name);
102    set_jcr_job_status(jcr, JS_Created);
103    jcr->jr.SchedTime = jcr->sched_time;
104    jcr->jr.StartTime = jcr->start_time;
105    jcr->jr.Type = jcr->JobType;
106    jcr->jr.Level = jcr->JobLevel;
107    jcr->jr.JobStatus = jcr->JobStatus;
108    bstrncpy(jcr->jr.Name, jcr->job->hdr.name, sizeof(jcr->jr.Name));
109    bstrncpy(jcr->jr.Job, jcr->Job, sizeof(jcr->jr.Job));
110
111    /* Initialize termination condition variable */
112    if ((errstat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
113       Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), strerror(errstat));
114       set_jcr_job_status(jcr, JS_ErrorTerminated);
115       free_jcr(jcr);
116       return;
117    }
118
119    /*
120     * Open database
121     */
122    Dmsg0(50, "Open database\n");
123    jcr->db=db_init_database(jcr, jcr->catalog->db_name, jcr->catalog->db_user,
124                             jcr->catalog->db_password, jcr->catalog->db_address,
125                             jcr->catalog->db_port, jcr->catalog->db_socket);
126    if (!db_open_database(jcr, jcr->db)) {
127       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
128       db_close_database(jcr, jcr->db);
129       set_jcr_job_status(jcr, JS_ErrorTerminated);
130       free_jcr(jcr);
131       return;
132    }
133    Dmsg0(50, "DB opened\n");
134
135    /*
136     * Create Job record  
137     */
138    jcr->jr.JobStatus = jcr->JobStatus;
139    if (!db_create_job_record(jcr, jcr->db, &jcr->jr)) {
140       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
141       db_close_database(jcr, jcr->db);
142       set_jcr_job_status(jcr, JS_ErrorTerminated);
143       free_jcr(jcr);
144       return;
145    }
146    jcr->JobId = jcr->jr.JobId;
147    ASSERT(jcr->jr.JobId > 0);
148
149    Dmsg4(30, "Created job record JobId=%d Name=%s Type=%c Level=%c\n", 
150        jcr->JobId, jcr->Job, jcr->jr.Type, jcr->jr.Level);
151    Dmsg0(200, "Add jrc to work queue\n");
152
153 #ifdef USE_SEMAPHORE
154   if ((stat = pthread_create(&tid, NULL, job_thread, (void *)jcr)) != 0) {
155       Emsg1(M_ABORT, 0, _("Unable to create job thread: ERR=%s\n"), strerror(stat));
156    }
157 #else
158    /* Queue the job to be run */
159    if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) {
160       Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat));
161    }
162    jcr->work_item = work_item;
163 #endif
164    Dmsg0(200, "Done run_job()\n");
165 }
166
167 /* 
168  * This is the engine called by workq_add() when we were pulled                
169  *  from the work queue.
170  *  At this point, we are running in our own thread 
171  */
172 static void *job_thread(void *arg)
173 {
174    JCR *jcr = (JCR *)arg;
175
176    pthread_detach(pthread_self());
177    sm_check(__FILE__, __LINE__, True);
178
179    if (!acquire_resource_locks(jcr)) {
180       set_jcr_job_status(jcr, JS_Canceled);
181    }
182
183    Dmsg0(200, "=====Start Job=========\n");
184    jcr->start_time = time(NULL);      /* set the real start time */
185    set_jcr_job_status(jcr, JS_Running);
186
187    if (job_canceled(jcr)) {
188       update_job_end_record(jcr);
189    } else if (jcr->job->MaxStartDelay != 0 && jcr->job->MaxStartDelay <
190        (utime_t)(jcr->start_time - jcr->sched_time)) {
191       Jmsg(jcr, M_FATAL, 0, _("Job canceled because max start delay time exceeded.\n"));
192       set_jcr_job_status(jcr, JS_Canceled);
193       update_job_end_record(jcr);
194    } else {
195
196       /* Run Job */
197       if (jcr->job->RunBeforeJob) {
198          POOLMEM *before = get_pool_memory(PM_FNAME);
199          int status;
200          
201          before = edit_run_codes(jcr, before, jcr->job->RunBeforeJob);
202          status = run_program(before, 0, NULL);
203          if (status != 0) {
204             Jmsg(jcr, M_FATAL, 0, _("RunBeforeJob returned non-zero status=%d\n"),
205                status);
206             set_jcr_job_status(jcr, JS_FatalError);
207             update_job_end_record(jcr);
208             free_pool_memory(before);
209             goto bail_out;
210          }
211          free_pool_memory(before);
212       }
213       switch (jcr->JobType) {
214          case JT_BACKUP:
215             do_backup(jcr);
216             if (jcr->JobStatus == JS_Terminated) {
217                do_autoprune(jcr);
218             }
219             break;
220          case JT_VERIFY:
221             do_verify(jcr);
222             if (jcr->JobStatus == JS_Terminated) {
223                do_autoprune(jcr);
224             }
225             break;
226          case JT_RESTORE:
227             do_restore(jcr);
228             if (jcr->JobStatus == JS_Terminated) {
229                do_autoprune(jcr);
230             }
231             break;
232          case JT_ADMIN:
233             /* No actual job */
234             do_autoprune(jcr);
235             set_jcr_job_status(jcr, JS_Terminated);
236             break;
237          default:
238             Pmsg1(0, "Unimplemented job type: %d\n", jcr->JobType);
239             break;
240          }
241       if (jcr->job->RunAfterJob) {
242          POOLMEM *after = get_pool_memory(PM_FNAME);
243          int status;
244       
245          after = edit_run_codes(jcr, after, jcr->job->RunAfterJob);
246          status = run_program(after, 0, NULL);
247          if (status != 0) {
248             Jmsg(jcr, M_FATAL, 0, _("RunAfterJob returned non-zero status=%d\n"),
249                status);
250             set_jcr_job_status(jcr, JS_FatalError);
251             update_job_end_record(jcr);
252          }
253          free_pool_memory(after);
254       }
255    }
256 bail_out:
257    release_resource_locks(jcr);
258    Dmsg0(50, "Before free jcr\n");
259    free_jcr(jcr);
260    Dmsg0(50, "======== End Job ==========\n");
261    sm_check(__FILE__, __LINE__, True);
262    return NULL;
263 }
264
265 /*
266  * Acquire the resources needed. These locks limit the
267  *  number of jobs by each resource. We have limits on
268  *  Jobs, Clients, Storage, and total jobs.
269  */
270 static int acquire_resource_locks(JCR *jcr)
271 {
272    time_t now = time(NULL);
273
274    /* Wait until scheduled time arrives */
275    if (jcr->sched_time > now && verbose) {
276       Jmsg(jcr, M_INFO, 0, _("Waiting %d seconds for sched time.\n"), 
277            jcr->sched_time - now);
278    }
279    while (jcr->sched_time > now) {
280       Dmsg2(100, "Waiting on sched time, jobid=%d secs=%d\n", jcr->JobId,
281             jcr->sched_time - now);
282       bmicrosleep(jcr->sched_time - now, 0);
283       now = time(NULL);
284    }
285
286
287 #ifdef USE_SEMAPHORE
288    int stat;
289
290    /* Initialize semaphores */
291    if (jcr->store->sem.valid != SEMLOCK_VALID) {
292       if ((stat = sem_init(&jcr->store->sem, jcr->store->MaxConcurrentJobs)) != 0) {
293          Emsg1(M_ABORT, 0, _("Could not init Storage semaphore: ERR=%s\n"), strerror(stat));
294       }
295    }
296    if (jcr->client->sem.valid != SEMLOCK_VALID) {
297       if ((stat = sem_init(&jcr->client->sem, jcr->client->MaxConcurrentJobs)) != 0) {
298          Emsg1(M_ABORT, 0, _("Could not init Client semaphore: ERR=%s\n"), strerror(stat));
299       }
300    }
301    if (jcr->job->sem.valid != SEMLOCK_VALID) {
302       if ((stat = sem_init(&jcr->job->sem, jcr->job->MaxConcurrentJobs)) != 0) {
303          Emsg1(M_ABORT, 0, _("Could not init Job semaphore: ERR=%s\n"), strerror(stat));
304       }
305    }
306
307    for ( ;; ) {
308       /* Acquire semaphore */
309       set_jcr_job_status(jcr, JS_WaitJobRes);
310       if ((stat = sem_lock(&jcr->job->sem)) != 0) {
311          Emsg1(M_ABORT, 0, _("Could not acquire Job max jobs lock: ERR=%s\n"), strerror(stat));
312       }
313       set_jcr_job_status(jcr, JS_WaitClientRes);
314       if ((stat = sem_trylock(&jcr->client->sem)) != 0) {
315          if (stat == EBUSY) {
316             backoff_resource_locks(jcr, 1);
317             goto wait;
318          } else {
319             Emsg1(M_ABORT, 0, _("Could not acquire Client max jobs lock: ERR=%s\n"), strerror(stat));
320          }
321       }
322       set_jcr_job_status(jcr, JS_WaitStoreRes);
323       if ((stat = sem_trylock(&jcr->store->sem)) != 0) {
324          if (stat == EBUSY) {
325             backoff_resource_locks(jcr, 2);
326             goto wait;
327          } else {
328             Emsg1(M_ABORT, 0, _("Could not acquire Storage max jobs lock: ERR=%s\n"), strerror(stat));
329          }
330       }
331       set_jcr_job_status(jcr, JS_WaitMaxJobs);
332       if ((stat = sem_trylock(&job_lock)) != 0) {
333          if (stat == EBUSY) {
334             backoff_resource_locks(jcr, 3);
335             goto wait;
336          } else {
337             Emsg1(M_ABORT, 0, _("Could not acquire max jobs lock: ERR=%s\n"), strerror(stat));
338          }
339       }
340       break;
341
342 wait:
343       P(mutex);
344       /*
345        * Wait for a resource to be released either by backoff or
346        *  by a job terminating.
347        */
348       waiting++;
349       pthread_cond_wait(&resource_wait, &mutex);
350       waiting--;
351       V(mutex);
352       /* Try again */
353    }
354 #endif
355    return 1;
356 }
357
358 #ifdef USE_SEMAPHORE
359 /*
360  * We could not get all the resource locks because 
361  *  too many jobs are running, so release any locks
362  *  we did acquire, giving others a chance to use them
363  *  while we wait.
364  */
365 static void backoff_resource_locks(JCR *jcr, int count)
366 {
367    P(mutex);
368    switch (count) {
369    case 3:
370       sem_unlock(&jcr->store->sem);
371       /* Fall through wanted */
372    case 2:
373       sem_unlock(&jcr->client->sem);
374       /* Fall through wanted */
375    case 1:
376       sem_unlock(&jcr->job->sem);
377       break;
378    }
379    /*
380     * Since we released a lock, if there are any threads
381     *  waiting, wake them up so that they can try again.
382     */
383    if (waiting > 0) {
384       pthread_cond_broadcast(&resource_wait);
385    }
386    V(mutex);
387 }
388 #endif
389
390 /*
391  * This is called at the end of the job to release
392  *   any resource limits on the number of jobs. If
393  *   there are any other jobs waiting, we wake them
394  *   up so that they can try again.
395  */
396 static void release_resource_locks(JCR *jcr)
397 {
398 #ifdef USE_SEMAPHORE
399    P(mutex);
400    sem_unlock(&jcr->store->sem);
401    sem_unlock(&jcr->client->sem);
402    sem_unlock(&jcr->job->sem);
403    sem_unlock(&job_lock);
404    if (waiting > 0) {
405       pthread_cond_broadcast(&resource_wait);
406    }
407    V(mutex);
408 #endif
409 }
410
411 /*
412  * Get or create a Client record for this Job
413  */
414 int get_or_create_client_record(JCR *jcr)
415 {
416    CLIENT_DBR cr;
417
418    memset(&cr, 0, sizeof(cr));
419    bstrncpy(cr.Name, jcr->client->hdr.name, sizeof(cr.Name));
420    cr.AutoPrune = jcr->client->AutoPrune;
421    cr.FileRetention = jcr->client->FileRetention;
422    cr.JobRetention = jcr->client->JobRetention;
423    if (jcr->client_name) {
424       free_pool_memory(jcr->client_name);
425    }
426    jcr->client_name = get_memory(strlen(jcr->client->hdr.name) + 1);
427    strcpy(jcr->client_name, jcr->client->hdr.name);
428    if (!db_create_client_record(jcr, jcr->db, &cr)) {
429       Jmsg(jcr, M_FATAL, 0, _("Could not create Client record. ERR=%s\n"), 
430          db_strerror(jcr->db));
431       return 0;
432    }
433    jcr->jr.ClientId = cr.ClientId;
434    if (cr.Uname[0]) {
435       if (jcr->client_uname) {
436          free_pool_memory(jcr->client_uname);
437       }
438       jcr->client_uname = get_memory(strlen(cr.Uname) + 1);
439       strcpy(jcr->client_uname, cr.Uname);
440    }
441    Dmsg2(100, "Created Client %s record %d\n", jcr->client->hdr.name, 
442       jcr->jr.ClientId);
443    return 1;
444 }
445
446
447 /*
448  * Write status and such in DB
449  */
450 void update_job_end_record(JCR *jcr)
451 {
452    if (jcr->jr.EndTime == 0) {
453       jcr->jr.EndTime = time(NULL);
454    }
455    jcr->end_time = jcr->jr.EndTime;
456    jcr->jr.JobId = jcr->JobId;
457    jcr->jr.JobStatus = jcr->JobStatus;
458    jcr->jr.JobFiles = jcr->JobFiles;
459    jcr->jr.JobBytes = jcr->JobBytes;
460    jcr->jr.VolSessionId = jcr->VolSessionId;
461    jcr->jr.VolSessionTime = jcr->VolSessionTime;
462    if (!db_update_job_end_record(jcr, jcr->db, &jcr->jr)) {
463       Jmsg(jcr, M_WARNING, 0, _("Error updating job record. %s"), 
464          db_strerror(jcr->db));
465    }
466 }
467
468 /*
469  * Takes base_name and appends (unique) current
470  *   date and time to form unique job name.
471  *
472  *  Returns: unique job name in jcr->Job
473  *    date/time in jcr->start_time
474  */
475 void create_unique_job_name(JCR *jcr, char *base_name)
476 {
477    /* Job start mutex */
478    static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
479    static time_t last_start_time = 0;
480    time_t now;
481    struct tm tm;
482    char dt[MAX_TIME_LENGTH];
483    char name[MAX_NAME_LENGTH];
484    char *p;
485
486    /* Guarantee unique start time -- maximum one per second, and
487     * thus unique Job Name 
488     */
489    P(mutex);                          /* lock creation of jobs */
490    now = time(NULL);
491    while (now == last_start_time) {
492       bmicrosleep(0, 500000);
493       now = time(NULL);
494    }
495    last_start_time = now;
496    V(mutex);                          /* allow creation of jobs */
497    jcr->start_time = now;
498    /* Form Unique JobName */
499    localtime_r(&now, &tm);
500    /* Use only characters that are permitted in Windows filenames */
501    strftime(dt, sizeof(dt), "%Y-%m-%d_%H.%M.%S", &tm); 
502    bstrncpy(name, base_name, sizeof(name));
503    name[sizeof(name)-22] = 0;          /* truncate if too long */
504    sprintf(jcr->Job, "%s.%s", name, dt); /* add date & time */
505    /* Convert spaces into underscores */
506    for (p=jcr->Job; *p; p++) {
507       if (*p == ' ') {
508          *p = '_';
509       }
510    }
511 }
512
513 /*
514  * Free the Job Control Record if no one is still using it.
515  *  Called from main free_jcr() routine in src/lib/jcr.c so
516  *  that we can do our Director specific cleanup of the jcr.
517  */
518 void dird_free_jcr(JCR *jcr)
519 {
520    Dmsg0(200, "Start dird free_jcr\n");
521
522    if (jcr->file_bsock) {
523       Dmsg0(200, "Close File bsock\n");
524       bnet_close(jcr->file_bsock);
525    }
526    if (jcr->store_bsock) {
527       Dmsg0(200, "Close Store bsock\n");
528       bnet_close(jcr->store_bsock);
529    }
530    if (jcr->fname) {  
531       Dmsg0(200, "Free JCR fname\n");
532       free_pool_memory(jcr->fname);
533    }
534    if (jcr->stime) {
535       Dmsg0(200, "Free JCR stime\n");
536       free_pool_memory(jcr->stime);
537    }
538    if (jcr->db) {
539       Dmsg0(200, "Close DB\n");
540       db_close_database(jcr, jcr->db);
541    }
542    if (jcr->RestoreWhere) {
543       free(jcr->RestoreWhere);
544    }
545    if (jcr->RestoreBootstrap) {
546       free(jcr->RestoreBootstrap);
547    }
548    if (jcr->client_uname) {
549       free_pool_memory(jcr->client_uname);
550    }
551    Dmsg0(200, "End dird free_jcr\n");
552 }
553
554 /*
555  * Set some defaults in the JCR necessary to
556  * run. These items are pulled from the job
557  * definition as defaults, but can be overridden
558  * later either by the Run record in the Schedule resource,
559  * or by the Console program.
560  */
561 void set_jcr_defaults(JCR *jcr, JOB *job)
562 {
563    jcr->job = job;
564    jcr->JobType = job->JobType;
565    jcr->JobLevel = job->level;
566    jcr->store = job->storage;
567    jcr->client = job->client;
568    if (jcr->client_name) {
569       free_pool_memory(jcr->client_name);
570    }
571    jcr->client_name = get_memory(strlen(jcr->client->hdr.name) + 1);
572    strcpy(jcr->client_name, jcr->client->hdr.name);
573    jcr->pool = job->pool;
574    jcr->catalog = job->client->catalog;
575    jcr->fileset = job->fileset;
576    jcr->messages = job->messages; 
577    if (jcr->RestoreBootstrap) {
578       free(jcr->RestoreBootstrap);
579    }
580    /* This can be overridden by Console program */
581    if (job->RestoreBootstrap) {
582       jcr->RestoreBootstrap = bstrdup(job->RestoreBootstrap);
583    }
584    /* If no default level given, set one */
585    if (jcr->JobLevel == 0) {
586       switch (jcr->JobType) {
587       case JT_VERIFY:
588          jcr->JobLevel = L_VERIFY_CATALOG;
589          break;
590       case JT_BACKUP:
591          jcr->JobLevel = L_INCREMENTAL;
592          break;
593       default:
594          break;
595       }
596    }
597 }
598
599 /*
600  * Edit codes into Run command
601  *  %% = %
602  *  %c = Client's name
603  *  %d = Director's name
604  *  %i = JobId
605  *  %e = Job Exit
606  *  %j = Job
607  *  %l = Job Level
608  *  %n = Job name
609  *  %t = Job type
610  *
611  *  omsg = edited output message
612  *  imsg = input string containing edit codes (%x)
613  *
614  */
615 static char *edit_run_codes(JCR *jcr, char *omsg, char *imsg) 
616 {
617    char *p;
618    const char *str;
619    char add[20];
620
621    *omsg = 0;
622    Dmsg1(200, "edit_run_codes: %s\n", imsg);
623    for (p=imsg; *p; p++) {
624       if (*p == '%') {
625          switch (*++p) {
626          case '%':
627             str = "%";
628             break;
629          case 'c':
630             str = jcr->client_name;
631             if (!str) {
632                str = "";
633             }
634             break;
635          case 'd':
636             str = my_name;
637             break;
638          case 'e':
639             str = job_status_to_str(jcr->JobStatus);
640             break;
641          case 'i':
642             sprintf(add, "%d", jcr->JobId);
643             str = add;
644             break;
645          case 'j':                    /* Job */
646             str = jcr->Job;
647             break;
648          case 'l':
649             str = job_level_to_str(jcr->JobLevel);
650             break;
651          case 'n':
652             str = jcr->job->hdr.name;
653             break;
654          case 't':
655             str = job_type_to_str(jcr->JobType);
656             break;
657          default:
658             add[0] = '%';
659             add[1] = *p;
660             add[2] = 0;
661             str = add;
662             break;
663          }
664       } else {
665          add[0] = *p;
666          add[1] = 0;
667          str = add;
668       }
669       Dmsg1(200, "add_str %s\n", str);
670       pm_strcat(&omsg, (char *)str);
671       Dmsg1(200, "omsg=%s\n", omsg);
672    }
673    return omsg;
674 }