]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/bsock.c
Restore win32 dir from Branch-5.2 and update it
[bacula/bacula] / bacula / src / lib / bsock.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2018 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Network Utility Routines
21  *
22  *  Written by Kern Sibbald
23  */
24
25 #include "bacula.h"
26 #include "jcr.h"
27 #include "lz4.h"
28 #include <netdb.h>
29 #include <netinet/tcp.h>
30
31 #if !defined(ENODATA)              /* not defined on BSD systems */
32 #define ENODATA  EPIPE
33 #endif 
34  
35 #if !defined(SOL_TCP)              /* Not defined on some systems */
36 #define SOL_TCP  IPPROTO_TCP
37 #endif 
38
39 #ifdef HAVE_WIN32
40 #include <mswsock.h>
41 #define socketRead(fd, buf, len)  ::recv(fd, buf, len, 0)
42 #define socketWrite(fd, buf, len) ::send(fd, buf, len, 0)
43 #define socketClose(fd)           ::closesocket(fd)
44 static void win_close_wait(int fd);
45 #ifndef SOCK_CLOEXEC
46 #define SOCK_CLOEXEC 0
47 #endif
48 #else
49 #define socketRead(fd, buf, len)  ::read(fd, buf, len)
50 #define socketWrite(fd, buf, len) ::write(fd, buf, len)
51 #define socketClose(fd)           ::close(fd)
52 #endif
53
54
55 /*
56  * make a nice dump of a message
57  */
58 void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags, POOLMEM *msg, int32_t msglen)
59 {
60    char buf[54];
61    bool is_ascii;
62    int dbglvl = DT_ASX;
63
64    if (msglen<0) {
65       Dmsg4(dbglvl, "%s %d:%d SIGNAL=%s\n", what, sock, msgno, bnet_sig_to_ascii(msglen));
66       // data
67       smartdump(msg, msglen, buf, sizeof(buf)-9, &is_ascii);
68       if (is_ascii) {
69          Dmsg5(dbglvl, "%s %d:%d len=%d \"%s\"\n", what, sock, msgno, msglen, buf);
70       } else {
71          Dmsg5(dbglvl, "%s %d:%d len=%d %s\n", what, sock, msgno, msglen, buf);
72       }
73    }
74 }
75
76
77 BSOCKCallback::BSOCKCallback()
78 {
79 }
80
81 BSOCKCallback::~BSOCKCallback()
82 {
83 }
84
85
86 /*
87  * This is a non-class BSOCK "constructor"  because we want to
88  *   call the Bacula smartalloc routines instead of new.
89  */
90 BSOCK *new_bsock()
91 {
92    BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK));
93    bsock->init();
94    return bsock;
95 }
96
97 void BSOCK::init()
98 {
99    memset(this, 0, sizeof(BSOCK));
100    m_master = this;
101    set_closed();
102    set_terminated();
103    m_blocking = 1;
104    pout_msg_no = &out_msg_no;
105    uninstall_send_hook_cb();
106    msg = get_pool_memory(PM_BSOCK);
107    cmsg = get_pool_memory(PM_BSOCK);
108    errmsg = get_pool_memory(PM_MESSAGE);
109    timeout = BSOCK_TIMEOUT;
110 }
111
112 void BSOCK::free_tls()
113 {
114    free_tls_connection(this->tls);
115    this->tls = NULL;
116 }
117
118 /*
119  * Try to connect to host for max_retry_time at retry_time intervals.
120  *   Note, you must have called the constructor prior to calling
121  *   this routine.
122  */
123 bool BSOCK::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
124                     utime_t heart_beat,
125                     const char *name, char *host, char *service, int port,
126                     int verbose)
127 {
128    bool ok = false;
129    int i;
130    int fatal = 0;
131    time_t begin_time = time(NULL);
132    time_t now;
133    btimer_t *tid = NULL;
134
135    /* Try to trap out of OS call when time expires */
136    if (max_retry_time) {
137       tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
138    }
139
140    for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
141         i -= retry_interval) {
142       berrno be;
143       if (fatal || (jcr && job_canceled(jcr))) {
144          goto bail_out;
145       }
146       Dmsg4(50, "Unable to connect to %s on %s:%d. ERR=%s\n",
147             name, host, port, be.bstrerror());
148       if (i < 0) {
149          i = 60 * 5;               /* complain again in 5 minutes */
150          if (verbose)
151             Qmsg4(jcr, M_WARNING, 0, _(
152                "Could not connect to %s on %s:%d. ERR=%s\n"
153                "Retrying ...\n"), name, host, port, be.bstrerror());
154       }
155       bmicrosleep(retry_interval, 0);
156       now = time(NULL);
157       if (begin_time + max_retry_time <= now) {
158          Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
159                name, host, port, be.bstrerror());
160          goto bail_out;
161       }
162    }
163    ok = true;
164
165 bail_out:
166    if (tid) {
167       stop_thread_timer(tid);
168    }
169    return ok;
170 }
171
172 /*
173  * Finish initialization of the packet structure.
174  */
175 void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
176                struct sockaddr *lclient_addr)
177 {
178    Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
179    m_fd = sockfd;
180    if (m_who) {
181       free(m_who);
182    }
183    if (m_host) {
184       free(m_host);
185    }
186    set_who(bstrdup(who));
187    set_host(bstrdup(host));
188    set_port(port);
189    memcpy(&client_addr, lclient_addr, sizeof(client_addr));
190    set_jcr(jcr);
191 }
192
193 /*
194  * Copy the address from the configuration dlist that gets passed in
195  */
196 void BSOCK::set_source_address(dlist *src_addr_list)
197 {
198    IPADDR *addr = NULL;
199
200    // delete the object we already have, if it's allocated
201    if (src_addr) {
202      free( src_addr);
203      src_addr = NULL;
204    }
205
206    if (src_addr_list) {
207      addr = (IPADDR*) src_addr_list->first();
208      src_addr = New( IPADDR(*addr));
209    }
210 }
211
212 /*
213  * Open a TCP connection to the server
214  * Returns NULL
215  * Returns BSOCK * pointer on success
216  */
217 bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
218                int port, utime_t heart_beat, int *fatal)
219 {
220    int sockfd = -1;
221    dlist *addr_list;
222    IPADDR *ipaddr;
223    bool connected = false;
224    int turnon = 1;
225    const char *errstr;
226    int save_errno = 0;
227
228    /*
229     * Fill in the structure serv_addr with the address of
230     * the server that we want to connect with.
231     */
232    if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
233       /* Note errstr is not malloc'ed */
234       Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"),
235             host, errstr);
236       Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
237             host, errstr);
238       *fatal = 1;
239       return false;
240    }
241
242    remove_duplicate_addresses(addr_list);
243    foreach_dlist(ipaddr, addr_list) {
244       ipaddr->set_port_net(htons(port));
245       char allbuf[256 * 10];
246       char curbuf[256];
247       Dmsg2(100, "Current %sAll %s\n",
248                    ipaddr->build_address_str(curbuf, sizeof(curbuf)),
249                    build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
250       /* Open a TCP socket */
251       if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) {
252          berrno be;
253          save_errno = errno;
254          switch (errno) {
255 #ifdef EAFNOSUPPORT
256          case EAFNOSUPPORT:
257             /*
258              * The name lookup of the host returned an address in a protocol family
259              * we don't support. Suppress the error and try the next address.
260              */
261             break;
262 #endif
263 #ifdef EPROTONOSUPPORT
264          /* See above comments */
265          case EPROTONOSUPPORT:
266             break;
267 #endif
268 #ifdef EPROTOTYPE
269          /* See above comments */
270          case EPROTOTYPE:
271             break;
272 #endif
273          default:
274             *fatal = 1;
275             Qmsg3(jcr, M_ERROR, 0,  _("Socket open error. proto=%d port=%d. ERR=%s\n"),
276                ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
277             Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
278                ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
279             break;
280          }
281          continue;
282       }
283
284       /* Bind to the source address if it is set */
285       if (src_addr) {
286          if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
287             berrno be;
288             save_errno = errno;
289             *fatal = 1;
290             Qmsg2(jcr, M_ERROR, 0, _("Source address bind error. proto=%d. ERR=%s\n"),
291                   src_addr->get_family(), be.bstrerror() );
292             Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
293                   src_addr->get_family(), be.bstrerror() );
294             if (sockfd >= 0) socketClose(sockfd);
295             continue;
296          }
297       }
298
299       /*
300        * Keep socket from timing out from inactivity
301        */
302       if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
303          berrno be;
304          Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
305                be.bstrerror());
306       }
307 #if defined(TCP_KEEPIDLE)
308       if (heart_beat) {
309          int opt = heart_beat;
310          if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
311             berrno be;
312             Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"),
313                   be.bstrerror());
314          }
315       }
316 #endif
317
318       /* connect to server */
319       if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
320          save_errno = errno;
321          if (sockfd >= 0) socketClose(sockfd);
322          continue;
323       }
324       *fatal = 0;
325       connected = true;
326       break;
327    }
328
329    if (!connected) {
330       berrno be;
331       free_addresses(addr_list);
332       errno = save_errno | b_errno_win32;
333       Dmsg4(50, "Could not connect to server %s %s:%d. ERR=%s\n",
334             name, host, port, be.bstrerror());
335       return false;
336    }
337    /*
338     * Keep socket from timing out from inactivity
339     *   Do this a second time out of paranoia
340     */
341    if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
342       berrno be;
343       Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
344             be.bstrerror());
345    }
346    fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
347    free_addresses(addr_list);
348
349    /* Clean the packet a bit */
350    m_closed = false;
351    m_duped = false;
352    m_spool = false;
353    m_use_locking = false;
354    m_timed_out = false;
355    m_terminated = false;
356    m_suppress_error_msgs = false;
357    errors = 0;
358    m_blocking = 0;
359
360    Dmsg3(50, "OK connected to server  %s %s:%d.\n",
361          name, host, port);
362
363    return true;
364 }
365
366 /*
367  * Force read/write to use locking
368  */
369 bool BSOCK::set_locking()
370 {
371    int stat;
372    if (m_use_locking) {
373       return true;                      /* already set */
374    }
375    pm_rmutex = &m_rmutex;
376    pm_wmutex = &m_wmutex;
377    if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) {
378       berrno be;
379       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock read mutex. ERR=%s\n"),
380          be.bstrerror(stat));
381       return false;
382    }
383    if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) {
384       berrno be;
385       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock write mutex. ERR=%s\n"),
386          be.bstrerror(stat));
387       return false;
388    }
389    if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) {
390       berrno be;
391       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock attribute mutex. ERR=%s\n"),
392          be.bstrerror(stat));
393       return false;
394    }
395    m_use_locking = true;
396    return true;
397 }
398
399 void BSOCK::clear_locking()
400 {
401    if (!m_use_locking || m_duped) {
402       return;
403    }
404    m_use_locking = false;
405    pthread_mutex_destroy(pm_rmutex);
406    pthread_mutex_destroy(pm_wmutex);
407    pthread_mutex_destroy(&m_mmutex);
408    pm_rmutex = NULL;
409    pm_wmutex = NULL;
410    return;
411 }
412
413 /*
414  * Do comm line compression (LZ4) of a bsock message.
415  * Returns:  true if the compression was done
416  *           false if no compression was done
417  * The "offset" defines where to start compressing the message.  This
418  *   allows passing "header" information uncompressed and the actual
419  *   data part compressed.
420  *
421  * Note, we don't compress lines less than 20 characters because we
422  *  want to save at least 10 characters otherwise compression doesn't
423  *  help enough to warrant doing the decompression.
424  */
425 bool BSOCK::comm_compress()
426 {
427    bool compress = false;
428    bool compressed = false;
429    int offset = m_flags & 0xFF;
430
431    /*
432     * Enable compress if allowed and not spooling and the
433     *  message is long enough (>20) to get some reasonable savings.
434     */
435    if (msglen > 20) {
436       compress = can_compress() && !is_spooling();
437    }
438    m_CommBytes += msglen;                    /* uncompressed bytes */
439    Dmsg4(DT_NETWORK|200, "can_compress=%d compress=%d CommBytes=%lld CommCompresedBytes=%lld\n",
440          can_compress(), compress, m_CommBytes, m_CommCompressedBytes);
441    if (compress) {
442       int clen;
443       int need_size;
444
445       ASSERT2(offset <= msglen, "Comm offset bigger than message\n");
446       ASSERT2(offset < 255, "Offset greater than 254\n");
447       need_size = LZ4_compressBound(msglen);
448       if (need_size >= ((int32_t)sizeof_pool_memory(cmsg))) {
449          cmsg = realloc_pool_memory(cmsg, need_size + 100);
450       }
451       msglen -= offset;
452       msg += offset;
453       cmsg += offset;
454       clen = LZ4_compress_default(msg, cmsg, msglen, msglen);
455       //Dmsg2(000, "clen=%d msglen=%d\n", clen, msglen);
456       /* Compression should save at least 10 characters */
457       if (clen > 0 && clen + 10 <= msglen) {
458
459 #ifdef xxx_debug
460          /* Debug code -- decompress and compare */
461          int blen, rlen, olen;
462          olen = msglen;
463          POOLMEM *rmsg = get_pool_memory(PM_BSOCK);
464          blen = sizeof_pool_memory(msg) * 2;
465          if (blen >= sizeof_pool_memory(rmsg)) {
466             rmsg = realloc_pool_memory(rmsg, blen);
467          }
468          rlen = LZ4_decompress_safe(cmsg, rmsg, clen, blen);
469          //Dmsg4(000, "blen=%d clen=%d olen=%d rlen=%d\n", blen, clen, olen, rlen);
470          ASSERT(olen == rlen);
471          ASSERT(memcmp(msg, rmsg, olen) == 0);
472          free_pool_memory(rmsg);
473          /* end Debug code */
474 #endif
475
476          msg = cmsg;
477          msglen = clen;
478          compressed = true;
479       }
480       msglen += offset;
481       msg -= offset;
482       cmsg -= offset;
483    }
484    m_CommCompressedBytes += msglen;
485    return compressed;
486 }
487
488
489 /*
490  * Send a message over the network. Everything is sent in one
491  *   write request, but depending on the mode you are using
492  *   there will be either two or three read requests done.
493  * Read 1: 32 bits, gets essentially the packet length, but may
494  *         have the upper bits set to indicate compression or
495  *         an extended header packet.
496  * Read 2: 32 bits, this read is done only of BNET_HDR_EXTEND is set.
497  *         In this case the top 16 bits of this 32 bit word are reserved
498  *         for flags and the lower 16 bits for data. This word will be
499  *         stored in the field "flags" in the BSOCK packet.
500  * Read 2 or 3: depending on if Read 2 is done. This is the data.
501  *
502  * For normal comm line compression, the whole data packet is compressed
503  *   but not the msglen (first read).
504  * To do data compression rather than full comm line compression, prior to
505  *   call send(flags) where the lower 32 bits is the offset to the data to
506  *   be compressed.  The top 32 bits are reserved for flags that can be
507  *   set. The are:
508  *     BNET_IS_CMD   We are sending a command
509  *     BNET_OFFSET   An offset is specified (this implies data compression)
510  *     BNET_NOCOMPRESS Inhibit normal comm line compression
511  *     BNET_DATACOMPRESSED The data using the specified offset was
512  *                   compressed, and normal comm line compression will
513  *                   not be done.
514  *   If any of the above bits are set, then BNET_HDR_EXTEND will be set
515  *   in the top bits of msglen, and the full set of flags + the offset
516  *   will be passed as a 32 bit word just after the msglen, and then
517  *   followed by any data that is either compressed or not.
518  *
519  *   Note, neither comm line nor data compression is not
520  *   guaranteed since it may result in more data, in which case, the
521  *   record is sent uncompressed and there will be no offset.
522  *   On the receive side, if BNET_OFFSET is set, then the data is compressed.
523  *
524  * Returns: false on failure
525  *          true  on success
526  */
527 bool BSOCK::send(int aflags)
528 {
529    int32_t rc;
530    int32_t pktsiz;
531    int32_t *hdrptr;
532    int offset;
533    int hdrsiz;
534    bool ok = true;
535    int32_t save_msglen;
536    POOLMEM *save_msg;
537    bool compressed;
538    bool locked = false;
539
540    if (is_closed()) {
541       if (!m_suppress_error_msgs) {
542          Qmsg0(m_jcr, M_ERROR, 0,  _("Socket is closed\n"));
543       }
544       return false;
545    }
546    if (errors) {
547       if (!m_suppress_error_msgs) {
548          Qmsg4(m_jcr, M_ERROR, 0,  _("Socket has errors=%d on call to %s:%s:%d\n"),
549              errors, m_who, m_host, m_port);
550       }
551       return false;
552    }
553    if (is_terminated()) {
554       if (!m_suppress_error_msgs) {
555          Qmsg4(m_jcr, M_ERROR, 0,  _("Bsock send while terminated=%d on call to %s:%s:%d\n"),
556              is_terminated(), m_who, m_host, m_port);
557       }
558       return false;
559    }
560
561    if (msglen > 4000000) {
562       if (!m_suppress_error_msgs) {
563          Qmsg4(m_jcr, M_ERROR, 0,
564             _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
565              msglen, m_who, m_host, m_port);
566       }
567       return false;
568    }
569
570    if (send_hook_cb) {
571       if (!send_hook_cb->bsock_send_cb()) {
572          Dmsg3(1, "Flowcontrol failure on %s:%s:%d\n", m_who, m_host, m_port);
573          Qmsg3(m_jcr, M_ERROR, 0,
574             _("Flowcontrol failure on %s:%s:%d\n"),
575                   m_who, m_host, m_port);
576          return false;
577       }
578    }
579    if (m_use_locking) {
580       pP(pm_wmutex);
581       locked = true;
582    }
583    save_msglen = msglen;
584    save_msg = msg;
585    m_flags = aflags;
586
587    offset = aflags & 0xFF;              /* offset is 16 bits */
588    if (offset) {
589       m_flags |= BNET_OFFSET;
590    }
591    if (m_flags & BNET_DATACOMPRESSED) {   /* Check if already compressed */
592       compressed = true;
593    } else if (m_flags & BNET_NOCOMPRESS) {
594       compressed = false;
595    } else {
596       compressed = comm_compress();       /* do requested compression */
597    }
598    if (offset && compressed) {
599       m_flags |= BNET_DATACOMPRESSED;
600    }
601    if (!compressed) {
602       m_flags &= ~BNET_COMPRESSED;
603    }
604
605    /* Compute total packet length */
606    if (msglen <= 0) {
607       hdrsiz = sizeof(pktsiz);
608       pktsiz = hdrsiz;                     /* signal, no data */
609    } else if (m_flags) {
610       hdrsiz = 2 * sizeof(pktsiz);         /* have 64 bit header */
611       pktsiz = msglen + hdrsiz;
612    } else {
613       hdrsiz = sizeof(pktsiz);             /* have 32 bit header */
614       pktsiz = msglen + hdrsiz;
615    }
616
617    /* Set special bits */
618    if (m_flags & BNET_OFFSET) {            /* if data compression on */
619       compressed = false;                  /*   no comm compression */
620    }
621    if (compressed) {
622       msglen |= BNET_COMPRESSED;           /* comm line compression */
623    }
624
625    if (m_flags) {
626       msglen |= BNET_HDR_EXTEND;           /* extended header */
627    }
628
629    /*
630     * Store packet length at head of message -- note, we
631     *  have reserved an int32_t just before msg, so we can
632     *  store there
633     */
634    hdrptr = (int32_t *)(msg - hdrsiz);
635    *hdrptr = htonl(msglen);             /* store signal/length */
636    if (m_flags) {
637       *(hdrptr+1) = htonl(m_flags);     /* store flags */
638    }
639
640    (*pout_msg_no)++;        /* increment message number */
641
642    /* send data packet */
643    timer_start = watchdog_time;  /* start timer */
644    clear_timed_out();
645    /* Full I/O done in one write */
646    rc = write_nbytes(this, (char *)hdrptr, pktsiz);
647    if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, save_msg, save_msglen);
648    timer_start = 0;         /* clear timer */
649    if (rc != pktsiz) {
650       errors++;
651       if (errno == 0) {
652          b_errno = EIO;
653       } else {
654          b_errno = errno;
655       }
656       if (rc < 0) {
657          if (!m_suppress_error_msgs) {
658             Qmsg5(m_jcr, M_ERROR, 0,
659                   _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"),
660                   pktsiz, m_who,
661                   m_host, m_port, this->bstrerror());
662          }
663       } else {
664          Qmsg5(m_jcr, M_ERROR, 0,
665                _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"),
666                pktsiz, m_who, m_host, m_port, rc);
667       }
668       ok = false;
669    }
670 //   Dmsg4(000, "cmpr=%d ext=%d cmd=%d m_flags=0x%x\n", msglen&BNET_COMPRESSED?1:0,
671 //      msglen&BNET_HDR_EXTEND?1:0, msglen&BNET_CMD_BIT?1:0, m_flags);
672    msglen = save_msglen;
673    msg = save_msg;
674    if (locked) pV(pm_wmutex);
675    return ok;
676 }
677
678 /*
679  * Format and send a message
680  *  Returns: false on error
681  *           true  on success
682  */
683 bool BSOCK::fsend(const char *fmt, ...)
684 {
685    va_list arg_ptr;
686    int maxlen;
687
688    if (is_null(this)) {
689       return false;                /* do not seg fault */
690    }
691    if (errors || is_terminated() || is_closed()) {
692       return false;
693    }
694    /* This probably won't work, but we vsnprintf, then if we
695     * get a negative length or a length greater than our buffer
696     * (depending on which library is used), the printf was truncated, so
697     * get a bigger buffer and try again.
698     */
699    for (;;) {
700       maxlen = sizeof_pool_memory(msg) - 1;
701       va_start(arg_ptr, fmt);
702       msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
703       va_end(arg_ptr);
704       if (msglen >= 0 && msglen < (maxlen - 5)) {
705          break;
706       }
707       msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
708    }
709    return send();
710 }
711
712 /*
713  * Receive a message from the other end. Each message consists of
714  * two packets. The first is a header that contains the size
715  * of the data that follows in the second packet.
716  * Returns number of bytes read (may return zero)
717  * Returns -1 on signal (BNET_SIGNAL)
718  * Returns -2 on hard end of file (BNET_HARDEOF)
719  * Returns -3 on error  (BNET_ERROR)
720  * Returns -4 on COMMAND (BNET_COMMAND)
721  *  Unfortunately, it is a bit complicated because we have these
722  *    four return types:
723  *    1. Normal data
724  *    2. Signal including end of data stream
725  *    3. Hard end of file
726  *    4. Error
727  *  Using bsock->is_stop() and bsock->is_error() you can figure this all out.
728  */
729 int32_t BSOCK::recv()
730 {
731    int32_t nbytes;
732    int32_t pktsiz;
733    int32_t o_pktsiz = 0;
734    bool compressed = false;
735    bool command = false;
736    bool locked = false;
737
738    cmsg[0] = msg[0] = 0;
739    msglen = 0;
740    m_flags = 0;
741    if (errors || is_terminated() || is_closed()) {
742       return BNET_HARDEOF;
743    }
744    if (m_use_locking) {
745       pP(pm_rmutex);
746       locked = true;
747    }
748
749    read_seqno++;            /* bump sequence number */
750    timer_start = watchdog_time;  /* set start wait time */
751    clear_timed_out();
752    /* get data size -- in int32_t */
753    if ((nbytes = read_nbytes(this, (char *)&pktsiz, sizeof(int32_t))) <= 0) {
754       timer_start = 0;      /* clear timer */
755       /* probably pipe broken because client died */
756       if (errno == 0) {
757          b_errno = ENODATA;
758       } else {
759          b_errno = errno;
760       }
761       errors++;
762       nbytes = BNET_HARDEOF;        /* assume hard EOF received */
763       goto get_out;
764    }
765    timer_start = 0;         /* clear timer */
766    if (nbytes != sizeof(int32_t)) {
767       errors++;
768       b_errno = EIO;
769       Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
770             sizeof(int32_t), nbytes, m_who, m_host, m_port);
771       nbytes = BNET_ERROR;
772       goto get_out;
773    }
774
775    pktsiz = ntohl(pktsiz);         /* decode no. of bytes that follow */
776    o_pktsiz = pktsiz;
777    /* If extension, read it */
778    if (pktsiz > 0 && (pktsiz & BNET_HDR_EXTEND)) {
779       timer_start = watchdog_time;  /* set start wait time */
780       clear_timed_out();
781       if ((nbytes = read_nbytes(this, (char *)&m_flags, sizeof(int32_t))) <= 0) {
782          timer_start = 0;      /* clear timer */
783          /* probably pipe broken because client died */
784          if (errno == 0) {
785             b_errno = ENODATA;
786          } else {
787             b_errno = errno;
788          }
789          errors++;
790          nbytes = BNET_HARDEOF;        /* assume hard EOF received */
791          goto get_out;
792       }
793       timer_start = 0;         /* clear timer */
794       if (nbytes != sizeof(int32_t)) {
795          errors++;
796          b_errno = EIO;
797          Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
798                sizeof(int32_t), nbytes, m_who, m_host, m_port);
799          nbytes = BNET_ERROR;
800          goto get_out;
801       }
802       pktsiz &= ~BNET_HDR_EXTEND;
803       m_flags = ntohl(m_flags);
804    }
805
806    if (pktsiz > 0 && (pktsiz & BNET_COMPRESSED)) {
807       compressed = true;
808       pktsiz &= ~BNET_COMPRESSED;
809    }
810
811    if (m_flags & BNET_IS_CMD) {
812        command = true;
813    }
814    if (m_flags & BNET_OFFSET) {
815       compressed = true;
816    }
817
818    if (pktsiz == 0) {              /* No data transferred */
819       timer_start = 0;             /* clear timer */
820       in_msg_no++;
821       msglen = 0;
822       nbytes = 0;                  /* zero bytes read */
823       goto get_out;
824    }
825
826    /* If signal or packet size too big */
827    if (pktsiz < 0 || pktsiz > 1000000) {
828       if (pktsiz > 0) {            /* if packet too big */
829          Qmsg4(m_jcr, M_FATAL, 0,
830                _("Packet size=%d too big from \"%s:%s:%d\". Maximum permitted 1000000. Terminating connection.\n"),
831                pktsiz, m_who, m_host, m_port);
832          pktsiz = BNET_TERMINATE;  /* hang up */
833       }
834       if (pktsiz == BNET_TERMINATE) {
835          set_terminated();
836       }
837       timer_start = 0;                /* clear timer */
838       b_errno = ENODATA;
839       msglen = pktsiz;                /* signal code */
840       nbytes =  BNET_SIGNAL;          /* signal */
841       goto get_out;
842    }
843
844    /* Make sure the buffer is big enough + one byte for EOS */
845    if (pktsiz >= (int32_t) sizeof_pool_memory(msg)) {
846       msg = realloc_pool_memory(msg, pktsiz + 100);
847    }
848
849    timer_start = watchdog_time;  /* set start wait time */
850    clear_timed_out();
851    /* now read the actual data */
852    if ((nbytes = read_nbytes(this, msg, pktsiz)) <= 0) {
853       timer_start = 0;      /* clear timer */
854       if (errno == 0) {
855          b_errno = ENODATA;
856       } else {
857          b_errno = errno;
858       }
859       errors++;
860       Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"),
861             m_who, m_host, m_port, this->bstrerror());
862       nbytes = BNET_ERROR;
863       goto get_out;
864    }
865    timer_start = 0;         /* clear timer */
866    in_msg_no++;
867    msglen = nbytes;
868    if (nbytes != pktsiz) {
869       b_errno = EIO;
870       errors++;
871       Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
872             pktsiz, nbytes, m_who, m_host, m_port);
873       nbytes = BNET_ERROR;
874       goto get_out;
875    }
876    /* If compressed uncompress it */
877    if (compressed) {
878       int offset = 0;
879       int psize = nbytes * 4;
880       if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) {
881          cmsg = realloc_pool_memory(cmsg, psize);
882       }
883       psize = sizeof_pool_memory(cmsg);
884       if (m_flags & BNET_OFFSET) {
885          offset = m_flags & 0xFF;
886          msg += offset;
887          msglen -= offset;
888       }
889       /* Grow buffer to max approx 4MB */
890       for (int i=0; i < 7; i++) {
891          nbytes = LZ4_decompress_safe(msg, cmsg, msglen, psize);
892          if (nbytes >=  0) {
893             break;
894          }
895          if (psize < 65536) {
896             psize = 65536;
897          } else {
898             psize = psize * 2;
899          }
900          if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) {
901             cmsg = realloc_pool_memory(cmsg, psize + 100);
902          }
903       }
904       if (m_flags & BNET_OFFSET) {
905          msg -= offset;
906          msglen += offset;
907       }
908       if (nbytes < 0) {
909          Jmsg1(m_jcr, M_ERROR, 0, "Decompress error!!!! ERR=%d\n", nbytes);
910          Pmsg3(000, "Decompress error!! pktsiz=%d cmsgsiz=%d nbytes=%d\n", pktsiz,
911            psize, nbytes);
912          b_errno = EIO;
913          errors++;
914           Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
915                pktsiz, nbytes, m_who, m_host, m_port);
916          nbytes = BNET_ERROR;
917          goto get_out;
918       }
919       msglen = nbytes;
920       /* Make sure the buffer is big enough + one byte for EOS */
921       if (msglen >= (int32_t)sizeof_pool_memory(msg)) {
922          msg = realloc_pool_memory(msg, msglen + 100);
923       }
924       /* If this is a data decompress, leave msg compressed */
925       if (!(m_flags & BNET_OFFSET)) {
926          memcpy(msg, cmsg, msglen);
927       }
928    }
929
930    /* always add a zero by to properly terminate any
931     * string that was send to us. Note, we ensured above that the
932     * buffer is at least one byte longer than the message length.
933     */
934    msg[nbytes] = 0; /* terminate in case it is a string */
935    /*
936     * The following uses *lots* of resources so turn it on only for
937     * serious debugging.
938     */
939    Dsm_check(300);
940
941 get_out:
942    if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "RECV", nbytes, o_pktsiz, m_flags, msg, msglen);
943    if (nbytes != BNET_ERROR && command) {
944       nbytes = BNET_COMMAND;
945    }
946
947    if (locked) pV(pm_rmutex);
948    return nbytes;                  /* return actual length of message */
949 }
950
951 /*
952  * Send a signal
953  */
954 bool BSOCK::signal(int signal)
955 {
956    msglen = signal;
957    if (signal == BNET_TERMINATE) {
958       m_suppress_error_msgs = true;
959    }
960    return send();
961 }
962
963 /*
964  * Despool spooled attributes
965  */
966 bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize)
967 {
968    int32_t pktsiz;
969    size_t nbytes;
970    ssize_t last = 0, size = 0;
971    int count = 0;
972    JCR *jcr = get_jcr();
973
974    rewind(m_spool_fd);
975
976 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
977    posix_fadvise(fileno(m_spool_fd), 0, 0, POSIX_FADV_WILLNEED);
978 #endif
979
980    while (fread((char *)&pktsiz, 1, sizeof(int32_t), m_spool_fd) ==
981           sizeof(int32_t)) {
982       size += sizeof(int32_t);
983       msglen = ntohl(pktsiz);
984       if (msglen > 0) {
985          if (msglen > (int32_t)sizeof_pool_memory(msg)) {
986             msg = realloc_pool_memory(msg, msglen + 1);
987          }
988          nbytes = fread(msg, 1, msglen, m_spool_fd);
989          if (nbytes != (size_t)msglen) {
990             berrno be;
991             Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen);
992             Qmsg2(get_jcr(), M_FATAL, 0, _("fread attr spool error. Wanted=%d got=%d bytes.\n"),
993                   msglen, nbytes);
994             update_attr_spool_size(tsize - last);
995             return false;
996          }
997          size += nbytes;
998          if ((++count & 0x3F) == 0) {
999             update_attr_spool_size(size - last);
1000             last = size;
1001          }
1002       }
1003       send();
1004       if (jcr && job_canceled(jcr)) {
1005          return false;
1006       }
1007    }
1008    update_attr_spool_size(tsize - last);
1009    if (ferror(m_spool_fd)) {
1010       Qmsg(jcr, M_FATAL, 0, _("fread attr spool I/O error.\n"));
1011       return false;
1012    }
1013    return true;
1014 }
1015
1016 /*
1017  * Return the string for the error that occurred
1018  * on the socket. Only the first error is retained.
1019  */
1020 const char *BSOCK::bstrerror()
1021 {
1022    berrno be;
1023    if (errmsg == NULL) {
1024       errmsg = get_pool_memory(PM_MESSAGE);
1025    }
1026    if (b_errno == 0) {
1027       pm_strcpy(errmsg, "I/O Error");
1028    } else {
1029       pm_strcpy(errmsg, be.bstrerror(b_errno));
1030    }
1031    return errmsg;
1032 }
1033
1034 int BSOCK::get_peer(char *buf, socklen_t buflen)
1035 {
1036 #if !defined(HAVE_WIN32)
1037     if (peer_addr.sin_family == 0) {
1038         socklen_t salen = sizeof(peer_addr);
1039         int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
1040         if (rval < 0) return rval;
1041     }
1042     if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
1043         return -1;
1044
1045     return 0;
1046 #else
1047     return -1;
1048 #endif
1049 }
1050
1051 /*
1052  * Set the network buffer size, suggested size is in size.
1053  *  Actual size obtained is returned in bs->msglen
1054  *
1055  *  Returns: false on failure
1056  *           true  on success
1057  */
1058 bool BSOCK::set_buffer_size(uint32_t size, int rw)
1059 {
1060    uint32_t dbuf_size, start_size;
1061
1062 #if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
1063    int opt;
1064    opt = IPTOS_THROUGHPUT;
1065    setsockopt(m_fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
1066 #endif
1067
1068    if (size != 0) {
1069       dbuf_size = size;
1070    } else {
1071       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
1072    }
1073    start_size = dbuf_size;
1074    if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) {
1075       Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCK data buffer\n"));
1076       return false;
1077    }
1078
1079    /*
1080     * If user has not set the size, use the OS default -- i.e. do not
1081     *   try to set it.  This allows sys admins to set the size they
1082     *   want in the OS, and Bacula will comply. See bug #1493
1083     */
1084    if (size == 0) {
1085       msglen = dbuf_size;
1086       return true;
1087    }
1088
1089    if (rw & BNET_SETBUF_READ) {
1090       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
1091               SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
1092          berrno be;
1093          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
1094          dbuf_size -= TAPE_BSIZE;
1095       }
1096       Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
1097       if (dbuf_size != start_size) {
1098          Qmsg1(get_jcr(), M_WARNING, 0,
1099                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
1100       }
1101    }
1102    if (size != 0) {
1103       dbuf_size = size;
1104    } else {
1105       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
1106    }
1107    start_size = dbuf_size;
1108    if (rw & BNET_SETBUF_WRITE) {
1109       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
1110               SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
1111          berrno be;
1112          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
1113          dbuf_size -= TAPE_BSIZE;
1114       }
1115       Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
1116       if (dbuf_size != start_size) {
1117          Qmsg1(get_jcr(), M_WARNING, 0,
1118                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
1119       }
1120    }
1121
1122    msglen = dbuf_size;
1123    return true;
1124 }
1125
1126 /*
1127  * Set socket non-blocking
1128  * Returns previous socket flag
1129  */
1130 int BSOCK::set_nonblocking()
1131 {
1132 #ifndef HAVE_WIN32
1133    int oflags;
1134
1135    /* Get current flags */
1136    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
1137       berrno be;
1138       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
1139    }
1140
1141    /* Set O_NONBLOCK flag */
1142    if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
1143       berrno be;
1144       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1145    }
1146
1147    m_blocking = 0;
1148    return oflags;
1149 #else
1150    int flags;
1151    u_long ioctlArg = 1;
1152
1153    flags = m_blocking;
1154    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
1155    m_blocking = 0;
1156
1157    return flags;
1158 #endif
1159 }
1160
1161 /*
1162  * Set socket blocking
1163  * Returns previous socket flags
1164  */
1165 int BSOCK::set_blocking()
1166 {
1167 #ifndef HAVE_WIN32
1168    int oflags;
1169    /* Get current flags */
1170    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
1171       berrno be;
1172       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
1173    }
1174
1175    /* Set O_NONBLOCK flag */
1176    if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
1177       berrno be;
1178       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1179    }
1180
1181    m_blocking = 1;
1182    return oflags;
1183 #else
1184    int flags;
1185    u_long ioctlArg = 0;
1186
1187    flags = m_blocking;
1188    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
1189    m_blocking = 1;
1190
1191    return flags;
1192 #endif
1193 }
1194
1195 void BSOCK::set_killable(bool killable)
1196 {
1197    if (m_jcr) {
1198       m_jcr->set_killable(killable);
1199    }
1200 }
1201
1202 /*
1203  * Restores socket flags
1204  */
1205 void BSOCK::restore_blocking (int flags)
1206 {
1207 #ifndef HAVE_WIN32
1208    if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
1209       berrno be;
1210       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1211    }
1212
1213    m_blocking = (flags & O_NONBLOCK) ? true : false;
1214 #else
1215    u_long ioctlArg = flags;
1216
1217    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
1218    m_blocking = 1;
1219 #endif
1220 }
1221
1222 /*
1223  * Wait for a specified time for data to appear on
1224  * the BSOCK connection.
1225  *
1226  *   Returns: 1 if data available
1227  *            0 if timeout
1228  *           -1 if error
1229  */
1230 int BSOCK::wait_data(int sec, int msec)
1231 {
1232    for (;;) {
1233       switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
1234       case 0:                      /* timeout */
1235          b_errno = 0;
1236          return 0;
1237       case -1:
1238          b_errno = errno;
1239          if (errno == EINTR) {
1240             continue;
1241          }
1242          return -1;                /* error return */
1243       default:
1244          b_errno = 0;
1245 #ifdef HAVE_TLS
1246          if (this->tls && !tls_bsock_probe(this)) {
1247             continue; /* false alarm, maybe a session key negotiation in progress on the socket */
1248          }
1249 #endif
1250          return 1;
1251       }
1252    }
1253 }
1254
1255 /*
1256  * As above, but returns on interrupt
1257  */
1258 int BSOCK::wait_data_intr(int sec, int msec)
1259 {
1260    switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
1261    case 0:                      /* timeout */
1262       b_errno = 0;
1263       return 0;
1264    case -1:
1265       b_errno = errno;
1266       return -1;                /* error return */
1267    default:
1268       b_errno = 0;
1269 #ifdef HAVE_TLS
1270       if (this->tls && !tls_bsock_probe(this)) {
1271          /* maybe a session key negotiation waked up the socket */
1272          return 0;
1273       }
1274 #endif
1275       break;
1276    }
1277    return 1;
1278 }
1279
1280 /*
1281  *  This routine closes the current BSOCK.
1282  *   It does not delete the socket packet
1283  *   resources, which are released int
1284  *   bsock->destroy().
1285  */
1286 #ifndef SHUT_RDWR
1287 #define SHUT_RDWR 2
1288 #endif
1289
1290 /*
1291  * The JCR is canceled, set terminate for chained BSOCKs starting from master
1292  */
1293 void BSOCK::cancel()
1294 {
1295    master_lock();
1296    for (BSOCK *next = m_master; next != NULL; next = next->m_next) {
1297       if (!next->m_closed) {
1298          next->m_terminated = true;
1299          next->m_timed_out = true;
1300       }
1301    }
1302    master_unlock();
1303 }
1304
1305 /*
1306  * Note, this routine closes the socket, but leaves the
1307  *   bsock memory in place.
1308  *   every thread is responsible of closing and destroying its own duped or not
1309  *   duped BSOCK
1310  */
1311 void BSOCK::close()
1312 {
1313    BSOCK *bsock = this;
1314
1315    if (bsock->is_closed()) {
1316       return;
1317    }
1318    if (!m_duped) {
1319       clear_locking();
1320    }
1321    bsock->set_closed();
1322    bsock->set_terminated();
1323    if (!bsock->m_duped) {
1324       /* Shutdown tls cleanly. */
1325       if (bsock->tls) {
1326          tls_bsock_shutdown(bsock);
1327          free_tls_connection(bsock->tls);
1328          bsock->tls = NULL;
1329       }
1330
1331 #ifdef HAVE_WIN32
1332       if (!bsock->is_timed_out()) {
1333          win_close_wait(bsock->m_fd);  /* Ensure that data is not discarded */
1334       }
1335 #else
1336       if (bsock->is_timed_out()) {
1337          shutdown(bsock->m_fd, SHUT_RDWR);   /* discard any pending I/O */
1338       }
1339 #endif
1340       /* On Windows this discards data if we did not do a close_wait() */
1341       socketClose(bsock->m_fd);      /* normal close */
1342    }
1343    return;
1344 }
1345
1346 /*
1347  * Destroy the socket (i.e. release all resources)
1348  */
1349 void BSOCK::_destroy()
1350 {
1351    this->close();                  /* Ensure that socket is closed */
1352
1353    if (msg) {
1354       free_pool_memory(msg);
1355       msg = NULL;
1356    } else {
1357       ASSERT2(1 == 0, "Two calls to destroy socket");  /* double destroy */
1358    }
1359    if (cmsg) {
1360       free_pool_memory(cmsg);
1361       cmsg = NULL;
1362    }
1363    if (errmsg) {
1364       free_pool_memory(errmsg);
1365       errmsg = NULL;
1366    }
1367    if (m_who) {
1368       free(m_who);
1369       m_who = NULL;
1370    }
1371    if (m_host) {
1372       free(m_host);
1373       m_host = NULL;
1374    }
1375    if (src_addr) {
1376       free(src_addr);
1377       src_addr = NULL;
1378    }
1379    free(this);
1380 }
1381
1382 /*
1383  * Destroy the socket (i.e. release all resources)
1384  * including duped sockets.
1385  * should not be called from duped BSOCK
1386  */
1387 void BSOCK::destroy()
1388 {
1389    ASSERTD(reinterpret_cast<uintptr_t>(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCK::destroy() already called\n")
1390    ASSERTD(this == m_master, "BSOCK::destroy() called by a non master BSOCK\n")
1391    ASSERTD(!m_duped, "BSOCK::destroy() called by a duped BSOCK\n")
1392    /* I'm the master I must destroy() all the duped BSOCKs */
1393    master_lock();
1394    BSOCK *ahead;
1395    for (BSOCK *next = m_next; next != NULL; next = ahead) {
1396       ahead = next->m_next;
1397       next->_destroy();
1398    }
1399    master_unlock();
1400    _destroy();
1401 }
1402
1403 /* Commands sent to Director */
1404 static char hello[]    = "Hello %s calling\n";
1405
1406 /* Response from Director */
1407 static char OKhello[]   = "1000 OK:";
1408
1409 /*
1410  * Authenticate Director
1411  */
1412 bool BSOCK::authenticate_director(const char *name, const char *password,
1413                TLS_CONTEXT *tls_ctx, char *errmsg, int errmsg_len)
1414 {
1415    int tls_local_need = BNET_TLS_NONE;
1416    int tls_remote_need = BNET_TLS_NONE;
1417    int compatible = true;
1418    char bashed_name[MAX_NAME_LENGTH];
1419    BSOCK *dir = this;        /* for readability */
1420
1421    *errmsg = 0;
1422    /*
1423     * Send my name to the Director then do authentication
1424     */
1425
1426    /* Timeout Hello after 15 secs */
1427    dir->start_timer(15);
1428    dir->fsend(hello, bashed_name);
1429
1430    if (get_tls_enable(tls_ctx)) {
1431       tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK;
1432    }
1433
1434    /* respond to Dir challenge */
1435    if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) ||
1436        /* Now challenge dir */
1437        !cram_md5_challenge(dir, password, tls_local_need, compatible)) {
1438       bsnprintf(errmsg, errmsg_len, _("Director authorization error at \"%s:%d\"\n"),
1439          dir->host(), dir->port());
1440       goto bail_out;
1441    }
1442
1443    /* Verify that the remote host is willing to meet our TLS requirements */
1444    if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1445       bsnprintf(errmsg, errmsg_len, _("Authorization error:"
1446              " Remote server at \"%s:%d\" did not advertise required TLS support.\n"),
1447              dir->host(), dir->port());
1448       goto bail_out;
1449    }
1450
1451    /* Verify that we are willing to meet the remote host's requirements */
1452    if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1453       bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\":"
1454                      " Remote server requires TLS.\n"),
1455                      dir->host(), dir->port());
1456
1457       goto bail_out;
1458    }
1459
1460    /* Is TLS Enabled? */
1461    if (have_tls) {
1462       if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) {
1463          /* Engage TLS! Full Speed Ahead! */
1464          if (!bnet_tls_client(tls_ctx, dir, NULL)) {
1465             bsnprintf(errmsg, errmsg_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
1466                dir->host(), dir->port());
1467             goto bail_out;
1468          }
1469       }
1470    }
1471
1472    Dmsg1(6, ">dird: %s", dir->msg);
1473    if (dir->recv() <= 0) {
1474       dir->stop_timer();
1475       bsnprintf(errmsg, errmsg_len, _("Bad errmsg to Hello command: ERR=%s\n"
1476                       "The Director at \"%s:%d\" may not be running.\n"),
1477                     dir->bstrerror(), dir->host(), dir->port());
1478       return false;
1479    }
1480
1481    dir->stop_timer();
1482    Dmsg1(10, "<dird: %s", dir->msg);
1483    if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) {
1484       bsnprintf(errmsg, errmsg_len, _("Director at \"%s:%d\" rejected Hello command\n"),
1485          dir->host(), dir->port());
1486       return false;
1487    } else {
1488       bsnprintf(errmsg, errmsg_len, "%s", dir->msg);
1489    }
1490    return true;
1491
1492 bail_out:
1493    dir->stop_timer();
1494    bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\"\n"
1495              "Most likely the passwords do not agree.\n"
1496              "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n"
1497              "For help, please see: " MANUAL_AUTH_URL "\n"),
1498              dir->host(), dir->port());
1499    return false;
1500 }
1501
1502 /* Try to limit the bandwidth of a network connection
1503  */
1504 void BSOCK::control_bwlimit(int bytes)
1505 {
1506    btime_t now, temp;
1507    if (bytes == 0) {
1508       return;
1509    }
1510
1511    now = get_current_btime();          /* microseconds */
1512    temp = now - m_last_tick;           /* microseconds */
1513
1514    m_nb_bytes += bytes;
1515
1516    if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */
1517       m_nb_bytes = bytes;
1518       m_last_tick = now;
1519       return;
1520    }
1521
1522    /* Less than 0.1ms since the last call, see the next time */
1523    if (temp < 100) {
1524       return;
1525    }
1526
1527    /* Remove what was authorised to be written in temp us */
1528    m_nb_bytes -= (int64_t)(temp * ((double)m_bwlimit / 1000000.0));
1529
1530    if (m_nb_bytes < 0) {
1531       m_nb_bytes = 0;
1532    }
1533
1534    /* What exceed should be converted in sleep time */
1535    int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0));
1536    if (usec_sleep > 100) {
1537       bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */
1538       m_last_tick = get_current_btime();
1539       m_nb_bytes = 0;
1540    } else {
1541       m_last_tick = now;
1542    }
1543 }
1544
1545 #ifdef HAVE_WIN32
1546 /*
1547  * closesocket is supposed to do a graceful disconnect under Window
1548  *   but it doesn't. Comments on http://msdn.microsoft.com/en-us/li
1549  *   confirm this behaviour. DisconnectEx is required instead, but
1550  *   that function needs to be retrieved via WS IOCTL
1551  */
1552 static void
1553 win_close_wait(int fd)
1554 {
1555    int ret;
1556    GUID disconnectex_guid = WSAID_DISCONNECTEX;
1557    DWORD bytes_returned;
1558    LPFN_DISCONNECTEX DisconnectEx;
1559    ret = WSAIoctl(fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &disconnectex_guid, sizeof(disconnectex_guid), &DisconnectEx, sizeof(DisconnectEx), &bytes_returned, NULL, NULL);
1560    Dmsg1(100, "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER, WSAID_DISCONNECTEX) ret = %d\n", ret);
1561    if (!ret) {
1562       DisconnectEx(fd, NULL, 0, 0);
1563    }
1564 }
1565 #endif