2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2018 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Network Utility Routines
22 * Written by Kern Sibbald
29 #include <netinet/tcp.h>
31 #if !defined(ENODATA) /* not defined on BSD systems */
35 #if !defined(SOL_TCP) /* Not defined on some systems */
36 #define SOL_TCP IPPROTO_TCP
41 #define socketRead(fd, buf, len) ::recv(fd, buf, len, 0)
42 #define socketWrite(fd, buf, len) ::send(fd, buf, len, 0)
43 #define socketClose(fd) ::closesocket(fd)
44 static void win_close_wait(int fd);
46 #define SOCK_CLOEXEC 0
49 #define socketRead(fd, buf, len) ::read(fd, buf, len)
50 #define socketWrite(fd, buf, len) ::write(fd, buf, len)
51 #define socketClose(fd) ::close(fd)
56 * make a nice dump of a message
58 void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags, POOLMEM *msg, int32_t msglen)
65 Dmsg4(dbglvl, "%s %d:%d SIGNAL=%s\n", what, sock, msgno, bnet_sig_to_ascii(msglen));
67 smartdump(msg, msglen, buf, sizeof(buf)-9, &is_ascii);
69 Dmsg5(dbglvl, "%s %d:%d len=%d \"%s\"\n", what, sock, msgno, msglen, buf);
71 Dmsg5(dbglvl, "%s %d:%d len=%d %s\n", what, sock, msgno, msglen, buf);
77 BSOCKCallback::BSOCKCallback()
81 BSOCKCallback::~BSOCKCallback()
87 * This is a non-class BSOCK "constructor" because we want to
88 * call the Bacula smartalloc routines instead of new.
92 BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK));
99 memset(this, 0, sizeof(BSOCK));
104 pout_msg_no = &out_msg_no;
105 uninstall_send_hook_cb();
106 msg = get_pool_memory(PM_BSOCK);
107 cmsg = get_pool_memory(PM_BSOCK);
108 errmsg = get_pool_memory(PM_MESSAGE);
109 timeout = BSOCK_TIMEOUT;
112 void BSOCK::free_tls()
114 free_tls_connection(this->tls);
119 * Try to connect to host for max_retry_time at retry_time intervals.
120 * Note, you must have called the constructor prior to calling
123 bool BSOCK::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
125 const char *name, char *host, char *service, int port,
131 time_t begin_time = time(NULL);
133 btimer_t *tid = NULL;
135 /* Try to trap out of OS call when time expires */
136 if (max_retry_time) {
137 tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
140 for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
141 i -= retry_interval) {
143 if (fatal || (jcr && job_canceled(jcr))) {
146 Dmsg4(50, "Unable to connect to %s on %s:%d. ERR=%s\n",
147 name, host, port, be.bstrerror());
149 i = 60 * 5; /* complain again in 5 minutes */
151 Qmsg4(jcr, M_WARNING, 0, _(
152 "Could not connect to %s on %s:%d. ERR=%s\n"
153 "Retrying ...\n"), name, host, port, be.bstrerror());
155 bmicrosleep(retry_interval, 0);
157 if (begin_time + max_retry_time <= now) {
158 Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
159 name, host, port, be.bstrerror());
167 stop_thread_timer(tid);
173 * Finish initialization of the packet structure.
175 void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
176 struct sockaddr *lclient_addr)
178 Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
186 set_who(bstrdup(who));
187 set_host(bstrdup(host));
189 memcpy(&client_addr, lclient_addr, sizeof(client_addr));
194 * Copy the address from the configuration dlist that gets passed in
196 void BSOCK::set_source_address(dlist *src_addr_list)
200 // delete the object we already have, if it's allocated
207 addr = (IPADDR*) src_addr_list->first();
208 src_addr = New( IPADDR(*addr));
213 * Open a TCP connection to the server
215 * Returns BSOCK * pointer on success
217 bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
218 int port, utime_t heart_beat, int *fatal)
223 bool connected = false;
229 * Fill in the structure serv_addr with the address of
230 * the server that we want to connect with.
232 if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
233 /* Note errstr is not malloc'ed */
234 Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"),
236 Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
242 remove_duplicate_addresses(addr_list);
243 foreach_dlist(ipaddr, addr_list) {
244 ipaddr->set_port_net(htons(port));
245 char allbuf[256 * 10];
247 Dmsg2(100, "Current %sAll %s\n",
248 ipaddr->build_address_str(curbuf, sizeof(curbuf)),
249 build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
250 /* Open a TCP socket */
251 if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) {
258 * The name lookup of the host returned an address in a protocol family
259 * we don't support. Suppress the error and try the next address.
263 #ifdef EPROTONOSUPPORT
264 /* See above comments */
265 case EPROTONOSUPPORT:
269 /* See above comments */
275 Qmsg3(jcr, M_ERROR, 0, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
276 ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
277 Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
278 ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
284 /* Bind to the source address if it is set */
286 if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
290 Qmsg2(jcr, M_ERROR, 0, _("Source address bind error. proto=%d. ERR=%s\n"),
291 src_addr->get_family(), be.bstrerror() );
292 Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
293 src_addr->get_family(), be.bstrerror() );
294 if (sockfd >= 0) socketClose(sockfd);
300 * Keep socket from timing out from inactivity
302 if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
304 Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
307 #if defined(TCP_KEEPIDLE)
309 int opt = heart_beat;
310 if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
312 Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"),
318 /* connect to server */
319 if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
321 if (sockfd >= 0) socketClose(sockfd);
331 free_addresses(addr_list);
332 errno = save_errno | b_errno_win32;
333 Dmsg4(50, "Could not connect to server %s %s:%d. ERR=%s\n",
334 name, host, port, be.bstrerror());
338 * Keep socket from timing out from inactivity
339 * Do this a second time out of paranoia
341 if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
343 Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
346 fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
347 free_addresses(addr_list);
349 /* Clean the packet a bit */
353 m_use_locking = false;
355 m_terminated = false;
356 m_suppress_error_msgs = false;
360 Dmsg3(50, "OK connected to server %s %s:%d.\n",
367 * Force read/write to use locking
369 bool BSOCK::set_locking()
373 return true; /* already set */
375 pm_rmutex = &m_rmutex;
376 pm_wmutex = &m_wmutex;
377 if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) {
379 Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock read mutex. ERR=%s\n"),
383 if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) {
385 Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock write mutex. ERR=%s\n"),
389 if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) {
391 Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock attribute mutex. ERR=%s\n"),
395 m_use_locking = true;
399 void BSOCK::clear_locking()
401 if (!m_use_locking || m_duped) {
404 m_use_locking = false;
405 pthread_mutex_destroy(pm_rmutex);
406 pthread_mutex_destroy(pm_wmutex);
407 pthread_mutex_destroy(&m_mmutex);
414 * Do comm line compression (LZ4) of a bsock message.
415 * Returns: true if the compression was done
416 * false if no compression was done
417 * The "offset" defines where to start compressing the message. This
418 * allows passing "header" information uncompressed and the actual
419 * data part compressed.
421 * Note, we don't compress lines less than 20 characters because we
422 * want to save at least 10 characters otherwise compression doesn't
423 * help enough to warrant doing the decompression.
425 bool BSOCK::comm_compress()
427 bool compress = false;
428 bool compressed = false;
429 int offset = m_flags & 0xFF;
432 * Enable compress if allowed and not spooling and the
433 * message is long enough (>20) to get some reasonable savings.
436 compress = can_compress() && !is_spooling();
438 m_CommBytes += msglen; /* uncompressed bytes */
439 Dmsg4(DT_NETWORK|200, "can_compress=%d compress=%d CommBytes=%lld CommCompresedBytes=%lld\n",
440 can_compress(), compress, m_CommBytes, m_CommCompressedBytes);
445 ASSERT2(offset <= msglen, "Comm offset bigger than message\n");
446 ASSERT2(offset < 255, "Offset greater than 254\n");
447 need_size = LZ4_compressBound(msglen);
448 if (need_size >= ((int32_t)sizeof_pool_memory(cmsg))) {
449 cmsg = realloc_pool_memory(cmsg, need_size + 100);
454 clen = LZ4_compress_default(msg, cmsg, msglen, msglen);
455 //Dmsg2(000, "clen=%d msglen=%d\n", clen, msglen);
456 /* Compression should save at least 10 characters */
457 if (clen > 0 && clen + 10 <= msglen) {
460 /* Debug code -- decompress and compare */
461 int blen, rlen, olen;
463 POOLMEM *rmsg = get_pool_memory(PM_BSOCK);
464 blen = sizeof_pool_memory(msg) * 2;
465 if (blen >= sizeof_pool_memory(rmsg)) {
466 rmsg = realloc_pool_memory(rmsg, blen);
468 rlen = LZ4_decompress_safe(cmsg, rmsg, clen, blen);
469 //Dmsg4(000, "blen=%d clen=%d olen=%d rlen=%d\n", blen, clen, olen, rlen);
470 ASSERT(olen == rlen);
471 ASSERT(memcmp(msg, rmsg, olen) == 0);
472 free_pool_memory(rmsg);
484 m_CommCompressedBytes += msglen;
490 * Send a message over the network. Everything is sent in one
491 * write request, but depending on the mode you are using
492 * there will be either two or three read requests done.
493 * Read 1: 32 bits, gets essentially the packet length, but may
494 * have the upper bits set to indicate compression or
495 * an extended header packet.
496 * Read 2: 32 bits, this read is done only of BNET_HDR_EXTEND is set.
497 * In this case the top 16 bits of this 32 bit word are reserved
498 * for flags and the lower 16 bits for data. This word will be
499 * stored in the field "flags" in the BSOCK packet.
500 * Read 2 or 3: depending on if Read 2 is done. This is the data.
502 * For normal comm line compression, the whole data packet is compressed
503 * but not the msglen (first read).
504 * To do data compression rather than full comm line compression, prior to
505 * call send(flags) where the lower 32 bits is the offset to the data to
506 * be compressed. The top 32 bits are reserved for flags that can be
508 * BNET_IS_CMD We are sending a command
509 * BNET_OFFSET An offset is specified (this implies data compression)
510 * BNET_NOCOMPRESS Inhibit normal comm line compression
511 * BNET_DATACOMPRESSED The data using the specified offset was
512 * compressed, and normal comm line compression will
514 * If any of the above bits are set, then BNET_HDR_EXTEND will be set
515 * in the top bits of msglen, and the full set of flags + the offset
516 * will be passed as a 32 bit word just after the msglen, and then
517 * followed by any data that is either compressed or not.
519 * Note, neither comm line nor data compression is not
520 * guaranteed since it may result in more data, in which case, the
521 * record is sent uncompressed and there will be no offset.
522 * On the receive side, if BNET_OFFSET is set, then the data is compressed.
524 * Returns: false on failure
527 bool BSOCK::send(int aflags)
541 if (!m_suppress_error_msgs) {
542 Qmsg0(m_jcr, M_ERROR, 0, _("Socket is closed\n"));
547 if (!m_suppress_error_msgs) {
548 Qmsg4(m_jcr, M_ERROR, 0, _("Socket has errors=%d on call to %s:%s:%d\n"),
549 errors, m_who, m_host, m_port);
553 if (is_terminated()) {
554 if (!m_suppress_error_msgs) {
555 Qmsg4(m_jcr, M_ERROR, 0, _("Bsock send while terminated=%d on call to %s:%s:%d\n"),
556 is_terminated(), m_who, m_host, m_port);
561 if (msglen > 4000000) {
562 if (!m_suppress_error_msgs) {
563 Qmsg4(m_jcr, M_ERROR, 0,
564 _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
565 msglen, m_who, m_host, m_port);
571 if (!send_hook_cb->bsock_send_cb()) {
572 Dmsg3(1, "Flowcontrol failure on %s:%s:%d\n", m_who, m_host, m_port);
573 Qmsg3(m_jcr, M_ERROR, 0,
574 _("Flowcontrol failure on %s:%s:%d\n"),
575 m_who, m_host, m_port);
583 save_msglen = msglen;
587 offset = aflags & 0xFF; /* offset is 16 bits */
589 m_flags |= BNET_OFFSET;
591 if (m_flags & BNET_DATACOMPRESSED) { /* Check if already compressed */
593 } else if (m_flags & BNET_NOCOMPRESS) {
596 compressed = comm_compress(); /* do requested compression */
598 if (offset && compressed) {
599 m_flags |= BNET_DATACOMPRESSED;
602 m_flags &= ~BNET_COMPRESSED;
605 /* Compute total packet length */
607 hdrsiz = sizeof(pktsiz);
608 pktsiz = hdrsiz; /* signal, no data */
609 } else if (m_flags) {
610 hdrsiz = 2 * sizeof(pktsiz); /* have 64 bit header */
611 pktsiz = msglen + hdrsiz;
613 hdrsiz = sizeof(pktsiz); /* have 32 bit header */
614 pktsiz = msglen + hdrsiz;
617 /* Set special bits */
618 if (m_flags & BNET_OFFSET) { /* if data compression on */
619 compressed = false; /* no comm compression */
622 msglen |= BNET_COMPRESSED; /* comm line compression */
626 msglen |= BNET_HDR_EXTEND; /* extended header */
630 * Store packet length at head of message -- note, we
631 * have reserved an int32_t just before msg, so we can
634 hdrptr = (int32_t *)(msg - hdrsiz);
635 *hdrptr = htonl(msglen); /* store signal/length */
637 *(hdrptr+1) = htonl(m_flags); /* store flags */
640 (*pout_msg_no)++; /* increment message number */
642 /* send data packet */
643 timer_start = watchdog_time; /* start timer */
645 /* Full I/O done in one write */
646 rc = write_nbytes(this, (char *)hdrptr, pktsiz);
647 if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, save_msg, save_msglen);
648 timer_start = 0; /* clear timer */
657 if (!m_suppress_error_msgs) {
658 Qmsg5(m_jcr, M_ERROR, 0,
659 _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"),
661 m_host, m_port, this->bstrerror());
664 Qmsg5(m_jcr, M_ERROR, 0,
665 _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"),
666 pktsiz, m_who, m_host, m_port, rc);
670 // Dmsg4(000, "cmpr=%d ext=%d cmd=%d m_flags=0x%x\n", msglen&BNET_COMPRESSED?1:0,
671 // msglen&BNET_HDR_EXTEND?1:0, msglen&BNET_CMD_BIT?1:0, m_flags);
672 msglen = save_msglen;
674 if (locked) pV(pm_wmutex);
679 * Format and send a message
680 * Returns: false on error
683 bool BSOCK::fsend(const char *fmt, ...)
689 return false; /* do not seg fault */
691 if (errors || is_terminated() || is_closed()) {
694 /* This probably won't work, but we vsnprintf, then if we
695 * get a negative length or a length greater than our buffer
696 * (depending on which library is used), the printf was truncated, so
697 * get a bigger buffer and try again.
700 maxlen = sizeof_pool_memory(msg) - 1;
701 va_start(arg_ptr, fmt);
702 msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
704 if (msglen >= 0 && msglen < (maxlen - 5)) {
707 msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
713 * Receive a message from the other end. Each message consists of
714 * two packets. The first is a header that contains the size
715 * of the data that follows in the second packet.
716 * Returns number of bytes read (may return zero)
717 * Returns -1 on signal (BNET_SIGNAL)
718 * Returns -2 on hard end of file (BNET_HARDEOF)
719 * Returns -3 on error (BNET_ERROR)
720 * Returns -4 on COMMAND (BNET_COMMAND)
721 * Unfortunately, it is a bit complicated because we have these
724 * 2. Signal including end of data stream
725 * 3. Hard end of file
727 * Using bsock->is_stop() and bsock->is_error() you can figure this all out.
729 int32_t BSOCK::recv()
733 int32_t o_pktsiz = 0;
734 bool compressed = false;
735 bool command = false;
738 cmsg[0] = msg[0] = 0;
741 if (errors || is_terminated() || is_closed()) {
749 read_seqno++; /* bump sequence number */
750 timer_start = watchdog_time; /* set start wait time */
752 /* get data size -- in int32_t */
753 if ((nbytes = read_nbytes(this, (char *)&pktsiz, sizeof(int32_t))) <= 0) {
754 timer_start = 0; /* clear timer */
755 /* probably pipe broken because client died */
762 nbytes = BNET_HARDEOF; /* assume hard EOF received */
765 timer_start = 0; /* clear timer */
766 if (nbytes != sizeof(int32_t)) {
769 Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
770 sizeof(int32_t), nbytes, m_who, m_host, m_port);
775 pktsiz = ntohl(pktsiz); /* decode no. of bytes that follow */
777 /* If extension, read it */
778 if (pktsiz > 0 && (pktsiz & BNET_HDR_EXTEND)) {
779 timer_start = watchdog_time; /* set start wait time */
781 if ((nbytes = read_nbytes(this, (char *)&m_flags, sizeof(int32_t))) <= 0) {
782 timer_start = 0; /* clear timer */
783 /* probably pipe broken because client died */
790 nbytes = BNET_HARDEOF; /* assume hard EOF received */
793 timer_start = 0; /* clear timer */
794 if (nbytes != sizeof(int32_t)) {
797 Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
798 sizeof(int32_t), nbytes, m_who, m_host, m_port);
802 pktsiz &= ~BNET_HDR_EXTEND;
803 m_flags = ntohl(m_flags);
806 if (pktsiz > 0 && (pktsiz & BNET_COMPRESSED)) {
808 pktsiz &= ~BNET_COMPRESSED;
811 if (m_flags & BNET_IS_CMD) {
814 if (m_flags & BNET_OFFSET) {
818 if (pktsiz == 0) { /* No data transferred */
819 timer_start = 0; /* clear timer */
822 nbytes = 0; /* zero bytes read */
826 /* If signal or packet size too big */
827 if (pktsiz < 0 || pktsiz > 1000000) {
828 if (pktsiz > 0) { /* if packet too big */
829 Qmsg4(m_jcr, M_FATAL, 0,
830 _("Packet size=%d too big from \"%s:%s:%d\". Maximum permitted 1000000. Terminating connection.\n"),
831 pktsiz, m_who, m_host, m_port);
832 pktsiz = BNET_TERMINATE; /* hang up */
834 if (pktsiz == BNET_TERMINATE) {
837 timer_start = 0; /* clear timer */
839 msglen = pktsiz; /* signal code */
840 nbytes = BNET_SIGNAL; /* signal */
844 /* Make sure the buffer is big enough + one byte for EOS */
845 if (pktsiz >= (int32_t) sizeof_pool_memory(msg)) {
846 msg = realloc_pool_memory(msg, pktsiz + 100);
849 timer_start = watchdog_time; /* set start wait time */
851 /* now read the actual data */
852 if ((nbytes = read_nbytes(this, msg, pktsiz)) <= 0) {
853 timer_start = 0; /* clear timer */
860 Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"),
861 m_who, m_host, m_port, this->bstrerror());
865 timer_start = 0; /* clear timer */
868 if (nbytes != pktsiz) {
871 Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
872 pktsiz, nbytes, m_who, m_host, m_port);
876 /* If compressed uncompress it */
879 int psize = nbytes * 4;
880 if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) {
881 cmsg = realloc_pool_memory(cmsg, psize);
883 psize = sizeof_pool_memory(cmsg);
884 if (m_flags & BNET_OFFSET) {
885 offset = m_flags & 0xFF;
889 /* Grow buffer to max approx 4MB */
890 for (int i=0; i < 7; i++) {
891 nbytes = LZ4_decompress_safe(msg, cmsg, msglen, psize);
900 if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) {
901 cmsg = realloc_pool_memory(cmsg, psize + 100);
904 if (m_flags & BNET_OFFSET) {
909 Jmsg1(m_jcr, M_ERROR, 0, "Decompress error!!!! ERR=%d\n", nbytes);
910 Pmsg3(000, "Decompress error!! pktsiz=%d cmsgsiz=%d nbytes=%d\n", pktsiz,
914 Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
915 pktsiz, nbytes, m_who, m_host, m_port);
920 /* Make sure the buffer is big enough + one byte for EOS */
921 if (msglen >= (int32_t)sizeof_pool_memory(msg)) {
922 msg = realloc_pool_memory(msg, msglen + 100);
924 /* If this is a data decompress, leave msg compressed */
925 if (!(m_flags & BNET_OFFSET)) {
926 memcpy(msg, cmsg, msglen);
930 /* always add a zero by to properly terminate any
931 * string that was send to us. Note, we ensured above that the
932 * buffer is at least one byte longer than the message length.
934 msg[nbytes] = 0; /* terminate in case it is a string */
936 * The following uses *lots* of resources so turn it on only for
942 if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "RECV", nbytes, o_pktsiz, m_flags, msg, msglen);
943 if (nbytes != BNET_ERROR && command) {
944 nbytes = BNET_COMMAND;
947 if (locked) pV(pm_rmutex);
948 return nbytes; /* return actual length of message */
954 bool BSOCK::signal(int signal)
957 if (signal == BNET_TERMINATE) {
958 m_suppress_error_msgs = true;
964 * Despool spooled attributes
966 bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize)
970 ssize_t last = 0, size = 0;
972 JCR *jcr = get_jcr();
976 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
977 posix_fadvise(fileno(m_spool_fd), 0, 0, POSIX_FADV_WILLNEED);
980 while (fread((char *)&pktsiz, 1, sizeof(int32_t), m_spool_fd) ==
982 size += sizeof(int32_t);
983 msglen = ntohl(pktsiz);
985 if (msglen > (int32_t)sizeof_pool_memory(msg)) {
986 msg = realloc_pool_memory(msg, msglen + 1);
988 nbytes = fread(msg, 1, msglen, m_spool_fd);
989 if (nbytes != (size_t)msglen) {
991 Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen);
992 Qmsg2(get_jcr(), M_FATAL, 0, _("fread attr spool error. Wanted=%d got=%d bytes.\n"),
994 update_attr_spool_size(tsize - last);
998 if ((++count & 0x3F) == 0) {
999 update_attr_spool_size(size - last);
1004 if (jcr && job_canceled(jcr)) {
1008 update_attr_spool_size(tsize - last);
1009 if (ferror(m_spool_fd)) {
1010 Qmsg(jcr, M_FATAL, 0, _("fread attr spool I/O error.\n"));
1017 * Return the string for the error that occurred
1018 * on the socket. Only the first error is retained.
1020 const char *BSOCK::bstrerror()
1023 if (errmsg == NULL) {
1024 errmsg = get_pool_memory(PM_MESSAGE);
1027 pm_strcpy(errmsg, "I/O Error");
1029 pm_strcpy(errmsg, be.bstrerror(b_errno));
1034 int BSOCK::get_peer(char *buf, socklen_t buflen)
1036 #if !defined(HAVE_WIN32)
1037 if (peer_addr.sin_family == 0) {
1038 socklen_t salen = sizeof(peer_addr);
1039 int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
1040 if (rval < 0) return rval;
1042 if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
1052 * Set the network buffer size, suggested size is in size.
1053 * Actual size obtained is returned in bs->msglen
1055 * Returns: false on failure
1058 bool BSOCK::set_buffer_size(uint32_t size, int rw)
1060 uint32_t dbuf_size, start_size;
1062 #if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
1064 opt = IPTOS_THROUGHPUT;
1065 setsockopt(m_fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
1071 dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
1073 start_size = dbuf_size;
1074 if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) {
1075 Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCK data buffer\n"));
1080 * If user has not set the size, use the OS default -- i.e. do not
1081 * try to set it. This allows sys admins to set the size they
1082 * want in the OS, and Bacula will comply. See bug #1493
1089 if (rw & BNET_SETBUF_READ) {
1090 while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
1091 SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
1093 Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
1094 dbuf_size -= TAPE_BSIZE;
1096 Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
1097 if (dbuf_size != start_size) {
1098 Qmsg1(get_jcr(), M_WARNING, 0,
1099 _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
1105 dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
1107 start_size = dbuf_size;
1108 if (rw & BNET_SETBUF_WRITE) {
1109 while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
1110 SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
1112 Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
1113 dbuf_size -= TAPE_BSIZE;
1115 Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
1116 if (dbuf_size != start_size) {
1117 Qmsg1(get_jcr(), M_WARNING, 0,
1118 _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
1127 * Set socket non-blocking
1128 * Returns previous socket flag
1130 int BSOCK::set_nonblocking()
1135 /* Get current flags */
1136 if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
1138 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
1141 /* Set O_NONBLOCK flag */
1142 if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
1144 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1151 u_long ioctlArg = 1;
1154 ioctlsocket(m_fd, FIONBIO, &ioctlArg);
1162 * Set socket blocking
1163 * Returns previous socket flags
1165 int BSOCK::set_blocking()
1169 /* Get current flags */
1170 if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
1172 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
1175 /* Set O_NONBLOCK flag */
1176 if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
1178 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1185 u_long ioctlArg = 0;
1188 ioctlsocket(m_fd, FIONBIO, &ioctlArg);
1195 void BSOCK::set_killable(bool killable)
1198 m_jcr->set_killable(killable);
1203 * Restores socket flags
1205 void BSOCK::restore_blocking (int flags)
1208 if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
1210 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1213 m_blocking = (flags & O_NONBLOCK) ? true : false;
1215 u_long ioctlArg = flags;
1217 ioctlsocket(m_fd, FIONBIO, &ioctlArg);
1223 * Wait for a specified time for data to appear on
1224 * the BSOCK connection.
1226 * Returns: 1 if data available
1230 int BSOCK::wait_data(int sec, int msec)
1233 switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
1234 case 0: /* timeout */
1239 if (errno == EINTR) {
1242 return -1; /* error return */
1246 if (this->tls && !tls_bsock_probe(this)) {
1247 continue; /* false alarm, maybe a session key negotiation in progress on the socket */
1256 * As above, but returns on interrupt
1258 int BSOCK::wait_data_intr(int sec, int msec)
1260 switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
1261 case 0: /* timeout */
1266 return -1; /* error return */
1270 if (this->tls && !tls_bsock_probe(this)) {
1271 /* maybe a session key negotiation waked up the socket */
1281 * This routine closes the current BSOCK.
1282 * It does not delete the socket packet
1283 * resources, which are released int
1291 * The JCR is canceled, set terminate for chained BSOCKs starting from master
1293 void BSOCK::cancel()
1296 for (BSOCK *next = m_master; next != NULL; next = next->m_next) {
1297 if (!next->m_closed) {
1298 next->m_terminated = true;
1299 next->m_timed_out = true;
1306 * Note, this routine closes the socket, but leaves the
1307 * bsock memory in place.
1308 * every thread is responsible of closing and destroying its own duped or not
1313 BSOCK *bsock = this;
1315 if (bsock->is_closed()) {
1321 bsock->set_closed();
1322 bsock->set_terminated();
1323 if (!bsock->m_duped) {
1324 /* Shutdown tls cleanly. */
1326 tls_bsock_shutdown(bsock);
1327 free_tls_connection(bsock->tls);
1332 if (!bsock->is_timed_out()) {
1333 win_close_wait(bsock->m_fd); /* Ensure that data is not discarded */
1336 if (bsock->is_timed_out()) {
1337 shutdown(bsock->m_fd, SHUT_RDWR); /* discard any pending I/O */
1340 /* On Windows this discards data if we did not do a close_wait() */
1341 socketClose(bsock->m_fd); /* normal close */
1347 * Destroy the socket (i.e. release all resources)
1349 void BSOCK::_destroy()
1351 this->close(); /* Ensure that socket is closed */
1354 free_pool_memory(msg);
1357 ASSERT2(1 == 0, "Two calls to destroy socket"); /* double destroy */
1360 free_pool_memory(cmsg);
1364 free_pool_memory(errmsg);
1383 * Destroy the socket (i.e. release all resources)
1384 * including duped sockets.
1385 * should not be called from duped BSOCK
1387 void BSOCK::destroy()
1389 ASSERTD(reinterpret_cast<uintptr_t>(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCK::destroy() already called\n")
1390 ASSERTD(this == m_master, "BSOCK::destroy() called by a non master BSOCK\n")
1391 ASSERTD(!m_duped, "BSOCK::destroy() called by a duped BSOCK\n")
1392 /* I'm the master I must destroy() all the duped BSOCKs */
1395 for (BSOCK *next = m_next; next != NULL; next = ahead) {
1396 ahead = next->m_next;
1403 /* Commands sent to Director */
1404 static char hello[] = "Hello %s calling\n";
1406 /* Response from Director */
1407 static char OKhello[] = "1000 OK:";
1410 * Authenticate Director
1412 bool BSOCK::authenticate_director(const char *name, const char *password,
1413 TLS_CONTEXT *tls_ctx, char *errmsg, int errmsg_len)
1415 int tls_local_need = BNET_TLS_NONE;
1416 int tls_remote_need = BNET_TLS_NONE;
1417 int compatible = true;
1418 char bashed_name[MAX_NAME_LENGTH];
1419 BSOCK *dir = this; /* for readability */
1423 * Send my name to the Director then do authentication
1426 /* Timeout Hello after 15 secs */
1427 dir->start_timer(15);
1428 dir->fsend(hello, bashed_name);
1430 if (get_tls_enable(tls_ctx)) {
1431 tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK;
1434 /* respond to Dir challenge */
1435 if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) ||
1436 /* Now challenge dir */
1437 !cram_md5_challenge(dir, password, tls_local_need, compatible)) {
1438 bsnprintf(errmsg, errmsg_len, _("Director authorization error at \"%s:%d\"\n"),
1439 dir->host(), dir->port());
1443 /* Verify that the remote host is willing to meet our TLS requirements */
1444 if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1445 bsnprintf(errmsg, errmsg_len, _("Authorization error:"
1446 " Remote server at \"%s:%d\" did not advertise required TLS support.\n"),
1447 dir->host(), dir->port());
1451 /* Verify that we are willing to meet the remote host's requirements */
1452 if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1453 bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\":"
1454 " Remote server requires TLS.\n"),
1455 dir->host(), dir->port());
1460 /* Is TLS Enabled? */
1462 if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) {
1463 /* Engage TLS! Full Speed Ahead! */
1464 if (!bnet_tls_client(tls_ctx, dir, NULL)) {
1465 bsnprintf(errmsg, errmsg_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
1466 dir->host(), dir->port());
1472 Dmsg1(6, ">dird: %s", dir->msg);
1473 if (dir->recv() <= 0) {
1475 bsnprintf(errmsg, errmsg_len, _("Bad errmsg to Hello command: ERR=%s\n"
1476 "The Director at \"%s:%d\" may not be running.\n"),
1477 dir->bstrerror(), dir->host(), dir->port());
1482 Dmsg1(10, "<dird: %s", dir->msg);
1483 if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) {
1484 bsnprintf(errmsg, errmsg_len, _("Director at \"%s:%d\" rejected Hello command\n"),
1485 dir->host(), dir->port());
1488 bsnprintf(errmsg, errmsg_len, "%s", dir->msg);
1494 bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\"\n"
1495 "Most likely the passwords do not agree.\n"
1496 "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n"
1497 "For help, please see: " MANUAL_AUTH_URL "\n"),
1498 dir->host(), dir->port());
1502 /* Try to limit the bandwidth of a network connection
1504 void BSOCK::control_bwlimit(int bytes)
1511 now = get_current_btime(); /* microseconds */
1512 temp = now - m_last_tick; /* microseconds */
1514 m_nb_bytes += bytes;
1516 if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */
1522 /* Less than 0.1ms since the last call, see the next time */
1527 /* Remove what was authorised to be written in temp us */
1528 m_nb_bytes -= (int64_t)(temp * ((double)m_bwlimit / 1000000.0));
1530 if (m_nb_bytes < 0) {
1534 /* What exceed should be converted in sleep time */
1535 int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0));
1536 if (usec_sleep > 100) {
1537 bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */
1538 m_last_tick = get_current_btime();
1547 * closesocket is supposed to do a graceful disconnect under Window
1548 * but it doesn't. Comments on http://msdn.microsoft.com/en-us/li
1549 * confirm this behaviour. DisconnectEx is required instead, but
1550 * that function needs to be retrieved via WS IOCTL
1553 win_close_wait(int fd)
1556 GUID disconnectex_guid = WSAID_DISCONNECTEX;
1557 DWORD bytes_returned;
1558 LPFN_DISCONNECTEX DisconnectEx;
1559 ret = WSAIoctl(fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &disconnectex_guid, sizeof(disconnectex_guid), &DisconnectEx, sizeof(DisconnectEx), &bytes_returned, NULL, NULL);
1560 Dmsg1(100, "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER, WSAID_DISCONNECTEX) ret = %d\n", ret);
1562 DisconnectEx(fd, NULL, 0, 0);