]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/bsock.c
d2a521b0cd346f55c1e13c6e8b0d8aa55235a11a
[bacula/bacula] / bacula / src / lib / bsock.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Network Utility Routines
21  *
22  *  Written by Kern Sibbald
23  */
24
25 #include "bacula.h"
26 #include "jcr.h"
27 #include "lz4.h"
28 #include <netdb.h>
29 #include <netinet/tcp.h>
30
31 #if !defined(ENODATA)              /* not defined on BSD systems */
32 #define ENODATA  EPIPE
33 #endif 
34  
35 #if !defined(SOL_TCP)              /* Not defined on some systems */
36 #define SOL_TCP  IPPROTO_TCP
37 #endif 
38  
39 #define socketRead(fd, buf, len)  ::read(fd, buf, len)
40 #define socketWrite(fd, buf, len) ::write(fd, buf, len)
41 #define socketClose(fd)           ::close(fd)
42
43 /*
44  * make a nice dump of a message
45  */
46 void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags, POOLMEM *msg, int32_t msglen)
47 {
48    char buf[54];
49    bool is_ascii;
50    int dbglvl = DT_ASX;
51
52    if (msglen<0) {
53       Dmsg4(dbglvl, "%s %d:%d SIGNAL=%s\n", what, sock, msgno, bnet_sig_to_ascii(msglen));
54       // data
55       smartdump(msg, msglen, buf, sizeof(buf)-9, &is_ascii);
56       if (is_ascii) {
57          Dmsg5(dbglvl, "%s %d:%d len=%d \"%s\"\n", what, sock, msgno, msglen, buf);
58       } else {
59          Dmsg5(dbglvl, "%s %d:%d len=%d %s\n", what, sock, msgno, msglen, buf);
60       }
61    }
62 }
63
64
65 BSOCKCallback::BSOCKCallback()
66 {
67 }
68
69 BSOCKCallback::~BSOCKCallback()
70 {
71 }
72
73
74 /*
75  * This is a non-class BSOCK "constructor"  because we want to
76  *   call the Bacula smartalloc routines instead of new.
77  */
78 BSOCK *new_bsock()
79 {
80    BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK));
81    bsock->init();
82    return bsock;
83 }
84
85 void BSOCK::init()
86 {
87    memset(this, 0, sizeof(BSOCK));
88    m_master = this;
89    set_closed();
90    set_terminated();
91    m_blocking = 1;
92    pout_msg_no = &out_msg_no;
93    uninstall_send_hook_cb();
94    msg = get_pool_memory(PM_BSOCK);
95    cmsg = get_pool_memory(PM_BSOCK);
96    errmsg = get_pool_memory(PM_MESSAGE);
97    timeout = BSOCK_TIMEOUT;
98 }
99
100 void BSOCK::free_tls()
101 {
102    free_tls_connection(this->tls);
103    this->tls = NULL;
104 }
105
106 /*
107  * Try to connect to host for max_retry_time at retry_time intervals.
108  *   Note, you must have called the constructor prior to calling
109  *   this routine.
110  */
111 bool BSOCK::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
112                     utime_t heart_beat,
113                     const char *name, char *host, char *service, int port,
114                     int verbose)
115 {
116    bool ok = false;
117    int i;
118    int fatal = 0;
119    time_t begin_time = time(NULL);
120    time_t now;
121    btimer_t *tid = NULL;
122
123    /* Try to trap out of OS call when time expires */
124    if (max_retry_time) {
125       tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
126    }
127
128    for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
129         i -= retry_interval) {
130       berrno be;
131       if (fatal || (jcr && job_canceled(jcr))) {
132          goto bail_out;
133       }
134       Dmsg4(50, "Unable to connect to %s on %s:%d. ERR=%s\n",
135             name, host, port, be.bstrerror());
136       if (i < 0) {
137          i = 60 * 5;               /* complain again in 5 minutes */
138          if (verbose)
139             Qmsg4(jcr, M_WARNING, 0, _(
140                "Could not connect to %s on %s:%d. ERR=%s\n"
141                "Retrying ...\n"), name, host, port, be.bstrerror());
142       }
143       bmicrosleep(retry_interval, 0);
144       now = time(NULL);
145       if (begin_time + max_retry_time <= now) {
146          Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
147                name, host, port, be.bstrerror());
148          goto bail_out;
149       }
150    }
151    ok = true;
152
153 bail_out:
154    if (tid) {
155       stop_thread_timer(tid);
156    }
157    return ok;
158 }
159
160 /*
161  * Finish initialization of the packet structure.
162  */
163 void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
164                struct sockaddr *lclient_addr)
165 {
166    Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
167    m_fd = sockfd;
168    if (m_who) {
169       free(m_who);
170    }
171    if (m_host) {
172       free(m_host);
173    }
174    set_who(bstrdup(who));
175    set_host(bstrdup(host));
176    set_port(port);
177    memcpy(&client_addr, lclient_addr, sizeof(client_addr));
178    set_jcr(jcr);
179 }
180
181 /*
182  * Copy the address from the configuration dlist that gets passed in
183  */
184 void BSOCK::set_source_address(dlist *src_addr_list)
185 {
186    IPADDR *addr = NULL;
187
188    // delete the object we already have, if it's allocated
189    if (src_addr) {
190      free( src_addr);
191      src_addr = NULL;
192    }
193
194    if (src_addr_list) {
195      addr = (IPADDR*) src_addr_list->first();
196      src_addr = New( IPADDR(*addr));
197    }
198 }
199
200 /*
201  * Open a TCP connection to the server
202  * Returns NULL
203  * Returns BSOCK * pointer on success
204  */
205 bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
206                int port, utime_t heart_beat, int *fatal)
207 {
208    int sockfd = -1;
209    dlist *addr_list;
210    IPADDR *ipaddr;
211    bool connected = false;
212    int turnon = 1;
213    const char *errstr;
214    int save_errno = 0;
215
216    /*
217     * Fill in the structure serv_addr with the address of
218     * the server that we want to connect with.
219     */
220    if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
221       /* Note errstr is not malloc'ed */
222       Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"),
223             host, errstr);
224       Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
225             host, errstr);
226       *fatal = 1;
227       return false;
228    }
229
230    remove_duplicate_addresses(addr_list);
231    foreach_dlist(ipaddr, addr_list) {
232       ipaddr->set_port_net(htons(port));
233       char allbuf[256 * 10];
234       char curbuf[256];
235       Dmsg2(100, "Current %sAll %s\n",
236                    ipaddr->build_address_str(curbuf, sizeof(curbuf)),
237                    build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
238       /* Open a TCP socket */
239       if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) {
240          berrno be;
241          save_errno = errno;
242          switch (errno) {
243 #ifdef EAFNOSUPPORT
244          case EAFNOSUPPORT:
245             /*
246              * The name lookup of the host returned an address in a protocol family
247              * we don't support. Suppress the error and try the next address.
248              */
249             break;
250 #endif
251 #ifdef EPROTONOSUPPORT
252          /* See above comments */
253          case EPROTONOSUPPORT:
254             break;
255 #endif
256 #ifdef EPROTOTYPE
257          /* See above comments */
258          case EPROTOTYPE:
259             break;
260 #endif
261          default:
262             *fatal = 1;
263             Qmsg3(jcr, M_ERROR, 0,  _("Socket open error. proto=%d port=%d. ERR=%s\n"),
264                ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
265             Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
266                ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
267             break;
268          }
269          continue;
270       }
271
272       /* Bind to the source address if it is set */
273       if (src_addr) {
274          if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
275             berrno be;
276             save_errno = errno;
277             *fatal = 1;
278             Qmsg2(jcr, M_ERROR, 0, _("Source address bind error. proto=%d. ERR=%s\n"),
279                   src_addr->get_family(), be.bstrerror() );
280             Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
281                   src_addr->get_family(), be.bstrerror() );
282             if (sockfd >= 0) socketClose(sockfd);
283             continue;
284          }
285       }
286
287       /*
288        * Keep socket from timing out from inactivity
289        */
290       if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
291          berrno be;
292          Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
293                be.bstrerror());
294       }
295 #if defined(TCP_KEEPIDLE)
296       if (heart_beat) {
297          int opt = heart_beat;
298          if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
299             berrno be;
300             Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"),
301                   be.bstrerror());
302          }
303       }
304 #endif
305
306       /* connect to server */
307       if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
308          save_errno = errno;
309          if (sockfd >= 0) socketClose(sockfd);
310          continue;
311       }
312       *fatal = 0;
313       connected = true;
314       break;
315    }
316
317    if (!connected) {
318       berrno be;
319       free_addresses(addr_list);
320       errno = save_errno | b_errno_win32;
321       Dmsg4(50, "Could not connect to server %s %s:%d. ERR=%s\n",
322             name, host, port, be.bstrerror());
323       return false;
324    }
325    /*
326     * Keep socket from timing out from inactivity
327     *   Do this a second time out of paranoia
328     */
329    if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
330       berrno be;
331       Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
332             be.bstrerror());
333    }
334    fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
335    free_addresses(addr_list);
336
337    /* Clean the packet a bit */
338    m_closed = false;
339    m_duped = false;
340    m_spool = false;
341    m_use_locking = false;
342    m_timed_out = false;
343    m_terminated = false;
344    m_suppress_error_msgs = false;
345    errors = 0;
346    m_blocking = 0;
347
348    Dmsg3(50, "OK connected to server  %s %s:%d.\n",
349          name, host, port);
350
351    return true;
352 }
353
354 /*
355  * Force read/write to use locking
356  */
357 bool BSOCK::set_locking()
358 {
359    int stat;
360    if (m_use_locking) {
361       return true;                      /* already set */
362    }
363    pm_rmutex = &m_rmutex;
364    pm_wmutex = &m_wmutex;
365    if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) {
366       berrno be;
367       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock read mutex. ERR=%s\n"),
368          be.bstrerror(stat));
369       return false;
370    }
371    if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) {
372       berrno be;
373       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock write mutex. ERR=%s\n"),
374          be.bstrerror(stat));
375       return false;
376    }
377    if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) {
378       berrno be;
379       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock attribute mutex. ERR=%s\n"),
380          be.bstrerror(stat));
381       return false;
382    }
383    m_use_locking = true;
384    return true;
385 }
386
387 void BSOCK::clear_locking()
388 {
389    if (!m_use_locking || m_duped) {
390       return;
391    }
392    m_use_locking = false;
393    pthread_mutex_destroy(pm_rmutex);
394    pthread_mutex_destroy(pm_wmutex);
395    pthread_mutex_destroy(&m_mmutex);
396    pm_rmutex = NULL;
397    pm_wmutex = NULL;
398    return;
399 }
400
401 /*
402  * Do comm line compression (LZ4) of a bsock message.
403  * Returns:  true if the compression was done
404  *           false if no compression was done
405  * The "offset" defines where to start compressing the message.  This
406  *   allows passing "header" information uncompressed and the actual
407  *   data part compressed.
408  *
409  * Note, we don't compress lines less than 20 characters because we
410  *  want to save at least 10 characters otherwise compression doesn't
411  *  help enough to warrant doing the decompression.
412  */
413 bool BSOCK::comm_compress()
414 {
415    bool compress = false;
416    bool compressed = false;
417    int offset = m_flags & 0xFF;
418
419    /*
420     * Enable compress if allowed and not spooling and the
421     *  message is long enough (>20) to get some reasonable savings.
422     */
423    if (msglen > 20) {
424       compress = can_compress() && !is_spooling();
425    }
426    m_CommBytes += msglen;                    /* uncompressed bytes */
427    Dmsg4(DT_NETWORK|200, "can_compress=%d compress=%d CommBytes=%lld CommCompresedBytes=%lld\n",
428          can_compress(), compress, m_CommBytes, m_CommCompressedBytes);
429    if (compress) {
430       int clen;
431       int need_size;
432
433       ASSERT2(offset <= msglen, "Comm offset bigger than message\n");
434       ASSERT2(offset < 255, "Offset greater than 254\n");
435       need_size = LZ4_compressBound(msglen);
436       if (need_size >= ((int32_t)sizeof_pool_memory(cmsg))) {
437          cmsg = realloc_pool_memory(cmsg, need_size + 100);
438       }
439       msglen -= offset;
440       msg += offset;
441       cmsg += offset;
442       clen = LZ4_compress_default(msg, cmsg, msglen, msglen);
443       //Dmsg2(000, "clen=%d msglen=%d\n", clen, msglen);
444       /* Compression should save at least 10 characters */
445       if (clen > 0 && clen + 10 <= msglen) {
446
447 #ifdef xxx_debug
448          /* Debug code -- decompress and compare */
449          int blen, rlen, olen;
450          olen = msglen;
451          POOLMEM *rmsg = get_pool_memory(PM_BSOCK);
452          blen = sizeof_pool_memory(msg) * 2;
453          if (blen >= sizeof_pool_memory(rmsg)) {
454             rmsg = realloc_pool_memory(rmsg, blen);
455          }
456          rlen = LZ4_decompress_safe(cmsg, rmsg, clen, blen);
457          //Dmsg4(000, "blen=%d clen=%d olen=%d rlen=%d\n", blen, clen, olen, rlen);
458          ASSERT(olen == rlen);
459          ASSERT(memcmp(msg, rmsg, olen) == 0);
460          free_pool_memory(rmsg);
461          /* end Debug code */
462 #endif
463
464          msg = cmsg;
465          msglen = clen;
466          compressed = true;
467       }
468       msglen += offset;
469       msg -= offset;
470       cmsg -= offset;
471    }
472    m_CommCompressedBytes += msglen;
473    return compressed;
474 }
475
476
477 /*
478  * Send a message over the network. Everything is sent in one
479  *   write request, but depending on the mode you are using
480  *   there will be either two or three read requests done.
481  * Read 1: 32 bits, gets essentially the packet length, but may
482  *         have the upper bits set to indicate compression or
483  *         an extended header packet.
484  * Read 2: 32 bits, this read is done only of BNET_HDR_EXTEND is set.
485  *         In this case the top 16 bits of this 32 bit word are reserved
486  *         for flags and the lower 16 bits for data. This word will be
487  *         stored in the field "flags" in the BSOCK packet.
488  * Read 2 or 3: depending on if Read 2 is done. This is the data.
489  *
490  * For normal comm line compression, the whole data packet is compressed
491  *   but not the msglen (first read).
492  * To do data compression rather than full comm line compression, prior to
493  *   call send(flags) where the lower 32 bits is the offset to the data to
494  *   be compressed.  The top 32 bits are reserved for flags that can be
495  *   set. The are:
496  *     BNET_IS_CMD   We are sending a command
497  *     BNET_OFFSET   An offset is specified (this implies data compression)
498  *     BNET_NOCOMPRESS Inhibit normal comm line compression
499  *     BNET_DATACOMPRESSED The data using the specified offset was
500  *                   compressed, and normal comm line compression will
501  *                   not be done.
502  *   If any of the above bits are set, then BNET_HDR_EXTEND will be set
503  *   in the top bits of msglen, and the full set of flags + the offset
504  *   will be passed as a 32 bit word just after the msglen, and then
505  *   followed by any data that is either compressed or not.
506  *
507  *   Note, neither comm line nor data compression is not
508  *   guaranteed since it may result in more data, in which case, the
509  *   record is sent uncompressed and there will be no offset.
510  *   On the receive side, if BNET_OFFSET is set, then the data is compressed.
511  *
512  * Returns: false on failure
513  *          true  on success
514  */
515 bool BSOCK::send(int aflags)
516 {
517    int32_t rc;
518    int32_t pktsiz;
519    int32_t *hdrptr;
520    int offset;
521    int hdrsiz;
522    bool ok = true;
523    int32_t save_msglen;
524    POOLMEM *save_msg;
525    bool compressed;
526    bool locked = false;
527
528    if (is_closed()) {
529       if (!m_suppress_error_msgs) {
530          Qmsg0(m_jcr, M_ERROR, 0,  _("Socket is closed\n"));
531       }
532       return false;
533    }
534    if (errors) {
535       if (!m_suppress_error_msgs) {
536          Qmsg4(m_jcr, M_ERROR, 0,  _("Socket has errors=%d on call to %s:%s:%d\n"),
537              errors, m_who, m_host, m_port);
538       }
539       return false;
540    }
541    if (is_terminated()) {
542       if (!m_suppress_error_msgs) {
543          Qmsg4(m_jcr, M_ERROR, 0,  _("Bsock send while terminated=%d on call to %s:%s:%d\n"),
544              is_terminated(), m_who, m_host, m_port);
545       }
546       return false;
547    }
548
549    if (msglen > 4000000) {
550       if (!m_suppress_error_msgs) {
551          Qmsg4(m_jcr, M_ERROR, 0,
552             _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
553              msglen, m_who, m_host, m_port);
554       }
555       return false;
556    }
557
558    if (send_hook_cb) {
559       if (!send_hook_cb->bsock_send_cb()) {
560          Dmsg3(1, "Flowcontrol failure on %s:%s:%d\n", m_who, m_host, m_port);
561          Qmsg3(m_jcr, M_ERROR, 0,
562             _("Flowcontrol failure on %s:%s:%d\n"),
563                   m_who, m_host, m_port);
564          return false;
565       }
566    }
567    if (m_use_locking) {
568       pP(pm_wmutex);
569       locked = true;
570    }
571    save_msglen = msglen;
572    save_msg = msg;
573    m_flags = aflags;
574
575    offset = aflags & 0xFF;              /* offset is 16 bits */
576    if (offset) {
577       m_flags |= BNET_OFFSET;
578    }
579    if (m_flags & BNET_DATACOMPRESSED) {   /* Check if already compressed */
580       compressed = true;
581    } else if (m_flags & BNET_NOCOMPRESS) {
582       compressed = false;
583    } else {
584       compressed = comm_compress();       /* do requested compression */
585    }
586    if (offset && compressed) {
587       m_flags |= BNET_DATACOMPRESSED;
588    }
589    if (!compressed) {
590       m_flags &= ~BNET_COMPRESSED;
591    }
592
593    /* Compute total packet length */
594    if (msglen <= 0) {
595       hdrsiz = sizeof(pktsiz);
596       pktsiz = hdrsiz;                     /* signal, no data */
597    } else if (m_flags) {
598       hdrsiz = 2 * sizeof(pktsiz);         /* have 64 bit header */
599       pktsiz = msglen + hdrsiz;
600    } else {
601       hdrsiz = sizeof(pktsiz);             /* have 32 bit header */
602       pktsiz = msglen + hdrsiz;
603    }
604
605    /* Set special bits */
606    if (m_flags & BNET_OFFSET) {            /* if data compression on */
607       compressed = false;                  /*   no comm compression */
608    }
609    if (compressed) {
610       msglen |= BNET_COMPRESSED;           /* comm line compression */
611    }
612
613    if (m_flags) {
614       msglen |= BNET_HDR_EXTEND;           /* extended header */
615    }
616
617    /*
618     * Store packet length at head of message -- note, we
619     *  have reserved an int32_t just before msg, so we can
620     *  store there
621     */
622    hdrptr = (int32_t *)(msg - hdrsiz);
623    *hdrptr = htonl(msglen);             /* store signal/length */
624    if (m_flags) {
625       *(hdrptr+1) = htonl(m_flags);     /* store flags */
626    }
627
628    (*pout_msg_no)++;        /* increment message number */
629
630    /* send data packet */
631    timer_start = watchdog_time;  /* start timer */
632    clear_timed_out();
633    /* Full I/O done in one write */
634    rc = write_nbytes(this, (char *)hdrptr, pktsiz);
635    if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, save_msg, save_msglen);
636    timer_start = 0;         /* clear timer */
637    if (rc != pktsiz) {
638       errors++;
639       if (errno == 0) {
640          b_errno = EIO;
641       } else {
642          b_errno = errno;
643       }
644       if (rc < 0) {
645          if (!m_suppress_error_msgs) {
646             Qmsg5(m_jcr, M_ERROR, 0,
647                   _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"),
648                   pktsiz, m_who,
649                   m_host, m_port, this->bstrerror());
650          }
651       } else {
652          Qmsg5(m_jcr, M_ERROR, 0,
653                _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"),
654                pktsiz, m_who, m_host, m_port, rc);
655       }
656       ok = false;
657    }
658 //   Dmsg4(000, "cmpr=%d ext=%d cmd=%d m_flags=0x%x\n", msglen&BNET_COMPRESSED?1:0,
659 //      msglen&BNET_HDR_EXTEND?1:0, msglen&BNET_CMD_BIT?1:0, m_flags);
660    msglen = save_msglen;
661    msg = save_msg;
662    if (locked) pV(pm_wmutex);
663    return ok;
664 }
665
666 /*
667  * Format and send a message
668  *  Returns: false on error
669  *           true  on success
670  */
671 bool BSOCK::fsend(const char *fmt, ...)
672 {
673    va_list arg_ptr;
674    int maxlen;
675
676    if (is_null(this)) {
677       return false;                /* do not seg fault */
678    }
679    if (errors || is_terminated() || is_closed()) {
680       return false;
681    }
682    /* This probably won't work, but we vsnprintf, then if we
683     * get a negative length or a length greater than our buffer
684     * (depending on which library is used), the printf was truncated, so
685     * get a bigger buffer and try again.
686     */
687    for (;;) {
688       maxlen = sizeof_pool_memory(msg) - 1;
689       va_start(arg_ptr, fmt);
690       msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
691       va_end(arg_ptr);
692       if (msglen >= 0 && msglen < (maxlen - 5)) {
693          break;
694       }
695       msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
696    }
697    return send();
698 }
699
700 /*
701  * Receive a message from the other end. Each message consists of
702  * two packets. The first is a header that contains the size
703  * of the data that follows in the second packet.
704  * Returns number of bytes read (may return zero)
705  * Returns -1 on signal (BNET_SIGNAL)
706  * Returns -2 on hard end of file (BNET_HARDEOF)
707  * Returns -3 on error  (BNET_ERROR)
708  * Returns -4 on COMMAND (BNET_COMMAND)
709  *  Unfortunately, it is a bit complicated because we have these
710  *    four return types:
711  *    1. Normal data
712  *    2. Signal including end of data stream
713  *    3. Hard end of file
714  *    4. Error
715  *  Using bsock->is_stop() and bsock->is_error() you can figure this all out.
716  */
717 int32_t BSOCK::recv()
718 {
719    int32_t nbytes;
720    int32_t pktsiz;
721    int32_t o_pktsiz = 0;
722    bool compressed = false;
723    bool command = false;
724    bool locked = false;
725
726    cmsg[0] = msg[0] = 0;
727    msglen = 0;
728    m_flags = 0;
729    if (errors || is_terminated() || is_closed()) {
730       return BNET_HARDEOF;
731    }
732    if (m_use_locking) {
733       pP(pm_rmutex);
734       locked = true;
735    }
736
737    read_seqno++;            /* bump sequence number */
738    timer_start = watchdog_time;  /* set start wait time */
739    clear_timed_out();
740    /* get data size -- in int32_t */
741    if ((nbytes = read_nbytes(this, (char *)&pktsiz, sizeof(int32_t))) <= 0) {
742       timer_start = 0;      /* clear timer */
743       /* probably pipe broken because client died */
744       if (errno == 0) {
745          b_errno = ENODATA;
746       } else {
747          b_errno = errno;
748       }
749       errors++;
750       nbytes = BNET_HARDEOF;        /* assume hard EOF received */
751       goto get_out;
752    }
753    timer_start = 0;         /* clear timer */
754    if (nbytes != sizeof(int32_t)) {
755       errors++;
756       b_errno = EIO;
757       Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
758             sizeof(int32_t), nbytes, m_who, m_host, m_port);
759       nbytes = BNET_ERROR;
760       goto get_out;
761    }
762
763    pktsiz = ntohl(pktsiz);         /* decode no. of bytes that follow */
764    o_pktsiz = pktsiz;
765    /* If extension, read it */
766    if (pktsiz > 0 && (pktsiz & BNET_HDR_EXTEND)) {
767       timer_start = watchdog_time;  /* set start wait time */
768       clear_timed_out();
769       if ((nbytes = read_nbytes(this, (char *)&m_flags, sizeof(int32_t))) <= 0) {
770          timer_start = 0;      /* clear timer */
771          /* probably pipe broken because client died */
772          if (errno == 0) {
773             b_errno = ENODATA;
774          } else {
775             b_errno = errno;
776          }
777          errors++;
778          nbytes = BNET_HARDEOF;        /* assume hard EOF received */
779          goto get_out;
780       }
781       timer_start = 0;         /* clear timer */
782       if (nbytes != sizeof(int32_t)) {
783          errors++;
784          b_errno = EIO;
785          Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
786                sizeof(int32_t), nbytes, m_who, m_host, m_port);
787          nbytes = BNET_ERROR;
788          goto get_out;
789       }
790       pktsiz &= ~BNET_HDR_EXTEND;
791       m_flags = ntohl(m_flags);
792    }
793
794    if (pktsiz > 0 && (pktsiz & BNET_COMPRESSED)) {
795       compressed = true;
796       pktsiz &= ~BNET_COMPRESSED;
797    }
798
799    if (m_flags & BNET_IS_CMD) {
800        command = true;
801    }
802    if (m_flags & BNET_OFFSET) {
803       compressed = true;
804    }
805
806    if (pktsiz == 0) {              /* No data transferred */
807       timer_start = 0;             /* clear timer */
808       in_msg_no++;
809       msglen = 0;
810       nbytes = 0;                  /* zero bytes read */
811       goto get_out;
812    }
813
814    /* If signal or packet size too big */
815    if (pktsiz < 0 || pktsiz > 1000000) {
816       if (pktsiz > 0) {            /* if packet too big */
817          Qmsg4(m_jcr, M_FATAL, 0,
818                _("Packet size=%d too big from \"%s:%s:%d\". Maximum permitted 1000000. Terminating connection.\n"),
819                pktsiz, m_who, m_host, m_port);
820          pktsiz = BNET_TERMINATE;  /* hang up */
821       }
822       if (pktsiz == BNET_TERMINATE) {
823          set_terminated();
824       }
825       timer_start = 0;                /* clear timer */
826       b_errno = ENODATA;
827       msglen = pktsiz;                /* signal code */
828       nbytes =  BNET_SIGNAL;          /* signal */
829       goto get_out;
830    }
831
832    /* Make sure the buffer is big enough + one byte for EOS */
833    if (pktsiz >= (int32_t) sizeof_pool_memory(msg)) {
834       msg = realloc_pool_memory(msg, pktsiz + 100);
835    }
836
837    timer_start = watchdog_time;  /* set start wait time */
838    clear_timed_out();
839    /* now read the actual data */
840    if ((nbytes = read_nbytes(this, msg, pktsiz)) <= 0) {
841       timer_start = 0;      /* clear timer */
842       if (errno == 0) {
843          b_errno = ENODATA;
844       } else {
845          b_errno = errno;
846       }
847       errors++;
848       Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"),
849             m_who, m_host, m_port, this->bstrerror());
850       nbytes = BNET_ERROR;
851       goto get_out;
852    }
853    timer_start = 0;         /* clear timer */
854    in_msg_no++;
855    msglen = nbytes;
856    if (nbytes != pktsiz) {
857       b_errno = EIO;
858       errors++;
859       Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
860             pktsiz, nbytes, m_who, m_host, m_port);
861       nbytes = BNET_ERROR;
862       goto get_out;
863    }
864    /* If compressed uncompress it */
865    if (compressed) {
866       int offset = 0;
867       int psize = nbytes * 4;
868       if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) {
869          cmsg = realloc_pool_memory(cmsg, psize);
870       }
871       psize = sizeof_pool_memory(cmsg);
872       if (m_flags & BNET_OFFSET) {
873          offset = m_flags & 0xFF;
874          msg += offset;
875          msglen -= offset;
876       }
877       /* Grow buffer to max approx 4MB */
878       for (int i=0; i < 7; i++) {
879          nbytes = LZ4_decompress_safe(msg, cmsg, msglen, psize);
880          if (nbytes >=  0) {
881             break;
882          }
883          if (psize < 65536) {
884             psize = 65536;
885          } else {
886             psize = psize * 2;
887          }
888          if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) {
889             cmsg = realloc_pool_memory(cmsg, psize + 100);
890          }
891       }
892       if (m_flags & BNET_OFFSET) {
893          msg -= offset;
894          msglen += offset;
895       }
896       if (nbytes < 0) {
897          Jmsg1(m_jcr, M_ERROR, 0, "Decompress error!!!! ERR=%d\n", nbytes);
898          Pmsg3(000, "Decompress error!! pktsiz=%d cmsgsiz=%d nbytes=%d\n", pktsiz,
899            psize, nbytes);
900          b_errno = EIO;
901          errors++;
902           Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
903                pktsiz, nbytes, m_who, m_host, m_port);
904          nbytes = BNET_ERROR;
905          goto get_out;
906       }
907       msglen = nbytes;
908       /* Make sure the buffer is big enough + one byte for EOS */
909       if (msglen >= (int32_t)sizeof_pool_memory(msg)) {
910          msg = realloc_pool_memory(msg, msglen + 100);
911       }
912       /* If this is a data decompress, leave msg compressed */
913       if (!(m_flags & BNET_OFFSET)) {
914          memcpy(msg, cmsg, msglen);
915       }
916    }
917
918    /* always add a zero by to properly terminate any
919     * string that was send to us. Note, we ensured above that the
920     * buffer is at least one byte longer than the message length.
921     */
922    msg[nbytes] = 0; /* terminate in case it is a string */
923    /*
924     * The following uses *lots* of resources so turn it on only for
925     * serious debugging.
926     */
927    Dsm_check(300);
928
929 get_out:
930    if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "RECV", nbytes, o_pktsiz, m_flags, msg, msglen);
931    if (nbytes != BNET_ERROR && command) {
932       nbytes = BNET_COMMAND;
933    }
934
935    if (locked) pV(pm_rmutex);
936    return nbytes;                  /* return actual length of message */
937 }
938
939 /*
940  * Send a signal
941  */
942 bool BSOCK::signal(int signal)
943 {
944    msglen = signal;
945    if (signal == BNET_TERMINATE) {
946       m_suppress_error_msgs = true;
947    }
948    return send();
949 }
950
951 /*
952  * Despool spooled attributes
953  */
954 bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize)
955 {
956    int32_t pktsiz;
957    size_t nbytes;
958    ssize_t last = 0, size = 0;
959    int count = 0;
960    JCR *jcr = get_jcr();
961
962    rewind(m_spool_fd);
963
964 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
965    posix_fadvise(fileno(m_spool_fd), 0, 0, POSIX_FADV_WILLNEED);
966 #endif
967
968    while (fread((char *)&pktsiz, 1, sizeof(int32_t), m_spool_fd) ==
969           sizeof(int32_t)) {
970       size += sizeof(int32_t);
971       msglen = ntohl(pktsiz);
972       if (msglen > 0) {
973          if (msglen > (int32_t)sizeof_pool_memory(msg)) {
974             msg = realloc_pool_memory(msg, msglen + 1);
975          }
976          nbytes = fread(msg, 1, msglen, m_spool_fd);
977          if (nbytes != (size_t)msglen) {
978             berrno be;
979             Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen);
980             Qmsg2(get_jcr(), M_FATAL, 0, _("fread attr spool error. Wanted=%d got=%d bytes.\n"),
981                   msglen, nbytes);
982             update_attr_spool_size(tsize - last);
983             return false;
984          }
985          size += nbytes;
986          if ((++count & 0x3F) == 0) {
987             update_attr_spool_size(size - last);
988             last = size;
989          }
990       }
991       send();
992       if (jcr && job_canceled(jcr)) {
993          return false;
994       }
995    }
996    update_attr_spool_size(tsize - last);
997    if (ferror(m_spool_fd)) {
998       Qmsg(jcr, M_FATAL, 0, _("fread attr spool I/O error.\n"));
999       return false;
1000    }
1001    return true;
1002 }
1003
1004 /*
1005  * Return the string for the error that occurred
1006  * on the socket. Only the first error is retained.
1007  */
1008 const char *BSOCK::bstrerror()
1009 {
1010    berrno be;
1011    if (errmsg == NULL) {
1012       errmsg = get_pool_memory(PM_MESSAGE);
1013    }
1014    if (b_errno == 0) {
1015       pm_strcpy(errmsg, "I/O Error");
1016    } else {
1017       pm_strcpy(errmsg, be.bstrerror(b_errno));
1018    }
1019    return errmsg;
1020 }
1021
1022 int BSOCK::get_peer(char *buf, socklen_t buflen)
1023 {
1024 #if !defined(HAVE_WIN32)
1025     if (peer_addr.sin_family == 0) {
1026         socklen_t salen = sizeof(peer_addr);
1027         int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
1028         if (rval < 0) return rval;
1029     }
1030     if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
1031         return -1;
1032
1033     return 0;
1034 #else
1035     return -1;
1036 #endif
1037 }
1038
1039 /*
1040  * Set the network buffer size, suggested size is in size.
1041  *  Actual size obtained is returned in bs->msglen
1042  *
1043  *  Returns: false on failure
1044  *           true  on success
1045  */
1046 bool BSOCK::set_buffer_size(uint32_t size, int rw)
1047 {
1048    uint32_t dbuf_size, start_size;
1049
1050 #if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
1051    int opt;
1052    opt = IPTOS_THROUGHPUT;
1053    setsockopt(m_fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
1054 #endif
1055
1056    if (size != 0) {
1057       dbuf_size = size;
1058    } else {
1059       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
1060    }
1061    start_size = dbuf_size;
1062    if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) {
1063       Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCK data buffer\n"));
1064       return false;
1065    }
1066
1067    /*
1068     * If user has not set the size, use the OS default -- i.e. do not
1069     *   try to set it.  This allows sys admins to set the size they
1070     *   want in the OS, and Bacula will comply. See bug #1493
1071     */
1072    if (size == 0) {
1073       msglen = dbuf_size;
1074       return true;
1075    }
1076
1077    if (rw & BNET_SETBUF_READ) {
1078       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
1079               SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
1080          berrno be;
1081          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
1082          dbuf_size -= TAPE_BSIZE;
1083       }
1084       Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
1085       if (dbuf_size != start_size) {
1086          Qmsg1(get_jcr(), M_WARNING, 0,
1087                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
1088       }
1089    }
1090    if (size != 0) {
1091       dbuf_size = size;
1092    } else {
1093       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
1094    }
1095    start_size = dbuf_size;
1096    if (rw & BNET_SETBUF_WRITE) {
1097       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
1098               SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
1099          berrno be;
1100          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
1101          dbuf_size -= TAPE_BSIZE;
1102       }
1103       Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
1104       if (dbuf_size != start_size) {
1105          Qmsg1(get_jcr(), M_WARNING, 0,
1106                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
1107       }
1108    }
1109
1110    msglen = dbuf_size;
1111    return true;
1112 }
1113
1114 /*
1115  * Set socket non-blocking
1116  * Returns previous socket flag
1117  */
1118 int BSOCK::set_nonblocking()
1119 {
1120    int oflags;
1121
1122    /* Get current flags */
1123    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
1124       berrno be;
1125       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
1126    }
1127
1128    /* Set O_NONBLOCK flag */
1129    if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
1130       berrno be;
1131       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1132    }
1133
1134    m_blocking = 0;
1135    return oflags;
1136 }
1137
1138 /*
1139  * Set socket blocking
1140  * Returns previous socket flags
1141  */
1142 int BSOCK::set_blocking()
1143 {
1144    int oflags;
1145    /* Get current flags */
1146    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
1147       berrno be;
1148       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
1149    }
1150
1151    /* Set O_NONBLOCK flag */
1152    if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
1153       berrno be;
1154       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1155    }
1156
1157    m_blocking = 1;
1158    return oflags;
1159 }
1160
1161 void BSOCK::set_killable(bool killable)
1162 {
1163    if (m_jcr) {
1164       m_jcr->set_killable(killable);
1165    }
1166 }
1167
1168 /*
1169  * Restores socket flags
1170  */
1171 void BSOCK::restore_blocking (int flags)
1172 {
1173    if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
1174       berrno be;
1175       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1176    }
1177
1178    m_blocking = (flags & O_NONBLOCK) ? true : false;
1179 }
1180
1181 /*
1182  * Wait for a specified time for data to appear on
1183  * the BSOCK connection.
1184  *
1185  *   Returns: 1 if data available
1186  *            0 if timeout
1187  *           -1 if error
1188  */
1189 int BSOCK::wait_data(int sec, int msec)
1190 {
1191    for (;;) {
1192       switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
1193       case 0:                      /* timeout */
1194          b_errno = 0;
1195          return 0;
1196       case -1:
1197          b_errno = errno;
1198          if (errno == EINTR) {
1199             continue;
1200          }
1201          return -1;                /* error return */
1202       default:
1203          b_errno = 0;
1204 #ifdef HAVE_TLS
1205          if (this->tls && !tls_bsock_probe(this)) {
1206             continue; /* false alarm, maybe a session key negotiation in progress on the socket */
1207          }
1208 #endif
1209          return 1;
1210       }
1211    }
1212 }
1213
1214 /*
1215  * As above, but returns on interrupt
1216  */
1217 int BSOCK::wait_data_intr(int sec, int msec)
1218 {
1219    switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
1220    case 0:                      /* timeout */
1221       b_errno = 0;
1222       return 0;
1223    case -1:
1224       b_errno = errno;
1225       return -1;                /* error return */
1226    default:
1227       b_errno = 0;
1228 #ifdef HAVE_TLS
1229       if (this->tls && !tls_bsock_probe(this)) {
1230          /* maybe a session key negotiation waked up the socket */
1231          return 0;
1232       }
1233 #endif
1234       break;
1235    }
1236    return 1;
1237 }
1238
1239 /*
1240  *  This routine closes the current BSOCK.
1241  *   It does not delete the socket packet
1242  *   resources, which are released int
1243  *   bsock->destroy().
1244  */
1245 #ifndef SHUT_RDWR
1246 #define SHUT_RDWR 2
1247 #endif
1248
1249 /*
1250  * The JCR is canceled, set terminate for chained BSOCKs starting from master
1251  */
1252 void BSOCK::cancel()
1253 {
1254    master_lock();
1255    for (BSOCK *next = m_master; next != NULL; next = next->m_next) {
1256       if (!next->m_closed) {
1257          next->m_terminated = true;
1258          next->m_timed_out = true;
1259       }
1260    }
1261    master_unlock();
1262 }
1263
1264 /*
1265  * Note, this routine closes the socket, but leaves the
1266  *   bsock memory in place.
1267  *   every thread is responsible of closing and destroying its own duped or not
1268  *   duped BSOCK
1269  */
1270 void BSOCK::close()
1271 {
1272    BSOCK *bsock = this;
1273
1274    if (bsock->is_closed()) {
1275       return;
1276    }
1277    if (!m_duped) {
1278       clear_locking();
1279    }
1280    bsock->set_closed();
1281    bsock->set_terminated();
1282    if (!bsock->m_duped) {
1283       /* Shutdown tls cleanly. */
1284       if (bsock->tls) {
1285          tls_bsock_shutdown(bsock);
1286          free_tls_connection(bsock->tls);
1287          bsock->tls = NULL;
1288       }
1289
1290       if (bsock->is_timed_out()) {
1291          shutdown(bsock->m_fd, SHUT_RDWR);   /* discard any pending I/O */
1292       }
1293       /* On Windows this discards data if we did not do a close_wait() */
1294       socketClose(bsock->m_fd);      /* normal close */
1295    }
1296    return;
1297 }
1298
1299 /*
1300  * Destroy the socket (i.e. release all resources)
1301  */
1302 void BSOCK::_destroy()
1303 {
1304    this->close();                  /* Ensure that socket is closed */
1305
1306    if (msg) {
1307       free_pool_memory(msg);
1308       msg = NULL;
1309    } else {
1310       ASSERT2(1 == 0, "Two calls to destroy socket");  /* double destroy */
1311    }
1312    if (cmsg) {
1313       free_pool_memory(cmsg);
1314       cmsg = NULL;
1315    }
1316    if (errmsg) {
1317       free_pool_memory(errmsg);
1318       errmsg = NULL;
1319    }
1320    if (m_who) {
1321       free(m_who);
1322       m_who = NULL;
1323    }
1324    if (m_host) {
1325       free(m_host);
1326       m_host = NULL;
1327    }
1328    if (src_addr) {
1329       free(src_addr);
1330       src_addr = NULL;
1331    }
1332    free(this);
1333 }
1334
1335 /*
1336  * Destroy the socket (i.e. release all resources)
1337  * including duped sockets.
1338  * should not be called from duped BSOCK
1339  */
1340 void BSOCK::destroy()
1341 {
1342    ASSERTD(reinterpret_cast<uintptr_t>(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCK::destroy() already called\n")
1343    ASSERTD(this == m_master, "BSOCK::destroy() called by a non master BSOCK\n")
1344    ASSERTD(!m_duped, "BSOCK::destroy() called by a duped BSOCK\n")
1345    /* I'm the master I must destroy() all the duped BSOCKs */
1346    master_lock();
1347    BSOCK *ahead;
1348    for (BSOCK *next = m_next; next != NULL; next = ahead) {
1349       ahead = next->m_next;
1350       next->_destroy();
1351    }
1352    master_unlock();
1353    _destroy();
1354 }
1355
1356 /* Commands sent to Director */
1357 static char hello[]    = "Hello %s calling\n";
1358
1359 /* Response from Director */
1360 static char OKhello[]   = "1000 OK:";
1361
1362 /*
1363  * Authenticate Director
1364  */
1365 bool BSOCK::authenticate_director(const char *name, const char *password,
1366                TLS_CONTEXT *tls_ctx, char *errmsg, int errmsg_len)
1367 {
1368    int tls_local_need = BNET_TLS_NONE;
1369    int tls_remote_need = BNET_TLS_NONE;
1370    int compatible = true;
1371    char bashed_name[MAX_NAME_LENGTH];
1372    BSOCK *dir = this;        /* for readability */
1373
1374    *errmsg = 0;
1375    /*
1376     * Send my name to the Director then do authentication
1377     */
1378
1379    /* Timeout Hello after 15 secs */
1380    dir->start_timer(15);
1381    dir->fsend(hello, bashed_name);
1382
1383    if (get_tls_enable(tls_ctx)) {
1384       tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK;
1385    }
1386
1387    /* respond to Dir challenge */
1388    if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) ||
1389        /* Now challenge dir */
1390        !cram_md5_challenge(dir, password, tls_local_need, compatible)) {
1391       bsnprintf(errmsg, errmsg_len, _("Director authorization error at \"%s:%d\"\n"),
1392          dir->host(), dir->port());
1393       goto bail_out;
1394    }
1395
1396    /* Verify that the remote host is willing to meet our TLS requirements */
1397    if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1398       bsnprintf(errmsg, errmsg_len, _("Authorization error:"
1399              " Remote server at \"%s:%d\" did not advertise required TLS support.\n"),
1400              dir->host(), dir->port());
1401       goto bail_out;
1402    }
1403
1404    /* Verify that we are willing to meet the remote host's requirements */
1405    if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1406       bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\":"
1407                      " Remote server requires TLS.\n"),
1408                      dir->host(), dir->port());
1409
1410       goto bail_out;
1411    }
1412
1413    /* Is TLS Enabled? */
1414    if (have_tls) {
1415       if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) {
1416          /* Engage TLS! Full Speed Ahead! */
1417          if (!bnet_tls_client(tls_ctx, dir, NULL)) {
1418             bsnprintf(errmsg, errmsg_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
1419                dir->host(), dir->port());
1420             goto bail_out;
1421          }
1422       }
1423    }
1424
1425    Dmsg1(6, ">dird: %s", dir->msg);
1426    if (dir->recv() <= 0) {
1427       dir->stop_timer();
1428       bsnprintf(errmsg, errmsg_len, _("Bad errmsg to Hello command: ERR=%s\n"
1429                       "The Director at \"%s:%d\" may not be running.\n"),
1430                     dir->bstrerror(), dir->host(), dir->port());
1431       return false;
1432    }
1433
1434    dir->stop_timer();
1435    Dmsg1(10, "<dird: %s", dir->msg);
1436    if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) {
1437       bsnprintf(errmsg, errmsg_len, _("Director at \"%s:%d\" rejected Hello command\n"),
1438          dir->host(), dir->port());
1439       return false;
1440    } else {
1441       bsnprintf(errmsg, errmsg_len, "%s", dir->msg);
1442    }
1443    return true;
1444
1445 bail_out:
1446    dir->stop_timer();
1447    bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\"\n"
1448              "Most likely the passwords do not agree.\n"
1449              "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n"
1450              "For help, please see: " MANUAL_AUTH_URL "\n"),
1451              dir->host(), dir->port());
1452    return false;
1453 }
1454
1455 /* Try to limit the bandwidth of a network connection
1456  */
1457 void BSOCK::control_bwlimit(int bytes)
1458 {
1459    btime_t now, temp;
1460    if (bytes == 0) {
1461       return;
1462    }
1463
1464    now = get_current_btime();          /* microseconds */
1465    temp = now - m_last_tick;           /* microseconds */
1466
1467    m_nb_bytes += bytes;
1468
1469    if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */
1470       m_nb_bytes = bytes;
1471       m_last_tick = now;
1472       return;
1473    }
1474
1475    /* Less than 0.1ms since the last call, see the next time */
1476    if (temp < 100) {
1477       return;
1478    }
1479
1480    /* Remove what was authorised to be written in temp us */
1481    m_nb_bytes -= (int64_t)(temp * ((double)m_bwlimit / 1000000.0));
1482
1483    if (m_nb_bytes < 0) {
1484       m_nb_bytes = 0;
1485    }
1486
1487    /* What exceed should be converted in sleep time */
1488    int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0));
1489    if (usec_sleep > 100) {
1490       bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */
1491       m_last_tick = get_current_btime();
1492       m_nb_bytes = 0;
1493    } else {
1494       m_last_tick = now;
1495    }
1496 }