]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/lib/message.c
Ensure we have a DIR connection in dequeue_messages
[bacula/bacula] / bacula / src / lib / message.c
index 4c32a12825708a39e63bba7e0c4ee43e0f1b648d..e39f678ed434ce23c8aad3630d8d49dabfd18907 100644 (file)
@@ -1,8 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2015 Kern Sibbald
-   Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
+   Copyright (C) 2000-2017 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
@@ -41,6 +40,9 @@ sql_escape_call p_sql_escape = NULL;
  *  This is where we define "Globals" because all the
  *    daemons include this file.
  */
+dlist *daemon_msg_queue = NULL;
+pthread_mutex_t daemon_msg_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+static bool dequeuing_daemon_msgs = false;
 const char *working_directory = NULL; /* working directory path stored here */
 const char *assert_msg = NULL;        /* ASSERT2 error message */
 const char *version = VERSION " (" BDATE ")";
@@ -421,7 +423,7 @@ void init_console_msg(const char *wd)
       console_msg_pending = 1;
    }
    close(fd);
-   con_fd = fopen(con_fname, "a+b");
+   con_fd = bfopen(con_fname, "a+b");
    if (!con_fd) {
       berrno be;
       Emsg2(M_ERROR, 0, _("Could not open console message file %s: ERR=%s\n"),
@@ -743,7 +745,7 @@ void term_msg()
 
 static bool open_dest_file(JCR *jcr, DEST *d, const char *mode)
 {
-   d->fd = fopen(d->where, mode);
+   d->fd = bfopen(d->where, mode);
    if (!d->fd) {
       berrno be;
       delivery_error(_("fopen %s failed: ERR=%s\n"), d->where, be.bstrerror());
@@ -786,6 +788,7 @@ void dispatch_message(JCR *jcr, int type, utime_t mtime, char *msg)
     MSGS *msgs;
     BPIPE *bpipe;
     const char *mode;
+    bool created_jcr = false;
 
     Dmsg2(850, "Enter dispatch_msg type=%d msg=%s", type, msg);
 
@@ -831,6 +834,10 @@ void dispatch_message(JCR *jcr, int type, utime_t mtime, char *msg)
     if (!jcr) {
        jcr = get_jcr_from_tsd();
     }
+    if (!jcr) {
+       jcr = new_jcr(sizeof(JCR), NULL);
+       created_jcr = true;
+    }
     if (jcr) {
        msgs = jcr->jcr_msgs;
     }
@@ -849,6 +856,7 @@ void dispatch_message(JCR *jcr, int type, utime_t mtime, char *msg)
        return;
     }
 
+
     for (d=msgs->dest_chain; d; d=d->next) {
        if (bit_is_set(type, d->msg_types)) {
           bool ok;
@@ -881,7 +889,7 @@ void dispatch_message(JCR *jcr, int type, utime_t mtime, char *msg)
              case MD_CONSOLE:
                 Dmsg1(850, "CONSOLE for following msg: %s", msg);
                 if (!con_fd) {
-                   con_fd = fopen(con_fname, "a+b");
+                   con_fd = bfopen(con_fname, "a+b");
                    Dmsg0(850, "Console file not open.\n");
                 }
                 if (con_fd) {
@@ -941,7 +949,7 @@ void dispatch_message(JCR *jcr, int type, utime_t mtime, char *msg)
                 if (!d->fd) {
                    POOLMEM *name = get_pool_memory(PM_MESSAGE);
                    make_unique_mail_filename(jcr, name, d);
-                   d->fd = fopen(name, "w+b");
+                   d->fd = bfopen(name, "w+b");
                    if (!d->fd) {
                       berrno be;
                       delivery_error(_("Msg delivery error: fopen %s failed: ERR=%s\n"), name,
@@ -992,8 +1000,8 @@ send_to_file:
              case MD_DIRECTOR:
                 Dmsg1(850, "DIRECTOR for following msg: %s", msg);
                 if (jcr && jcr->dir_bsock && !jcr->dir_bsock->errors) {
-                   jcr->dir_bsock->fsend("Jmsg Job=%s type=%d level=%lld %s",
-                      jcr->Job, type, mtime, msg);
+                   jcr->dir_bsock->fsend("Jmsg JobId=%ld type=%d level=%lld %s",
+                      jcr->JobId, type, mtime, msg);
                 } else {
                    Dmsg1(800, "no jcr for following msg: %s", msg);
                 }
@@ -1017,6 +1025,9 @@ send_to_file:
           }
        }
     }
+    if (created_jcr) {
+       free_jcr(jcr);
+    }
 }
 
 /*********************************************************************
@@ -1053,7 +1064,7 @@ static void pt_out(char *buf)
        if (!trace_fd) {
           char fn[200];
           bsnprintf(fn, sizeof(fn), "%s/%s.trace", working_directory ? working_directory : "./", my_name);
-          trace_fd = fopen(fn, "a+b");
+          trace_fd = bfopen(fn, "a+b");
        }
        if (trace_fd) {
           fputs(buf, trace_fd);
@@ -1151,9 +1162,7 @@ void set_trace(int trace_flag)
 
 void set_hangup(int hangup_value)
 {
-   if (hangup_value < 0) {
-      return;
-   } else {
+   if (hangup_value != -1) {
       hangup = hangup_value;
    }
 }
@@ -1165,9 +1174,7 @@ int get_hangup(void)
 
 void set_blowup(int blowup_value)
 {
-   if (blowup_value < 0) {
-      return;
-   } else {
+   if (blowup_value != -1) {
       blowup = blowup_value;
    }
 }
@@ -1177,6 +1184,36 @@ int get_blowup(void)
    return blowup;
 }
 
+bool handle_hangup_blowup(JCR *jcr, uint32_t file_count, uint64_t byte_count)
+{
+   if (hangup == 0 && blowup == 0) {
+      /* quick check */
+      return false;
+   }
+   /* Debug code: check if we must hangup  or blowup */
+   if ((hangup > 0 && (file_count > (uint32_t)hangup)) ||
+       (hangup < 0 && (byte_count/1024 > (uint32_t)-hangup)))  {
+      jcr->setJobStatus(JS_Incomplete);
+      if (hangup > 0) {
+         Jmsg1(jcr, M_FATAL, 0, "Debug hangup requested after %d files.\n", hangup);
+      } else {
+         Jmsg1(jcr, M_FATAL, 0, "Debug hangup requested after %d Kbytes.\n", -hangup);
+      }
+      set_hangup(0);
+      return true;
+   }
+   if ((blowup > 0 && (file_count > (uint32_t)blowup)) ||
+       (blowup < 0 && (byte_count/1024 > (uint32_t)-blowup)))  {
+      if (blowup > 0) {
+         Jmsg1(jcr, M_ABORT, 0, "Debug blowup requested after %d files.\n", blowup);
+      } else {
+         Jmsg1(jcr, M_ABORT, 0, "Debug blowup requested after %d Kbytes.\n", -blowup);
+      }
+      /* will never reach this line */
+      return true;
+   }
+   return false;
+}
 
 bool get_trace(void)
 {
@@ -1247,7 +1284,7 @@ t_msg(const char *file, int line, int64_t level, const char *fmt,...)
     if (level <= debug_level) {
        if (!trace_fd) {
           bsnprintf(buf, sizeof(buf), "%s/%s.trace", working_directory ? working_directory : ".", my_name);
-          trace_fd = fopen(buf, "a+b");
+          trace_fd = bfopen(buf, "a+b");
        }
 
 #ifdef FULL_LOCATION
@@ -1337,6 +1374,23 @@ e_msg(const char *file, int line, int type, int level, const char *fmt,...)
     }
 }
 
+/* Check in the msgs resource if a given type is defined */
+bool is_message_type_set(JCR *jcr, int type)
+{
+   MSGS *msgs = NULL;
+   if (jcr) {
+       msgs = jcr->jcr_msgs;
+   }
+   if (!msgs) {
+      msgs = daemon_msgs;            /* if no jcr, we use daemon handler */
+   }
+   if (msgs && (type != M_ABORT && type != M_ERROR_TERM) &&
+       !bit_is_set(type, msgs->send_msg)) {
+      return false;                 /* no destination */
+   }
+   return true;
+}
+
 /* *********************************************************
  *
  * Generate a Job message
@@ -1633,14 +1687,17 @@ void Qmsg(JCR *jcr, int type, utime_t mtime, const char *fmt,...)
    }
 
    if (jcr && type==M_FATAL) {
-      // TODO ASX MUST use a lock to protect access jcr->JobStatus from another thread
       jcr->setJobStatus(JS_FatalError);
     }
 
    /* If no jcr or no queue or dequeuing send to syslog */
    if (!jcr || !jcr->msg_queue || jcr->dequeuing_msgs) {
       syslog(LOG_DAEMON|LOG_ERR, "%s", item->msg);
-      free(item);
+      P(daemon_msg_queue_mutex);
+      if (daemon_msg_queue) {
+         daemon_msg_queue->append(item);
+      }
+      V(daemon_msg_queue_mutex);
    } else {
       /* Queue message for later sending */
       P(jcr->msg_queue_mutex);
@@ -1656,14 +1713,45 @@ void Qmsg(JCR *jcr, int type, utime_t mtime, const char *fmt,...)
 void dequeue_messages(JCR *jcr)
 {
    MQUEUE_ITEM *item;
-   if (!jcr->msg_queue) {
+   JobId_t JobId;
+
+   /* Avoid bad calls, recursion and no Director connection */
+   if (jcr == NULL || jcr->dequeuing_msgs ||
+       jcr->dir_bsock == NULL || jcr->dir_bsock->is_closed()) {
+      return;
+   }
+
+   /* Dequeue daemon messages */
+   if (daemon_msg_queue && !dequeuing_daemon_msgs) {
+      P(daemon_msg_queue_mutex);
+      dequeuing_daemon_msgs = true;
+      jcr->dequeuing_msgs = true;
+      JobId = jcr->JobId;
+      jcr->JobId = 0;       /* set daemon JobId == 0 */
+      jcr->dir_bsock->suppress_error_messages(true);
+      foreach_dlist(item, daemon_msg_queue) {
+         Jmsg(jcr, item->type, item->mtime, "%s", item->msg);
+      }
+      jcr->dir_bsock->suppress_error_messages(false);
+      /* Remove messages just sent */
+      daemon_msg_queue->destroy();
+      jcr->JobId = JobId;   /* restore JobId */
+      jcr->dequeuing_msgs = false;
+      dequeuing_daemon_msgs = false;
+      V(daemon_msg_queue_mutex);
+   }
+
+   /* Dequeue Job specific messages */
+   if (!jcr->msg_queue || jcr->dequeuing_msgs) {
       return;
    }
    P(jcr->msg_queue_mutex);
    jcr->dequeuing_msgs = true;
+   jcr->dir_bsock->suppress_error_messages(true);
    foreach_dlist(item, jcr->msg_queue) {
       Jmsg(jcr, item->type, item->mtime, "%s", item->msg);
    }
+   jcr->dir_bsock->suppress_error_messages(false);
    /* Remove messages just sent */
    jcr->msg_queue->destroy();
    jcr->dequeuing_msgs = false;