2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2017 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Network Utility Routines
22 * Written by Kern Sibbald
29 #include <netinet/tcp.h>
31 #if !defined(ENODATA) /* not defined on BSD systems */
35 #if !defined(SOL_TCP) /* Not defined on some systems */
36 #define SOL_TCP IPPROTO_TCP
39 #define socketRead(fd, buf, len) ::read(fd, buf, len)
40 #define socketWrite(fd, buf, len) ::write(fd, buf, len)
41 #define socketClose(fd) ::close(fd)
44 * make a nice dump of a message
46 void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags, POOLMEM *msg, int32_t msglen)
53 Dmsg4(dbglvl, "%s %d:%d SIGNAL=%s\n", what, sock, msgno, bnet_sig_to_ascii(msglen));
55 smartdump(msg, msglen, buf, sizeof(buf)-9, &is_ascii);
57 Dmsg5(dbglvl, "%s %d:%d len=%d \"%s\"\n", what, sock, msgno, msglen, buf);
59 Dmsg5(dbglvl, "%s %d:%d len=%d %s\n", what, sock, msgno, msglen, buf);
65 BSOCKCallback::BSOCKCallback()
69 BSOCKCallback::~BSOCKCallback()
75 * This is a non-class BSOCK "constructor" because we want to
76 * call the Bacula smartalloc routines instead of new.
80 BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK));
87 memset(this, 0, sizeof(BSOCK));
92 pout_msg_no = &out_msg_no;
93 uninstall_send_hook_cb();
94 msg = get_pool_memory(PM_BSOCK);
95 cmsg = get_pool_memory(PM_BSOCK);
96 errmsg = get_pool_memory(PM_MESSAGE);
97 timeout = BSOCK_TIMEOUT;
100 void BSOCK::free_tls()
102 free_tls_connection(this->tls);
107 * Try to connect to host for max_retry_time at retry_time intervals.
108 * Note, you must have called the constructor prior to calling
111 bool BSOCK::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
113 const char *name, char *host, char *service, int port,
119 time_t begin_time = time(NULL);
121 btimer_t *tid = NULL;
123 /* Try to trap out of OS call when time expires */
124 if (max_retry_time) {
125 tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
128 for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
129 i -= retry_interval) {
131 if (fatal || (jcr && job_canceled(jcr))) {
134 Dmsg4(50, "Unable to connect to %s on %s:%d. ERR=%s\n",
135 name, host, port, be.bstrerror());
137 i = 60 * 5; /* complain again in 5 minutes */
139 Qmsg4(jcr, M_WARNING, 0, _(
140 "Could not connect to %s on %s:%d. ERR=%s\n"
141 "Retrying ...\n"), name, host, port, be.bstrerror());
143 bmicrosleep(retry_interval, 0);
145 if (begin_time + max_retry_time <= now) {
146 Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
147 name, host, port, be.bstrerror());
155 stop_thread_timer(tid);
161 * Finish initialization of the packet structure.
163 void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
164 struct sockaddr *lclient_addr)
166 Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
174 set_who(bstrdup(who));
175 set_host(bstrdup(host));
177 memcpy(&client_addr, lclient_addr, sizeof(client_addr));
182 * Copy the address from the configuration dlist that gets passed in
184 void BSOCK::set_source_address(dlist *src_addr_list)
188 // delete the object we already have, if it's allocated
195 addr = (IPADDR*) src_addr_list->first();
196 src_addr = New( IPADDR(*addr));
201 * Open a TCP connection to the server
203 * Returns BSOCK * pointer on success
205 bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
206 int port, utime_t heart_beat, int *fatal)
211 bool connected = false;
217 * Fill in the structure serv_addr with the address of
218 * the server that we want to connect with.
220 if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
221 /* Note errstr is not malloc'ed */
222 Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"),
224 Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
230 remove_duplicate_addresses(addr_list);
231 foreach_dlist(ipaddr, addr_list) {
232 ipaddr->set_port_net(htons(port));
233 char allbuf[256 * 10];
235 Dmsg2(100, "Current %sAll %s\n",
236 ipaddr->build_address_str(curbuf, sizeof(curbuf)),
237 build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
238 /* Open a TCP socket */
239 if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) {
246 * The name lookup of the host returned an address in a protocol family
247 * we don't support. Suppress the error and try the next address.
251 #ifdef EPROTONOSUPPORT
252 /* See above comments */
253 case EPROTONOSUPPORT:
257 /* See above comments */
263 Qmsg3(jcr, M_ERROR, 0, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
264 ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
265 Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
266 ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
272 /* Bind to the source address if it is set */
274 if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
278 Qmsg2(jcr, M_ERROR, 0, _("Source address bind error. proto=%d. ERR=%s\n"),
279 src_addr->get_family(), be.bstrerror() );
280 Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
281 src_addr->get_family(), be.bstrerror() );
282 if (sockfd >= 0) socketClose(sockfd);
288 * Keep socket from timing out from inactivity
290 if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
292 Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
295 #if defined(TCP_KEEPIDLE)
297 int opt = heart_beat;
298 if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
300 Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"),
306 /* connect to server */
307 if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
309 if (sockfd >= 0) socketClose(sockfd);
319 free_addresses(addr_list);
320 errno = save_errno | b_errno_win32;
321 Dmsg4(50, "Could not connect to server %s %s:%d. ERR=%s\n",
322 name, host, port, be.bstrerror());
326 * Keep socket from timing out from inactivity
327 * Do this a second time out of paranoia
329 if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
331 Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
334 fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
335 free_addresses(addr_list);
337 /* Clean the packet a bit */
341 m_use_locking = false;
343 m_terminated = false;
344 m_suppress_error_msgs = false;
348 Dmsg3(50, "OK connected to server %s %s:%d.\n",
355 * Force read/write to use locking
357 bool BSOCK::set_locking()
361 return true; /* already set */
363 pm_rmutex = &m_rmutex;
364 pm_wmutex = &m_wmutex;
365 if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) {
367 Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock read mutex. ERR=%s\n"),
371 if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) {
373 Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock write mutex. ERR=%s\n"),
377 if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) {
379 Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock attribute mutex. ERR=%s\n"),
383 m_use_locking = true;
387 void BSOCK::clear_locking()
389 if (!m_use_locking || m_duped) {
392 m_use_locking = false;
393 pthread_mutex_destroy(pm_rmutex);
394 pthread_mutex_destroy(pm_wmutex);
395 pthread_mutex_destroy(&m_mmutex);
402 * Do comm line compression (LZ4) of a bsock message.
403 * Returns: true if the compression was done
404 * false if no compression was done
405 * The "offset" defines where to start compressing the message. This
406 * allows passing "header" information uncompressed and the actual
407 * data part compressed.
409 * Note, we don't compress lines less than 20 characters because we
410 * want to save at least 10 characters otherwise compression doesn't
411 * help enough to warrant doing the decompression.
413 bool BSOCK::comm_compress()
415 bool compress = false;
416 bool compressed = false;
417 int offset = m_flags & 0xFF;
420 * Enable compress if allowed and not spooling and the
421 * message is long enough (>20) to get some reasonable savings.
424 compress = can_compress() && !is_spooling();
426 m_CommBytes += msglen; /* uncompressed bytes */
427 Dmsg4(DT_NETWORK|200, "can_compress=%d compress=%d CommBytes=%lld CommCompresedBytes=%lld\n",
428 can_compress(), compress, m_CommBytes, m_CommCompressedBytes);
433 ASSERT2(offset <= msglen, "Comm offset bigger than message\n");
434 ASSERT2(offset < 255, "Offset greater than 254\n");
435 need_size = LZ4_compressBound(msglen);
436 if (need_size >= ((int32_t)sizeof_pool_memory(cmsg))) {
437 cmsg = realloc_pool_memory(cmsg, need_size + 100);
442 clen = LZ4_compress_default(msg, cmsg, msglen, msglen);
443 //Dmsg2(000, "clen=%d msglen=%d\n", clen, msglen);
444 /* Compression should save at least 10 characters */
445 if (clen > 0 && clen + 10 <= msglen) {
448 /* Debug code -- decompress and compare */
449 int blen, rlen, olen;
451 POOLMEM *rmsg = get_pool_memory(PM_BSOCK);
452 blen = sizeof_pool_memory(msg) * 2;
453 if (blen >= sizeof_pool_memory(rmsg)) {
454 rmsg = realloc_pool_memory(rmsg, blen);
456 rlen = LZ4_decompress_safe(cmsg, rmsg, clen, blen);
457 //Dmsg4(000, "blen=%d clen=%d olen=%d rlen=%d\n", blen, clen, olen, rlen);
458 ASSERT(olen == rlen);
459 ASSERT(memcmp(msg, rmsg, olen) == 0);
460 free_pool_memory(rmsg);
472 m_CommCompressedBytes += msglen;
478 * Send a message over the network. Everything is sent in one
479 * write request, but depending on the mode you are using
480 * there will be either two or three read requests done.
481 * Read 1: 32 bits, gets essentially the packet length, but may
482 * have the upper bits set to indicate compression or
483 * an extended header packet.
484 * Read 2: 32 bits, this read is done only of BNET_HDR_EXTEND is set.
485 * In this case the top 16 bits of this 32 bit word are reserved
486 * for flags and the lower 16 bits for data. This word will be
487 * stored in the field "flags" in the BSOCK packet.
488 * Read 2 or 3: depending on if Read 2 is done. This is the data.
490 * For normal comm line compression, the whole data packet is compressed
491 * but not the msglen (first read).
492 * To do data compression rather than full comm line compression, prior to
493 * call send(flags) where the lower 32 bits is the offset to the data to
494 * be compressed. The top 32 bits are reserved for flags that can be
496 * BNET_IS_CMD We are sending a command
497 * BNET_OFFSET An offset is specified (this implies data compression)
498 * BNET_NOCOMPRESS Inhibit normal comm line compression
499 * BNET_DATACOMPRESSED The data using the specified offset was
500 * compressed, and normal comm line compression will
502 * If any of the above bits are set, then BNET_HDR_EXTEND will be set
503 * in the top bits of msglen, and the full set of flags + the offset
504 * will be passed as a 32 bit word just after the msglen, and then
505 * followed by any data that is either compressed or not.
507 * Note, neither comm line nor data compression is not
508 * guaranteed since it may result in more data, in which case, the
509 * record is sent uncompressed and there will be no offset.
510 * On the receive side, if BNET_OFFSET is set, then the data is compressed.
512 * Returns: false on failure
515 bool BSOCK::send(int aflags)
529 if (!m_suppress_error_msgs) {
530 Qmsg0(m_jcr, M_ERROR, 0, _("Socket is closed\n"));
535 if (!m_suppress_error_msgs) {
536 Qmsg4(m_jcr, M_ERROR, 0, _("Socket has errors=%d on call to %s:%s:%d\n"),
537 errors, m_who, m_host, m_port);
541 if (is_terminated()) {
542 if (!m_suppress_error_msgs) {
543 Qmsg4(m_jcr, M_ERROR, 0, _("Bsock send while terminated=%d on call to %s:%s:%d\n"),
544 is_terminated(), m_who, m_host, m_port);
549 if (msglen > 4000000) {
550 if (!m_suppress_error_msgs) {
551 Qmsg4(m_jcr, M_ERROR, 0,
552 _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
553 msglen, m_who, m_host, m_port);
559 if (!send_hook_cb->bsock_send_cb()) {
560 Dmsg3(1, "Flowcontrol failure on %s:%s:%d\n", m_who, m_host, m_port);
561 Qmsg3(m_jcr, M_ERROR, 0,
562 _("Flowcontrol failure on %s:%s:%d\n"),
563 m_who, m_host, m_port);
571 save_msglen = msglen;
575 offset = aflags & 0xFF; /* offset is 16 bits */
577 m_flags |= BNET_OFFSET;
579 if (m_flags & BNET_DATACOMPRESSED) { /* Check if already compressed */
581 } else if (m_flags & BNET_NOCOMPRESS) {
584 compressed = comm_compress(); /* do requested compression */
586 if (offset && compressed) {
587 m_flags |= BNET_DATACOMPRESSED;
590 m_flags &= ~BNET_COMPRESSED;
593 /* Compute total packet length */
595 hdrsiz = sizeof(pktsiz);
596 pktsiz = hdrsiz; /* signal, no data */
597 } else if (m_flags) {
598 hdrsiz = 2 * sizeof(pktsiz); /* have 64 bit header */
599 pktsiz = msglen + hdrsiz;
601 hdrsiz = sizeof(pktsiz); /* have 32 bit header */
602 pktsiz = msglen + hdrsiz;
605 /* Set special bits */
606 if (m_flags & BNET_OFFSET) { /* if data compression on */
607 compressed = false; /* no comm compression */
610 msglen |= BNET_COMPRESSED; /* comm line compression */
614 msglen |= BNET_HDR_EXTEND; /* extended header */
618 * Store packet length at head of message -- note, we
619 * have reserved an int32_t just before msg, so we can
622 hdrptr = (int32_t *)(msg - hdrsiz);
623 *hdrptr = htonl(msglen); /* store signal/length */
625 *(hdrptr+1) = htonl(m_flags); /* store flags */
628 (*pout_msg_no)++; /* increment message number */
630 /* send data packet */
631 timer_start = watchdog_time; /* start timer */
633 /* Full I/O done in one write */
634 rc = write_nbytes(this, (char *)hdrptr, pktsiz);
635 if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, save_msg, save_msglen);
636 timer_start = 0; /* clear timer */
645 if (!m_suppress_error_msgs) {
646 Qmsg5(m_jcr, M_ERROR, 0,
647 _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"),
649 m_host, m_port, this->bstrerror());
652 Qmsg5(m_jcr, M_ERROR, 0,
653 _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"),
654 pktsiz, m_who, m_host, m_port, rc);
658 // Dmsg4(000, "cmpr=%d ext=%d cmd=%d m_flags=0x%x\n", msglen&BNET_COMPRESSED?1:0,
659 // msglen&BNET_HDR_EXTEND?1:0, msglen&BNET_CMD_BIT?1:0, m_flags);
660 msglen = save_msglen;
662 if (locked) pV(pm_wmutex);
667 * Format and send a message
668 * Returns: false on error
671 bool BSOCK::fsend(const char *fmt, ...)
677 return false; /* do not seg fault */
679 if (errors || is_terminated() || is_closed()) {
682 /* This probably won't work, but we vsnprintf, then if we
683 * get a negative length or a length greater than our buffer
684 * (depending on which library is used), the printf was truncated, so
685 * get a bigger buffer and try again.
688 maxlen = sizeof_pool_memory(msg) - 1;
689 va_start(arg_ptr, fmt);
690 msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
692 if (msglen >= 0 && msglen < (maxlen - 5)) {
695 msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
701 * Receive a message from the other end. Each message consists of
702 * two packets. The first is a header that contains the size
703 * of the data that follows in the second packet.
704 * Returns number of bytes read (may return zero)
705 * Returns -1 on signal (BNET_SIGNAL)
706 * Returns -2 on hard end of file (BNET_HARDEOF)
707 * Returns -3 on error (BNET_ERROR)
708 * Returns -4 on COMMAND (BNET_COMMAND)
709 * Unfortunately, it is a bit complicated because we have these
712 * 2. Signal including end of data stream
713 * 3. Hard end of file
715 * Using bsock->is_stop() and bsock->is_error() you can figure this all out.
717 int32_t BSOCK::recv()
721 int32_t o_pktsiz = 0;
722 bool compressed = false;
723 bool command = false;
726 cmsg[0] = msg[0] = 0;
729 if (errors || is_terminated() || is_closed()) {
737 read_seqno++; /* bump sequence number */
738 timer_start = watchdog_time; /* set start wait time */
740 /* get data size -- in int32_t */
741 if ((nbytes = read_nbytes(this, (char *)&pktsiz, sizeof(int32_t))) <= 0) {
742 timer_start = 0; /* clear timer */
743 /* probably pipe broken because client died */
750 nbytes = BNET_HARDEOF; /* assume hard EOF received */
753 timer_start = 0; /* clear timer */
754 if (nbytes != sizeof(int32_t)) {
757 Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
758 sizeof(int32_t), nbytes, m_who, m_host, m_port);
763 pktsiz = ntohl(pktsiz); /* decode no. of bytes that follow */
765 /* If extension, read it */
766 if (pktsiz > 0 && (pktsiz & BNET_HDR_EXTEND)) {
767 timer_start = watchdog_time; /* set start wait time */
769 if ((nbytes = read_nbytes(this, (char *)&m_flags, sizeof(int32_t))) <= 0) {
770 timer_start = 0; /* clear timer */
771 /* probably pipe broken because client died */
778 nbytes = BNET_HARDEOF; /* assume hard EOF received */
781 timer_start = 0; /* clear timer */
782 if (nbytes != sizeof(int32_t)) {
785 Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
786 sizeof(int32_t), nbytes, m_who, m_host, m_port);
790 pktsiz &= ~BNET_HDR_EXTEND;
791 m_flags = ntohl(m_flags);
794 if (pktsiz > 0 && (pktsiz & BNET_COMPRESSED)) {
796 pktsiz &= ~BNET_COMPRESSED;
799 if (m_flags & BNET_IS_CMD) {
802 if (m_flags & BNET_OFFSET) {
806 if (pktsiz == 0) { /* No data transferred */
807 timer_start = 0; /* clear timer */
810 nbytes = 0; /* zero bytes read */
814 /* If signal or packet size too big */
815 if (pktsiz < 0 || pktsiz > 1000000) {
816 if (pktsiz > 0) { /* if packet too big */
817 Qmsg4(m_jcr, M_FATAL, 0,
818 _("Packet size=%d too big from \"%s:%s:%d\". Maximum permitted 1000000. Terminating connection.\n"),
819 pktsiz, m_who, m_host, m_port);
820 pktsiz = BNET_TERMINATE; /* hang up */
822 if (pktsiz == BNET_TERMINATE) {
825 timer_start = 0; /* clear timer */
827 msglen = pktsiz; /* signal code */
828 nbytes = BNET_SIGNAL; /* signal */
832 /* Make sure the buffer is big enough + one byte for EOS */
833 if (pktsiz >= (int32_t) sizeof_pool_memory(msg)) {
834 msg = realloc_pool_memory(msg, pktsiz + 100);
837 timer_start = watchdog_time; /* set start wait time */
839 /* now read the actual data */
840 if ((nbytes = read_nbytes(this, msg, pktsiz)) <= 0) {
841 timer_start = 0; /* clear timer */
848 Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"),
849 m_who, m_host, m_port, this->bstrerror());
853 timer_start = 0; /* clear timer */
856 if (nbytes != pktsiz) {
859 Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
860 pktsiz, nbytes, m_who, m_host, m_port);
864 /* If compressed uncompress it */
867 int psize = nbytes * 4;
868 if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) {
869 cmsg = realloc_pool_memory(cmsg, psize);
871 psize = sizeof_pool_memory(cmsg);
872 if (m_flags & BNET_OFFSET) {
873 offset = m_flags & 0xFF;
877 /* Grow buffer to max approx 4MB */
878 for (int i=0; i < 7; i++) {
879 nbytes = LZ4_decompress_safe(msg, cmsg, msglen, psize);
888 if (psize >= ((int32_t)sizeof_pool_memory(cmsg))) {
889 cmsg = realloc_pool_memory(cmsg, psize + 100);
892 if (m_flags & BNET_OFFSET) {
897 Jmsg1(m_jcr, M_ERROR, 0, "Decompress error!!!! ERR=%d\n", nbytes);
898 Pmsg3(000, "Decompress error!! pktsiz=%d cmsgsiz=%d nbytes=%d\n", pktsiz,
902 Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
903 pktsiz, nbytes, m_who, m_host, m_port);
908 /* Make sure the buffer is big enough + one byte for EOS */
909 if (msglen >= (int32_t)sizeof_pool_memory(msg)) {
910 msg = realloc_pool_memory(msg, msglen + 100);
912 /* If this is a data decompress, leave msg compressed */
913 if (!(m_flags & BNET_OFFSET)) {
914 memcpy(msg, cmsg, msglen);
918 /* always add a zero by to properly terminate any
919 * string that was send to us. Note, we ensured above that the
920 * buffer is at least one byte longer than the message length.
922 msg[nbytes] = 0; /* terminate in case it is a string */
924 * The following uses *lots* of resources so turn it on only for
930 if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "RECV", nbytes, o_pktsiz, m_flags, msg, msglen);
931 if (nbytes != BNET_ERROR && command) {
932 nbytes = BNET_COMMAND;
935 if (locked) pV(pm_rmutex);
936 return nbytes; /* return actual length of message */
942 bool BSOCK::signal(int signal)
945 if (signal == BNET_TERMINATE) {
946 m_suppress_error_msgs = true;
952 * Despool spooled attributes
954 bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize)
958 ssize_t last = 0, size = 0;
960 JCR *jcr = get_jcr();
964 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
965 posix_fadvise(fileno(m_spool_fd), 0, 0, POSIX_FADV_WILLNEED);
968 while (fread((char *)&pktsiz, 1, sizeof(int32_t), m_spool_fd) ==
970 size += sizeof(int32_t);
971 msglen = ntohl(pktsiz);
973 if (msglen > (int32_t)sizeof_pool_memory(msg)) {
974 msg = realloc_pool_memory(msg, msglen + 1);
976 nbytes = fread(msg, 1, msglen, m_spool_fd);
977 if (nbytes != (size_t)msglen) {
979 Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen);
980 Qmsg2(get_jcr(), M_FATAL, 0, _("fread attr spool error. Wanted=%d got=%d bytes.\n"),
982 update_attr_spool_size(tsize - last);
986 if ((++count & 0x3F) == 0) {
987 update_attr_spool_size(size - last);
992 if (jcr && job_canceled(jcr)) {
996 update_attr_spool_size(tsize - last);
997 if (ferror(m_spool_fd)) {
998 Qmsg(jcr, M_FATAL, 0, _("fread attr spool I/O error.\n"));
1005 * Return the string for the error that occurred
1006 * on the socket. Only the first error is retained.
1008 const char *BSOCK::bstrerror()
1011 if (errmsg == NULL) {
1012 errmsg = get_pool_memory(PM_MESSAGE);
1015 pm_strcpy(errmsg, "I/O Error");
1017 pm_strcpy(errmsg, be.bstrerror(b_errno));
1022 int BSOCK::get_peer(char *buf, socklen_t buflen)
1024 #if !defined(HAVE_WIN32)
1025 if (peer_addr.sin_family == 0) {
1026 socklen_t salen = sizeof(peer_addr);
1027 int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
1028 if (rval < 0) return rval;
1030 if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
1040 * Set the network buffer size, suggested size is in size.
1041 * Actual size obtained is returned in bs->msglen
1043 * Returns: false on failure
1046 bool BSOCK::set_buffer_size(uint32_t size, int rw)
1048 uint32_t dbuf_size, start_size;
1050 #if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
1052 opt = IPTOS_THROUGHPUT;
1053 setsockopt(m_fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
1059 dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
1061 start_size = dbuf_size;
1062 if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) {
1063 Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCK data buffer\n"));
1068 * If user has not set the size, use the OS default -- i.e. do not
1069 * try to set it. This allows sys admins to set the size they
1070 * want in the OS, and Bacula will comply. See bug #1493
1077 if (rw & BNET_SETBUF_READ) {
1078 while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
1079 SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
1081 Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
1082 dbuf_size -= TAPE_BSIZE;
1084 Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
1085 if (dbuf_size != start_size) {
1086 Qmsg1(get_jcr(), M_WARNING, 0,
1087 _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
1093 dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
1095 start_size = dbuf_size;
1096 if (rw & BNET_SETBUF_WRITE) {
1097 while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
1098 SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
1100 Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
1101 dbuf_size -= TAPE_BSIZE;
1103 Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
1104 if (dbuf_size != start_size) {
1105 Qmsg1(get_jcr(), M_WARNING, 0,
1106 _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
1115 * Set socket non-blocking
1116 * Returns previous socket flag
1118 int BSOCK::set_nonblocking()
1122 /* Get current flags */
1123 if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
1125 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
1128 /* Set O_NONBLOCK flag */
1129 if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
1131 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1139 * Set socket blocking
1140 * Returns previous socket flags
1142 int BSOCK::set_blocking()
1145 /* Get current flags */
1146 if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
1148 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
1151 /* Set O_NONBLOCK flag */
1152 if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
1154 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1161 void BSOCK::set_killable(bool killable)
1164 m_jcr->set_killable(killable);
1169 * Restores socket flags
1171 void BSOCK::restore_blocking (int flags)
1173 if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
1175 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
1178 m_blocking = (flags & O_NONBLOCK) ? true : false;
1182 * Wait for a specified time for data to appear on
1183 * the BSOCK connection.
1185 * Returns: 1 if data available
1189 int BSOCK::wait_data(int sec, int msec)
1192 switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
1193 case 0: /* timeout */
1198 if (errno == EINTR) {
1201 return -1; /* error return */
1205 if (this->tls && !tls_bsock_probe(this)) {
1206 continue; /* false alarm, maybe a session key negotiation in progress on the socket */
1215 * As above, but returns on interrupt
1217 int BSOCK::wait_data_intr(int sec, int msec)
1219 switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
1220 case 0: /* timeout */
1225 return -1; /* error return */
1229 if (this->tls && !tls_bsock_probe(this)) {
1230 /* maybe a session key negotiation waked up the socket */
1240 * This routine closes the current BSOCK.
1241 * It does not delete the socket packet
1242 * resources, which are released int
1250 * The JCR is canceled, set terminate for chained BSOCKs starting from master
1252 void BSOCK::cancel()
1255 for (BSOCK *next = m_master; next != NULL; next = next->m_next) {
1256 if (!next->m_closed) {
1257 next->m_terminated = true;
1258 next->m_timed_out = true;
1265 * Note, this routine closes the socket, but leaves the
1266 * bsock memory in place.
1267 * every thread is responsible of closing and destroying its own duped or not
1272 BSOCK *bsock = this;
1274 if (bsock->is_closed()) {
1280 bsock->set_closed();
1281 bsock->set_terminated();
1282 if (!bsock->m_duped) {
1283 /* Shutdown tls cleanly. */
1285 tls_bsock_shutdown(bsock);
1286 free_tls_connection(bsock->tls);
1290 if (bsock->is_timed_out()) {
1291 shutdown(bsock->m_fd, SHUT_RDWR); /* discard any pending I/O */
1293 /* On Windows this discards data if we did not do a close_wait() */
1294 socketClose(bsock->m_fd); /* normal close */
1300 * Destroy the socket (i.e. release all resources)
1302 void BSOCK::_destroy()
1304 this->close(); /* Ensure that socket is closed */
1307 free_pool_memory(msg);
1310 ASSERT2(1 == 0, "Two calls to destroy socket"); /* double destroy */
1313 free_pool_memory(cmsg);
1317 free_pool_memory(errmsg);
1336 * Destroy the socket (i.e. release all resources)
1337 * including duped sockets.
1338 * should not be called from duped BSOCK
1340 void BSOCK::destroy()
1342 ASSERTD(reinterpret_cast<uintptr_t>(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCK::destroy() already called\n")
1343 ASSERTD(this == m_master, "BSOCK::destroy() called by a non master BSOCK\n")
1344 ASSERTD(!m_duped, "BSOCK::destroy() called by a duped BSOCK\n")
1345 /* I'm the master I must destroy() all the duped BSOCKs */
1348 for (BSOCK *next = m_next; next != NULL; next = ahead) {
1349 ahead = next->m_next;
1356 /* Commands sent to Director */
1357 static char hello[] = "Hello %s calling\n";
1359 /* Response from Director */
1360 static char OKhello[] = "1000 OK:";
1363 * Authenticate Director
1365 bool BSOCK::authenticate_director(const char *name, const char *password,
1366 TLS_CONTEXT *tls_ctx, char *errmsg, int errmsg_len)
1368 int tls_local_need = BNET_TLS_NONE;
1369 int tls_remote_need = BNET_TLS_NONE;
1370 int compatible = true;
1371 char bashed_name[MAX_NAME_LENGTH];
1372 BSOCK *dir = this; /* for readability */
1376 * Send my name to the Director then do authentication
1379 /* Timeout Hello after 15 secs */
1380 dir->start_timer(15);
1381 dir->fsend(hello, bashed_name);
1383 if (get_tls_enable(tls_ctx)) {
1384 tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK;
1387 /* respond to Dir challenge */
1388 if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) ||
1389 /* Now challenge dir */
1390 !cram_md5_challenge(dir, password, tls_local_need, compatible)) {
1391 bsnprintf(errmsg, errmsg_len, _("Director authorization error at \"%s:%d\"\n"),
1392 dir->host(), dir->port());
1396 /* Verify that the remote host is willing to meet our TLS requirements */
1397 if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1398 bsnprintf(errmsg, errmsg_len, _("Authorization error:"
1399 " Remote server at \"%s:%d\" did not advertise required TLS support.\n"),
1400 dir->host(), dir->port());
1404 /* Verify that we are willing to meet the remote host's requirements */
1405 if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1406 bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\":"
1407 " Remote server requires TLS.\n"),
1408 dir->host(), dir->port());
1413 /* Is TLS Enabled? */
1415 if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) {
1416 /* Engage TLS! Full Speed Ahead! */
1417 if (!bnet_tls_client(tls_ctx, dir, NULL)) {
1418 bsnprintf(errmsg, errmsg_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
1419 dir->host(), dir->port());
1425 Dmsg1(6, ">dird: %s", dir->msg);
1426 if (dir->recv() <= 0) {
1428 bsnprintf(errmsg, errmsg_len, _("Bad errmsg to Hello command: ERR=%s\n"
1429 "The Director at \"%s:%d\" may not be running.\n"),
1430 dir->bstrerror(), dir->host(), dir->port());
1435 Dmsg1(10, "<dird: %s", dir->msg);
1436 if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) {
1437 bsnprintf(errmsg, errmsg_len, _("Director at \"%s:%d\" rejected Hello command\n"),
1438 dir->host(), dir->port());
1441 bsnprintf(errmsg, errmsg_len, "%s", dir->msg);
1447 bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\"\n"
1448 "Most likely the passwords do not agree.\n"
1449 "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n"
1450 "For help, please see: " MANUAL_AUTH_URL "\n"),
1451 dir->host(), dir->port());
1455 /* Try to limit the bandwidth of a network connection
1457 void BSOCK::control_bwlimit(int bytes)
1464 now = get_current_btime(); /* microseconds */
1465 temp = now - m_last_tick; /* microseconds */
1467 m_nb_bytes += bytes;
1469 if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */
1475 /* Less than 0.1ms since the last call, see the next time */
1480 /* Remove what was authorised to be written in temp us */
1481 m_nb_bytes -= (int64_t)(temp * ((double)m_bwlimit / 1000000.0));
1483 if (m_nb_bytes < 0) {
1487 /* What exceed should be converted in sleep time */
1488 int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0));
1489 if (usec_sleep > 100) {
1490 bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */
1491 m_last_tick = get_current_btime();