X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Fstored%2Fstored.c;h=957738336fb02483cd4b80d8cb9a2031098582fb;hb=b9265a6898ae36e450755721acfbf4a52342bc2f;hp=1d4279b804e3140af1e5d28743fdc09fe91dc487;hpb=83a15f9cedbcb8f9e00baa7f48fd3010a1355994;p=bacula%2Fbacula diff --git a/bacula/src/stored/stored.c b/bacula/src/stored/stored.c index 1d4279b804..957738336f 100644 --- a/bacula/src/stored/stored.c +++ b/bacula/src/stored/stored.c @@ -10,7 +10,7 @@ * */ /* - Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker + Copyright (C) 2000-2004 Kern Sibbald and John Walker This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -38,42 +38,42 @@ /* Forward referenced functions */ void terminate_stored(int sig); static void check_config(); -static void *director_thread(void *arg); + +extern "C" void *device_allocation(void *arg); #define CONFIG_FILE "bacula-sd.conf" /* Default config file */ /* Global variables exported */ +char OK_msg[] = "3000 OK\n"; +char TERM_msg[] = "3999 Terminate\n"; +STORES *me = NULL; /* our Global resource */ +bool forge_on = false; /* proceed inspite of I/O errors */ - -struct s_shm *shm; /* memory shared with children */ -BSHM bshm; /* shared memory control packet */ - - -/* This is our own global resource */ -STORES *me; - -static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static uint32_t VolSessionId = 0; uint32_t VolSessionTime; +char *configfile; -static char *configfile; +/* Global static variables */ static int foreground = 0; - -static workq_t dird_workq; /* queue for processing connections */ -static workq_t filed_workq; /* queue for processing connections */ +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static workq_t dird_workq; /* queue for processing connections */ static void usage() { fprintf(stderr, _( -"\nVersion: " VERSION " (" DATE ")\n\n" -"Usage: stored [-s -f ] [-c config_file] [-d debug_level] [config_file]\n" +"\nVersion: " VERSION " (" BDATE ")\n\n" +"Usage: stored [options] [-c config_file] [config_file]\n" " -c use as configuration file\n" " -dnn set debug level to nn\n" " -f run in foreground (for debugging)\n" +" -g set groupid to group\n" +" -p proceed despite I/O errors\n" " -s no signals (for debugging)\n" " -t test - read config and exit\n" +" -u userid to \n" +" -v verbose user messages\n" " -? print this message.\n" "\n")); exit(1); @@ -86,60 +86,76 @@ static void usage() */ int main (int argc, char *argv[]) { - int ch, i; + int ch; int no_signals = FALSE; int test_config = FALSE; - DEVRES *device; - pthread_t dirid; - int status; + pthread_t thid; + char *uid = NULL; + char *gid = NULL; init_stack_dump(); - my_name_is(argc, argv, "stored"); + my_name_is(argc, argv, "bacula-sd"); + textdomain("bacula"); init_msg(NULL, NULL); daemon_start_time = time(NULL); - memset(&last_job, 0, sizeof(last_job)); /* Sanity checks */ if (TAPE_BSIZE % DEV_BSIZE != 0 || TAPE_BSIZE / DEV_BSIZE == 0) { Emsg2(M_ABORT, 0, "Tape block size (%d) not multiple of system size (%d)\n", - TAPE_BSIZE, DEV_BSIZE); + TAPE_BSIZE, DEV_BSIZE); } if (TAPE_BSIZE != (1 << (ffs(TAPE_BSIZE)-1))) { Emsg1(M_ABORT, 0, "Tape block size (%d) is not a power of 2\n", TAPE_BSIZE); } - while ((ch = getopt(argc, argv, "c:d:fst?")) != -1) { + while ((ch = getopt(argc, argv, "c:d:fg:pstu:v?")) != -1) { switch (ch) { - case 'c': /* configuration file */ - if (configfile != NULL) { - free(configfile); - } - configfile = bstrdup(optarg); - break; - - case 'd': /* debug level */ - debug_level = atoi(optarg); - if (debug_level <= 0) { - debug_level = 1; - } - break; - - case 'f': /* run in foreground */ - foreground = TRUE; - break; - - case 's': /* no signals */ - no_signals = TRUE; - break; - - case 't': - test_config = TRUE; - break; - - case '?': - default: - usage(); - + case 'c': /* configuration file */ + if (configfile != NULL) { + free(configfile); + } + configfile = bstrdup(optarg); + break; + + case 'd': /* debug level */ + debug_level = atoi(optarg); + if (debug_level <= 0) { + debug_level = 1; + } + break; + + case 'f': /* run in foreground */ + foreground = TRUE; + break; + + case 'g': /* set group id */ + gid = optarg; + break; + + case 'p': /* proceed in spite of I/O errors */ + forge_on = true; + break; + + case 's': /* no signals */ + no_signals = TRUE; + break; + + case 't': + test_config = TRUE; + break; + + case 'u': /* set uid */ + uid = optarg; + break; + + case 'v': /* verbose */ + verbose++; + break; + + case '?': + default: + usage(); + break; } } argc -= optind; @@ -147,7 +163,7 @@ int main (int argc, char *argv[]) if (argc) { if (configfile != NULL) { - free(configfile); + free(configfile); } configfile = bstrdup(*argv); argc--; @@ -168,27 +184,19 @@ int main (int argc, char *argv[]) parse_config(configfile); check_config(); - bshm.size = 0; if (test_config) { terminate_stored(0); } if (!foreground) { - daemon_start(); /* become daemon */ - init_stack_dump(); /* pick up new pid */ + daemon_start(); /* become daemon */ + init_stack_dump(); /* pick up new pid */ } create_pid_file(me->pid_directory, "bacula-sd", me->SDport); + read_state_file(me->working_directory, "bacula-sd", me->SDport); - /* ****FIXME**** clean this up */ - /* Create and attach to shared memory. This is a - * hold over from the days of child processes. - * Note, in reality all memory is shared. This - * is just a global buffer for the device packets. - */ - shm = (s_shm *) malloc(sizeof(struct s_shm)); - /* Zero shared memory */ - memset(shm, 0, sizeof(struct s_shm)); + drop(uid, gid); /* Ensure that Volume Session Time and Id are both * set and are both non-zero. @@ -198,90 +206,30 @@ int main (int argc, char *argv[]) Emsg0(M_ABORT, 0, _("Volume Session Time is ZERO!\n")); } - LockRes(); - for (device=NULL,i=0; (device=(DEVRES *)GetNextRes(R_DEVICE, (RES *)device)); i++) { - if (i >= MAX_DEVICES) { - UnlockRes(); - Emsg1(M_ABORT, 0, _("Too many Device Resources. Max=%d\n"), MAX_DEVICES); - } - Dmsg1(90, "calling init_dev %s\n", device->device_name); - device->dev = init_dev(&shm->dev[i], device->device_name); - /* Copy some attributes from the Device Resource to the DEV structure */ - if (device->dev) { - device->dev->capabilities = device->cap_bits; - device->dev->min_block_size = device->min_block_size; - device->dev->max_block_size = device->max_block_size; - device->dev->max_volume_jobs = device->max_volume_jobs; - device->dev->max_volume_files = device->max_volume_files; - device->dev->max_volume_size = device->max_volume_size; - device->dev->max_file_size = device->max_file_size; - device->dev->volume_capacity = device->volume_capacity; - device->dev->max_rewind_wait = device->max_rewind_wait; - device->dev->max_open_wait = device->max_open_wait; - device->dev->device = device; - } - Dmsg1(10, "Init done %s\n", device->device_name); - if (!device->dev) { - Emsg1(M_ERROR, 0, _("Could not initialize %s\n"), device->device_name); - } - if (device->cap_bits & CAP_ALWAYSOPEN) { - Dmsg1(20, "calling open_device %s\n", device->device_name); - if (!open_device(device->dev)) { - Emsg1(M_ERROR, 0, _("Could not open device %s\n"), device->device_name); - } - } - if (device->cap_bits & CAP_AUTOMOUNT && device->dev && - device->dev->state & ST_OPENED) { - DEV_BLOCK *block; - JCR *jcr; - block = new_block(device->dev); - jcr = new_jcr(sizeof(JCR), stored_free_jcr); - switch (read_dev_volume_label(jcr, device->dev, block)) { - case VOL_OK: - break; - default: - Emsg1(M_WARNING, 0, _("Could not mount device %s\n"), device->device_name); - break; - } - free_jcr(jcr); - free_block(block); - } - } - UnlockRes(); - device = NULL; - - set_thread_concurrency(me->max_concurrent_jobs * 2 + - 4 /* watch dog + servers + misc */); + /* Make sure on Solaris we can run concurrent, watch dog + servers + misc */ + set_thread_concurrency(me->max_concurrent_jobs * 2 + 4); - start_watchdog(); /* start watchdog thread */ - - /* - * Here we support either listening on one port or on two ports - */ - if (me->SDDport == 0 || me->SDDport == me->SDport) { - /* Single server used for Director and File daemon */ - bnet_thread_server(me->SDport, me->max_concurrent_jobs * 2, - &dird_workq, connection_request); - } else { - /* Start the Director server */ - if ((status=pthread_create(&dirid, NULL, director_thread, - (void *)me->SDport)) != 0) { - Emsg1(M_ABORT, 0, _("Cannot create Director thread: %s\n"), strerror(status)); - } - /* Start File daemon server */ - bnet_thread_server(me->SDDport, 10, &filed_workq, connection_from_filed); - /* never returns */ + /* + * Start the device allocation thread + */ + if (pthread_create(&thid, NULL, device_allocation, NULL) != 0) { + Emsg1(M_ABORT, 0, _("Unable to create thread. ERR=%s\n"), strerror(errno)); } - exit(1); /* to keep compiler quiet */ -} + start_watchdog(); /* start watchdog thread */ -static void *director_thread(void *arg) -{ - int dir_port = (int)arg; - pthread_detach(pthread_self()); - bnet_thread_server(dir_port, 10, &dird_workq, connection_request); - return NULL; + init_jcr_subsystem(); /* start JCR watchdogs etc. */ + + /* + * Sleep a bit to give device thread a chance to lock the resource + * chain before we start the server. + */ + bmicrosleep(1, 0); + + /* Single server used for Director and File daemon */ + bnet_thread_server(me->SDaddr, me->SDport, me->max_concurrent_jobs * 2 + 1, + &dird_workq, handle_connection_request); + exit(1); /* to keep compiler quiet */ } /* Return a new Session Id */ @@ -299,80 +247,157 @@ uint32_t newVolSessionId() /* Check Configuration file for necessary info */ static void check_config() { - struct stat stat_buf; - LockRes(); me = (STORES *)GetNextRes(R_STORAGE, NULL); if (!me) { UnlockRes(); - Emsg1(M_ABORT, 0, _("No Storage resource defined in %s. Cannot continue.\n"), - configfile); + Emsg1(M_ERROR_TERM, 0, _("No Storage resource defined in %s. Cannot continue.\n"), + configfile); } - my_name_is(0, (char **)NULL, me->hdr.name); /* Set our real name */ + my_name_is(0, (char **)NULL, me->hdr.name); /* Set our real name */ if (GetNextRes(R_STORAGE, (RES *)me) != NULL) { UnlockRes(); - Emsg1(M_ABORT, 0, _("Only one Storage resource permitted in %s\n"), - configfile); + Emsg1(M_ERROR_TERM, 0, _("Only one Storage resource permitted in %s\n"), + configfile); } if (GetNextRes(R_DIRECTOR, NULL) == NULL) { UnlockRes(); - Emsg1(M_ABORT, 0, _("No Director resource defined in %s. Cannot continue.\n"), - configfile); + Emsg1(M_ERROR_TERM, 0, _("No Director resource defined in %s. Cannot continue.\n"), + configfile); } if (GetNextRes(R_DEVICE, NULL) == NULL){ UnlockRes(); - Emsg1(M_ABORT, 0, _("No Device resource defined in %s. Cannot continue.\n"), - configfile); + Emsg1(M_ERROR_TERM, 0, _("No Device resource defined in %s. Cannot continue.\n"), + configfile); } if (!me->messages) { me->messages = (MSGS *)GetNextRes(R_MSGS, NULL); if (!me->messages) { - Emsg1(M_ABORT, 0, _("No Messages resource defined in %s. Cannot continue.\n"), - configfile); + Emsg1(M_ERROR_TERM, 0, _("No Messages resource defined in %s. Cannot continue.\n"), + configfile); } } - close_msg(NULL); /* close temp message handler */ + close_msg(NULL); /* close temp message handler */ init_msg(NULL, me->messages); /* open daemon message handler */ UnlockRes(); if (!me->working_directory) { - Emsg1(M_ABORT, 0, _("No Working Directory defined in %s. Cannot continue.\n"), - configfile); - } - if (stat(me->working_directory, &stat_buf) != 0) { - Emsg1(M_ABORT, 0, _("Working Directory: %s not found. Cannot continue.\n"), - me->working_directory); - } - if (!S_ISDIR(stat_buf.st_mode)) { - Emsg1(M_ABORT, 0, _("Working Directory: %s is not a directory. Cannot continue.\n"), - me->working_directory); + Emsg1(M_ERROR_TERM, 0, _("No Working Directory defined in %s. Cannot continue.\n"), + configfile); } - working_directory = me->working_directory; + + set_working_directory(me->working_directory); +} + +/* + * We are started as a separate thread. The + * resources are alread locked. + */ +extern "C" +void *device_allocation(void *arg) +{ + DEVRES *device; + + LockRes(); + pthread_detach(pthread_self()); + + foreach_res(device, R_DEVICE) { + Dmsg1(90, "calling init_dev %s\n", device->device_name); + device->dev = init_dev(NULL, device); + Dmsg1(10, "SD init done %s\n", device->device_name); + if (!device->dev) { + Emsg1(M_ERROR, 0, _("Could not initialize %s\n"), device->device_name); + continue; + } + + if (device->cap_bits & CAP_ALWAYSOPEN) { + Dmsg1(20, "calling first_open_device %s\n", device->device_name); + if (!first_open_device(device->dev)) { + Emsg1(M_ERROR, 0, _("Could not open device %s\n"), device->device_name); + } + } + if (device->cap_bits & CAP_AUTOMOUNT && device->dev && + device->dev->state & ST_OPENED) { + JCR *jcr; + DCR *dcr; + jcr = new_jcr(sizeof(JCR), stored_free_jcr); + jcr->JobType = JT_SYSTEM; + /* Initialize FD start condition variable */ + int errstat = pthread_cond_init(&jcr->job_start_wait, NULL); + if (errstat != 0) { + Jmsg1(jcr, M_ABORT, 0, _("Unable to init job cond variable: ERR=%s\n"), strerror(errstat)); + } + jcr->device = device; + dcr = new_dcr(jcr, device->dev); + switch (read_dev_volume_label(dcr, dcr->block)) { + case VOL_OK: + break; + default: + Emsg1(M_WARNING, 0, _("Could not mount device %s\n"), device->device_name); + break; + } + free_jcr(jcr); + } + } + UnlockRes(); + return NULL; } + /* Clean up and then exit */ void terminate_stored(int sig) { static int in_here = FALSE; DEVRES *device; + JCR *jcr; - if (in_here) { /* prevent loops */ + if (in_here) { /* prevent loops */ exit(1); } in_here = TRUE; + if (sig == SIGTERM) { /* normal shutdown request? */ + /* + * This is a normal shutdown request. We wiffle through + * all open jobs canceling them and trying to wake + * them up so that they will report back the correct + * volume status. + */ + lock_jcr_chain(); + foreach_jcr(jcr) { + BSOCK *fd; + free_locked_jcr(jcr); + if (jcr->JobId == 0) { + continue; /* ignore console */ + } + set_jcr_job_status(jcr, JS_Canceled); + fd = jcr->file_bsock; + if (fd) { + fd->timed_out = TRUE; + Dmsg1(100, "killing JobId=%d\n", jcr->JobId); + pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); + if (jcr->device && jcr->device->dev && jcr->device->dev->dev_blocked) { + pthread_cond_signal(&jcr->device->dev->wait_next_vol); + } + bmicrosleep(0, 50000); + } + } + unlock_jcr_chain(); + bmicrosleep(0, 500000); /* give them 1/2 sec to clean up */ + } + + write_state_file(me->working_directory, "bacula-sd", me->SDport); delete_pid_file(me->pid_directory, "bacula-sd", me->SDport); - stop_watchdog(); - Dmsg0(200, "In terminate_stored()\n"); + Dmsg1(200, "In terminate_stored() sig=%d\n", sig); LockRes(); - for (device=NULL; (device=(DEVRES *)GetNextRes(R_DEVICE, (RES *)device)); ) { + foreach_res(device, R_DEVICE) { if (device->dev) { - term_dev(device->dev); + term_dev(device->dev); } } UnlockRes(); @@ -385,12 +410,9 @@ void terminate_stored(int sig) print_memory_pool_stats(); } term_msg(); + stop_watchdog(); close_memory_pool(); - if (shm) { - free(shm); - } - - sm_dump(False); /* dump orphaned buffers */ - exit(1); + sm_dump(false); /* dump orphaned buffers */ + exit(sig); }