/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2016 Kern Sibbald
+ Copyright (C) 2000-2017 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
Public License, v3.0 ("AGPLv3") and some additional permissions and
terms pursuant to its AGPLv3 Section 7.
- This notice must be preserved when any source code is
+ This notice must be preserved when any source code is
conveyed and/or propagated.
Bacula(R) is a registered trademark of Kern Sibbald.
*
* Negative msglen, is special "signal" (no data follows).
* See below for SIGNAL codes.
- *
*/
#ifndef __BSOCK_H_
btimer_t *start_bsock_timer(BSOCK *bs, uint32_t wait);
void stop_bsock_timer(btimer_t *wid);
+class BSOCKCallback {
+public:
+ BSOCKCallback();
+ virtual ~BSOCKCallback();
+ virtual bool bsock_send_cb() = 0;
+};
class BSOCK {
/*
public:
uint64_t read_seqno; /* read sequence number */
POOLMEM *msg; /* message pool buffer */
+ POOLMEM *cmsg; /* Compress buffer */
POOLMEM *errmsg; /* edited error message */
RES *res; /* Resource to which we are connected */
FILE *m_spool_fd; /* spooling file */
struct sockaddr client_addr; /* client's IP address */
struct sockaddr_in peer_addr; /* peer's IP address */
+ /* when "installed", send_hook_cb->bsock_send_cb() is called before
+ * any ::send().
+ */
+ BSOCKCallback *send_hook_cb;
+
private:
+ /* m_master is used by "duped" BSOCK to access some attributes of the "parent"
+ * thread to have an up2date status (for example when the job is canceled,
+ * the "parent" BSOCK is "terminated", but the duped BSOCK is unchanged)
+ * In the future more attributes and method could use the "m_master"
+ * indirection.
+ * master->m_rmutex could replace pm_rmutex, idem for the (w)rite" mutex
+ * "m_master->error" should be incremented instead of "error", but
+ * this require a lock.
+ *
+ * USAGE: the parent thread MUST be sure that the child thread have quit
+ * before to free the "parent" BSOCK.
+ */
BSOCK *m_next; /* next BSOCK if duped (not actually used) */
JCR *m_jcr; /* jcr or NULL for error msgs */
pthread_mutex_t m_rmutex; /* for read locking if use_locking set */
pthread_mutex_t m_wmutex; /* for write locking if use_locking set */
+ mutable pthread_mutex_t m_mmutex; /* when accessing the master/next chain */
pthread_mutex_t *pm_rmutex; /* Pointer to the read mutex */
pthread_mutex_t *pm_wmutex; /* Pointer to the write mutex */
char *m_who; /* Name of daemon to which we are talking */
bool m_closed: 1; /* set when socket is closed */
bool m_duped: 1; /* set if duped BSOCK */
bool m_spool: 1; /* set for spooling */
- bool m_use_locking: 1; /* set to use locking */
+ bool m_compress: 1; /* set to use comm line compression */
+ bool m_use_locking; /* set to use locking (out of a bitfield */
+ /* to avoid race conditions) */
int64_t m_bwlimit; /* set to limit bandwidth */
int64_t m_nb_bytes; /* bytes sent/recv since the last tick */
btime_t m_last_tick; /* last tick used by bwlimit */
+ uint64_t m_CommBytes; /* Bytes sent */
+ uint64_t m_CommCompressedBytes; /* Compressed bytes sent */
void fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
struct sockaddr *lclient_addr);
bool open(JCR *jcr, const char *name, char *host, char *service,
int port, utime_t heart_beat, int *fatal);
+ void master_lock() const { if (m_use_locking) pP((&m_mmutex)); };
+ void master_unlock() const { if (m_use_locking) pV((&m_mmutex)); };
public:
+ BSOCK *m_master; /* "this" or the "parent" BSOCK if duped */
/* methods -- in bsock.c */
void init();
void free_tls();
bool fsend(const char*, ...);
bool signal(int signal);
void close(); /* close connection and destroy packet */
+ void _destroy(); /* called by destroy() */
void destroy(); /* destroy socket packet */
+ bool comm_compress(); /* in bsock.c */
const char *bstrerror(); /* last error on socket */
int get_peer(char *buf, socklen_t buflen);
bool despool(void update_attr_spool_size(ssize_t size), ssize_t tsize);
int set_blocking();
void restore_blocking(int flags);
void set_killable(bool killable);
- int wait_data(int sec, int usec=0);
- int wait_data_intr(int sec, int usec=0);
+ int wait_data(int sec, int msec=0);
+ int wait_data_intr(int sec, int msec=0);
bool authenticate_director(const char *name, const char *password,
TLS_CONTEXT *tls_ctx, char *response, int response_len);
bool set_locking(); /* in bsock.c */
bool is_timed_out() const { return m_timed_out; };
bool is_closed() const { return m_closed; };
bool is_open() const { return !m_closed; };
- bool is_stop() const { return errors || is_terminated() || is_closed(); }
+ bool is_stop() const { return errors || is_terminated() || is_closed(); };
bool is_error() { errno = b_errno; return errors; };
+ bool can_compress() const { return m_compress; };
void set_data_end(int32_t FileIndex) {
if (m_spool && FileIndex > m_FileIndex) {
m_lastFileIndex = m_FileIndex;
};
boffset_t get_last_data_end() { return m_last_data_end; };
int32_t get_lastFileIndex() { return m_lastFileIndex; };
+ uint32_t CommBytes() { return m_CommBytes; };
+ uint32_t CommCompressedBytes() { return m_CommCompressedBytes; };
void set_bwlimit(int64_t maxspeed) { m_bwlimit = maxspeed; };
bool use_bwlimit() { return m_bwlimit > 0;};
void set_spooling() { m_spool = true; };
void clear_spooling() { m_spool = false; };
+ void set_compress() { m_compress = true; };
+ void clear_compress() { m_compress = false; };
void set_duped() { m_duped = true; };
+ void set_master(BSOCK *master) { master_lock(); m_master = master; m_next = master->m_next; master->m_next = this; master_unlock(); };
void set_timed_out() { m_timed_out = true; };
void clear_timed_out() { m_timed_out = false; };
void set_terminated() { m_terminated = true; };
void start_timer(int sec) { m_tid = start_bsock_timer(this, sec); };
void stop_timer() { stop_bsock_timer(m_tid); };
void swap_msgs();
+ void install_send_hook_cb(BSOCKCallback *obj) { send_hook_cb=obj; };
+ void uninstall_send_hook_cb() { send_hook_cb=NULL; };
+ void cancel(); /* call it when JCR is canceled */
};
BNET_START_RTREE = -25, /* Start restore tree mode */
BNET_END_RTREE = -26, /* End restore tree mode */
BNET_SUB_PROMPT = -27, /* Indicate we are at a subprompt */
- BNET_TEXT_INPUT = -28 /* Get text input from user */
+ BNET_TEXT_INPUT = -28, /* Get text input from user */
+ BNET_EXT_TERMINATE = -29, /* A Terminate condition has been met and
+ already reported somewhere else */
+ BNET_FDCALLED = -30 /* The FD should keep the connection for a new job */
};
+/*
+ * These bits ares set in the packet length field. Attempt to
+ * keep the number of bits to a minimum and instead use the new
+ * flag field for passing bits using the BNET_HDR_EXTEND bit.
+ * Note: we must not set the high bit as that indicates a signal.
+ */
+#define BNET_COMPRESSED (1<<30) /* set for lz4 compressed data */
+#define BNET_HDR_EXTEND (1<<29) /* extended header */
+
+/*
+ * The following bits are kept in flags. The high 16 bits are
+ * for flags, and the low 16 bits are for other info such as
+ * compressed data offset (BNET_OFFSET)
+ */
+#define BNET_IS_CMD (1<<28) /* set for command data */
+#define BNET_OFFSET (1<<27) /* Data compression offset specified */
+#define BNET_NOCOMPRESS (1<<25) /* Disable compression */
+#define BNET_DATACOMPRESSED (1<<24) /* Data compression */
+
#define BNET_SETBUF_READ 1 /* Arg for bnet_set_buffer_size */
#define BNET_SETBUF_WRITE 2 /* Arg for bnet_set_buffer_size */
enum {
BNET_SIGNAL = -1,
BNET_HARDEOF = -2,
- BNET_ERROR = -3
+ BNET_ERROR = -3,
+ BNET_COMMAND = -4,
+};
+
+/*
+ * Inter-daemon commands
+ * When BNET_IS_CMD is on, the next int32 is a command
+ */
+#define BNET_CMD_SIZE sizeof(int32_t)
+
+enum {
+ BNET_CMD_NONE = 0, /* reserved */
+ BNET_CMD_ACK_HASH = 1, /* backup SD->FD SD already know this hash, don't need the block */
+ BNET_CMD_UNK_HASH = 2, /* restore SD->FD hash is unknown */
+ BNET_CMD_GET_HASH = 3, /* backup SD->FD SD ask FD to send the corresponding block */
+ /* restore FD->SD FD ask SD to send the corresponding block */
+ BNET_CMD_STO_BLOCK = 4, /* backup FD->SD FD send requested block */
+ BNET_CMD_REC_ACK = 5, /* restore FD->SD FD has consumed records from the buffer */
+ BNET_CMD_STP_THREAD = 6, /* restore FD->SD SD must stop thread */
+ BNET_CMD_STP_FLOWCTRL = 7, /* backup FD->SD SD must stop sending flowcontrol information */
};
+const char *bnet_cmd_to_name(int val);
+
/*
* TLS enabling values. Value is important for comparison, ie:
* if (tls_remote_need < BNET_TLS_REQUIRED) { ... }