X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Flib%2Fbsock.c;h=94efd6bfe407d5cbe88a4b789fc93343b78840f9;hb=0fa12977628002cc239bb6dbadb473189f71229c;hp=88841f322f23adb65325a16472ada5bcb7bcd59c;hpb=450d6875ecc22baa6012452dc332ba607fdb5917;p=bacula%2Fbacula diff --git a/bacula/src/lib/bsock.c b/bacula/src/lib/bsock.c index 88841f322f..94efd6bfe4 100644 --- a/bacula/src/lib/bsock.c +++ b/bacula/src/lib/bsock.c @@ -1,12 +1,12 @@ /* Bacula® - The Network Backup Solution - Copyright (C) 2007-2007 Free Software Foundation Europe e.V. + Copyright (C) 2007-2010 Free Software Foundation Europe e.V. The main author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. This program is Free Software; you can redistribute it and/or - modify it under the terms of version two of the GNU General Public + modify it under the terms of version three of the GNU Affero General Public License as published by the Free Software Foundation and included in the file LICENSE. @@ -15,12 +15,12 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - You should have received a copy of the GNU General Public License + You should have received a copy of the GNU Affero General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - Bacula® is a registered trademark of John Walker. + Bacula® is a registered trademark of Kern Sibbald. The licensor of Bacula is the Free Software Foundation Europe (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, Switzerland, email:ftf@fsfeurope.org. @@ -30,7 +30,6 @@ * * by Kern Sibbald * - * Version $Id: $ */ @@ -85,6 +84,12 @@ void BSOCK::free_bsock() destroy(); } +void BSOCK::free_tls() +{ + free_tls_connection(this->tls); + this->tls = NULL; +} + /* * Try to connect to host for max_retry_time at retry_time intervals. * Note, you must have called the constructor prior to calling @@ -104,7 +109,7 @@ bool BSOCK::connect(JCR * jcr, int retry_interval, utime_t max_retry_time, /* Try to trap out of OS call when time expires */ if (max_retry_time) { - tid = start_thread_timer(pthread_self(), (uint32_t)max_retry_time); + tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time); } for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal); @@ -155,6 +160,26 @@ void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, i set_jcr(jcr); } +/* + * Copy the address from the configuration dlist that gets passed in + */ +void BSOCK::set_source_address(dlist *src_addr_list) +{ + IPADDR *addr = NULL; + + // delete the object we already have, if it's allocated + if (src_addr) { + free( src_addr); + src_addr = NULL; + } + + if (src_addr_list) { + addr = (IPADDR*) src_addr_list->first(); + src_addr = New( IPADDR(*addr)); + } +} + + /* * Open a TCP connection to the server * Returns NULL @@ -202,6 +227,19 @@ bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service, ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror()); continue; } + + /* Bind to the source address if it is set */ + if (src_addr) { + if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) { + berrno be; + save_errno = errno; + *fatal = 1; + Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"), + src_addr->get_family(), be.bstrerror() ); + continue; + } + } + /* * Keep socket from timing out from inactivity */ @@ -251,7 +289,34 @@ bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service, return true; } +/* + * Force read/write to use locking + */ +bool BSOCK::set_locking() +{ + int stat; + if (m_use_locking) { + return true; /* already set */ + } + if ((stat = pthread_mutex_init(&m_mutex, NULL)) != 0) { + berrno be; + Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock mutex. ERR=%s\n"), + be.bstrerror(stat)); + return false; + } + m_use_locking = true; + return true; +} +void BSOCK::clear_locking() +{ + if (!m_use_locking) { + return; + } + m_use_locking = false; + pthread_mutex_destroy(&m_mutex); + return; +} /* * Send a message over the network. The send consists of @@ -266,10 +331,32 @@ bool BSOCK::send() int32_t rc; int32_t pktsiz; int32_t *hdr; + bool ok = true; - if (errors || is_terminated() || msglen > 1000000) { + if (errors) { + if (!m_suppress_error_msgs) { + Qmsg4(m_jcr, M_ERROR, 0, _("Socket has errors=%d on call to %s:%s:%d\n"), + errors, m_who, m_host, m_port); + } + return false; + } + if (is_terminated()) { + if (!m_suppress_error_msgs) { + Qmsg4(m_jcr, M_ERROR, 0, _("Socket is terminated=%d on call to %s:%s:%d\n"), + is_terminated(), m_who, m_host, m_port); + } + return false; + } + if (msglen > 4000000) { + if (!m_suppress_error_msgs) { + Qmsg4(m_jcr, M_ERROR, 0, + _("Socket has insane msglen=%d on call to %s:%s:%d\n"), + msglen, m_who, m_host, m_port); + } return false; } + + if (m_use_locking) P(m_mutex); /* Compute total packet length */ if (msglen <= 0) { pktsiz = sizeof(pktsiz); /* signal, no data */ @@ -310,9 +397,10 @@ bool BSOCK::send() _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"), msglen, m_who, m_host, m_port, rc); } - return false; + ok = false; } - return true; + if (m_use_locking) V(m_mutex); + return ok; } /* @@ -374,6 +462,7 @@ int32_t BSOCK::recv() return BNET_HARDEOF; } + if (m_use_locking) P(m_mutex); read_seqno++; /* bump sequence number */ timer_start = watchdog_time; /* set start wait time */ clear_timed_out(); @@ -387,7 +476,8 @@ int32_t BSOCK::recv() b_errno = errno; } errors++; - return BNET_HARDEOF; /* assume hard EOF received */ + nbytes = BNET_HARDEOF; /* assume hard EOF received */ + goto get_out; } timer_start = 0; /* clear timer */ if (nbytes != sizeof(int32_t)) { @@ -395,16 +485,18 @@ int32_t BSOCK::recv() b_errno = EIO; Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"), sizeof(int32_t), nbytes, m_who, m_host, m_port); - return BNET_ERROR; + nbytes = BNET_ERROR; + goto get_out; } pktsiz = ntohl(pktsiz); /* decode no. of bytes that follow */ if (pktsiz == 0) { /* No data transferred */ - timer_start = 0; /* clear timer */ + timer_start = 0; /* clear timer */ in_msg_no++; msglen = 0; - return 0; /* zero bytes read */ + nbytes = 0; /* zero bytes read */ + goto get_out; } /* If signal or packet size too big */ @@ -418,10 +510,11 @@ int32_t BSOCK::recv() if (pktsiz == BNET_TERMINATE) { set_terminated(); } - timer_start = 0; /* clear timer */ + timer_start = 0; /* clear timer */ b_errno = ENODATA; - msglen = pktsiz; /* signal code */ - return BNET_SIGNAL; /* signal */ + msglen = pktsiz; /* signal code */ + nbytes = BNET_SIGNAL; /* signal */ + goto get_out; } /* Make sure the buffer is big enough + one byte for EOS */ @@ -442,7 +535,8 @@ int32_t BSOCK::recv() errors++; Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"), m_who, m_host, m_port, this->bstrerror()); - return BNET_ERROR; + nbytes = BNET_ERROR; + goto get_out; } timer_start = 0; /* clear timer */ in_msg_no++; @@ -452,7 +546,8 @@ int32_t BSOCK::recv() errors++; Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"), pktsiz, nbytes, m_who, m_host, m_port); - return BNET_ERROR; + nbytes = BNET_ERROR; + goto get_out; } /* always add a zero by to properly terminate any * string that was send to us. Note, we ensured above that the @@ -460,6 +555,9 @@ int32_t BSOCK::recv() */ msg[nbytes] = 0; /* terminate in case it is a string */ sm_check(__FILE__, __LINE__, false); + +get_out: + if (m_use_locking) V(m_mutex); return nbytes; /* return actual length of message */ } @@ -485,6 +583,7 @@ bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize) size_t nbytes; ssize_t last = 0, size = 0; int count = 0; + JCR *jcr = get_jcr(); rewind(m_spool_fd); @@ -497,11 +596,11 @@ bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize) size += sizeof(int32_t); msglen = ntohl(pktsiz); if (msglen > 0) { - if (msglen > (int32_t) sizeof_pool_memory(msg)) { + if (msglen > (int32_t)sizeof_pool_memory(msg)) { msg = realloc_pool_memory(msg, msglen + 1); } nbytes = fread(msg, 1, msglen, m_spool_fd); - if (nbytes != (size_t) msglen) { + if (nbytes != (size_t)msglen) { berrno be; Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen); Qmsg1(get_jcr(), M_FATAL, 0, _("fread attr spool error. ERR=%s\n"), @@ -516,12 +615,13 @@ bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize) } } send(); + if (jcr && job_canceled(jcr)) { + return false; + } } update_attr_spool_size(tsize - last); if (ferror(m_spool_fd)) { - berrno be; - Qmsg1(get_jcr(), M_FATAL, 0, _("fread attr spool error. ERR=%s\n"), - be.bstrerror()); + Qmsg(jcr, M_FATAL, 0, _("fread attr spool I/O error.\n")); return false; } return true; @@ -568,6 +668,7 @@ int BSOCK::get_peer(char *buf, socklen_t buflen) bool BSOCK::set_buffer_size(uint32_t size, int rw) { uint32_t dbuf_size, start_size; + #if defined(IP_TOS) && defined(IPTOS_THROUGHPUT) int opt; opt = IPTOS_THROUGHPUT; @@ -584,6 +685,17 @@ bool BSOCK::set_buffer_size(uint32_t size, int rw) Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCK data buffer\n")); return false; } + + /* + * If user has not set the size, use the OS default -- i.e. do not + * try to set it. This allows sys admins to set the size they + * want in the OS, and Bacula will comply. See bug #1493 + */ + if (size == 0) { + msglen = dbuf_size; + return true; + } + if (rw & BNET_SETBUF_READ) { while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET, SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) { @@ -643,13 +755,13 @@ int BSOCK::set_nonblocking() /* Get current flags */ if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) { berrno be; - Jmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror()); + Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror()); } /* Set O_NONBLOCK flag */ if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) { berrno be; - Jmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); + Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); } m_blocking = 0; @@ -677,13 +789,13 @@ int BSOCK::set_blocking() /* Get current flags */ if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) { berrno be; - Jmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror()); + Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror()); } /* Set O_NONBLOCK flag */ if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) { berrno be; - Jmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); + Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); } m_blocking = 1; @@ -708,7 +820,7 @@ void BSOCK::restore_blocking (int flags) #ifndef HAVE_WIN32 if ((fcntl(m_fd, F_SETFL, flags)) < 0) { berrno be; - Jmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); + Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); } m_blocking = (flags & O_NONBLOCK) ? true : false; @@ -728,7 +840,7 @@ void BSOCK::restore_blocking (int flags) * 0 if timeout * -1 if error */ -int BSOCK::wait_data(int sec) +int BSOCK::wait_data(int sec, int usec) { fd_set fdset; struct timeval tv; @@ -737,7 +849,7 @@ int BSOCK::wait_data(int sec) FD_SET((unsigned)m_fd, &fdset); for (;;) { tv.tv_sec = sec; - tv.tv_usec = 0; + tv.tv_usec = usec; switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) { case 0: /* timeout */ b_errno = 0; @@ -758,15 +870,18 @@ int BSOCK::wait_data(int sec) /* * As above, but returns on interrupt */ -int BSOCK::wait_data_intr(int sec) +int BSOCK::wait_data_intr(int sec, int usec) { fd_set fdset; struct timeval tv; + if (this == NULL) { + return -1; + } FD_ZERO(&fdset); FD_SET((unsigned)m_fd, &fdset); tv.tv_sec = sec; - tv.tv_usec = 0; + tv.tv_usec = usec; switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) { case 0: /* timeout */ b_errno = 0; @@ -776,6 +891,7 @@ int BSOCK::wait_data_intr(int sec) return -1; /* error return */ default: b_errno = 0; + break; } return 1; } @@ -793,6 +909,9 @@ void BSOCK::close() BSOCK *bsock = this; BSOCK *next; + if (!m_duped) { + clear_locking(); + } for (; bsock; bsock = next) { next = bsock->m_next; /* get possible pointer to next before destoryed */ if (!bsock->m_duped) { @@ -802,7 +921,7 @@ void BSOCK::close() free_tls_connection(bsock->tls); bsock->tls = NULL; } - if (bsock->is_timed_out() || bsock->is_terminated()) { + if (bsock->is_timed_out()) { shutdown(bsock->m_fd, SHUT_RDWR); /* discard any pending I/O */ } socketClose(bsock->m_fd); /* normal close */ @@ -832,6 +951,10 @@ void BSOCK::destroy() free(m_host); m_host = NULL; } + if (src_addr) { + free(src_addr); + src_addr = NULL; + } free(this); } @@ -929,7 +1052,7 @@ bail_out: bsnprintf(msg, msglen, _("Authorization problem with Director at \"%s:%d\"\n" "Most likely the passwords do not agree.\n" "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n" - "Please see http://www.bacula.org/rel-manual/faq.html#AuthorizationErrors for help.\n"), + "Please see http://www.bacula.org/en/rel-manual/Bacula_Freque_Asked_Questi.html#SECTION003760000000000000000 for help.\n"), dir->host(), dir->port()); return false; }