X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Flib%2Fbsock.c;h=1e15b03afb242e0b829f6cb84cfcf53fd1cc1497;hb=86da6147f63b29cb85d51620b55bae7266f1c890;hp=e0bf32b35c663e38d96eb7c590a6e53852b6d062;hpb=a00e029e3ea1f4fa1d98a3bf3402809844a493c9;p=bacula%2Fbacula diff --git a/bacula/src/lib/bsock.c b/bacula/src/lib/bsock.c index e0bf32b35c..1e15b03afb 100644 --- a/bacula/src/lib/bsock.c +++ b/bacula/src/lib/bsock.c @@ -1,7 +1,7 @@ /* Bacula(R) - The Network Backup Solution - Copyright (C) 2000-2016 Kern Sibbald + Copyright (C) 2000-2017 Kern Sibbald The original author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -11,7 +11,7 @@ Public License, v3.0 ("AGPLv3") and some additional permissions and terms pursuant to its AGPLv3 Section 7. - This notice must be preserved when any source code is + This notice must be preserved when any source code is conveyed and/or propagated. Bacula(R) is a registered trademark of Kern Sibbald. @@ -20,11 +20,11 @@ * Network Utility Routines * * Written by Kern Sibbald - * */ #include "bacula.h" #include "jcr.h" +#include "lz4.h" #include #include @@ -40,6 +40,37 @@ #define socketWrite(fd, buf, len) ::write(fd, buf, len) #define socketClose(fd) ::close(fd) +/* + * make a nice dump of a message + */ +void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags, POOLMEM *msg, int32_t msglen) +{ + char buf[54]; + bool is_ascii; + int dbglvl = DT_ASX; + + if (msglen<0) { + Dmsg4(dbglvl, "%s %d:%d SIGNAL=%s\n", what, sock, msgno, bnet_sig_to_ascii(msglen)); + // data + smartdump(msg, msglen, buf, sizeof(buf)-9, &is_ascii); + if (is_ascii) { + Dmsg5(dbglvl, "%s %d:%d len=%d \"%s\"\n", what, sock, msgno, msglen, buf); + } else { + Dmsg5(dbglvl, "%s %d:%d len=%d %s\n", what, sock, msgno, msglen, buf); + } + } +} + + +BSOCKCallback::BSOCKCallback() +{ +} + +BSOCKCallback::~BSOCKCallback() +{ +} + + /* * This is a non-class BSOCK "constructor" because we want to * call the Bacula smartalloc routines instead of new. @@ -54,11 +85,14 @@ BSOCK *new_bsock() void BSOCK::init() { memset(this, 0, sizeof(BSOCK)); + m_master = this; set_closed(); set_terminated(); m_blocking = 1; pout_msg_no = &out_msg_no; + uninstall_send_hook_cb(); msg = get_pool_memory(PM_BSOCK); + cmsg = get_pool_memory(PM_BSOCK); errmsg = get_pool_memory(PM_MESSAGE); timeout = BSOCK_TIMEOUT; } @@ -202,7 +236,7 @@ bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service, ipaddr->build_address_str(curbuf, sizeof(curbuf)), build_addresses_str(addr_list, allbuf, sizeof(allbuf))); /* Open a TCP socket */ - if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM, 0)) < 0) { + if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) { berrno be; save_errno = errno; switch (errno) { @@ -340,6 +374,12 @@ bool BSOCK::set_locking() be.bstrerror(stat)); return false; } + if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) { + berrno be; + Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock attribute mutex. ERR=%s\n"), + be.bstrerror(stat)); + return false; + } m_use_locking = true; return true; } @@ -352,12 +392,122 @@ void BSOCK::clear_locking() m_use_locking = false; pthread_mutex_destroy(pm_rmutex); pthread_mutex_destroy(pm_wmutex); + pthread_mutex_destroy(&m_mmutex); pm_rmutex = NULL; pm_wmutex = NULL; return; } /* + * Do comm line compression (LZ4) of a bsock message. + * Returns: true if the compression was done + * false if no compression was done + * The "offset" defines where to start compressing the message. This + * allows passing "header" information uncompressed and the actual + * data part compressed. + * + * Note, we don't compress lines less than 20 characters because we + * want to save at least 10 characters otherwise compression doesn't + * help enough to warrant doing the decompression. + */ +bool BSOCK::comm_compress() +{ + bool compress = false; + bool compressed = false; + int offset = m_flags & 0xFF; + + /* + * Enable compress if allowed and not spooling and the + * message is long enough (>20) to get some reasonable savings. + */ + if (msglen > 20) { + compress = can_compress() && !is_spooling(); + } + m_CommBytes += msglen; /* uncompressed bytes */ + Dmsg4(DT_NETWORK|200, "can_compress=%d compress=%d CommBytes=%lld CommCompresedBytes=%lld\n", + can_compress(), compress, m_CommBytes, m_CommCompressedBytes); + if (compress) { + int clen; + int need_size; + + ASSERT2(offset <= msglen, "Comm offset bigger than message\n"); + ASSERT2(offset < 255, "Offset greater than 254\n"); + need_size = LZ4_compressBound(msglen); + if (need_size >= ((int32_t)sizeof_pool_memory(cmsg))) { + cmsg = realloc_pool_memory(cmsg, need_size + 100); + } + msglen -= offset; + msg += offset; + cmsg += offset; + clen = LZ4_compress_limitedOutput(msg, cmsg, msglen, msglen); + //Dmsg2(000, "clen=%d msglen=%d\n", clen, msglen); + /* Compression should save at least 10 characters */ + if (clen > 0 && clen + 10 <= msglen) { + +#ifdef xxx_debug + /* Debug code -- decompress and compare */ + int blen, rlen, olen; + olen = msglen; + POOLMEM *rmsg = get_pool_memory(PM_BSOCK); + blen = sizeof_pool_memory(msg) * 2; + if (blen >= sizeof_pool_memory(rmsg)) { + rmsg = realloc_pool_memory(rmsg, blen); + } + rlen = LZ4_decompress_safe(cmsg, rmsg, clen, blen); + //Dmsg4(000, "blen=%d clen=%d olen=%d rlen=%d\n", blen, clen, olen, rlen); + ASSERT(olen == rlen); + ASSERT(memcmp(msg, rmsg, olen) == 0); + free_pool_memory(rmsg); + /* end Debug code */ +#endif + + msg = cmsg; + msglen = clen; + compressed = true; + } + msglen += offset; + msg -= offset; + cmsg -= offset; + } + m_CommCompressedBytes += msglen; + return compressed; +} + + +/* + * Send a message over the network. Everything is sent in one + * write request, but depending on the mode you are using + * there will be either two or three read requests done. + * Read 1: 32 bits, gets essentially the packet length, but may + * have the upper bits set to indicate compression or + * an extended header packet. + * Read 2: 32 bits, this read is done only of BNET_HDR_EXTEND is set. + * In this case the top 16 bits of this 32 bit word are reserved + * for flags and the lower 16 bits for data. This word will be + * stored in the field "flags" in the BSOCK packet. + * Read 2 or 3: depending on if Read 2 is done. This is the data. + * + * For normal comm line compression, the whole data packet is compressed + * but not the msglen (first read). + * To do data compression rather than full comm line compression, prior to + * call send(flags) where the lower 32 bits is the offset to the data to + * be compressed. The top 32 bits are reserved for flags that can be + * set. The are: + * BNET_IS_CMD We are sending a command + * BNET_OFFSET An offset is specified (this implies data compression) + * BNET_NOCOMPRESS Inhibit normal comm line compression + * BNET_DATACOMPRESSED The data using the specified offset was + * compressed, and normal comm line compression will + * not be done. + * If any of the above bits are set, then BNET_HDR_EXTEND will be set + * in the top bits of msglen, and the full set of flags + the offset + * will be passed as a 32 bit word just after the msglen, and then + * followed by any data that is either compressed or not. + * + * Note, neither comm line nor data compression is not + * guaranteed since it may result in more data, in which case, the + * record is sent uncompressed and there will be no offset. + * On the receive side, if BNET_OFFSET is set, then the data is compressed. * * Returns: false on failure * true on success @@ -367,10 +517,12 @@ bool BSOCK::send(int aflags) int32_t rc; int32_t pktsiz; int32_t *hdrptr; + int offset; int hdrsiz; bool ok = true; int32_t save_msglen; POOLMEM *save_msg; + bool compressed; bool locked = false; if (is_closed()) { @@ -403,6 +555,15 @@ bool BSOCK::send(int aflags) return false; } + if (send_hook_cb) { + if (!send_hook_cb->bsock_send_cb()) { + Dmsg3(1, "Flowcontrol failure on %s:%s:%d\n", m_who, m_host, m_port); + Qmsg3(m_jcr, M_ERROR, 0, + _("Flowcontrol failure on %s:%s:%d\n"), + m_who, m_host, m_port); + return false; + } + } if (m_use_locking) { pP(pm_wmutex); locked = true; @@ -411,6 +572,24 @@ bool BSOCK::send(int aflags) save_msg = msg; m_flags = aflags; + offset = aflags & 0xFF; /* offset is 16 bits */ + if (offset) { + m_flags |= BNET_OFFSET; + } + if (m_flags & BNET_DATACOMPRESSED) { /* Check if already compressed */ + compressed = true; + } else if (m_flags & BNET_NOCOMPRESS) { + compressed = false; + } else { + compressed = comm_compress(); /* do requested compression */ + } + if (offset && compressed) { + m_flags |= BNET_DATACOMPRESSED; + } + if (!compressed) { + m_flags &= ~BNET_COMPRESSED; + } + /* Compute total packet length */ if (msglen <= 0) { hdrsiz = sizeof(pktsiz); @@ -423,6 +602,18 @@ bool BSOCK::send(int aflags) pktsiz = msglen + hdrsiz; } + /* Set special bits */ + if (m_flags & BNET_OFFSET) { /* if data compression on */ + compressed = false; /* no comm compression */ + } + if (compressed) { + msglen |= BNET_COMPRESSED; /* comm line compression */ + } + + if (m_flags) { + msglen |= BNET_HDR_EXTEND; /* extended header */ + } + /* * Store packet length at head of message -- note, we * have reserved an int32_t just before msg, so we can @@ -441,7 +632,7 @@ bool BSOCK::send(int aflags) clear_timed_out(); /* Full I/O done in one write */ rc = write_nbytes(this, (char *)hdrptr, pktsiz); -// if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, save_msg, save_msglen); + if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, save_msg, save_msglen); timer_start = 0; /* clear timer */ if (rc != pktsiz) { errors++; @@ -464,6 +655,8 @@ bool BSOCK::send(int aflags) } ok = false; } +// Dmsg4(000, "cmpr=%d ext=%d cmd=%d m_flags=0x%x\n", msglen&BNET_COMPRESSED?1:0, +// msglen&BNET_HDR_EXTEND?1:0, msglen&BNET_CMD_BIT?1:0, m_flags); msglen = save_msglen; msg = save_msg; if (locked) pV(pm_wmutex); @@ -480,6 +673,9 @@ bool BSOCK::fsend(const char *fmt, ...) va_list arg_ptr; int maxlen; + if (is_null(this)) { + return false; /* do not seg fault */ + } if (errors || is_terminated() || is_closed()) { return false; } @@ -522,9 +718,12 @@ int32_t BSOCK::recv() { int32_t nbytes; int32_t pktsiz; + int32_t o_pktsiz = 0; + bool compressed = false; + bool command = false; bool locked = false; - msg[0] = 0; + cmsg[0] = msg[0] = 0; msglen = 0; m_flags = 0; if (errors || is_terminated() || is_closed()) { @@ -562,6 +761,47 @@ int32_t BSOCK::recv() } pktsiz = ntohl(pktsiz); /* decode no. of bytes that follow */ + o_pktsiz = pktsiz; + /* If extension, read it */ + if (pktsiz > 0 && (pktsiz & BNET_HDR_EXTEND)) { + timer_start = watchdog_time; /* set start wait time */ + clear_timed_out(); + if ((nbytes = read_nbytes(this, (char *)&m_flags, sizeof(int32_t))) <= 0) { + timer_start = 0; /* clear timer */ + /* probably pipe broken because client died */ + if (errno == 0) { + b_errno = ENODATA; + } else { + b_errno = errno; + } + errors++; + nbytes = BNET_HARDEOF; /* assume hard EOF received */ + goto get_out; + } + timer_start = 0; /* clear timer */ + if (nbytes != sizeof(int32_t)) { + errors++; + b_errno = EIO; + Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"), + sizeof(int32_t), nbytes, m_who, m_host, m_port); + nbytes = BNET_ERROR; + goto get_out; + } + pktsiz &= ~BNET_HDR_EXTEND; + m_flags = ntohl(m_flags); + } + + if (pktsiz > 0 && (pktsiz & BNET_COMPRESSED)) { + compressed = true; + pktsiz &= ~BNET_COMPRESSED; + } + + if (m_flags & BNET_IS_CMD) { + command = true; + } + if (m_flags & BNET_OFFSET) { + compressed = true; + } if (pktsiz == 0) { /* No data transferred */ timer_start = 0; /* clear timer */ @@ -575,7 +815,7 @@ int32_t BSOCK::recv() if (pktsiz < 0 || pktsiz > 1000000) { if (pktsiz > 0) { /* if packet too big */ Qmsg4(m_jcr, M_FATAL, 0, - _("Packet size=%d too big from \"%s:%s:%d. Terminating connection.\n"), + _("Packet size=%d too big from \"%s:%s:%d\". Maximum permitted 1000000. Terminating connection.\n"), pktsiz, m_who, m_host, m_port); pktsiz = BNET_TERMINATE; /* hang up */ } @@ -621,6 +861,59 @@ int32_t BSOCK::recv() nbytes = BNET_ERROR; goto get_out; } + /* If compressed uncompress it */ + if (compressed) { + int offset = 0; + int psize = nbytes * 4; + if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) { + cmsg = realloc_pool_memory(cmsg, psize); + } + psize = sizeof_pool_memory(cmsg); + if (m_flags & BNET_OFFSET) { + offset = m_flags & 0xFF; + msg += offset; + msglen -= offset; + } + /* Grow buffer to max approx 4MB */ + for (int i=0; i < 7; i++) { + nbytes = LZ4_decompress_safe(msg, cmsg, msglen, psize); + if (nbytes >= 0) { + break; + } + if (psize < 65536) { + psize = 65536; + } else { + psize = psize * 2; + } + if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) { + cmsg = realloc_pool_memory(cmsg, psize + 100); + } + } + if (m_flags & BNET_OFFSET) { + msg -= offset; + msglen += offset; + } + if (nbytes < 0) { + Jmsg1(m_jcr, M_ERROR, 0, "Decompress error!!!! ERR=%d\n", nbytes); + Pmsg3(000, "Decompress error!! pktsiz=%d cmsgsiz=%d nbytes=%d\n", pktsiz, + psize, nbytes); + b_errno = EIO; + errors++; + Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"), + pktsiz, nbytes, m_who, m_host, m_port); + nbytes = BNET_ERROR; + goto get_out; + } + msglen = nbytes; + /* Make sure the buffer is big enough + one byte for EOS */ + if (msglen >= (int32_t)sizeof_pool_memory(msg)) { + msg = realloc_pool_memory(msg, msglen + 100); + } + /* If this is a data decompress, leave msg compressed */ + if (!(m_flags & BNET_OFFSET)) { + memcpy(msg, cmsg, msglen); + } + } /* always add a zero by to properly terminate any * string that was send to us. Note, we ensured above that the @@ -634,7 +927,10 @@ int32_t BSOCK::recv() Dsm_check(300); get_out: -// if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "RECV", nbytes, o_pktsiz, m_flags, msg, msglen); + if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "RECV", nbytes, o_pktsiz, m_flags, msg, msglen); + if (nbytes != BNET_ERROR && command) { + nbytes = BNET_COMMAND; + } if (locked) pV(pm_rmutex); return nbytes; /* return actual length of message */ @@ -725,6 +1021,7 @@ const char *BSOCK::bstrerror() int BSOCK::get_peer(char *buf, socklen_t buflen) { +#if !defined(HAVE_WIN32) if (peer_addr.sin_family == 0) { socklen_t salen = sizeof(peer_addr); int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen); @@ -734,6 +1031,9 @@ int BSOCK::get_peer(char *buf, socklen_t buflen) return -1; return 0; +#else + return -1; +#endif } /* @@ -886,17 +1186,10 @@ void BSOCK::restore_blocking (int flags) * 0 if timeout * -1 if error */ -int BSOCK::wait_data(int sec, int usec) +int BSOCK::wait_data(int sec, int msec) { - fd_set fdset; - struct timeval tv; - - FD_ZERO(&fdset); - FD_SET((unsigned)m_fd, &fdset); for (;;) { - tv.tv_sec = sec; - tv.tv_usec = usec; - switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) { + switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) { case 0: /* timeout */ b_errno = 0; return 0; @@ -921,16 +1214,9 @@ int BSOCK::wait_data(int sec, int usec) /* * As above, but returns on interrupt */ -int BSOCK::wait_data_intr(int sec, int usec) +int BSOCK::wait_data_intr(int sec, int msec) { - fd_set fdset; - struct timeval tv; - - FD_ZERO(&fdset); - FD_SET((unsigned)m_fd, &fdset); - tv.tv_sec = sec; - tv.tv_usec = usec; - switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) { + switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) { case 0: /* timeout */ b_errno = 0; return 0; @@ -960,14 +1246,30 @@ int BSOCK::wait_data_intr(int sec, int usec) #define SHUT_RDWR 2 #endif +/* + * The JCR is canceled, set terminate for chained BSOCKs starting from master + */ +void BSOCK::cancel() +{ + master_lock(); + for (BSOCK *next = m_master; next != NULL; next = next->m_next) { + if (!next->m_closed) { + next->m_terminated = true; + next->m_timed_out = true; + } + } + master_unlock(); +} + /* * Note, this routine closes the socket, but leaves the * bsock memory in place. + * every thread is responsible of closing and destroying its own duped or not + * duped BSOCK */ void BSOCK::close() { BSOCK *bsock = this; - BSOCK *next; if (bsock->is_closed()) { return; @@ -975,33 +1277,29 @@ void BSOCK::close() if (!m_duped) { clear_locking(); } - for (; bsock; bsock = next) { - next = bsock->m_next; /* get possible pointer to next before destoryed */ - bsock->set_closed(); - bsock->set_terminated(); - if (!bsock->m_duped) { - /* Shutdown tls cleanly. */ - if (bsock->tls) { - tls_bsock_shutdown(bsock); - free_tls_connection(bsock->tls); - bsock->tls = NULL; - } + bsock->set_closed(); + bsock->set_terminated(); + if (!bsock->m_duped) { + /* Shutdown tls cleanly. */ + if (bsock->tls) { + tls_bsock_shutdown(bsock); + free_tls_connection(bsock->tls); + bsock->tls = NULL; + } - if (bsock->is_timed_out()) { - shutdown(bsock->m_fd, SHUT_RDWR); /* discard any pending I/O */ - } - /* On Windows this discards data if we did not do a close_wait() */ - socketClose(bsock->m_fd); /* normal close */ + if (bsock->is_timed_out()) { + shutdown(bsock->m_fd, SHUT_RDWR); /* discard any pending I/O */ } + /* On Windows this discards data if we did not do a close_wait() */ + socketClose(bsock->m_fd); /* normal close */ } return; } /* * Destroy the socket (i.e. release all resources) - * including and duped sockets. */ -void BSOCK::destroy() +void BSOCK::_destroy() { this->close(); /* Ensure that socket is closed */ @@ -1011,6 +1309,10 @@ void BSOCK::destroy() } else { ASSERT2(1 == 0, "Two calls to destroy socket"); /* double destroy */ } + if (cmsg) { + free_pool_memory(cmsg); + cmsg = NULL; + } if (errmsg) { free_pool_memory(errmsg); errmsg = NULL; @@ -1027,12 +1329,30 @@ void BSOCK::destroy() free(src_addr); src_addr = NULL; } - if (m_next) { - m_next->destroy(); - } free(this); } +/* + * Destroy the socket (i.e. release all resources) + * including duped sockets. + * should not be called from duped BSOCK + */ +void BSOCK::destroy() +{ + ASSERTD(reinterpret_cast(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCK::destroy() already called\n") + ASSERTD(this == m_master, "BSOCK::destroy() called by a non master BSOCK\n") + ASSERTD(!m_duped, "BSOCK::destroy() called by a duped BSOCK\n") + /* I'm the master I must destroy() all the duped BSOCKs */ + master_lock(); + BSOCK *ahead; + for (BSOCK *next = m_next; next != NULL; next = ahead) { + ahead = next->m_next; + next->_destroy(); + } + master_unlock(); + _destroy(); +} + /* Commands sent to Director */ static char hello[] = "Hello %s calling\n"; @@ -1167,7 +1487,7 @@ void BSOCK::control_bwlimit(int bytes) /* What exceed should be converted in sleep time */ int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0)); if (usec_sleep > 100) { - bmicrosleep(0, usec_sleep); /* TODO: Check that bmicrosleep slept enough or sleep again */ + bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */ m_last_tick = get_current_btime(); m_nb_bytes = 0; } else {