]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/lib/bsock.h
Big backport from Enterprise
[bacula/bacula] / bacula / src / lib / bsock.h
index 97f84bcf2e3c61cafcfa03dc6ffc15e24674bf7c..e2de186b52190e72d1f8a1935cee88121a987cd3 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2016 Kern Sibbald
+   Copyright (C) 2000-2017 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
@@ -11,7 +11,7 @@
    Public License, v3.0 ("AGPLv3") and some additional permissions and
    terms pursuant to its AGPLv3 Section 7.
 
-   This notice must be preserved when any source code is 
+   This notice must be preserved when any source code is
    conveyed and/or propagated.
 
    Bacula(R) is a registered trademark of Kern Sibbald.
@@ -28,7 +28,6 @@
  *
  * Negative msglen, is special "signal" (no data follows).
  *   See below for SIGNAL codes.
- *
  */
 
 #ifndef __BSOCK_H_
@@ -41,6 +40,12 @@ class BSOCK;
 btimer_t *start_bsock_timer(BSOCK *bs, uint32_t wait);
 void stop_bsock_timer(btimer_t *wid);
 
+class BSOCKCallback {
+public:
+   BSOCKCallback();
+   virtual ~BSOCKCallback();
+   virtual bool bsock_send_cb() = 0;
+};
 
 class BSOCK {
 /*
@@ -50,6 +55,7 @@ class BSOCK {
 public:
    uint64_t read_seqno;               /* read sequence number */
    POOLMEM *msg;                      /* message pool buffer */
+   POOLMEM *cmsg;                     /* Compress buffer */
    POOLMEM *errmsg;                   /* edited error message */
    RES *res;                          /* Resource to which we are connected */
    FILE *m_spool_fd;                  /* spooling file */
@@ -70,11 +76,29 @@ public:
    struct sockaddr client_addr;       /* client's IP address */
    struct sockaddr_in peer_addr;      /* peer's IP address */
 
+   /* when "installed", send_hook_cb->bsock_send_cb() is called before
+    * any ::send().
+    */
+   BSOCKCallback *send_hook_cb;
+
 private:
+   /* m_master is used by "duped" BSOCK to access some attributes of the "parent"
+    * thread to have an up2date status (for example when the job is canceled,
+    * the "parent" BSOCK is "terminated", but the duped BSOCK is unchanged)
+    * In the future more attributes and method could use the "m_master"
+    * indirection.
+    * master->m_rmutex could replace pm_rmutex, idem for the (w)rite" mutex
+    * "m_master->error" should be incremented instead of "error", but
+    * this require a lock.
+    *
+    * USAGE: the parent thread MUST be sure that the child thread have quit
+    * before to free the "parent" BSOCK.
+    */
    BSOCK *m_next;                     /* next BSOCK if duped (not actually used) */
    JCR *m_jcr;                        /* jcr or NULL for error msgs */
    pthread_mutex_t m_rmutex;          /* for read locking if use_locking set */
    pthread_mutex_t m_wmutex;          /* for write locking if use_locking set */
+   mutable pthread_mutex_t m_mmutex;  /* when accessing the master/next chain */
    pthread_mutex_t *pm_rmutex;        /* Pointer to the read mutex */
    pthread_mutex_t *pm_wmutex;        /* Pointer to the write mutex */
    char *m_who;                       /* Name of daemon to which we are talking */
@@ -91,18 +115,25 @@ private:
    bool m_closed: 1;                  /* set when socket is closed */
    bool m_duped: 1;                   /* set if duped BSOCK */
    bool m_spool: 1;                   /* set for spooling */
-   bool m_use_locking: 1;             /* set to use locking */
+   bool m_compress: 1;                /* set to use comm line compression */
+   bool m_use_locking;                /* set to use locking (out of a bitfield */
+                                      /* to avoid race conditions) */
 
    int64_t m_bwlimit;                 /* set to limit bandwidth */
    int64_t m_nb_bytes;                /* bytes sent/recv since the last tick */
    btime_t m_last_tick;               /* last tick used by bwlimit */
+   uint64_t m_CommBytes;              /* Bytes sent */
+   uint64_t m_CommCompressedBytes;    /* Compressed bytes sent */
 
    void fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
                struct sockaddr *lclient_addr);
    bool open(JCR *jcr, const char *name, char *host, char *service,
                int port, utime_t heart_beat, int *fatal);
+   void master_lock() const { if (m_use_locking) pP((&m_mmutex)); };
+   void master_unlock() const { if (m_use_locking) pV((&m_mmutex)); };
 
 public:
+   BSOCK *m_master;                    /* "this" or the "parent" BSOCK if duped */
    /* methods -- in bsock.c */
    void init();
    void free_tls();
@@ -115,7 +146,9 @@ public:
    bool fsend(const char*, ...);
    bool signal(int signal);
    void close();                      /* close connection and destroy packet */
+   void _destroy();                   /* called by destroy() */
    void destroy();                    /* destroy socket packet */
+   bool comm_compress();              /* in bsock.c */
    const char *bstrerror();           /* last error on socket */
    int get_peer(char *buf, socklen_t buflen);
    bool despool(void update_attr_spool_size(ssize_t size), ssize_t tsize);
@@ -124,8 +157,8 @@ public:
    int set_blocking();
    void restore_blocking(int flags);
    void set_killable(bool killable);
-   int wait_data(int sec, int usec=0);
-   int wait_data_intr(int sec, int usec=0);
+   int wait_data(int sec, int msec=0);
+   int wait_data_intr(int sec, int msec=0);
    bool authenticate_director(const char *name, const char *password,
            TLS_CONTEXT *tls_ctx, char *response, int response_len);
    bool set_locking();                /* in bsock.c */
@@ -150,8 +183,9 @@ public:
    bool is_timed_out() const { return m_timed_out; };
    bool is_closed() const { return m_closed; };
    bool is_open() const { return !m_closed; };
-   bool is_stop() const { return errors || is_terminated() || is_closed(); }
+   bool is_stop() const { return errors || is_terminated() || is_closed(); };
    bool is_error() { errno = b_errno; return errors; };
+   bool can_compress() const { return m_compress; };
    void set_data_end(int32_t FileIndex) {
           if (m_spool && FileIndex > m_FileIndex) {
               m_lastFileIndex = m_FileIndex;
@@ -162,11 +196,16 @@ public:
         };
    boffset_t get_last_data_end() { return m_last_data_end; };
    int32_t get_lastFileIndex() { return m_lastFileIndex; };
+   uint32_t CommBytes() { return m_CommBytes; };
+   uint32_t CommCompressedBytes() { return m_CommCompressedBytes; };
    void set_bwlimit(int64_t maxspeed) { m_bwlimit = maxspeed; };
    bool use_bwlimit() { return m_bwlimit > 0;};
    void set_spooling() { m_spool = true; };
    void clear_spooling() { m_spool = false; };
+   void set_compress() { m_compress = true; };
+   void clear_compress() { m_compress = false; };
    void set_duped() { m_duped = true; };
+   void set_master(BSOCK *master) { master_lock(); m_master = master; m_next = master->m_next; master->m_next = this; master_unlock(); };
    void set_timed_out() { m_timed_out = true; };
    void clear_timed_out() { m_timed_out = false; };
    void set_terminated() { m_terminated = true; };
@@ -174,6 +213,9 @@ public:
    void start_timer(int sec) { m_tid = start_bsock_timer(this, sec); };
    void stop_timer() { stop_bsock_timer(m_tid); };
    void swap_msgs();
+   void install_send_hook_cb(BSOCKCallback *obj) { send_hook_cb=obj; };
+   void uninstall_send_hook_cb() { send_hook_cb=NULL; };
+   void cancel(); /* call it when JCR is canceled */
 
 };
 
@@ -210,9 +252,31 @@ enum {
    BNET_START_RTREE    = -25,         /* Start restore tree mode */
    BNET_END_RTREE      = -26,         /* End restore tree mode */
    BNET_SUB_PROMPT     = -27,         /* Indicate we are at a subprompt */
-   BNET_TEXT_INPUT     = -28          /* Get text input from user */
+   BNET_TEXT_INPUT     = -28,         /* Get text input from user */
+   BNET_EXT_TERMINATE  = -29,         /* A Terminate condition has been met and
+                                         already reported somewhere else */
+   BNET_FDCALLED       = -30          /* The FD should keep the connection for a new job */
 };
 
+/*
+ * These bits ares set in the packet length field.  Attempt to
+ *  keep the number of bits to a minimum and instead use the new
+ *  flag field for passing bits using the BNET_HDR_EXTEND bit.
+ *  Note: we must not set the high bit as that indicates a signal.
+ */
+#define BNET_COMPRESSED (1<<30)       /* set for lz4 compressed data */
+#define BNET_HDR_EXTEND (1<<29)       /* extended header */
+
+/*
+ * The following bits are kept in flags.  The high 16 bits are
+ *  for flags, and the low 16 bits are for other info such as
+ *  compressed data offset (BNET_OFFSET)
+ */
+#define BNET_IS_CMD     (1<<28)       /* set for command data */
+#define BNET_OFFSET     (1<<27)       /* Data compression offset specified */
+#define BNET_NOCOMPRESS (1<<25)       /* Disable compression */
+#define BNET_DATACOMPRESSED (1<<24)   /* Data compression */
+
 #define BNET_SETBUF_READ  1           /* Arg for bnet_set_buffer_size */
 #define BNET_SETBUF_WRITE 2           /* Arg for bnet_set_buffer_size */
 
@@ -224,9 +288,30 @@ enum {
 enum {
    BNET_SIGNAL         = -1,
    BNET_HARDEOF        = -2,
-   BNET_ERROR          = -3
+   BNET_ERROR          = -3,
+   BNET_COMMAND        = -4,
+};
+
+/*
+ * Inter-daemon commands
+ * When BNET_IS_CMD is on, the next int32 is a command
+ */
+#define BNET_CMD_SIZE sizeof(int32_t)
+
+enum {
+   BNET_CMD_NONE       =  0, /* reserved */
+   BNET_CMD_ACK_HASH   =  1, /* backup  SD->FD  SD already know this hash, don't need the block */
+   BNET_CMD_UNK_HASH   =  2, /* restore SD->FD  hash is unknown */
+   BNET_CMD_GET_HASH   =  3, /* backup  SD->FD  SD ask FD to send the corresponding block */
+                             /* restore FD->SD  FD ask SD to send the corresponding block */
+   BNET_CMD_STO_BLOCK  =  4, /* backup  FD->SD  FD send requested block */
+   BNET_CMD_REC_ACK    =  5, /* restore FD->SD  FD has consumed records from the buffer */
+   BNET_CMD_STP_THREAD =  6, /* restore FD->SD  SD must stop thread */
+   BNET_CMD_STP_FLOWCTRL =  7, /* backup FD->SD  SD must stop sending flowcontrol information */
 };
 
+const char *bnet_cmd_to_name(int val);
+
 /*
  * TLS enabling values. Value is important for comparison, ie:
  * if (tls_remote_need < BNET_TLS_REQUIRED) { ... }