]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/bsock.c
Fix TCP Heartbeat code
[bacula/bacula] / bacula / src / lib / bsock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2007-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Network Utility Routines
30  *
31  *  by Kern Sibbald
32  *
33  */
34
35 #include "bacula.h"
36 #include "jcr.h"
37 #include <netdb.h>
38 #include <netinet/tcp.h>
39
40 #ifndef ENODATA                    /* not defined on BSD systems */
41 #define ENODATA EPIPE
42 #endif
43
44 #ifdef HAVE_WIN32
45 #define socketRead(fd, buf, len)  ::recv(fd, buf, len, 0)
46 #define socketWrite(fd, buf, len) ::send(fd, buf, len, 0)
47 #define socketClose(fd)           ::closesocket(fd)
48 #else
49 #define socketRead(fd, buf, len)  ::read(fd, buf, len)
50 #define socketWrite(fd, buf, len) ::write(fd, buf, len)
51 #define socketClose(fd)           ::close(fd)
52 #endif
53
54 /*
55  * This is a non-class BSOCK "constructor"  because we want to 
56  *   call the Bacula smartalloc routines instead of new.
57  */
58 BSOCK *new_bsock()
59 {
60    BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK));
61    bsock->init();
62    return bsock;
63 }
64
65 void BSOCK::init()
66 {
67    memset(this, 0, sizeof(BSOCK));
68    m_blocking = 1;
69    msg = get_pool_memory(PM_BSOCK);
70    errmsg = get_pool_memory(PM_MESSAGE);
71    /*
72     * ****FIXME**** reduce this to a few hours once
73     *   heartbeats are implemented
74     */
75    timeout = 60 * 60 * 6 * 24;   /* 6 days timeout */
76 }
77
78 /*
79  * This is our "class destructor" that ensures that we use
80  *   smartalloc rather than the system free().
81  */
82 void BSOCK::free_bsock()
83 {
84    destroy();
85 }
86
87 void BSOCK::free_tls()
88 {
89    free_tls_connection(this->tls);
90    this->tls = NULL;
91 }   
92
93 /*
94  * Try to connect to host for max_retry_time at retry_time intervals.
95  *   Note, you must have called the constructor prior to calling
96  *   this routine.
97  */
98 bool BSOCK::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
99                     utime_t heart_beat,
100                     const char *name, char *host, char *service, int port,
101                     int verbose)
102 {
103    bool ok = false;
104    int i;
105    int fatal = 0;
106    time_t begin_time = time(NULL);
107    time_t now;
108    btimer_t *tid = NULL;
109
110    /* Try to trap out of OS call when time expires */
111    if (max_retry_time) {
112       tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
113    }
114    
115    for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
116         i -= retry_interval) {
117       berrno be;
118       if (fatal || (jcr && job_canceled(jcr))) {
119          goto bail_out;
120       }
121       Dmsg4(100, "Unable to connect to %s on %s:%d. ERR=%s\n",
122             name, host, port, be.bstrerror());
123       if (i < 0) {
124          i = 60 * 5;               /* complain again in 5 minutes */
125          if (verbose)
126             Qmsg4(jcr, M_WARNING, 0, _(
127                "Could not connect to %s on %s:%d. ERR=%s\n"
128                "Retrying ...\n"), name, host, port, be.bstrerror());
129       }
130       bmicrosleep(retry_interval, 0);
131       now = time(NULL);
132       if (begin_time + max_retry_time <= now) {
133          Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
134                name, host, port, be.bstrerror());
135          goto bail_out;
136       }
137    }
138    ok = true;
139
140 bail_out:
141    if (tid) {
142       stop_thread_timer(tid);
143    }
144    return ok;
145 }
146
147 /*       
148  * Finish initialization of the pocket structure.
149  */
150 void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
151                      struct sockaddr *lclient_addr)
152 {
153    Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
154    m_fd = sockfd;
155    set_who(bstrdup(who));
156    set_host(bstrdup(host));
157    set_port(port);
158    memcpy(&client_addr, lclient_addr, sizeof(client_addr));
159    set_jcr(jcr);
160 }
161
162 /*
163  * Copy the address from the configuration dlist that gets passed in
164  */
165 void BSOCK::set_source_address(dlist *src_addr_list)
166 {
167    IPADDR *addr = NULL;
168
169    // delete the object we already have, if it's allocated
170    if (src_addr) {
171      free( src_addr);
172      src_addr = NULL;
173    }
174
175    if (src_addr_list) {
176      addr = (IPADDR*) src_addr_list->first();
177      src_addr = New( IPADDR(*addr));
178    }
179 }
180
181 /*
182  * Open a TCP connection to the server
183  * Returns NULL
184  * Returns BSOCK * pointer on success
185  */
186 bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
187                  int port, utime_t heart_beat, int *fatal)
188 {
189    int sockfd = -1;
190    dlist *addr_list;
191    IPADDR *ipaddr;
192    bool connected = false;
193    int turnon = 1;
194    const char *errstr;
195    int save_errno = 0;
196
197    /*
198     * Fill in the structure serv_addr with the address of
199     * the server that we want to connect with.
200     */
201    if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
202       /* Note errstr is not malloc'ed */
203       Qmsg2(jcr, M_ERROR, 0, _("bnet_host2ipaddrs() for host \"%s\" failed: ERR=%s\n"),
204             host, errstr);
205       Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
206             host, errstr);
207       *fatal = 1;
208       return false;
209    }
210
211    foreach_dlist(ipaddr, addr_list) {
212       ipaddr->set_port_net(htons(port));
213       char allbuf[256 * 10];
214       char curbuf[256];
215       Dmsg2(100, "Current %sAll %s\n",
216                    ipaddr->build_address_str(curbuf, sizeof(curbuf)),
217                    build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
218       /* Open a TCP socket */
219       if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM, 0)) < 0) {
220          berrno be;
221          save_errno = errno;
222          switch (errno) {
223 #ifdef EAFNOSUPPORT
224          case EAFNOSUPPORT:
225             /*
226              * The name lookup of the host returned an address in a protocol family
227              * we don't support. Suppress the error and try the next address.
228              */
229             break;
230 #endif
231          default:
232             *fatal = 1;
233             Pmsg3(000, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
234                ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
235             break;
236          }
237          continue;
238       }
239
240       /* Bind to the source address if it is set */
241       if (src_addr) {
242          if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
243             berrno be;
244             save_errno = errno;
245             *fatal = 1;
246             Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
247                   src_addr->get_family(), be.bstrerror() );
248             continue;
249          }
250       }
251
252       /*
253        * Keep socket from timing out from inactivity
254        */
255       if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
256          berrno be;
257          Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
258                be.bstrerror());
259       }
260 #if defined(TCP_KEEPIDLE)
261       if (heart_beat) {
262          int opt = heart_beat;
263          if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
264             berrno be;
265             Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"),
266                   be.bstrerror());
267          }
268       }
269 #endif
270
271       /* connect to server */
272       if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
273          save_errno = errno;
274          socketClose(sockfd);
275          continue;
276       }
277       *fatal = 0;
278       connected = true;
279       break;
280    }
281
282    if (!connected) {
283       free_addresses(addr_list);
284       errno = save_errno | b_errno_win32;
285       return false;
286    }
287    /*
288     * Keep socket from timing out from inactivity
289     *   Do this a second time out of paranoia
290     */
291    if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
292       berrno be;
293       Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
294             be.bstrerror());
295    }
296    fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
297    free_addresses(addr_list);
298    return true;
299 }
300
301 /*
302  * Force read/write to use locking
303  */
304 bool BSOCK::set_locking()
305 {
306    int stat;
307    if (m_use_locking) {
308       return true;                      /* already set */
309    }
310    if ((stat = pthread_mutex_init(&m_mutex, NULL)) != 0) {
311       berrno be;
312       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock mutex. ERR=%s\n"),
313          be.bstrerror(stat));
314       return false;
315    }
316    m_use_locking = true;
317    return true;
318 }
319
320 void BSOCK::clear_locking()
321 {
322    if (!m_use_locking) {
323       return;
324    }
325    m_use_locking = false;
326    pthread_mutex_destroy(&m_mutex);
327    return;
328 }
329
330 /*
331  * Send a message over the network. The send consists of
332  * two network packets. The first is sends a 32 bit integer containing
333  * the length of the data packet which follows.
334  *
335  * Returns: false on failure
336  *          true  on success
337  */
338 bool BSOCK::send()
339 {
340    int32_t rc;
341    int32_t pktsiz;
342    int32_t *hdr;
343    bool ok = true;
344
345    if (errors) {
346       if (!m_suppress_error_msgs) {
347          Qmsg4(m_jcr, M_ERROR, 0,  _("Socket has errors=%d on call to %s:%s:%d\n"),
348              errors, m_who, m_host, m_port);
349       }
350       return false;
351    }
352    if (is_terminated()) {
353       if (!m_suppress_error_msgs) {
354          Qmsg4(m_jcr, M_ERROR, 0,  _("Socket is terminated=%d on call to %s:%s:%d\n"),
355              is_terminated(), m_who, m_host, m_port);
356       }
357       return false;
358    }
359    if (msglen > 4000000) {
360       if (!m_suppress_error_msgs) {
361          Qmsg4(m_jcr, M_ERROR, 0,
362             _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
363              msglen, m_who, m_host, m_port);
364       }
365       return false;
366    }
367
368    if (m_use_locking) P(m_mutex);
369    /* Compute total packet length */
370    if (msglen <= 0) {
371       pktsiz = sizeof(pktsiz);               /* signal, no data */
372    } else {
373       pktsiz = msglen + sizeof(pktsiz);      /* data */
374    }
375    /* Store packet length at head of message -- note, we
376     *  have reserved an int32_t just before msg, so we can
377     *  store there 
378     */
379    hdr = (int32_t *)(msg - (int)sizeof(pktsiz));
380    *hdr = htonl(msglen);                     /* store signal/length */
381
382    out_msg_no++;            /* increment message number */
383
384    /* send data packet */
385    timer_start = watchdog_time;  /* start timer */
386    clear_timed_out();
387    /* Full I/O done in one write */
388    rc = write_nbytes(this, (char *)hdr, pktsiz);
389    timer_start = 0;         /* clear timer */
390    if (rc != pktsiz) {
391       errors++;
392       if (errno == 0) {
393          b_errno = EIO;
394       } else {
395          b_errno = errno;
396       }
397       if (rc < 0) {
398          if (!m_suppress_error_msgs) {
399             Qmsg5(m_jcr, M_ERROR, 0,
400                   _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"), 
401                   msglen, m_who,
402                   m_host, m_port, this->bstrerror());
403          }
404       } else {
405          Qmsg5(m_jcr, M_ERROR, 0,
406                _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"),
407                msglen, m_who, m_host, m_port, rc);
408       }
409       ok = false;
410    }
411    if (m_use_locking) V(m_mutex);
412    return ok;
413 }
414
415 /*
416  * Format and send a message
417  *  Returns: false on error
418  *           true  on success
419  */
420 bool BSOCK::fsend(const char *fmt, ...)
421 {
422    va_list arg_ptr;
423    int maxlen;
424
425    if (errors || is_terminated()) {
426       return false;
427    }
428    /* This probably won't work, but we vsnprintf, then if we
429     * get a negative length or a length greater than our buffer
430     * (depending on which library is used), the printf was truncated, so
431     * get a bigger buffer and try again.
432     */
433    for (;;) {
434       maxlen = sizeof_pool_memory(msg) - 1;
435       va_start(arg_ptr, fmt);
436       msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
437       va_end(arg_ptr);
438       if (msglen > 0 && msglen < (maxlen - 5)) {
439          break;
440       }
441       msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
442    }
443    return send();
444 }
445
446 /*
447  * Receive a message from the other end. Each message consists of
448  * two packets. The first is a header that contains the size
449  * of the data that follows in the second packet.
450  * Returns number of bytes read (may return zero)
451  * Returns -1 on signal (BNET_SIGNAL)
452  * Returns -2 on hard end of file (BNET_HARDEOF)
453  * Returns -3 on error  (BNET_ERROR)
454  *
455  *  Unfortunately, it is a bit complicated because we have these
456  *    four return types:
457  *    1. Normal data
458  *    2. Signal including end of data stream
459  *    3. Hard end of file
460  *    4. Error
461  *  Using is_bnet_stop() and is_bnet_error() you can figure this all out.
462  */
463 int32_t BSOCK::recv()
464 {
465    int32_t nbytes;
466    int32_t pktsiz;
467
468    msg[0] = 0;
469    msglen = 0;
470    if (errors || is_terminated()) {
471       return BNET_HARDEOF;
472    }
473
474    if (m_use_locking) P(m_mutex);
475    read_seqno++;            /* bump sequence number */
476    timer_start = watchdog_time;  /* set start wait time */
477    clear_timed_out();
478    /* get data size -- in int32_t */
479    if ((nbytes = read_nbytes(this, (char *)&pktsiz, sizeof(int32_t))) <= 0) {
480       timer_start = 0;      /* clear timer */
481       /* probably pipe broken because client died */
482       if (errno == 0) {
483          b_errno = ENODATA;
484       } else {
485          b_errno = errno;
486       }
487       errors++;
488       nbytes = BNET_HARDEOF;        /* assume hard EOF received */
489       goto get_out;
490    }
491    timer_start = 0;         /* clear timer */
492    if (nbytes != sizeof(int32_t)) {
493       errors++;
494       b_errno = EIO;
495       Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
496             sizeof(int32_t), nbytes, m_who, m_host, m_port);
497       nbytes = BNET_ERROR;
498       goto get_out;
499    }
500
501    pktsiz = ntohl(pktsiz);         /* decode no. of bytes that follow */
502
503    if (pktsiz == 0) {              /* No data transferred */
504       timer_start = 0;             /* clear timer */
505       in_msg_no++;
506       msglen = 0;
507       nbytes = 0;                  /* zero bytes read */
508       goto get_out;
509    }
510
511    /* If signal or packet size too big */
512    if (pktsiz < 0 || pktsiz > 1000000) {
513       if (pktsiz > 0) {            /* if packet too big */
514          Qmsg3(m_jcr, M_FATAL, 0,
515                _("Packet size too big from \"%s:%s:%d. Terminating connection.\n"),
516                m_who, m_host, m_port);
517          pktsiz = BNET_TERMINATE;  /* hang up */
518       }
519       if (pktsiz == BNET_TERMINATE) {
520          set_terminated();
521       }
522       timer_start = 0;                /* clear timer */
523       b_errno = ENODATA;
524       msglen = pktsiz;                /* signal code */
525       nbytes =  BNET_SIGNAL;          /* signal */
526       goto get_out;
527    }
528
529    /* Make sure the buffer is big enough + one byte for EOS */
530    if (pktsiz >= (int32_t) sizeof_pool_memory(msg)) {
531       msg = realloc_pool_memory(msg, pktsiz + 100);
532    }
533
534    timer_start = watchdog_time;  /* set start wait time */
535    clear_timed_out();
536    /* now read the actual data */
537    if ((nbytes = read_nbytes(this, msg, pktsiz)) <= 0) {
538       timer_start = 0;      /* clear timer */
539       if (errno == 0) {
540          b_errno = ENODATA;
541       } else {
542          b_errno = errno;
543       }
544       errors++;
545       Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"),
546             m_who, m_host, m_port, this->bstrerror());
547       nbytes = BNET_ERROR;
548       goto get_out;
549    }
550    timer_start = 0;         /* clear timer */
551    in_msg_no++;
552    msglen = nbytes;
553    if (nbytes != pktsiz) {
554       b_errno = EIO;
555       errors++;
556       Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
557             pktsiz, nbytes, m_who, m_host, m_port);
558       nbytes = BNET_ERROR;
559       goto get_out;
560    }
561    /* always add a zero by to properly terminate any
562     * string that was send to us. Note, we ensured above that the
563     * buffer is at least one byte longer than the message length.
564     */
565    msg[nbytes] = 0; /* terminate in case it is a string */
566    /*
567     * The following uses *lots* of resources so turn it on only for 
568     * serious debugging.
569     */
570    Dsm_check(300);
571
572 get_out:
573    if (m_use_locking) V(m_mutex);
574    return nbytes;                  /* return actual length of message */
575 }
576
577 /*
578  * Send a signal
579  */
580 bool BSOCK::signal(int signal)
581 {
582    msglen = signal;
583    if (signal == BNET_TERMINATE) {
584       m_suppress_error_msgs = true;
585    }
586    return send();
587 }
588
589 /* 
590  * Despool spooled attributes
591  */
592 bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize)
593 {
594    int32_t pktsiz;
595    size_t nbytes;
596    ssize_t last = 0, size = 0;
597    int count = 0;
598    JCR *jcr = get_jcr();
599
600    rewind(m_spool_fd);
601
602 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
603    posix_fadvise(fileno(m_spool_fd), 0, 0, POSIX_FADV_WILLNEED);
604 #endif
605
606    while (fread((char *)&pktsiz, 1, sizeof(int32_t), m_spool_fd) ==
607           sizeof(int32_t)) {
608       size += sizeof(int32_t);
609       msglen = ntohl(pktsiz);
610       if (msglen > 0) {
611          if (msglen > (int32_t)sizeof_pool_memory(msg)) {
612             msg = realloc_pool_memory(msg, msglen + 1);
613          }
614          nbytes = fread(msg, 1, msglen, m_spool_fd);
615          if (nbytes != (size_t)msglen) {
616             berrno be;
617             Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen);
618             Qmsg1(get_jcr(), M_FATAL, 0, _("fread attr spool error. ERR=%s\n"),
619                   be.bstrerror());
620             update_attr_spool_size(tsize - last);
621             return false;
622          }
623          size += nbytes;
624          if ((++count & 0x3F) == 0) {
625             update_attr_spool_size(size - last);
626             last = size;
627          }
628       }
629       send();
630       if (jcr && job_canceled(jcr)) {
631          return false;
632       }
633    }
634    update_attr_spool_size(tsize - last);
635    if (ferror(m_spool_fd)) {
636       Qmsg(jcr, M_FATAL, 0, _("fread attr spool I/O error.\n"));
637       return false;
638    }
639    return true;
640 }
641
642 /*
643  * Return the string for the error that occurred
644  * on the socket. Only the first error is retained.
645  */
646 const char *BSOCK::bstrerror()
647 {
648    berrno be;
649    if (errmsg == NULL) {
650       errmsg = get_pool_memory(PM_MESSAGE);
651    }
652    pm_strcpy(errmsg, be.bstrerror(b_errno));
653    return errmsg;
654 }
655
656 int BSOCK::get_peer(char *buf, socklen_t buflen) 
657 {
658 #if !defined(HAVE_WIN32)
659     if (peer_addr.sin_family == 0) {
660         socklen_t salen = sizeof(peer_addr);
661         int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
662         if (rval < 0) return rval;
663     }
664     if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
665         return -1;
666
667     return 0;
668 #else
669     return -1;
670 #endif
671 }
672
673 /*
674  * Set the network buffer size, suggested size is in size.
675  *  Actual size obtained is returned in bs->msglen
676  *
677  *  Returns: false on failure
678  *           true  on success
679  */
680 bool BSOCK::set_buffer_size(uint32_t size, int rw)
681 {
682    uint32_t dbuf_size, start_size;
683
684 #if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
685    int opt;
686    opt = IPTOS_THROUGHPUT;
687    setsockopt(fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
688 #endif
689
690    if (size != 0) {
691       dbuf_size = size;
692    } else {
693       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
694    }
695    start_size = dbuf_size;
696    if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) {
697       Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCK data buffer\n"));
698       return false;
699    }
700
701    /*
702     * If user has not set the size, use the OS default -- i.e. do not
703     *   try to set it.  This allows sys admins to set the size they
704     *   want in the OS, and Bacula will comply. See bug #1493
705     */
706    if (size == 0) {
707       msglen = dbuf_size;
708       return true;
709    }
710
711    if (rw & BNET_SETBUF_READ) {
712       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
713               SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
714          berrno be;
715          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
716          dbuf_size -= TAPE_BSIZE;
717       }
718       Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
719       if (dbuf_size != start_size) {
720          Qmsg1(get_jcr(), M_WARNING, 0,
721                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
722       }
723       if (dbuf_size % TAPE_BSIZE != 0) {
724          Qmsg1(get_jcr(), M_ABORT, 0,
725                _("Network buffer size %d not multiple of tape block size.\n"),
726                dbuf_size);
727       }
728    }
729    if (size != 0) {
730       dbuf_size = size;
731    } else {
732       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
733    }
734    start_size = dbuf_size;
735    if (rw & BNET_SETBUF_WRITE) {
736       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
737               SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
738          berrno be;
739          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
740          dbuf_size -= TAPE_BSIZE;
741       }
742       Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
743       if (dbuf_size != start_size) {
744          Qmsg1(get_jcr(), M_WARNING, 0,
745                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
746       }
747       if (dbuf_size % TAPE_BSIZE != 0) {
748          Qmsg1(get_jcr(), M_ABORT, 0,
749                _("Network buffer size %d not multiple of tape block size.\n"),
750                dbuf_size);
751       }
752    }
753
754    msglen = dbuf_size;
755    return true;
756 }
757
758 /*
759  * Set socket non-blocking
760  * Returns previous socket flag
761  */
762 int BSOCK::set_nonblocking()
763 {
764 #ifndef HAVE_WIN32
765    int oflags;
766
767    /* Get current flags */
768    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
769       berrno be;
770       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
771    }
772
773    /* Set O_NONBLOCK flag */
774    if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
775       berrno be;
776       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
777    }
778
779    m_blocking = 0;
780    return oflags;
781 #else
782    int flags;
783    u_long ioctlArg = 1;
784
785    flags = m_blocking;
786    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
787    m_blocking = 0;
788
789    return flags;
790 #endif
791 }
792
793 /*
794  * Set socket blocking
795  * Returns previous socket flags
796  */
797 int BSOCK::set_blocking()
798 {
799 #ifndef HAVE_WIN32
800    int oflags;
801    /* Get current flags */
802    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
803       berrno be;
804       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
805    }
806
807    /* Set O_NONBLOCK flag */
808    if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
809       berrno be;
810       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
811    }
812
813    m_blocking = 1;
814    return oflags;
815 #else
816    int flags;
817    u_long ioctlArg = 0;
818
819    flags = m_blocking;
820    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
821    m_blocking = 1;
822
823    return flags;
824 #endif
825 }
826
827 void BSOCK::set_killable(bool killable)
828 {
829    if (m_jcr) {
830       m_jcr->set_killable(killable);
831    }
832 }
833
834 /*
835  * Restores socket flags
836  */
837 void BSOCK::restore_blocking (int flags) 
838 {
839 #ifndef HAVE_WIN32
840    if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
841       berrno be;
842       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
843    }
844
845    m_blocking = (flags & O_NONBLOCK) ? true : false;
846 #else
847    u_long ioctlArg = flags;
848
849    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
850    m_blocking = 1;
851 #endif
852 }
853
854 /*
855  * Wait for a specified time for data to appear on
856  * the BSOCK connection.
857  *
858  *   Returns: 1 if data available
859  *            0 if timeout
860  *           -1 if error
861  */
862 int BSOCK::wait_data(int sec, int usec)
863 {
864    fd_set fdset;
865    struct timeval tv;
866
867    FD_ZERO(&fdset);
868    FD_SET((unsigned)m_fd, &fdset);
869    for (;;) {
870       tv.tv_sec = sec;
871       tv.tv_usec = usec;
872       switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) {
873       case 0:                      /* timeout */
874          b_errno = 0;
875          return 0;
876       case -1:
877          b_errno = errno;
878          if (errno == EINTR) {
879             continue;
880          }
881          return -1;                /* error return */
882       default:
883          b_errno = 0;
884          return 1;
885       }
886    }
887 }
888
889 /*
890  * As above, but returns on interrupt
891  */
892 int BSOCK::wait_data_intr(int sec, int usec)
893 {
894    fd_set fdset;
895    struct timeval tv;
896
897    if (this == NULL) {
898       return -1;
899    }
900    FD_ZERO(&fdset);
901    FD_SET((unsigned)m_fd, &fdset);
902    tv.tv_sec = sec;
903    tv.tv_usec = usec;
904    switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) {
905    case 0:                      /* timeout */
906       b_errno = 0;
907       return 0;
908    case -1:
909       b_errno = errno;
910       return -1;                /* error return */
911    default:
912       b_errno = 0;
913       break;
914    }
915    return 1;
916 }
917
918 /*
919  * Note, this routine closes and destroys all the sockets
920  *  that are open including the duped ones.
921  */
922 #ifndef SHUT_RDWR
923 #define SHUT_RDWR 2
924 #endif
925
926 void BSOCK::close()
927 {
928    BSOCK *bsock = this;
929    BSOCK *next;
930
931    if (!m_duped) {
932       clear_locking();
933    }
934    for (; bsock; bsock = next) {
935       next = bsock->m_next;           /* get possible pointer to next before destoryed */
936       if (!bsock->m_duped) {
937          /* Shutdown tls cleanly. */
938          if (bsock->tls) {
939             tls_bsock_shutdown(bsock);
940             free_tls_connection(bsock->tls);
941             bsock->tls = NULL;
942          }
943          if (bsock->is_timed_out()) {
944             shutdown(bsock->m_fd, SHUT_RDWR);   /* discard any pending I/O */
945          }
946          socketClose(bsock->m_fd);      /* normal close */
947       }
948       bsock->destroy();
949    }
950    return;
951 }
952
953 void BSOCK::destroy()
954 {
955    if (msg) {
956       free_pool_memory(msg);
957       msg = NULL;
958    } else {
959       ASSERT(1 == 0);              /* double close */
960    }
961    if (errmsg) {
962       free_pool_memory(errmsg);
963       errmsg = NULL;
964    }
965    if (m_who) {
966       free(m_who);
967       m_who = NULL;
968    }
969    if (m_host) {
970       free(m_host);
971       m_host = NULL;
972    }
973    if (src_addr) {
974       free(src_addr);
975       src_addr = NULL;
976    } 
977    free(this);
978 }
979
980 /* Commands sent to Director */
981 static char hello[]    = "Hello %s calling\n";
982
983 /* Response from Director */
984 static char OKhello[]   = "1000 OK:";
985
986 /*
987  * Authenticate Director
988  */
989 bool BSOCK::authenticate_director(const char *name, const char *password,
990                                   TLS_CONTEXT *tls_ctx, char *response, int response_len)
991 {
992    int tls_local_need = BNET_TLS_NONE;
993    int tls_remote_need = BNET_TLS_NONE;
994    int compatible = true;
995    char bashed_name[MAX_NAME_LENGTH];
996    BSOCK *dir = this;        /* for readability */
997
998    response[0] = 0;
999    /*
1000     * Send my name to the Director then do authentication
1001     */
1002
1003    /* Timeout Hello after 15 secs */
1004    dir->start_timer(15);
1005    dir->fsend(hello, bashed_name);
1006
1007    if (get_tls_enable(tls_ctx)) {
1008       tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK;
1009    }
1010
1011    /* respond to Dir challenge */
1012    if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) ||
1013        /* Now challenge dir */
1014        !cram_md5_challenge(dir, password, tls_local_need, compatible)) {
1015       bsnprintf(response, response_len, _("Director authorization problem at \"%s:%d\"\n"),
1016          dir->host(), dir->port());
1017       goto bail_out;
1018    }
1019
1020    /* Verify that the remote host is willing to meet our TLS requirements */
1021    if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1022       bsnprintf(response, response_len, _("Authorization problem:"
1023              " Remote server at \"%s:%d\" did not advertise required TLS support.\n"),
1024              dir->host(), dir->port());
1025       goto bail_out;
1026    }
1027
1028    /* Verify that we are willing to meet the remote host's requirements */
1029    if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
1030       bsnprintf(response, response_len, _("Authorization problem with Director at \"%s:%d\":"
1031                      " Remote server requires TLS.\n"),
1032                      dir->host(), dir->port());
1033
1034       goto bail_out;
1035    }
1036
1037    /* Is TLS Enabled? */
1038    if (have_tls) {
1039       if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) {
1040          /* Engage TLS! Full Speed Ahead! */
1041          if (!bnet_tls_client(tls_ctx, dir, NULL)) {
1042             bsnprintf(response, response_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
1043                dir->host(), dir->port());
1044             goto bail_out;
1045          }
1046       }
1047    }
1048
1049    Dmsg1(6, ">dird: %s", dir->msg);
1050    if (dir->recv() <= 0) {
1051       dir->stop_timer();
1052       bsnprintf(response, response_len, _("Bad response to Hello command: ERR=%s\n"
1053                       "The Director at \"%s:%d\" is probably not running.\n"),
1054                     dir->bstrerror(), dir->host(), dir->port());
1055       return false;
1056    }
1057
1058    dir->stop_timer();
1059    Dmsg1(10, "<dird: %s", dir->msg);
1060    if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) {
1061       bsnprintf(response, response_len, _("Director at \"%s:%d\" rejected Hello command\n"),
1062          dir->host(), dir->port());
1063       return false;
1064    } else {
1065       bsnprintf(response, response_len, "%s", dir->msg);
1066    }
1067    return true;
1068
1069 bail_out:
1070    dir->stop_timer();
1071    bsnprintf(response, response_len, _("Authorization problem with Director at \"%s:%d\"\n"
1072              "Most likely the passwords do not agree.\n"
1073              "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n"
1074              "Please see " MANUAL_AUTH_URL " for help.\n"),
1075              dir->host(), dir->port());
1076    return false;
1077 }