]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/bsock.c
First try at fixing bug #1897
[bacula/bacula] / bacula / src / lib / bsock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2007-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Network Utility Routines
30  *
31  *  by Kern Sibbald
32  *
33  */
34
35 #include "bacula.h"
36 #include "jcr.h"
37 #include <netdb.h>
38 #include <netinet/tcp.h>
39
40 #ifndef ENODATA                    /* not defined on BSD systems */
41 #define ENODATA EPIPE
42 #endif
43
44 #ifndef SOL_TCP
45 #define SOL_TCP IPPROTO_TCP
46 #endif
47
48 #ifdef HAVE_WIN32
49 #define socketRead(fd, buf, len)  ::recv(fd, buf, len, 0)
50 #define socketWrite(fd, buf, len) ::send(fd, buf, len, 0)
51 #define socketClose(fd)           ::closesocket(fd)
52 #else
53 #define socketRead(fd, buf, len)  ::read(fd, buf, len)
54 #define socketWrite(fd, buf, len) ::write(fd, buf, len)
55 #define socketClose(fd)           ::close(fd)
56 #endif
57
58 /*
59  * This is a non-class BSOCK "constructor"  because we want to 
60  *   call the Bacula smartalloc routines instead of new.
61  */
62 BSOCK *new_bsock()
63 {
64    BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK));
65    bsock->init();
66    return bsock;
67 }
68
69 void BSOCK::init()
70 {
71    memset(this, 0, sizeof(BSOCK));
72    m_blocking = 1;
73    msg = get_pool_memory(PM_BSOCK);
74    errmsg = get_pool_memory(PM_MESSAGE);
75    /*
76     * ****FIXME**** reduce this to a few hours once
77     *   heartbeats are implemented
78     */
79    timeout = 60 * 60 * 6 * 24;   /* 6 days timeout */
80 }
81
82 /*
83  * This is our "class destructor" that ensures that we use
84  *   smartalloc rather than the system free().
85  */
86 void BSOCK::free_bsock()
87 {
88    destroy();
89 }
90
91 void BSOCK::free_tls()
92 {
93    free_tls_connection(this->tls);
94    this->tls = NULL;
95 }   
96
97 /*
98  * Try to connect to host for max_retry_time at retry_time intervals.
99  *   Note, you must have called the constructor prior to calling
100  *   this routine.
101  */
102 bool BSOCK::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
103                     utime_t heart_beat,
104                     const char *name, char *host, char *service, int port,
105                     int verbose)
106 {
107    bool ok = false;
108    int i;
109    int fatal = 0;
110    time_t begin_time = time(NULL);
111    time_t now;
112    btimer_t *tid = NULL;
113
114    /* Try to trap out of OS call when time expires */
115    if (max_retry_time) {
116       tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
117    }
118    
119    for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
120         i -= retry_interval) {
121       berrno be;
122       if (fatal || (jcr && job_canceled(jcr))) {
123          goto bail_out;
124       }
125       Dmsg4(100, "Unable to connect to %s on %s:%d. ERR=%s\n",
126             name, host, port, be.bstrerror());
127       if (i < 0) {
128          i = 60 * 5;               /* complain again in 5 minutes */
129          if (verbose)
130             Qmsg4(jcr, M_WARNING, 0, _(
131                "Could not connect to %s on %s:%d. ERR=%s\n"
132                "Retrying ...\n"), name, host, port, be.bstrerror());
133       }
134       bmicrosleep(retry_interval, 0);
135       now = time(NULL);
136       if (begin_time + max_retry_time <= now) {
137          Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
138                name, host, port, be.bstrerror());
139          goto bail_out;
140       }
141    }
142    ok = true;
143
144 bail_out:
145    if (tid) {
146       stop_thread_timer(tid);
147    }
148    return ok;
149 }
150
151 /*       
152  * Finish initialization of the pocket structure.
153  */
154 void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
155                      struct sockaddr *lclient_addr)
156 {
157    Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
158    m_fd = sockfd;
159    set_who(bstrdup(who));
160    set_host(bstrdup(host));
161    set_port(port);
162    memcpy(&client_addr, lclient_addr, sizeof(client_addr));
163    set_jcr(jcr);
164 }
165
166 /*
167  * Copy the address from the configuration dlist that gets passed in
168  */
169 void BSOCK::set_source_address(dlist *src_addr_list)
170 {
171    IPADDR *addr = NULL;
172
173    // delete the object we already have, if it's allocated
174    if (src_addr) {
175      free( src_addr);
176      src_addr = NULL;
177    }
178
179    if (src_addr_list) {
180      addr = (IPADDR*) src_addr_list->first();
181      src_addr = New( IPADDR(*addr));
182    }
183 }
184
185 /*
186  * Open a TCP connection to the server
187  * Returns NULL
188  * Returns BSOCK * pointer on success
189  */
190 bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
191                  int port, utime_t heart_beat, int *fatal)
192 {
193    int sockfd = -1;
194    dlist *addr_list;
195    IPADDR *ipaddr, *next;
196    bool connected = false;
197    int turnon = 1;
198    const char *errstr;
199    int save_errno = 0;
200
201    /*
202     * Fill in the structure serv_addr with the address of
203     * the server that we want to connect with.
204     */
205    if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
206       /* Note errstr is not malloc'ed */
207       Qmsg2(jcr, M_ERROR, 0, _("bnet_host2ipaddrs() for host \"%s\" failed: ERR=%s\n"),
208             host, errstr);
209       Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
210             host, errstr);
211       *fatal = 1;
212       return false;
213    }
214
215    /*
216     * Remove any duplicate addresses.
217     */
218    for (ipaddr = (IPADDR *)addr_list->first(); ipaddr; ipaddr = (IPADDR *)addr_list->next(ipaddr)) {
219       for (next = (IPADDR *)addr_list->next(ipaddr); next; next = (IPADDR *)addr_list->next(next)) {
220          if (ipaddr->get_sockaddr_len() == next->get_sockaddr_len() &&
221              !memcmp(ipaddr->get_sockaddr(), next->get_sockaddr(),
222                      ipaddr->get_sockaddr_len())) {
223             addr_list->remove(next);
224          }
225       }
226    }
227
228    foreach_dlist(ipaddr, addr_list) {
229       ipaddr->set_port_net(htons(port));
230       char allbuf[256 * 10];
231       char curbuf[256];
232       Dmsg2(100, "Current %sAll %s\n",
233                    ipaddr->build_address_str(curbuf, sizeof(curbuf)),
234                    build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
235       /* Open a TCP socket */
236       if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM, 0)) < 0) {
237          berrno be;
238          save_errno = errno;
239          switch (errno) {
240 #ifdef EAFNOSUPPORT
241          case EAFNOSUPPORT:
242             /*
243              * The name lookup of the host returned an address in a protocol family
244              * we don't support. Suppress the error and try the next address.
245              */
246             break;
247 #endif
248          default:
249             *fatal = 1;
250             Pmsg3(000, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
251                ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
252             break;
253          }
254          continue;
255       }
256
257       /* Bind to the source address if it is set */
258       if (src_addr) {
259          if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
260             berrno be;
261             save_errno = errno;
262             *fatal = 1;
263             Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
264                   src_addr->get_family(), be.bstrerror() );
265             continue;
266          }
267       }
268
269       /*
270        * Keep socket from timing out from inactivity
271        */
272       if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
273          berrno be;
274          Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
275                be.bstrerror());
276       }
277 #if defined(TCP_KEEPIDLE)
278       if (heart_beat) {
279          int opt = heart_beat;
280          if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
281             berrno be;
282             Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"),
283                   be.bstrerror());
284          }
285       }
286 #endif
287
288       /* connect to server */
289       if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
290          save_errno = errno;
291          socketClose(sockfd);
292          continue;
293       }
294       *fatal = 0;
295       connected = true;
296       break;
297    }
298
299    if (!connected) {
300       free_addresses(addr_list);
301       errno = save_errno | b_errno_win32;
302       return false;
303    }
304    /*
305     * Keep socket from timing out from inactivity
306     *   Do this a second time out of paranoia
307     */
308    if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
309       berrno be;
310       Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
311             be.bstrerror());
312    }
313    fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
314    free_addresses(addr_list);
315    return true;
316 }
317
318 /*
319  * Force read/write to use locking
320  */
321 bool BSOCK::set_locking()
322 {
323    int stat;
324    if (m_use_locking) {
325       return true;                      /* already set */
326    }
327    if ((stat = pthread_mutex_init(&m_mutex, NULL)) != 0) {
328       berrno be;
329       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock mutex. ERR=%s\n"),
330          be.bstrerror(stat));
331       return false;
332    }
333    m_use_locking = true;
334    return true;
335 }
336
337 void BSOCK::clear_locking()
338 {
339    if (!m_use_locking) {
340       return;
341    }
342    m_use_locking = false;
343    pthread_mutex_destroy(&m_mutex);
344    return;
345 }
346
347 /*
348  * Send a message over the network. The send consists of
349  * two network packets. The first is sends a 32 bit integer containing
350  * the length of the data packet which follows.
351  *
352  * Returns: false on failure
353  *          true  on success
354  */
355 bool BSOCK::send()
356 {
357    int32_t rc;
358    int32_t pktsiz;
359    int32_t *hdr;
360    bool ok = true;
361
362    if (errors) {
363       if (!m_suppress_error_msgs) {
364          Qmsg4(m_jcr, M_ERROR, 0,  _("Socket has errors=%d on call to %s:%s:%d\n"),
365              errors, m_who, m_host, m_port);
366       }
367       return false;
368    }
369    if (is_terminated()) {
370       if (!m_suppress_error_msgs) {
371          Qmsg4(m_jcr, M_ERROR, 0,  _("Socket is terminated=%d on call to %s:%s:%d\n"),
372              is_terminated(), m_who, m_host, m_port);
373       }
374       return false;
375    }
376    if (msglen > 4000000) {
377       if (!m_suppress_error_msgs) {
378          Qmsg4(m_jcr, M_ERROR, 0,
379             _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
380              msglen, m_who, m_host, m_port);
381       }
382       return false;
383    }
384
385    if (m_use_locking) P(m_mutex);
386    /* Compute total packet length */
387    if (msglen <= 0) {
388       pktsiz = sizeof(pktsiz);               /* signal, no data */
389    } else {
390       pktsiz = msglen + sizeof(pktsiz);      /* data */
391    }
392    /* Store packet length at head of message -- note, we
393     *  have reserved an int32_t just before msg, so we can
394     *  store there 
395     */
396    hdr = (int32_t *)(msg - (int)sizeof(pktsiz));
397    *hdr = htonl(msglen);                     /* store signal/length */
398
399    out_msg_no++;            /* increment message number */
400
401    /* send data packet */
402    timer_start = watchdog_time;  /* start timer */
403    clear_timed_out();
404    /* Full I/O done in one write */
405    rc = write_nbytes(this, (char *)hdr, pktsiz);
406    timer_start = 0;         /* clear timer */
407    if (rc != pktsiz) {
408       errors++;
409       if (errno == 0) {
410          b_errno = EIO;
411       } else {
412          b_errno = errno;
413       }
414       if (rc < 0) {
415          if (!m_suppress_error_msgs) {
416             Qmsg5(m_jcr, M_ERROR, 0,
417                   _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"), 
418                   msglen, m_who,
419                   m_host, m_port, this->bstrerror());
420          }
421       } else {
422          Qmsg5(m_jcr, M_ERROR, 0,
423                _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"),
424                msglen, m_who, m_host, m_port, rc);
425       }
426       ok = false;
427    }
428    if (m_use_locking) V(m_mutex);
429    return ok;
430 }
431
432 /*
433  * Format and send a message
434  *  Returns: false on error
435  *           true  on success
436  */
437 bool BSOCK::fsend(const char *fmt, ...)
438 {
439    va_list arg_ptr;
440    int maxlen;
441
442    if (errors || is_terminated()) {
443       return false;
444    }
445    /* This probably won't work, but we vsnprintf, then if we
446     * get a negative length or a length greater than our buffer
447     * (depending on which library is used), the printf was truncated, so
448     * get a bigger buffer and try again.
449     */
450    for (;;) {
451       maxlen = sizeof_pool_memory(msg) - 1;
452       va_start(arg_ptr, fmt);
453       msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
454       va_end(arg_ptr);
455       if (msglen > 0 && msglen < (maxlen - 5)) {
456          break;
457       }
458       msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
459    }
460    return send();
461 }
462
463 /*
464  * Receive a message from the other end. Each message consists of
465  * two packets. The first is a header that contains the size
466  * of the data that follows in the second packet.
467  * Returns number of bytes read (may return zero)
468  * Returns -1 on signal (BNET_SIGNAL)
469  * Returns -2 on hard end of file (BNET_HARDEOF)
470  * Returns -3 on error  (BNET_ERROR)
471  *
472  *  Unfortunately, it is a bit complicated because we have these
473  *    four return types:
474  *    1. Normal data
475  *    2. Signal including end of data stream
476  *    3. Hard end of file
477  *    4. Error
478  *  Using is_bnet_stop() and is_bnet_error() you can figure this all out.
479  */
480 int32_t BSOCK::recv()
481 {
482    int32_t nbytes;
483    int32_t pktsiz;
484
485    msg[0] = 0;
486    msglen = 0;
487    if (errors || is_terminated()) {
488       return BNET_HARDEOF;
489    }
490
491    if (m_use_locking) P(m_mutex);
492    read_seqno++;            /* bump sequence number */
493    timer_start = watchdog_time;  /* set start wait time */
494    clear_timed_out();
495    /* get data size -- in int32_t */
496    if ((nbytes = read_nbytes(this, (char *)&pktsiz, sizeof(int32_t))) <= 0) {
497       timer_start = 0;      /* clear timer */
498       /* probably pipe broken because client died */
499       if (errno == 0) {
500          b_errno = ENODATA;
501       } else {
502          b_errno = errno;
503       }
504       errors++;
505       nbytes = BNET_HARDEOF;        /* assume hard EOF received */
506       goto get_out;
507    }
508    timer_start = 0;         /* clear timer */
509    if (nbytes != sizeof(int32_t)) {
510       errors++;
511       b_errno = EIO;
512       Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
513             sizeof(int32_t), nbytes, m_who, m_host, m_port);
514       nbytes = BNET_ERROR;
515       goto get_out;
516    }
517
518    pktsiz = ntohl(pktsiz);         /* decode no. of bytes that follow */
519
520    if (pktsiz == 0) {              /* No data transferred */
521       timer_start = 0;             /* clear timer */
522       in_msg_no++;
523       msglen = 0;
524       nbytes = 0;                  /* zero bytes read */
525       goto get_out;
526    }
527
528    /* If signal or packet size too big */
529    if (pktsiz < 0 || pktsiz > 1000000) {
530       if (pktsiz > 0) {            /* if packet too big */
531          Qmsg3(m_jcr, M_FATAL, 0,
532                _("Packet size too big from \"%s:%s:%d. Terminating connection.\n"),
533                m_who, m_host, m_port);
534          pktsiz = BNET_TERMINATE;  /* hang up */
535       }
536       if (pktsiz == BNET_TERMINATE) {
537          set_terminated();
538       }
539       timer_start = 0;                /* clear timer */
540       b_errno = ENODATA;
541       msglen = pktsiz;                /* signal code */
542       nbytes =  BNET_SIGNAL;          /* signal */
543       goto get_out;
544    }
545
546    /* Make sure the buffer is big enough + one byte for EOS */
547    if (pktsiz >= (int32_t) sizeof_pool_memory(msg)) {
548       msg = realloc_pool_memory(msg, pktsiz + 100);
549    }
550
551    timer_start = watchdog_time;  /* set start wait time */
552    clear_timed_out();
553    /* now read the actual data */
554    if ((nbytes = read_nbytes(this, msg, pktsiz)) <= 0) {
555       timer_start = 0;      /* clear timer */
556       if (errno == 0) {
557          b_errno = ENODATA;
558       } else {
559          b_errno = errno;
560       }
561       errors++;
562       Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"),
563             m_who, m_host, m_port, this->bstrerror());
564       nbytes = BNET_ERROR;
565       goto get_out;
566    }
567    timer_start = 0;         /* clear timer */
568    in_msg_no++;
569    msglen = nbytes;
570    if (nbytes != pktsiz) {
571       b_errno = EIO;
572       errors++;
573       Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
574             pktsiz, nbytes, m_who, m_host, m_port);
575       nbytes = BNET_ERROR;
576       goto get_out;
577    }
578    /* always add a zero by to properly terminate any
579     * string that was send to us. Note, we ensured above that the
580     * buffer is at least one byte longer than the message length.
581     */
582    msg[nbytes] = 0; /* terminate in case it is a string */
583    /*
584     * The following uses *lots* of resources so turn it on only for 
585     * serious debugging.
586     */
587    Dsm_check(300);
588
589 get_out:
590    if (m_use_locking) V(m_mutex);
591    return nbytes;                  /* return actual length of message */
592 }
593
594 /*
595  * Send a signal
596  */
597 bool BSOCK::signal(int signal)
598 {
599    msglen = signal;
600    if (signal == BNET_TERMINATE) {
601       m_suppress_error_msgs = true;
602    }
603    return send();
604 }
605
606 /* 
607  * Despool spooled attributes
608  */
609 bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize)
610 {
611    int32_t pktsiz;
612    size_t nbytes;
613    ssize_t last = 0, size = 0;
614    int count = 0;
615    JCR *jcr = get_jcr();
616
617    rewind(m_spool_fd);
618
619 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
620    posix_fadvise(fileno(m_spool_fd), 0, 0, POSIX_FADV_WILLNEED);
621 #endif
622
623    while (fread((char *)&pktsiz, 1, sizeof(int32_t), m_spool_fd) ==
624           sizeof(int32_t)) {
625       size += sizeof(int32_t);
626       msglen = ntohl(pktsiz);
627       if (msglen > 0) {
628          if (msglen > (int32_t)sizeof_pool_memory(msg)) {
629             msg = realloc_pool_memory(msg, msglen + 1);
630          }
631          nbytes = fread(msg, 1, msglen, m_spool_fd);
632          if (nbytes != (size_t)msglen) {
633             berrno be;
634             Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen);
635             Qmsg1(get_jcr(), M_FATAL, 0, _("fread attr spool error. ERR=%s\n"),
636                   be.bstrerror());
637             update_attr_spool_size(tsize - last);
638             return false;
639          }
640          size += nbytes;
641          if ((++count & 0x3F) == 0) {
642             update_attr_spool_size(size - last);
643             last = size;
644          }
645       }
646       send();
647       if (jcr && job_canceled(jcr)) {
648          return false;
649       }
650    }
651    update_attr_spool_size(tsize - last);
652    if (ferror(m_spool_fd)) {
653       Qmsg(jcr, M_FATAL, 0, _("fread attr spool I/O error.\n"));
654       return false;
655    }
656    return true;
657 }
658
659 /*
660  * Return the string for the error that occurred
661  * on the socket. Only the first error is retained.
662  */
663 const char *BSOCK::bstrerror()
664 {
665    berrno be;
666    if (errmsg == NULL) {
667       errmsg = get_pool_memory(PM_MESSAGE);
668    }
669    pm_strcpy(errmsg, be.bstrerror(b_errno));
670    return errmsg;
671 }
672
673 int BSOCK::get_peer(char *buf, socklen_t buflen) 
674 {
675 #if !defined(HAVE_WIN32)
676     if (peer_addr.sin_family == 0) {
677         socklen_t salen = sizeof(peer_addr);
678         int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
679         if (rval < 0) return rval;
680     }
681     if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
682         return -1;
683
684     return 0;
685 #else
686     return -1;
687 #endif
688 }
689
690 /*
691  * Set the network buffer size, suggested size is in size.
692  *  Actual size obtained is returned in bs->msglen
693  *
694  *  Returns: false on failure
695  *           true  on success
696  */
697 bool BSOCK::set_buffer_size(uint32_t size, int rw)
698 {
699    uint32_t dbuf_size, start_size;
700
701 #if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
702    int opt;
703    opt = IPTOS_THROUGHPUT;
704    setsockopt(fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
705 #endif
706
707    if (size != 0) {
708       dbuf_size = size;
709    } else {
710       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
711    }
712    start_size = dbuf_size;
713    if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) {
714       Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCK data buffer\n"));
715       return false;
716    }
717
718    /*
719     * If user has not set the size, use the OS default -- i.e. do not
720     *   try to set it.  This allows sys admins to set the size they
721     *   want in the OS, and Bacula will comply. See bug #1493
722     */
723    if (size == 0) {
724       msglen = dbuf_size;
725       return true;
726    }
727
728    if (rw & BNET_SETBUF_READ) {
729       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
730               SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
731          berrno be;
732          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
733          dbuf_size -= TAPE_BSIZE;
734       }
735       Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
736       if (dbuf_size != start_size) {
737          Qmsg1(get_jcr(), M_WARNING, 0,
738                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
739       }
740    }
741    if (size != 0) {
742       dbuf_size = size;
743    } else {
744       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
745    }
746    start_size = dbuf_size;
747    if (rw & BNET_SETBUF_WRITE) {
748       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
749               SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
750          berrno be;
751          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
752          dbuf_size -= TAPE_BSIZE;
753       }
754       Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
755       if (dbuf_size != start_size) {
756          Qmsg1(get_jcr(), M_WARNING, 0,
757                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
758       }
759    }
760
761    msglen = dbuf_size;
762    return true;
763 }
764
765 /*
766  * Set socket non-blocking
767  * Returns previous socket flag
768  */
769 int BSOCK::set_nonblocking()
770 {
771 #ifndef HAVE_WIN32
772    int oflags;
773
774    /* Get current flags */
775    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
776       berrno be;
777       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
778    }
779
780    /* Set O_NONBLOCK flag */
781    if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
782       berrno be;
783       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
784    }
785
786    m_blocking = 0;
787    return oflags;
788 #else
789    int flags;
790    u_long ioctlArg = 1;
791
792    flags = m_blocking;
793    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
794    m_blocking = 0;
795
796    return flags;
797 #endif
798 }
799
800 /*
801  * Set socket blocking
802  * Returns previous socket flags
803  */
804 int BSOCK::set_blocking()
805 {
806 #ifndef HAVE_WIN32
807    int oflags;
808    /* Get current flags */
809    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
810       berrno be;
811       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
812    }
813
814    /* Set O_NONBLOCK flag */
815    if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
816       berrno be;
817       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
818    }
819
820    m_blocking = 1;
821    return oflags;
822 #else
823    int flags;
824    u_long ioctlArg = 0;
825
826    flags = m_blocking;
827    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
828    m_blocking = 1;
829
830    return flags;
831 #endif
832 }
833
834 void BSOCK::set_killable(bool killable)
835 {
836    if (m_jcr) {
837       m_jcr->set_killable(killable);
838    }
839 }
840
841 /*
842  * Restores socket flags
843  */
844 void BSOCK::restore_blocking (int flags) 
845 {
846 #ifndef HAVE_WIN32
847    if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
848       berrno be;
849       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
850    }
851
852    m_blocking = (flags & O_NONBLOCK) ? true : false;
853 #else
854    u_long ioctlArg = flags;
855
856    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
857    m_blocking = 1;
858 #endif
859 }
860
861 /*
862  * Wait for a specified time for data to appear on
863  * the BSOCK connection.
864  *
865  *   Returns: 1 if data available
866  *            0 if timeout
867  *           -1 if error
868  */
869 int BSOCK::wait_data(int sec, int usec)
870 {
871    fd_set fdset;
872    struct timeval tv;
873
874    FD_ZERO(&fdset);
875    FD_SET((unsigned)m_fd, &fdset);
876    for (;;) {
877       tv.tv_sec = sec;
878       tv.tv_usec = usec;
879       switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) {
880       case 0:                      /* timeout */
881          b_errno = 0;
882          return 0;
883       case -1:
884          b_errno = errno;
885          if (errno == EINTR) {
886             continue;
887          }
888          return -1;                /* error return */
889       default:
890          b_errno = 0;
891          return 1;
892       }
893    }
894 }
895
896 /*
897  * As above, but returns on interrupt
898  */
899 int BSOCK::wait_data_intr(int sec, int usec)
900 {
901    fd_set fdset;
902    struct timeval tv;
903
904    if (this == NULL) {
905       return -1;
906    }
907    FD_ZERO(&fdset);
908    FD_SET((unsigned)m_fd, &fdset);
909    tv.tv_sec = sec;
910    tv.tv_usec = usec;
911    switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) {
912    case 0:                      /* timeout */
913       b_errno = 0;
914       return 0;
915    case -1:
916       b_errno = errno;
917       return -1;                /* error return */
918    default:
919       b_errno = 0;
920       break;
921    }
922    return 1;
923 }
924
925 /*
926  * Note, this routine closes and destroys all the sockets
927  *  that are open including the duped ones.
928  */
929 #ifndef SHUT_RDWR
930 #define SHUT_RDWR 2
931 #endif
932
933 void BSOCK::close()
934 {
935    BSOCK *bsock = this;
936    BSOCK *next;
937
938    if (!m_duped) {
939       clear_locking();
940    }
941    for (; bsock; bsock = next) {
942       next = bsock->m_next;           /* get possible pointer to next before destoryed */
943       if (!bsock->m_duped) {
944          /* Shutdown tls cleanly. */
945          if (bsock->tls) {
946             tls_bsock_shutdown(bsock);
947             free_tls_connection(bsock->tls);
948             bsock->tls = NULL;
949          }
950          if (bsock->is_timed_out()) {
951             shutdown(bsock->m_fd, SHUT_RDWR);   /* discard any pending I/O */
952          }
953          socketClose(bsock->m_fd);      /* normal close */
954       }
955       bsock->destroy();
956    }
957    return;
958 }
959
960 void BSOCK::destroy()
961 {
962    if (msg) {
963       free_pool_memory(msg);
964       msg = NULL;
965    } else {
966       ASSERT(1 == 0);              /* double close */
967    }
968    if (errmsg) {
969       free_pool_memory(errmsg);
970       errmsg = NULL;
971    }
972    if (m_who) {
973       free(m_who);
974       m_who = NULL;
975    }
976    if (m_host) {
977       free(m_host);
978       m_host = NULL;
979    }
980    if (src_addr) {
981       free(src_addr);
982       src_addr = NULL;
983    } 
984    free(this);
985 }
986
987 /* Commands sent to Director */
988 static char hello[]    = "Hello %s calling\n";
989
990 /* Response from Director */
991 static char OKhello[]   = "1000 OK:";
992
993 /*
994  * Authenticate Director
995  */
996 bool BSOCK::authenticate_director(const char *name, const char *password,
997                                   TLS_CONTEXT *tls_ctx, char *response, int response_len)
998 {
999    int tls_local_need = BNET_TLS_NONE;
1000    int tls_remote_need = BNET_TLS_NONE;
1001    int compatible = true;
1002    char bashed_name[MAX_NAME_LENGTH];
1003    BSOCK *dir = this;        /* for readability */
1004
1005    response[0] = 0;
1006    /*
1007     * Send my name to the Director then do authentication
1008     */
1009
1010    /* Timeout Hello after 15 secs */
1011    dir->start_timer(15);
1012    dir->fsend(hello, bashed_name);
1013
1014    if (get_tls_enable(tls_ctx)) {
1015       tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK;
1016    }
1017
1018    /* respond to Dir challenge */
1019    if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) ||
1020        /* Now challenge dir */
1021        !cram_md5_challenge(dir, password, tls_local_need, compatible)) {
1022       bsnprintf(response, response_len, _("Director authorization problem at \"%s:%d\"\n"),
1023          dir->host(), dir->port());
1024       goto bail_out;
1025    }
1026
1027    /* Verify that the remote host is willing to meet our TLS requirements */
1028    if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1029       bsnprintf(response, response_len, _("Authorization problem:"
1030              " Remote server at \"%s:%d\" did not advertise required TLS support.\n"),
1031              dir->host(), dir->port());
1032       goto bail_out;
1033    }
1034
1035    /* Verify that we are willing to meet the remote host's requirements */
1036    if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1037       bsnprintf(response, response_len, _("Authorization problem with Director at \"%s:%d\":"
1038                      " Remote server requires TLS.\n"),
1039                      dir->host(), dir->port());
1040
1041       goto bail_out;
1042    }
1043
1044    /* Is TLS Enabled? */
1045    if (have_tls) {
1046       if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) {
1047          /* Engage TLS! Full Speed Ahead! */
1048          if (!bnet_tls_client(tls_ctx, dir, NULL)) {
1049             bsnprintf(response, response_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
1050                dir->host(), dir->port());
1051             goto bail_out;
1052          }
1053       }
1054    }
1055
1056    Dmsg1(6, ">dird: %s", dir->msg);
1057    if (dir->recv() <= 0) {
1058       dir->stop_timer();
1059       bsnprintf(response, response_len, _("Bad response to Hello command: ERR=%s\n"
1060                       "The Director at \"%s:%d\" is probably not running.\n"),
1061                     dir->bstrerror(), dir->host(), dir->port());
1062       return false;
1063    }
1064
1065    dir->stop_timer();
1066    Dmsg1(10, "<dird: %s", dir->msg);
1067    if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) {
1068       bsnprintf(response, response_len, _("Director at \"%s:%d\" rejected Hello command\n"),
1069          dir->host(), dir->port());
1070       return false;
1071    } else {
1072       bsnprintf(response, response_len, "%s", dir->msg);
1073    }
1074    return true;
1075
1076 bail_out:
1077    dir->stop_timer();
1078    bsnprintf(response, response_len, _("Authorization problem with Director at \"%s:%d\"\n"
1079              "Most likely the passwords do not agree.\n"
1080              "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n"
1081              "Please see " MANUAL_AUTH_URL " for help.\n"),
1082              dir->host(), dir->port());
1083    return false;
1084 }