]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/lib/bsock.c
Restore win32 dir from Branch-5.2 and update it
[bacula/bacula] / bacula / src / lib / bsock.c
index 2aa9f095ed49cb99ecbfac2dfdce5d7d7265f2c7..42393153c2f09195169b9496dab2964cbd779351 100644 (file)
@@ -1,37 +1,40 @@
 /*
-   Bacula® - The Network Backup Solution
+   Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2007-2014 Free Software Foundation Europe e.V.
+   Copyright (C) 2000-2018 Kern Sibbald
 
-   The main author of Bacula is Kern Sibbald, with contributions from many
-   others, a complete list can be found in the file AUTHORS.
+   The original author of Bacula is Kern Sibbald, with contributions
+   from many others, a complete list can be found in the file AUTHORS.
 
    You may use this file and others of this release according to the
    license defined in the LICENSE file, which includes the Affero General
    Public License, v3.0 ("AGPLv3") and some additional permissions and
    terms pursuant to its AGPLv3 Section 7.
 
-   Bacula® is a registered trademark of Kern Sibbald.
+   This notice must be preserved when any source code is
+   conveyed and/or propagated.
+
+   Bacula(R) is a registered trademark of Kern Sibbald.
 */
 /*
  * Network Utility Routines
  *
  *  Written by Kern Sibbald
- *
  */
 
 #include "bacula.h"
 #include "jcr.h"
+#include "lz4.h"
 #include <netdb.h>
 #include <netinet/tcp.h>
 
-#ifndef ENODATA                    /* not defined on BSD systems */
-#define ENODATA EPIPE
-#endif
-
-#ifndef SOL_TCP
-#define SOL_TCP IPPROTO_TCP
-#endif
+#if !defined(ENODATA)              /* not defined on BSD systems */
+#define ENODATA  EPIPE
+#endif 
+#if !defined(SOL_TCP)              /* Not defined on some systems */
+#define SOL_TCP  IPPROTO_TCP
+#endif 
 
 #ifdef HAVE_WIN32
 #include <mswsock.h>
@@ -39,6 +42,9 @@
 #define socketWrite(fd, buf, len) ::send(fd, buf, len, 0)
 #define socketClose(fd)           ::closesocket(fd)
 static void win_close_wait(int fd);
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
 #else
 #define socketRead(fd, buf, len)  ::read(fd, buf, len)
 #define socketWrite(fd, buf, len) ::write(fd, buf, len)
@@ -46,6 +52,37 @@ static void win_close_wait(int fd);
 #endif
 
 
+/*
+ * make a nice dump of a message
+ */
+void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags, POOLMEM *msg, int32_t msglen)
+{
+   char buf[54];
+   bool is_ascii;
+   int dbglvl = DT_ASX;
+
+   if (msglen<0) {
+      Dmsg4(dbglvl, "%s %d:%d SIGNAL=%s\n", what, sock, msgno, bnet_sig_to_ascii(msglen));
+      // data
+      smartdump(msg, msglen, buf, sizeof(buf)-9, &is_ascii);
+      if (is_ascii) {
+         Dmsg5(dbglvl, "%s %d:%d len=%d \"%s\"\n", what, sock, msgno, msglen, buf);
+      } else {
+         Dmsg5(dbglvl, "%s %d:%d len=%d %s\n", what, sock, msgno, msglen, buf);
+      }
+   }
+}
+
+
+BSOCKCallback::BSOCKCallback()
+{
+}
+
+BSOCKCallback::~BSOCKCallback()
+{
+}
+
+
 /*
  * This is a non-class BSOCK "constructor"  because we want to
  *   call the Bacula smartalloc routines instead of new.
@@ -60,10 +97,14 @@ BSOCK *new_bsock()
 void BSOCK::init()
 {
    memset(this, 0, sizeof(BSOCK));
+   m_master = this;
    set_closed();
    set_terminated();
    m_blocking = 1;
+   pout_msg_no = &out_msg_no;
+   uninstall_send_hook_cb();
    msg = get_pool_memory(PM_BSOCK);
+   cmsg = get_pool_memory(PM_BSOCK);
    errmsg = get_pool_memory(PM_MESSAGE);
    timeout = BSOCK_TIMEOUT;
 }
@@ -132,7 +173,7 @@ bail_out:
  * Finish initialization of the packet structure.
  */
 void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
-                     struct sockaddr *lclient_addr)
+               struct sockaddr *lclient_addr)
 {
    Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
    m_fd = sockfd;
@@ -174,7 +215,7 @@ void BSOCK::set_source_address(dlist *src_addr_list)
  * Returns BSOCK * pointer on success
  */
 bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
-                 int port, utime_t heart_beat, int *fatal)
+               int port, utime_t heart_beat, int *fatal)
 {
    int sockfd = -1;
    dlist *addr_list;
@@ -199,7 +240,6 @@ bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
    }
 
    remove_duplicate_addresses(addr_list);
-
    foreach_dlist(ipaddr, addr_list) {
       ipaddr->set_port_net(htons(port));
       char allbuf[256 * 10];
@@ -208,7 +248,7 @@ bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
                    ipaddr->build_address_str(curbuf, sizeof(curbuf)),
                    build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
       /* Open a TCP socket */
-      if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM, 0)) < 0) {
+      if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) {
          berrno be;
          save_errno = errno;
          switch (errno) {
@@ -219,12 +259,22 @@ bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
              * we don't support. Suppress the error and try the next address.
              */
             break;
+#endif
+#ifdef EPROTONOSUPPORT
+         /* See above comments */
+         case EPROTONOSUPPORT:
+            break;
+#endif
+#ifdef EPROTOTYPE
+         /* See above comments */
+         case EPROTOTYPE:
+            break;
 #endif
          default:
             *fatal = 1;
             Qmsg3(jcr, M_ERROR, 0,  _("Socket open error. proto=%d port=%d. ERR=%s\n"),
                ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
-            Pmsg3(000, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
+            Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
                ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
             break;
          }
@@ -322,9 +372,23 @@ bool BSOCK::set_locking()
    if (m_use_locking) {
       return true;                      /* already set */
    }
-   if ((stat = pthread_mutex_init(&m_mutex, NULL)) != 0) {
+   pm_rmutex = &m_rmutex;
+   pm_wmutex = &m_wmutex;
+   if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) {
+      berrno be;
+      Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock read mutex. ERR=%s\n"),
+         be.bstrerror(stat));
+      return false;
+   }
+   if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) {
       berrno be;
-      Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock mutex. ERR=%s\n"),
+      Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock write mutex. ERR=%s\n"),
+         be.bstrerror(stat));
+      return false;
+   }
+   if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) {
+      berrno be;
+      Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock attribute mutex. ERR=%s\n"),
          be.bstrerror(stat));
       return false;
    }
@@ -334,30 +398,144 @@ bool BSOCK::set_locking()
 
 void BSOCK::clear_locking()
 {
-   if (!m_use_locking) {
+   if (!m_use_locking || m_duped) {
       return;
    }
    m_use_locking = false;
-   pthread_mutex_destroy(&m_mutex);
+   pthread_mutex_destroy(pm_rmutex);
+   pthread_mutex_destroy(pm_wmutex);
+   pthread_mutex_destroy(&m_mmutex);
+   pm_rmutex = NULL;
+   pm_wmutex = NULL;
    return;
 }
 
 /*
- * Send a message over the network. The send consists of
- * two network packets. The first is sends a 32 bit integer containing
- * the length of the data packet which follows.
+ * Do comm line compression (LZ4) of a bsock message.
+ * Returns:  true if the compression was done
+ *           false if no compression was done
+ * The "offset" defines where to start compressing the message.  This
+ *   allows passing "header" information uncompressed and the actual
+ *   data part compressed.
+ *
+ * Note, we don't compress lines less than 20 characters because we
+ *  want to save at least 10 characters otherwise compression doesn't
+ *  help enough to warrant doing the decompression.
+ */
+bool BSOCK::comm_compress()
+{
+   bool compress = false;
+   bool compressed = false;
+   int offset = m_flags & 0xFF;
+
+   /*
+    * Enable compress if allowed and not spooling and the
+    *  message is long enough (>20) to get some reasonable savings.
+    */
+   if (msglen > 20) {
+      compress = can_compress() && !is_spooling();
+   }
+   m_CommBytes += msglen;                    /* uncompressed bytes */
+   Dmsg4(DT_NETWORK|200, "can_compress=%d compress=%d CommBytes=%lld CommCompresedBytes=%lld\n",
+         can_compress(), compress, m_CommBytes, m_CommCompressedBytes);
+   if (compress) {
+      int clen;
+      int need_size;
+
+      ASSERT2(offset <= msglen, "Comm offset bigger than message\n");
+      ASSERT2(offset < 255, "Offset greater than 254\n");
+      need_size = LZ4_compressBound(msglen);
+      if (need_size >= ((int32_t)sizeof_pool_memory(cmsg))) {
+         cmsg = realloc_pool_memory(cmsg, need_size + 100);
+      }
+      msglen -= offset;
+      msg += offset;
+      cmsg += offset;
+      clen = LZ4_compress_default(msg, cmsg, msglen, msglen);
+      //Dmsg2(000, "clen=%d msglen=%d\n", clen, msglen);
+      /* Compression should save at least 10 characters */
+      if (clen > 0 && clen + 10 <= msglen) {
+
+#ifdef xxx_debug
+         /* Debug code -- decompress and compare */
+         int blen, rlen, olen;
+         olen = msglen;
+         POOLMEM *rmsg = get_pool_memory(PM_BSOCK);
+         blen = sizeof_pool_memory(msg) * 2;
+         if (blen >= sizeof_pool_memory(rmsg)) {
+            rmsg = realloc_pool_memory(rmsg, blen);
+         }
+         rlen = LZ4_decompress_safe(cmsg, rmsg, clen, blen);
+         //Dmsg4(000, "blen=%d clen=%d olen=%d rlen=%d\n", blen, clen, olen, rlen);
+         ASSERT(olen == rlen);
+         ASSERT(memcmp(msg, rmsg, olen) == 0);
+         free_pool_memory(rmsg);
+         /* end Debug code */
+#endif
+
+         msg = cmsg;
+         msglen = clen;
+         compressed = true;
+      }
+      msglen += offset;
+      msg -= offset;
+      cmsg -= offset;
+   }
+   m_CommCompressedBytes += msglen;
+   return compressed;
+}
+
+
+/*
+ * Send a message over the network. Everything is sent in one
+ *   write request, but depending on the mode you are using
+ *   there will be either two or three read requests done.
+ * Read 1: 32 bits, gets essentially the packet length, but may
+ *         have the upper bits set to indicate compression or
+ *         an extended header packet.
+ * Read 2: 32 bits, this read is done only of BNET_HDR_EXTEND is set.
+ *         In this case the top 16 bits of this 32 bit word are reserved
+ *         for flags and the lower 16 bits for data. This word will be
+ *         stored in the field "flags" in the BSOCK packet.
+ * Read 2 or 3: depending on if Read 2 is done. This is the data.
+ *
+ * For normal comm line compression, the whole data packet is compressed
+ *   but not the msglen (first read).
+ * To do data compression rather than full comm line compression, prior to
+ *   call send(flags) where the lower 32 bits is the offset to the data to
+ *   be compressed.  The top 32 bits are reserved for flags that can be
+ *   set. The are:
+ *     BNET_IS_CMD   We are sending a command
+ *     BNET_OFFSET   An offset is specified (this implies data compression)
+ *     BNET_NOCOMPRESS Inhibit normal comm line compression
+ *     BNET_DATACOMPRESSED The data using the specified offset was
+ *                   compressed, and normal comm line compression will
+ *                   not be done.
+ *   If any of the above bits are set, then BNET_HDR_EXTEND will be set
+ *   in the top bits of msglen, and the full set of flags + the offset
+ *   will be passed as a 32 bit word just after the msglen, and then
+ *   followed by any data that is either compressed or not.
+ *
+ *   Note, neither comm line nor data compression is not
+ *   guaranteed since it may result in more data, in which case, the
+ *   record is sent uncompressed and there will be no offset.
+ *   On the receive side, if BNET_OFFSET is set, then the data is compressed.
  *
  * Returns: false on failure
  *          true  on success
  */
-bool BSOCK::send()
+bool BSOCK::send(int aflags)
 {
    int32_t rc;
    int32_t pktsiz;
-   int32_t *hdr;
+   int32_t *hdrptr;
+   int offset;
+   int hdrsiz;
    bool ok = true;
    int32_t save_msglen;
    POOLMEM *save_msg;
+   bool compressed;
+   bool locked = false;
 
    if (is_closed()) {
       if (!m_suppress_error_msgs) {
@@ -374,11 +552,12 @@ bool BSOCK::send()
    }
    if (is_terminated()) {
       if (!m_suppress_error_msgs) {
-         Qmsg4(m_jcr, M_ERROR, 0,  _("Socket is terminated=%d on call to %s:%s:%d\n"),
+         Qmsg4(m_jcr, M_ERROR, 0,  _("Bsock send while terminated=%d on call to %s:%s:%d\n"),
              is_terminated(), m_who, m_host, m_port);
       }
       return false;
    }
+
    if (msglen > 4000000) {
       if (!m_suppress_error_msgs) {
          Qmsg4(m_jcr, M_ERROR, 0,
@@ -388,30 +567,84 @@ bool BSOCK::send()
       return false;
    }
 
-   if (m_use_locking) P(m_mutex);
+   if (send_hook_cb) {
+      if (!send_hook_cb->bsock_send_cb()) {
+         Dmsg3(1, "Flowcontrol failure on %s:%s:%d\n", m_who, m_host, m_port);
+         Qmsg3(m_jcr, M_ERROR, 0,
+            _("Flowcontrol failure on %s:%s:%d\n"),
+                  m_who, m_host, m_port);
+         return false;
+      }
+   }
+   if (m_use_locking) {
+      pP(pm_wmutex);
+      locked = true;
+   }
    save_msglen = msglen;
    save_msg = msg;
+   m_flags = aflags;
+
+   offset = aflags & 0xFF;              /* offset is 16 bits */
+   if (offset) {
+      m_flags |= BNET_OFFSET;
+   }
+   if (m_flags & BNET_DATACOMPRESSED) {   /* Check if already compressed */
+      compressed = true;
+   } else if (m_flags & BNET_NOCOMPRESS) {
+      compressed = false;
+   } else {
+      compressed = comm_compress();       /* do requested compression */
+   }
+   if (offset && compressed) {
+      m_flags |= BNET_DATACOMPRESSED;
+   }
+   if (!compressed) {
+      m_flags &= ~BNET_COMPRESSED;
+   }
+
    /* Compute total packet length */
    if (msglen <= 0) {
-      pktsiz = sizeof(pktsiz);               /* signal, no data */
+      hdrsiz = sizeof(pktsiz);
+      pktsiz = hdrsiz;                     /* signal, no data */
+   } else if (m_flags) {
+      hdrsiz = 2 * sizeof(pktsiz);         /* have 64 bit header */
+      pktsiz = msglen + hdrsiz;
    } else {
-      pktsiz = msglen + sizeof(pktsiz);      /* data */
+      hdrsiz = sizeof(pktsiz);             /* have 32 bit header */
+      pktsiz = msglen + hdrsiz;
+   }
+
+   /* Set special bits */
+   if (m_flags & BNET_OFFSET) {            /* if data compression on */
+      compressed = false;                  /*   no comm compression */
+   }
+   if (compressed) {
+      msglen |= BNET_COMPRESSED;           /* comm line compression */
+   }
+
+   if (m_flags) {
+      msglen |= BNET_HDR_EXTEND;           /* extended header */
    }
+
    /*
     * Store packet length at head of message -- note, we
     *  have reserved an int32_t just before msg, so we can
     *  store there
     */
-   hdr = (int32_t *)(msg - (int)sizeof(pktsiz));
-   *hdr = htonl(msglen);              /* store signal/length */
+   hdrptr = (int32_t *)(msg - hdrsiz);
+   *hdrptr = htonl(msglen);             /* store signal/length */
+   if (m_flags) {
+      *(hdrptr+1) = htonl(m_flags);     /* store flags */
+   }
 
-   out_msg_no++;            /* increment message number */
+   (*pout_msg_no)++;        /* increment message number */
 
    /* send data packet */
    timer_start = watchdog_time;  /* start timer */
    clear_timed_out();
    /* Full I/O done in one write */
-   rc = write_nbytes(this, (char *)hdr, pktsiz);
+   rc = write_nbytes(this, (char *)hdrptr, pktsiz);
+   if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, save_msg, save_msglen);
    timer_start = 0;         /* clear timer */
    if (rc != pktsiz) {
       errors++;
@@ -434,9 +667,11 @@ bool BSOCK::send()
       }
       ok = false;
    }
+//   Dmsg4(000, "cmpr=%d ext=%d cmd=%d m_flags=0x%x\n", msglen&BNET_COMPRESSED?1:0,
+//      msglen&BNET_HDR_EXTEND?1:0, msglen&BNET_CMD_BIT?1:0, m_flags);
    msglen = save_msglen;
    msg = save_msg;
-   if (m_use_locking) V(m_mutex);
+   if (locked) pV(pm_wmutex);
    return ok;
 }
 
@@ -450,6 +685,9 @@ bool BSOCK::fsend(const char *fmt, ...)
    va_list arg_ptr;
    int maxlen;
 
+   if (is_null(this)) {
+      return false;                /* do not seg fault */
+   }
    if (errors || is_terminated() || is_closed()) {
       return false;
    }
@@ -463,7 +701,7 @@ bool BSOCK::fsend(const char *fmt, ...)
       va_start(arg_ptr, fmt);
       msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
       va_end(arg_ptr);
-      if (msglen > 0 && msglen < (maxlen - 5)) {
+      if (msglen >= 0 && msglen < (maxlen - 5)) {
          break;
       }
       msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
@@ -479,7 +717,7 @@ bool BSOCK::fsend(const char *fmt, ...)
  * Returns -1 on signal (BNET_SIGNAL)
  * Returns -2 on hard end of file (BNET_HARDEOF)
  * Returns -3 on error  (BNET_ERROR)
- *
+ * Returns -4 on COMMAND (BNET_COMMAND)
  *  Unfortunately, it is a bit complicated because we have these
  *    four return types:
  *    1. Normal data
@@ -492,18 +730,22 @@ int32_t BSOCK::recv()
 {
    int32_t nbytes;
    int32_t pktsiz;
+   int32_t o_pktsiz = 0;
+   bool compressed = false;
+   bool command = false;
    bool locked = false;
 
-   msg[0] = 0;
+   cmsg[0] = msg[0] = 0;
    msglen = 0;
+   m_flags = 0;
    if (errors || is_terminated() || is_closed()) {
       return BNET_HARDEOF;
    }
-
    if (m_use_locking) {
-      P(m_mutex);
+      pP(pm_rmutex);
       locked = true;
    }
+
    read_seqno++;            /* bump sequence number */
    timer_start = watchdog_time;  /* set start wait time */
    clear_timed_out();
@@ -531,6 +773,47 @@ int32_t BSOCK::recv()
    }
 
    pktsiz = ntohl(pktsiz);         /* decode no. of bytes that follow */
+   o_pktsiz = pktsiz;
+   /* If extension, read it */
+   if (pktsiz > 0 && (pktsiz & BNET_HDR_EXTEND)) {
+      timer_start = watchdog_time;  /* set start wait time */
+      clear_timed_out();
+      if ((nbytes = read_nbytes(this, (char *)&m_flags, sizeof(int32_t))) <= 0) {
+         timer_start = 0;      /* clear timer */
+         /* probably pipe broken because client died */
+         if (errno == 0) {
+            b_errno = ENODATA;
+         } else {
+            b_errno = errno;
+         }
+         errors++;
+         nbytes = BNET_HARDEOF;        /* assume hard EOF received */
+         goto get_out;
+      }
+      timer_start = 0;         /* clear timer */
+      if (nbytes != sizeof(int32_t)) {
+         errors++;
+         b_errno = EIO;
+         Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
+               sizeof(int32_t), nbytes, m_who, m_host, m_port);
+         nbytes = BNET_ERROR;
+         goto get_out;
+      }
+      pktsiz &= ~BNET_HDR_EXTEND;
+      m_flags = ntohl(m_flags);
+   }
+
+   if (pktsiz > 0 && (pktsiz & BNET_COMPRESSED)) {
+      compressed = true;
+      pktsiz &= ~BNET_COMPRESSED;
+   }
+
+   if (m_flags & BNET_IS_CMD) {
+       command = true;
+   }
+   if (m_flags & BNET_OFFSET) {
+      compressed = true;
+   }
 
    if (pktsiz == 0) {              /* No data transferred */
       timer_start = 0;             /* clear timer */
@@ -544,7 +827,7 @@ int32_t BSOCK::recv()
    if (pktsiz < 0 || pktsiz > 1000000) {
       if (pktsiz > 0) {            /* if packet too big */
          Qmsg4(m_jcr, M_FATAL, 0,
-               _("Packet size=%d too big from \"%s:%s:%d. Terminating connection.\n"),
+               _("Packet size=%d too big from \"%s:%s:%d\". Maximum permitted 1000000. Terminating connection.\n"),
                pktsiz, m_who, m_host, m_port);
          pktsiz = BNET_TERMINATE;  /* hang up */
       }
@@ -590,6 +873,59 @@ int32_t BSOCK::recv()
       nbytes = BNET_ERROR;
       goto get_out;
    }
+   /* If compressed uncompress it */
+   if (compressed) {
+      int offset = 0;
+      int psize = nbytes * 4;
+      if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) {
+         cmsg = realloc_pool_memory(cmsg, psize);
+      }
+      psize = sizeof_pool_memory(cmsg);
+      if (m_flags & BNET_OFFSET) {
+         offset = m_flags & 0xFF;
+         msg += offset;
+         msglen -= offset;
+      }
+      /* Grow buffer to max approx 4MB */
+      for (int i=0; i < 7; i++) {
+         nbytes = LZ4_decompress_safe(msg, cmsg, msglen, psize);
+         if (nbytes >=  0) {
+            break;
+         }
+         if (psize < 65536) {
+            psize = 65536;
+         } else {
+            psize = psize * 2;
+         }
+         if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) {
+            cmsg = realloc_pool_memory(cmsg, psize + 100);
+         }
+      }
+      if (m_flags & BNET_OFFSET) {
+         msg -= offset;
+         msglen += offset;
+      }
+      if (nbytes < 0) {
+         Jmsg1(m_jcr, M_ERROR, 0, "Decompress error!!!! ERR=%d\n", nbytes);
+         Pmsg3(000, "Decompress error!! pktsiz=%d cmsgsiz=%d nbytes=%d\n", pktsiz,
+           psize, nbytes);
+         b_errno = EIO;
+         errors++;
+          Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
+               pktsiz, nbytes, m_who, m_host, m_port);
+         nbytes = BNET_ERROR;
+         goto get_out;
+      }
+      msglen = nbytes;
+      /* Make sure the buffer is big enough + one byte for EOS */
+      if (msglen >= (int32_t)sizeof_pool_memory(msg)) {
+         msg = realloc_pool_memory(msg, msglen + 100);
+      }
+      /* If this is a data decompress, leave msg compressed */
+      if (!(m_flags & BNET_OFFSET)) {
+         memcpy(msg, cmsg, msglen);
+      }
+   }
 
    /* always add a zero by to properly terminate any
     * string that was send to us. Note, we ensured above that the
@@ -603,7 +939,12 @@ int32_t BSOCK::recv()
    Dsm_check(300);
 
 get_out:
-   if (locked) V(m_mutex);
+   if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "RECV", nbytes, o_pktsiz, m_flags, msg, msglen);
+   if (nbytes != BNET_ERROR && command) {
+      nbytes = BNET_COMMAND;
+   }
+
+   if (locked) pV(pm_rmutex);
    return nbytes;                  /* return actual length of message */
 }
 
@@ -648,8 +989,8 @@ bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize)
          if (nbytes != (size_t)msglen) {
             berrno be;
             Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen);
-            Qmsg3(get_jcr(), M_FATAL, 0, _("fread attr spool error. Wanted=%d got=%d bytes. ERR=%s\n"),
-                  msglen, nbytes, be.bstrerror());
+            Qmsg2(get_jcr(), M_FATAL, 0, _("fread attr spool error. Wanted=%d got=%d bytes.\n"),
+                  msglen, nbytes);
             update_attr_spool_size(tsize - last);
             return false;
          }
@@ -682,7 +1023,11 @@ const char *BSOCK::bstrerror()
    if (errmsg == NULL) {
       errmsg = get_pool_memory(PM_MESSAGE);
    }
-   pm_strcpy(errmsg, be.bstrerror(b_errno));
+   if (b_errno == 0) {
+      pm_strcpy(errmsg, "I/O Error");
+   } else {
+      pm_strcpy(errmsg, be.bstrerror(b_errno));
+   }
    return errmsg;
 }
 
@@ -882,17 +1227,10 @@ void BSOCK::restore_blocking (int flags)
  *            0 if timeout
  *           -1 if error
  */
-int BSOCK::wait_data(int sec, int usec)
+int BSOCK::wait_data(int sec, int msec)
 {
-   fd_set fdset;
-   struct timeval tv;
-
-   FD_ZERO(&fdset);
-   FD_SET((unsigned)m_fd, &fdset);
    for (;;) {
-      tv.tv_sec = sec;
-      tv.tv_usec = usec;
-      switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) {
+      switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
       case 0:                      /* timeout */
          b_errno = 0;
          return 0;
@@ -904,6 +1242,11 @@ int BSOCK::wait_data(int sec, int usec)
          return -1;                /* error return */
       default:
          b_errno = 0;
+#ifdef HAVE_TLS
+         if (this->tls && !tls_bsock_probe(this)) {
+            continue; /* false alarm, maybe a session key negotiation in progress on the socket */
+         }
+#endif
          return 1;
       }
    }
@@ -912,19 +1255,9 @@ int BSOCK::wait_data(int sec, int usec)
 /*
  * As above, but returns on interrupt
  */
-int BSOCK::wait_data_intr(int sec, int usec)
+int BSOCK::wait_data_intr(int sec, int msec)
 {
-   fd_set fdset;
-   struct timeval tv;
-
-   if (this == NULL) {
-      return -1;
-   }
-   FD_ZERO(&fdset);
-   FD_SET((unsigned)m_fd, &fdset);
-   tv.tv_sec = sec;
-   tv.tv_usec = usec;
-   switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) {
+   switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
    case 0:                      /* timeout */
       b_errno = 0;
       return 0;
@@ -933,6 +1266,12 @@ int BSOCK::wait_data_intr(int sec, int usec)
       return -1;                /* error return */
    default:
       b_errno = 0;
+#ifdef HAVE_TLS
+      if (this->tls && !tls_bsock_probe(this)) {
+         /* maybe a session key negotiation waked up the socket */
+         return 0;
+      }
+#endif
       break;
    }
    return 1;
@@ -948,14 +1287,30 @@ int BSOCK::wait_data_intr(int sec, int usec)
 #define SHUT_RDWR 2
 #endif
 
+/*
+ * The JCR is canceled, set terminate for chained BSOCKs starting from master
+ */
+void BSOCK::cancel()
+{
+   master_lock();
+   for (BSOCK *next = m_master; next != NULL; next = next->m_next) {
+      if (!next->m_closed) {
+         next->m_terminated = true;
+         next->m_timed_out = true;
+      }
+   }
+   master_unlock();
+}
+
 /*
  * Note, this routine closes the socket, but leaves the
  *   bsock memory in place.
+ *   every thread is responsible of closing and destroying its own duped or not
+ *   duped BSOCK
  */
 void BSOCK::close()
 {
    BSOCK *bsock = this;
-   BSOCK *next;
 
    if (bsock->is_closed()) {
       return;
@@ -963,39 +1318,35 @@ void BSOCK::close()
    if (!m_duped) {
       clear_locking();
    }
-   for (; bsock; bsock = next) {
-      next = bsock->m_next;           /* get possible pointer to next before destoryed */
-      bsock->set_closed();
-      bsock->set_terminated();
-      if (!bsock->m_duped) {
-         /* Shutdown tls cleanly. */
-         if (bsock->tls) {
-            tls_bsock_shutdown(bsock);
-            free_tls_connection(bsock->tls);
-            bsock->tls = NULL;
-         }
+   bsock->set_closed();
+   bsock->set_terminated();
+   if (!bsock->m_duped) {
+      /* Shutdown tls cleanly. */
+      if (bsock->tls) {
+         tls_bsock_shutdown(bsock);
+         free_tls_connection(bsock->tls);
+         bsock->tls = NULL;
+      }
 
 #ifdef HAVE_WIN32
-         if (!bsock->is_timed_out()) {
-            win_close_wait(bsock->m_fd);  /* Ensure that data is not discarded */
-         }
+      if (!bsock->is_timed_out()) {
+         win_close_wait(bsock->m_fd);  /* Ensure that data is not discarded */
+      }
 #else
-         if (bsock->is_timed_out()) {
-            shutdown(bsock->m_fd, SHUT_RDWR);   /* discard any pending I/O */
-         }
-#endif
-         /* On Windows this discards data if we did not do a close_wait() */
-         socketClose(bsock->m_fd);      /* normal close */
+      if (bsock->is_timed_out()) {
+         shutdown(bsock->m_fd, SHUT_RDWR);   /* discard any pending I/O */
       }
+#endif
+      /* On Windows this discards data if we did not do a close_wait() */
+      socketClose(bsock->m_fd);      /* normal close */
    }
    return;
 }
 
 /*
  * Destroy the socket (i.e. release all resources)
- *  including and duped sockets.
  */
-void BSOCK::destroy()
+void BSOCK::_destroy()
 {
    this->close();                  /* Ensure that socket is closed */
 
@@ -1005,6 +1356,10 @@ void BSOCK::destroy()
    } else {
       ASSERT2(1 == 0, "Two calls to destroy socket");  /* double destroy */
    }
+   if (cmsg) {
+      free_pool_memory(cmsg);
+      cmsg = NULL;
+   }
    if (errmsg) {
       free_pool_memory(errmsg);
       errmsg = NULL;
@@ -1021,12 +1376,30 @@ void BSOCK::destroy()
       free(src_addr);
       src_addr = NULL;
    }
-   if (m_next) {
-      m_next->destroy();
-   }
    free(this);
 }
 
+/*
+ * Destroy the socket (i.e. release all resources)
+ * including duped sockets.
+ * should not be called from duped BSOCK
+ */
+void BSOCK::destroy()
+{
+   ASSERTD(reinterpret_cast<uintptr_t>(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCK::destroy() already called\n")
+   ASSERTD(this == m_master, "BSOCK::destroy() called by a non master BSOCK\n")
+   ASSERTD(!m_duped, "BSOCK::destroy() called by a duped BSOCK\n")
+   /* I'm the master I must destroy() all the duped BSOCKs */
+   master_lock();
+   BSOCK *ahead;
+   for (BSOCK *next = m_next; next != NULL; next = ahead) {
+      ahead = next->m_next;
+      next->_destroy();
+   }
+   master_unlock();
+   _destroy();
+}
+
 /* Commands sent to Director */
 static char hello[]    = "Hello %s calling\n";
 
@@ -1037,7 +1410,7 @@ static char OKhello[]   = "1000 OK:";
  * Authenticate Director
  */
 bool BSOCK::authenticate_director(const char *name, const char *password,
-                                  TLS_CONTEXT *tls_ctx, char *response, int response_len)
+               TLS_CONTEXT *tls_ctx, char *errmsg, int errmsg_len)
 {
    int tls_local_need = BNET_TLS_NONE;
    int tls_remote_need = BNET_TLS_NONE;
@@ -1045,7 +1418,7 @@ bool BSOCK::authenticate_director(const char *name, const char *password,
    char bashed_name[MAX_NAME_LENGTH];
    BSOCK *dir = this;        /* for readability */
 
-   response[0] = 0;
+   *errmsg = 0;
    /*
     * Send my name to the Director then do authentication
     */
@@ -1062,14 +1435,14 @@ bool BSOCK::authenticate_director(const char *name, const char *password,
    if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) ||
        /* Now challenge dir */
        !cram_md5_challenge(dir, password, tls_local_need, compatible)) {
-      bsnprintf(response, response_len, _("Director authorization problem at \"%s:%d\"\n"),
+      bsnprintf(errmsg, errmsg_len, _("Director authorization error at \"%s:%d\"\n"),
          dir->host(), dir->port());
       goto bail_out;
    }
 
    /* Verify that the remote host is willing to meet our TLS requirements */
    if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
-      bsnprintf(response, response_len, _("Authorization problem:"
+      bsnprintf(errmsg, errmsg_len, _("Authorization error:"
              " Remote server at \"%s:%d\" did not advertise required TLS support.\n"),
              dir->host(), dir->port());
       goto bail_out;
@@ -1077,7 +1450,7 @@ bool BSOCK::authenticate_director(const char *name, const char *password,
 
    /* Verify that we are willing to meet the remote host's requirements */
    if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
-      bsnprintf(response, response_len, _("Authorization problem with Director at \"%s:%d\":"
+      bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\":"
                      " Remote server requires TLS.\n"),
                      dir->host(), dir->port());
 
@@ -1089,7 +1462,7 @@ bool BSOCK::authenticate_director(const char *name, const char *password,
       if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) {
          /* Engage TLS! Full Speed Ahead! */
          if (!bnet_tls_client(tls_ctx, dir, NULL)) {
-            bsnprintf(response, response_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
+            bsnprintf(errmsg, errmsg_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
                dir->host(), dir->port());
             goto bail_out;
          }
@@ -1099,8 +1472,8 @@ bool BSOCK::authenticate_director(const char *name, const char *password,
    Dmsg1(6, ">dird: %s", dir->msg);
    if (dir->recv() <= 0) {
       dir->stop_timer();
-      bsnprintf(response, response_len, _("Bad response to Hello command: ERR=%s\n"
-                      "The Director at \"%s:%d\" is probably not running.\n"),
+      bsnprintf(errmsg, errmsg_len, _("Bad errmsg to Hello command: ERR=%s\n"
+                      "The Director at \"%s:%d\" may not be running.\n"),
                     dir->bstrerror(), dir->host(), dir->port());
       return false;
    }
@@ -1108,20 +1481,20 @@ bool BSOCK::authenticate_director(const char *name, const char *password,
    dir->stop_timer();
    Dmsg1(10, "<dird: %s", dir->msg);
    if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) {
-      bsnprintf(response, response_len, _("Director at \"%s:%d\" rejected Hello command\n"),
+      bsnprintf(errmsg, errmsg_len, _("Director at \"%s:%d\" rejected Hello command\n"),
          dir->host(), dir->port());
       return false;
    } else {
-      bsnprintf(response, response_len, "%s", dir->msg);
+      bsnprintf(errmsg, errmsg_len, "%s", dir->msg);
    }
    return true;
 
 bail_out:
    dir->stop_timer();
-   bsnprintf(response, response_len, _("Authorization problem with Director at \"%s:%d\"\n"
+   bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\"\n"
              "Most likely the passwords do not agree.\n"
              "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n"
-             "Please see " MANUAL_AUTH_URL " for help.\n"),
+             "For help, please see: " MANUAL_AUTH_URL "\n"),
              dir->host(), dir->port());
    return false;
 }
@@ -1140,14 +1513,14 @@ void BSOCK::control_bwlimit(int bytes)
 
    m_nb_bytes += bytes;
 
-   /* Less than 0.1ms since the last call, see the next time */
-   if (temp < 100) {
+   if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */
+      m_nb_bytes = bytes;
+      m_last_tick = now;
       return;
    }
 
-   if (temp > 10000000) { /* Take care of clock problems (>10s) */
-      m_nb_bytes = bytes;
-      m_last_tick = now;
+   /* Less than 0.1ms since the last call, see the next time */
+   if (temp < 100) {
       return;
    }
 
@@ -1161,7 +1534,7 @@ void BSOCK::control_bwlimit(int bytes)
    /* What exceed should be converted in sleep time */
    int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0));
    if (usec_sleep > 100) {
-      bmicrosleep(0, usec_sleep); /* TODO: Check that bmicrosleep slept enough or sleep again */
+      bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */
       m_last_tick = get_current_btime();
       m_nb_bytes = 0;
    } else {