]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/bsock.c
Add compress/decompress of Object Record data
[bacula/bacula] / bacula / src / lib / bsock.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2007-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Network Utility Routines
30  *
31  *  by Kern Sibbald
32  *
33  */
34
35
36 #include "bacula.h"
37 #include "jcr.h"
38 #include <netdb.h>
39
40 #ifndef ENODATA                    /* not defined on BSD systems */
41 #define ENODATA EPIPE
42 #endif
43
44 #ifdef HAVE_WIN32
45 #define socketRead(fd, buf, len)  ::recv(fd, buf, len, 0)
46 #define socketWrite(fd, buf, len) ::send(fd, buf, len, 0)
47 #define socketClose(fd)           ::closesocket(fd)
48 #else
49 #define socketRead(fd, buf, len)  ::read(fd, buf, len)
50 #define socketWrite(fd, buf, len) ::write(fd, buf, len)
51 #define socketClose(fd)           ::close(fd)
52 #endif
53
54 /*
55  * This is a non-class BSOCK "constructor"  because we want to 
56  *   call the Bacula smartalloc routines instead of new.
57  */
58 BSOCK *new_bsock()
59 {
60    BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK));
61    bsock->init();
62    return bsock;
63 }
64
65 void BSOCK::init()
66 {
67    memset(this, 0, sizeof(BSOCK));
68    m_blocking = 1;
69    msg = get_pool_memory(PM_MESSAGE);
70    errmsg = get_pool_memory(PM_MESSAGE);
71    /*
72     * ****FIXME**** reduce this to a few hours once
73     *   heartbeats are implemented
74     */
75    timeout = 60 * 60 * 6 * 24;   /* 6 days timeout */
76 }
77
78 /*
79  * This is our "class destructor" that ensures that we use
80  *   smartalloc rather than the system free().
81  */
82 void BSOCK::free_bsock()
83 {
84    destroy();
85 }
86
87 void BSOCK::free_tls()
88 {
89    free_tls_connection(this->tls);
90    this->tls = NULL;
91 }   
92
93 /*
94  * Try to connect to host for max_retry_time at retry_time intervals.
95  *   Note, you must have called the constructor prior to calling
96  *   this routine.
97  */
98 bool BSOCK::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
99                     utime_t heart_beat,
100                     const char *name, char *host, char *service, int port,
101                     int verbose)
102 {
103    bool ok = false;
104    int i;
105    int fatal = 0;
106    time_t begin_time = time(NULL);
107    time_t now;
108    btimer_t *tid = NULL;
109
110    /* Try to trap out of OS call when time expires */
111    if (max_retry_time) {
112       tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
113    }
114    
115    for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
116         i -= retry_interval) {
117       berrno be;
118       if (fatal || (jcr && job_canceled(jcr))) {
119          goto bail_out;
120       }
121       Dmsg4(100, "Unable to connect to %s on %s:%d. ERR=%s\n",
122             name, host, port, be.bstrerror());
123       if (i < 0) {
124          i = 60 * 5;               /* complain again in 5 minutes */
125          if (verbose)
126             Qmsg4(jcr, M_WARNING, 0, _(
127                "Could not connect to %s on %s:%d. ERR=%s\n"
128                "Retrying ...\n"), name, host, port, be.bstrerror());
129       }
130       bmicrosleep(retry_interval, 0);
131       now = time(NULL);
132       if (begin_time + max_retry_time <= now) {
133          Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
134                name, host, port, be.bstrerror());
135          goto bail_out;
136       }
137    }
138    ok = true;
139
140 bail_out:
141    if (tid) {
142       stop_thread_timer(tid);
143    }
144    return ok;
145 }
146
147
148 /*       
149  * Finish initialization of the pocket structure.
150  */
151 void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
152             struct sockaddr *lclient_addr)
153 {
154    Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
155    m_fd = sockfd;
156    set_who(bstrdup(who));
157    set_host(bstrdup(host));
158    set_port(port);
159    memcpy(&client_addr, lclient_addr, sizeof(client_addr));
160    set_jcr(jcr);
161 }
162
163 /*
164  * Copy the address from the configuration dlist that gets passed in
165  */
166 void BSOCK::set_source_address(dlist *src_addr_list)
167 {
168    IPADDR *addr = NULL;
169
170    // delete the object we already have, if it's allocated
171    if (src_addr) {
172      free( src_addr);
173      src_addr = NULL;
174    }
175
176    if (src_addr_list) {
177      addr = (IPADDR*) src_addr_list->first();
178      src_addr = New( IPADDR(*addr));
179    }
180 }
181
182
183 /*
184  * Open a TCP connection to the server
185  * Returns NULL
186  * Returns BSOCK * pointer on success
187  *
188  */
189 bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
190             int port, utime_t heart_beat, int *fatal)
191 {
192    int sockfd = -1;
193    dlist *addr_list;
194    IPADDR *ipaddr;
195    bool connected = false;
196    int turnon = 1;
197    const char *errstr;
198    int save_errno = 0;
199
200    /*
201     * Fill in the structure serv_addr with the address of
202     * the server that we want to connect with.
203     */
204    if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
205       /* Note errstr is not malloc'ed */
206       Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"),
207             host, errstr);
208       Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
209             host, errstr);
210       *fatal = 1;
211       return false;
212    }
213
214    foreach_dlist(ipaddr, addr_list) {
215       ipaddr->set_port_net(htons(port));
216       char allbuf[256 * 10];
217       char curbuf[256];
218       Dmsg2(100, "Current %sAll %s\n",
219                    ipaddr->build_address_str(curbuf, sizeof(curbuf)),
220                    build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
221       /* Open a TCP socket */
222       if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM, 0)) < 0) {
223          berrno be;
224          save_errno = errno;
225          *fatal = 1;
226          Pmsg3(000, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
227             ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
228          continue;
229       }
230
231       /* Bind to the source address if it is set */
232       if (src_addr) {
233          if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
234             berrno be;
235             save_errno = errno;
236             *fatal = 1;
237             Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
238                   src_addr->get_family(), be.bstrerror() );
239             continue;
240          }
241       }
242
243       /*
244        * Keep socket from timing out from inactivity
245        */
246       if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
247          berrno be;
248          Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
249                be.bstrerror());
250       }
251 #if defined(TCP_KEEPIDLE)
252       if (heart_beat) {
253          int opt = heart_beat
254          if (setsockopt(sockfd, IPPROTO_IP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
255             berrno be;
256             Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPIDLE on socket: %s\n"),
257                   be.bstrerror());
258          }
259       }
260 #endif
261
262       /* connect to server */
263       if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
264          save_errno = errno;
265          socketClose(sockfd);
266          continue;
267       }
268       *fatal = 0;
269       connected = true;
270       break;
271    }
272
273    if (!connected) {
274       free_addresses(addr_list);
275       errno = save_errno | b_errno_win32;
276       return false;
277    }
278    /*
279     * Keep socket from timing out from inactivity
280     *   Do this a second time out of paranoia
281     */
282    if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
283       berrno be;
284       Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
285             be.bstrerror());
286    }
287    fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
288    free_addresses(addr_list);
289    return true;
290 }
291
292 /*
293  * Force read/write to use locking
294  */
295 bool BSOCK::set_locking()
296 {
297    int stat;
298    if (m_use_locking) {
299       return true;                      /* already set */
300    }
301    if ((stat = pthread_mutex_init(&m_mutex, NULL)) != 0) {
302       berrno be;
303       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock mutex. ERR=%s\n"),
304          be.bstrerror(stat));
305       return false;
306    }
307    m_use_locking = true;
308    return true;
309 }
310
311 void BSOCK::clear_locking()
312 {
313    if (!m_use_locking) {
314       return;
315    }
316    m_use_locking = false;
317    pthread_mutex_destroy(&m_mutex);
318    return;
319 }
320
321 /*
322  * Send a message over the network. The send consists of
323  * two network packets. The first is sends a 32 bit integer containing
324  * the length of the data packet which follows.
325  *
326  * Returns: false on failure
327  *          true  on success
328  */
329 bool BSOCK::send()
330 {
331    int32_t rc;
332    int32_t pktsiz;
333    int32_t *hdr;
334    bool ok = true;
335
336    if (errors) {
337       if (!m_suppress_error_msgs) {
338          Qmsg4(m_jcr, M_ERROR, 0,  _("Socket has errors=%d on call to %s:%s:%d\n"),
339              errors, m_who, m_host, m_port);
340       }
341       return false;
342    }
343    if (is_terminated()) {
344       if (!m_suppress_error_msgs) {
345          Qmsg4(m_jcr, M_ERROR, 0,  _("Socket is terminated=%d on call to %s:%s:%d\n"),
346              is_terminated(), m_who, m_host, m_port);
347       }
348       return false;
349    }
350    if (msglen > 4000000) {
351       if (!m_suppress_error_msgs) {
352          Qmsg4(m_jcr, M_ERROR, 0,
353             _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
354              msglen, m_who, m_host, m_port);
355       }
356       return false;
357    }
358
359    if (m_use_locking) P(m_mutex);
360    /* Compute total packet length */
361    if (msglen <= 0) {
362       pktsiz = sizeof(pktsiz);               /* signal, no data */
363    } else {
364       pktsiz = msglen + sizeof(pktsiz);      /* data */
365    }
366    /* Store packet length at head of message -- note, we
367     *  have reserved an int32_t just before msg, so we can
368     *  store there 
369     */
370    hdr = (int32_t *)(msg - (int)sizeof(pktsiz));
371    *hdr = htonl(msglen);                     /* store signal/length */
372
373    out_msg_no++;            /* increment message number */
374
375    /* send data packet */
376    timer_start = watchdog_time;  /* start timer */
377    clear_timed_out();
378    /* Full I/O done in one write */
379    rc = write_nbytes(this, (char *)hdr, pktsiz);
380    timer_start = 0;         /* clear timer */
381    if (rc != pktsiz) {
382       errors++;
383       if (errno == 0) {
384          b_errno = EIO;
385       } else {
386          b_errno = errno;
387       }
388       if (rc < 0) {
389          if (!m_suppress_error_msgs) {
390             Qmsg5(m_jcr, M_ERROR, 0,
391                   _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"), 
392                   msglen, m_who,
393                   m_host, m_port, this->bstrerror());
394          }
395       } else {
396          Qmsg5(m_jcr, M_ERROR, 0,
397                _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"),
398                msglen, m_who, m_host, m_port, rc);
399       }
400       ok = false;
401    }
402    if (m_use_locking) V(m_mutex);
403    return ok;
404 }
405
406 /*
407  * Format and send a message
408  *  Returns: false on error
409  *           true  on success
410  */
411 bool BSOCK::fsend(const char *fmt, ...)
412 {
413    va_list arg_ptr;
414    int maxlen;
415
416    if (errors || is_terminated()) {
417       return false;
418    }
419    /* This probably won't work, but we vsnprintf, then if we
420     * get a negative length or a length greater than our buffer
421     * (depending on which library is used), the printf was truncated, so
422     * get a bigger buffer and try again.
423     */
424    for (;;) {
425       maxlen = sizeof_pool_memory(msg) - 1;
426       va_start(arg_ptr, fmt);
427       msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
428       va_end(arg_ptr);
429       if (msglen > 0 && msglen < (maxlen - 5)) {
430          break;
431       }
432       msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
433    }
434    return send();
435 }
436
437 /*
438  * Receive a message from the other end. Each message consists of
439  * two packets. The first is a header that contains the size
440  * of the data that follows in the second packet.
441  * Returns number of bytes read (may return zero)
442  * Returns -1 on signal (BNET_SIGNAL)
443  * Returns -2 on hard end of file (BNET_HARDEOF)
444  * Returns -3 on error  (BNET_ERROR)
445  *
446  *  Unfortunately, it is a bit complicated because we have these
447  *    four return types:
448  *    1. Normal data
449  *    2. Signal including end of data stream
450  *    3. Hard end of file
451  *    4. Error
452  *  Using is_bnet_stop() and is_bnet_error() you can figure this all out.
453  */
454 int32_t BSOCK::recv()
455 {
456    int32_t nbytes;
457    int32_t pktsiz;
458
459    msg[0] = 0;
460    msglen = 0;
461    if (errors || is_terminated()) {
462       return BNET_HARDEOF;
463    }
464
465    if (m_use_locking) P(m_mutex);
466    read_seqno++;            /* bump sequence number */
467    timer_start = watchdog_time;  /* set start wait time */
468    clear_timed_out();
469    /* get data size -- in int32_t */
470    if ((nbytes = read_nbytes(this, (char *)&pktsiz, sizeof(int32_t))) <= 0) {
471       timer_start = 0;      /* clear timer */
472       /* probably pipe broken because client died */
473       if (errno == 0) {
474          b_errno = ENODATA;
475       } else {
476          b_errno = errno;
477       }
478       errors++;
479       nbytes = BNET_HARDEOF;        /* assume hard EOF received */
480       goto get_out;
481    }
482    timer_start = 0;         /* clear timer */
483    if (nbytes != sizeof(int32_t)) {
484       errors++;
485       b_errno = EIO;
486       Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
487             sizeof(int32_t), nbytes, m_who, m_host, m_port);
488       nbytes = BNET_ERROR;
489       goto get_out;
490    }
491
492    pktsiz = ntohl(pktsiz);         /* decode no. of bytes that follow */
493
494    if (pktsiz == 0) {              /* No data transferred */
495       timer_start = 0;             /* clear timer */
496       in_msg_no++;
497       msglen = 0;
498       nbytes = 0;                  /* zero bytes read */
499       goto get_out;
500    }
501
502    /* If signal or packet size too big */
503    if (pktsiz < 0 || pktsiz > 1000000) {
504       if (pktsiz > 0) {            /* if packet too big */
505          Qmsg3(m_jcr, M_FATAL, 0,
506                _("Packet size too big from \"%s:%s:%d. Terminating connection.\n"),
507                m_who, m_host, m_port);
508          pktsiz = BNET_TERMINATE;  /* hang up */
509       }
510       if (pktsiz == BNET_TERMINATE) {
511          set_terminated();
512       }
513       timer_start = 0;                /* clear timer */
514       b_errno = ENODATA;
515       msglen = pktsiz;                /* signal code */
516       nbytes =  BNET_SIGNAL;          /* signal */
517       goto get_out;
518    }
519
520    /* Make sure the buffer is big enough + one byte for EOS */
521    if (pktsiz >= (int32_t) sizeof_pool_memory(msg)) {
522       msg = realloc_pool_memory(msg, pktsiz + 100);
523    }
524
525    timer_start = watchdog_time;  /* set start wait time */
526    clear_timed_out();
527    /* now read the actual data */
528    if ((nbytes = read_nbytes(this, msg, pktsiz)) <= 0) {
529       timer_start = 0;      /* clear timer */
530       if (errno == 0) {
531          b_errno = ENODATA;
532       } else {
533          b_errno = errno;
534       }
535       errors++;
536       Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"),
537             m_who, m_host, m_port, this->bstrerror());
538       nbytes = BNET_ERROR;
539       goto get_out;
540    }
541    timer_start = 0;         /* clear timer */
542    in_msg_no++;
543    msglen = nbytes;
544    if (nbytes != pktsiz) {
545       b_errno = EIO;
546       errors++;
547       Qmsg5(m_jcr, M_ERROR, 0, _("Read expected %d got %d from %s:%s:%d\n"),
548             pktsiz, nbytes, m_who, m_host, m_port);
549       nbytes = BNET_ERROR;
550       goto get_out;
551    }
552    /* always add a zero by to properly terminate any
553     * string that was send to us. Note, we ensured above that the
554     * buffer is at least one byte longer than the message length.
555     */
556    msg[nbytes] = 0; /* terminate in case it is a string */
557    sm_check(__FILE__, __LINE__, false);
558
559 get_out:
560    if (m_use_locking) V(m_mutex);
561    return nbytes;                  /* return actual length of message */
562 }
563
564
565 /*
566  * Send a signal
567  */
568 bool BSOCK::signal(int signal)
569 {
570    msglen = signal;
571    if (signal == BNET_TERMINATE) {
572       m_suppress_error_msgs = true;
573    }
574    return send();
575 }
576
577 /* 
578  * Despool spooled attributes
579  */
580 bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize)
581 {
582    int32_t pktsiz;
583    size_t nbytes;
584    ssize_t last = 0, size = 0;
585    int count = 0;
586    JCR *jcr = get_jcr();
587
588    rewind(m_spool_fd);
589
590 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
591    posix_fadvise(fileno(m_spool_fd), 0, 0, POSIX_FADV_WILLNEED);
592 #endif
593
594    while (fread((char *)&pktsiz, 1, sizeof(int32_t), m_spool_fd) ==
595           sizeof(int32_t)) {
596       size += sizeof(int32_t);
597       msglen = ntohl(pktsiz);
598       if (msglen > 0) {
599          if (msglen > (int32_t)sizeof_pool_memory(msg)) {
600             msg = realloc_pool_memory(msg, msglen + 1);
601          }
602          nbytes = fread(msg, 1, msglen, m_spool_fd);
603          if (nbytes != (size_t)msglen) {
604             berrno be;
605             Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen);
606             Qmsg1(get_jcr(), M_FATAL, 0, _("fread attr spool error. ERR=%s\n"),
607                   be.bstrerror());
608             update_attr_spool_size(tsize - last);
609             return false;
610          }
611          size += nbytes;
612          if ((++count & 0x3F) == 0) {
613             update_attr_spool_size(size - last);
614             last = size;
615          }
616       }
617       send();
618       if (jcr && job_canceled(jcr)) {
619          return false;
620       }
621    }
622    update_attr_spool_size(tsize - last);
623    if (ferror(m_spool_fd)) {
624       Qmsg(jcr, M_FATAL, 0, _("fread attr spool I/O error.\n"));
625       return false;
626    }
627    return true;
628 }
629
630 /*
631  * Return the string for the error that occurred
632  * on the socket. Only the first error is retained.
633  */
634 const char *BSOCK::bstrerror()
635 {
636    berrno be;
637    if (errmsg == NULL) {
638       errmsg = get_pool_memory(PM_MESSAGE);
639    }
640    pm_strcpy(errmsg, be.bstrerror(b_errno));
641    return errmsg;
642 }
643
644 int BSOCK::get_peer(char *buf, socklen_t buflen) 
645 {
646 #if !defined(HAVE_WIN32)
647     if (peer_addr.sin_family == 0) {
648         socklen_t salen = sizeof(peer_addr);
649         int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
650         if (rval < 0) return rval;
651     }
652     if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
653         return -1;
654
655     return 0;
656 #else
657     return -1;
658 #endif
659 }
660
661 /*
662  * Set the network buffer size, suggested size is in size.
663  *  Actual size obtained is returned in bs->msglen
664  *
665  *  Returns: false on failure
666  *           true  on success
667  */
668 bool BSOCK::set_buffer_size(uint32_t size, int rw)
669 {
670    uint32_t dbuf_size, start_size;
671 #if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
672    int opt;
673    opt = IPTOS_THROUGHPUT;
674    setsockopt(fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
675 #endif
676
677    if (size != 0) {
678       dbuf_size = size;
679    } else {
680       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
681    }
682    start_size = dbuf_size;
683    if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) {
684       Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCK data buffer\n"));
685       return false;
686    }
687    if (rw & BNET_SETBUF_READ) {
688       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
689               SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
690          berrno be;
691          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
692          dbuf_size -= TAPE_BSIZE;
693       }
694       Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
695       if (dbuf_size != start_size) {
696          Qmsg1(get_jcr(), M_WARNING, 0,
697                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
698       }
699       if (dbuf_size % TAPE_BSIZE != 0) {
700          Qmsg1(get_jcr(), M_ABORT, 0,
701                _("Network buffer size %d not multiple of tape block size.\n"),
702                dbuf_size);
703       }
704    }
705    if (size != 0) {
706       dbuf_size = size;
707    } else {
708       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
709    }
710    start_size = dbuf_size;
711    if (rw & BNET_SETBUF_WRITE) {
712       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
713               SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
714          berrno be;
715          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
716          dbuf_size -= TAPE_BSIZE;
717       }
718       Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
719       if (dbuf_size != start_size) {
720          Qmsg1(get_jcr(), M_WARNING, 0,
721                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
722       }
723       if (dbuf_size % TAPE_BSIZE != 0) {
724          Qmsg1(get_jcr(), M_ABORT, 0,
725                _("Network buffer size %d not multiple of tape block size.\n"),
726                dbuf_size);
727       }
728    }
729
730    msglen = dbuf_size;
731    return true;
732 }
733
734 /*
735  * Set socket non-blocking
736  * Returns previous socket flag
737  */
738 int BSOCK::set_nonblocking()
739 {
740 #ifndef HAVE_WIN32
741    int oflags;
742
743    /* Get current flags */
744    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
745       berrno be;
746       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
747    }
748
749    /* Set O_NONBLOCK flag */
750    if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
751       berrno be;
752       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
753    }
754
755    m_blocking = 0;
756    return oflags;
757 #else
758    int flags;
759    u_long ioctlArg = 1;
760
761    flags = m_blocking;
762    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
763    m_blocking = 0;
764
765    return flags;
766 #endif
767 }
768
769 /*
770  * Set socket blocking
771  * Returns previous socket flags
772  */
773 int BSOCK::set_blocking()
774 {
775 #ifndef HAVE_WIN32
776    int oflags;
777    /* Get current flags */
778    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
779       berrno be;
780       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
781    }
782
783    /* Set O_NONBLOCK flag */
784    if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
785       berrno be;
786       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
787    }
788
789    m_blocking = 1;
790    return oflags;
791 #else
792    int flags;
793    u_long ioctlArg = 0;
794
795    flags = m_blocking;
796    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
797    m_blocking = 1;
798
799    return flags;
800 #endif
801 }
802
803 /*
804  * Restores socket flags
805  */
806 void BSOCK::restore_blocking (int flags) 
807 {
808 #ifndef HAVE_WIN32
809    if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
810       berrno be;
811       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
812    }
813
814    m_blocking = (flags & O_NONBLOCK) ? true : false;
815 #else
816    u_long ioctlArg = flags;
817
818    ioctlsocket(m_fd, FIONBIO, &ioctlArg);
819    m_blocking = 1;
820 #endif
821 }
822
823 /*
824  * Wait for a specified time for data to appear on
825  * the BSOCK connection.
826  *
827  *   Returns: 1 if data available
828  *            0 if timeout
829  *           -1 if error
830  */
831 int BSOCK::wait_data(int sec, int usec)
832 {
833    fd_set fdset;
834    struct timeval tv;
835
836    FD_ZERO(&fdset);
837    FD_SET((unsigned)m_fd, &fdset);
838    for (;;) {
839       tv.tv_sec = sec;
840       tv.tv_usec = usec;
841       switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) {
842       case 0:                      /* timeout */
843          b_errno = 0;
844          return 0;
845       case -1:
846          b_errno = errno;
847          if (errno == EINTR) {
848             continue;
849          }
850          return -1;                /* error return */
851       default:
852          b_errno = 0;
853          return 1;
854       }
855    }
856 }
857
858 /*
859  * As above, but returns on interrupt
860  */
861 int BSOCK::wait_data_intr(int sec, int usec)
862 {
863    fd_set fdset;
864    struct timeval tv;
865
866    if (this == NULL) {
867       return -1;
868    }
869    FD_ZERO(&fdset);
870    FD_SET((unsigned)m_fd, &fdset);
871    tv.tv_sec = sec;
872    tv.tv_usec = usec;
873    switch (select(m_fd + 1, &fdset, NULL, NULL, &tv)) {
874    case 0:                      /* timeout */
875       b_errno = 0;
876       return 0;
877    case -1:
878       b_errno = errno;
879       return -1;                /* error return */
880    default:
881       b_errno = 0;
882       break;
883    }
884    return 1;
885 }
886
887 /*
888  * Note, this routine closes and destroys all the sockets
889  *  that are open including the duped ones.
890  */
891 #ifndef SHUT_RDWR
892 #define SHUT_RDWR 2
893 #endif
894
895 void BSOCK::close()
896 {
897    BSOCK *bsock = this;
898    BSOCK *next;
899
900    if (!m_duped) {
901       clear_locking();
902    }
903    for (; bsock; bsock = next) {
904       next = bsock->m_next;           /* get possible pointer to next before destoryed */
905       if (!bsock->m_duped) {
906          /* Shutdown tls cleanly. */
907          if (bsock->tls) {
908             tls_bsock_shutdown(bsock);
909             free_tls_connection(bsock->tls);
910             bsock->tls = NULL;
911          }
912          if (bsock->is_timed_out()) {
913             shutdown(bsock->m_fd, SHUT_RDWR);   /* discard any pending I/O */
914          }
915          socketClose(bsock->m_fd);      /* normal close */
916       }
917       bsock->destroy();
918    }
919    return;
920 }
921
922 void BSOCK::destroy()
923 {
924    if (msg) {
925       free_pool_memory(msg);
926       msg = NULL;
927    } else {
928       ASSERT(1 == 0);              /* double close */
929    }
930    if (errmsg) {
931       free_pool_memory(errmsg);
932       errmsg = NULL;
933    }
934    if (m_who) {
935       free(m_who);
936       m_who = NULL;
937    }
938    if (m_host) {
939       free(m_host);
940       m_host = NULL;
941    }
942    if (src_addr) {
943       free(src_addr);
944       src_addr = NULL;
945    } 
946    free(this);
947 }
948
949 /* Commands sent to Director */
950 static char hello[]    = "Hello %s calling\n";
951
952 /* Response from Director */
953 static char OKhello[]   = "1000 OK:";
954
955 /*
956  * Authenticate Director
957  */
958 bool BSOCK::authenticate_director(const char *name, const char *password,
959                TLS_CONTEXT *tls_ctx, char *msg, int msglen)
960 {
961    int tls_local_need = BNET_TLS_NONE;
962    int tls_remote_need = BNET_TLS_NONE;
963    int compatible = true;
964    char bashed_name[MAX_NAME_LENGTH];
965    BSOCK *dir = this;        /* for readability */
966
967    msg[0] = 0;
968    /*
969     * Send my name to the Director then do authentication
970     */
971
972    /* Timeout Hello after 15 secs */
973    dir->start_timer(15);
974    dir->fsend(hello, bashed_name);
975
976    if (get_tls_enable(tls_ctx)) {
977       tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK;
978    }
979
980    /* respond to Dir challenge */
981    if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) ||
982        /* Now challenge dir */
983        !cram_md5_challenge(dir, password, tls_local_need, compatible)) {
984       bsnprintf(msg, msglen, _("Director authorization problem at \"%s:%d\"\n"),
985          dir->host(), dir->port());
986       goto bail_out;
987    }
988
989    /* Verify that the remote host is willing to meet our TLS requirements */
990    if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
991       bsnprintf(msg, msglen, _("Authorization problem:"
992              " Remote server at \"%s:%d\" did not advertise required TLS support.\n"),
993              dir->host(), dir->port());
994       goto bail_out;
995    }
996
997    /* Verify that we are willing to meet the remote host's requirements */
998    if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
999       bsnprintf(msg, msglen, _("Authorization problem with Director at \"%s:%d\":"
1000                      " Remote server requires TLS.\n"),
1001                      dir->host(), dir->port());
1002
1003       goto bail_out;
1004    }
1005
1006    /* Is TLS Enabled? */
1007    if (have_tls) {
1008       if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) {
1009          /* Engage TLS! Full Speed Ahead! */
1010          if (!bnet_tls_client(tls_ctx, dir, NULL)) {
1011             bsnprintf(msg, msglen, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
1012                dir->host(), dir->port());
1013             goto bail_out;
1014          }
1015       }
1016    }
1017
1018    Dmsg1(6, ">dird: %s", dir->msg);
1019    if (dir->recv() <= 0) {
1020       dir->stop_timer();
1021       bsnprintf(msg, msglen, _("Bad response to Hello command: ERR=%s\n"
1022                       "The Director at \"%s:%d\" is probably not running.\n"),
1023                     dir->bstrerror(), dir->host(), dir->port());
1024       return false;
1025    }
1026
1027   dir->stop_timer();
1028    Dmsg1(10, "<dird: %s", dir->msg);
1029    if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) {
1030       bsnprintf(msg, msglen, _("Director at \"%s:%d\" rejected Hello command\n"),
1031          dir->host(), dir->port());
1032       return false;
1033    } else {
1034       bsnprintf(msg, msglen, "%s", dir->msg);
1035    }
1036    return true;
1037
1038 bail_out:
1039    dir->stop_timer();
1040    bsnprintf(msg, msglen, _("Authorization problem with Director at \"%s:%d\"\n"
1041              "Most likely the passwords do not agree.\n"
1042              "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n"
1043              "Please see http://www.bacula.org/en/rel-manual/Bacula_Freque_Asked_Questi.html#SECTION003760000000000000000 for help.\n"), 
1044              dir->host(), dir->port());
1045    return false;
1046 }