X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Flib%2Fbsys.c;h=4a4569ad53d316a9ef922e428f29c4a926e03c35;hb=10cfd798ced2d27f61ead2de6fe9b1bcc8e3468d;hp=7900e3cf58776420a7e3241155fde9e9bce0c9a5;hpb=82201676d0c997033333adf6c7719534519b8102;p=bacula%2Fbacula diff --git a/bacula/src/lib/bsys.c b/bacula/src/lib/bsys.c index 7900e3cf58..4a4569ad53 100644 --- a/bacula/src/lib/bsys.c +++ b/bacula/src/lib/bsys.c @@ -1,29 +1,20 @@ /* - Bacula® - The Network Backup Solution - - Copyright (C) 2000-2012 Free Software Foundation Europe e.V. - - The main author of Bacula is Kern Sibbald, with contributions from - many others, a complete list can be found in the file AUTHORS. - This program is Free Software; you can redistribute it and/or - modify it under the terms of version three of the GNU Affero General Public - License as published by the Free Software Foundation and included - in the file LICENSE. - - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - 02110-1301, USA. - - Bacula® is a registered trademark of Kern Sibbald. - The licensor of Bacula is the Free Software Foundation Europe - (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, - Switzerland, email:ftf@fsfeurope.org. + Bacula(R) - The Network Backup Solution + + Copyright (C) 2000-2017 Kern Sibbald + + The original author of Bacula is Kern Sibbald, with contributions + from many others, a complete list can be found in the file AUTHORS. + + You may use this file and others of this release according to the + license defined in the LICENSE file, which includes the Affero General + Public License, v3.0 ("AGPLv3") and some additional permissions and + terms pursuant to its AGPLv3 Section 7. + + This notice must be preserved when any source code is + conveyed and/or propagated. + + Bacula(R) is a registered trademark of Kern Sibbald. */ /* * Miscellaneous Bacula memory and thread safe routines @@ -44,10 +35,91 @@ static pthread_mutex_t timer_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t timer = PTHREAD_COND_INITIALIZER; +/* + * Quote a string + */ +POOLMEM *quote_string(POOLMEM *snew, const char *old) +{ + char *n; + int i; + + if (!old) { + strcpy(snew, "null"); + return snew; + } + n = snew; + *n++ = '"'; + for (i=0; old[i]; i++) { + switch (old[i]) { + case '"': + *n++ = '\\'; + *n++ = '"'; + break; + case '\\': + *n++ = '\\'; + *n++ = '\\'; + break; + case '\r': + *n++ = '\\'; + *n++ = 'r'; + break; + case '\n': + *n++ = '\\'; + *n++ = 'n'; + break; + default: + *n++ = old[i]; + break; + } + } + *n++ = '"'; + *n = 0; + return snew; +} + +/* + * Quote a where (list of addresses separated by spaces) + */ +POOLMEM *quote_where(POOLMEM *snew, const char *old) +{ + char *n; + int i; + + if (!old) { + strcpy(snew, "null"); + return snew; + } + n = snew; + *n++ = '"'; + for (i=0; old[i]; i++) { + switch (old[i]) { + case ' ': + *n++ = '"'; + *n++ = ','; + *n++ = '"'; + break; + case '"': + *n++ = '\\'; + *n++ = '"'; + break; + case '\\': + *n++ = '\\'; + *n++ = '\\'; + break; + default: + *n++ = old[i]; + break; + } + } + *n++ = '"'; + *n = 0; + return snew; +} + /* * This routine is a somewhat safer unlink in that it - * allows you to run a regex on the filename before - * excepting it. It also requires the file to be in + * allows you to run a regex on the filename before + * excepting it. It also requires the file to be in * the working directory. */ int safer_unlink(const char *pathname, const char *regx) @@ -119,7 +191,7 @@ int bmicrosleep(int32_t sec, int32_t usec) timeout.tv_sec++; } - Dmsg2(200, "pthread_cond_timedwait sec=%lld usec=%d\n", sec, usec); + Dmsg2(200, "pthread_cond_timedwait sec=%d usec=%d\n", sec, usec); /* Note, this unlocks mutex during the sleep */ P(timer_mutex); stat = pthread_cond_timedwait(&timer, &timer_mutex, &timeout); @@ -190,15 +262,26 @@ bool bstrcmp(const char *s1, const char *s2) return strcmp(s1, s2) == 0; } +/* + * Allows one or both pointers to be NULL + */ +bool bstrcasecmp(const char *s1, const char *s2) +{ + if (s1 == s2) return true; + if (s1 == NULL || s2 == NULL) return false; + return strcasecmp(s1, s2) == 0; +} + + /* * Get character length of UTF-8 string * * Valid UTF-8 codes - * U-00000000 - U-0000007F: 0xxxxxxx - * U-00000080 - U-000007FF: 110xxxxx 10xxxxxx - * U-00000800 - U-0000FFFF: 1110xxxx 10xxxxxx 10xxxxxx - * U-00010000 - U-001FFFFF: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx - * U-00200000 - U-03FFFFFF: 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx + * U-00000000 - U-0000007F: 0xxxxxxx + * U-00000080 - U-000007FF: 110xxxxx 10xxxxxx + * U-00000800 - U-0000FFFF: 1110xxxx 10xxxxxx 10xxxxxx + * U-00010000 - U-001FFFFF: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx + * U-00200000 - U-03FFFFFF: 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx * U-04000000 - U-7FFFFFFF: 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx */ int cstrlen(const char *str) @@ -244,7 +327,14 @@ int cstrlen(const char *str) return len; } - +/* We need to disable the malloc() macro if SMARTALLOC is not used, + * else, it points to b_malloc() and causes problems. + */ +#ifndef SMARTALLOC + #ifdef malloc + #undef malloc + #endif +#endif #ifndef bmalloc void *bmalloc(size_t size) @@ -444,20 +534,39 @@ void b_memset(const char *file, int line, void *mem, int val, size_t num) #if !defined(HAVE_WIN32) static int del_pid_file_ok = FALSE; #endif +static int pid_fd = -1; -/* - * Create a standard "Unix" pid file. +#ifdef HAVE_FCNTL_LOCK +/* a convenient function [un]lock file using fnctl() + * code must be in F_UNLCK, F_RDLCK, F_WRLCK + * return -1 for error and errno is set */ -void create_pid_file(char *dir, const char *progname, int port) +int fcntl_lock(int fd, int code) { + struct flock l; + l.l_type = code; + l.l_whence = l.l_start = l.l_len = 0; + l.l_len = 1; + return fcntl(fd, F_SETLK, &l); +} +#endif + +/* Create a disk pid "lock" file + * returns + * 0: Error with the error message in errmsg + * 1: Succcess + * 2: Successs, but a previous file was found + */ +#if !defined(HAVE_FCNTL_LOCK) || defined(HAVE_WIN32) +int create_lock_file(char *fname, const char *progname, const char *filetype, POOLMEM **errmsg, int *fd) +{ + int ret = 1; #if !defined(HAVE_WIN32) int pidfd, len; int oldpid; char pidbuf[20]; - POOLMEM *fname = get_pool_memory(PM_FNAME); struct stat statp; - Mmsg(&fname, "%s/%s.%d.pid", dir, progname, port); if (stat(fname, &statp) == 0) { /* File exists, see what we have */ *pidbuf = 0; @@ -465,8 +574,10 @@ void create_pid_file(char *dir, const char *progname, int port) read(pidfd, &pidbuf, sizeof(pidbuf)) < 0 || sscanf(pidbuf, "%d", &oldpid) != 1) { berrno be; - Emsg2(M_ERROR_TERM, 0, _("Cannot open pid file. %s ERR=%s\n"), fname, - be.bstrerror()); + Mmsg(errmsg, _("Cannot open %s file. %s ERR=%s\n"), filetype, fname, + be.bstrerror()); + close(pidfd); /* if it was successfully opened */ + return 0; } /* Some OSes (IRIX) don't bother to clean out the old pid files after a crash, and * since they use a deterministic algorithm for assigning PIDs, we can have @@ -480,27 +591,84 @@ void create_pid_file(char *dir, const char *progname, int port) * For more details see bug #797. */ if ((oldpid != (int)getpid()) && (kill(oldpid, 0) != -1 || errno != ESRCH)) { - Emsg3(M_ERROR_TERM, 0, _("%s is already running. pid=%d\nCheck file %s\n"), + Mmsg(errmsg, _("%s is already running. pid=%d\nCheck file %s\n"), progname, oldpid, fname); + return 0; } /* He is not alive, so take over file ownership */ unlink(fname); /* remove stale pid file */ + ret = 2; } /* Create new pid file */ if ((pidfd = open(fname, O_CREAT|O_TRUNC|O_WRONLY|O_BINARY, 0640)) >= 0) { len = sprintf(pidbuf, "%d\n", (int)getpid()); write(pidfd, pidbuf, len); close(pidfd); - del_pid_file_ok = TRUE; /* we created it so we can delete it */ + /* ret is already 1 */ } else { berrno be; - Emsg2(M_ERROR_TERM, 0, _("Could not open pid file. %s ERR=%s\n"), fname, - be.bstrerror()); + Mmsg(errmsg, _("Could not open %s file. %s ERR=%s\n"), filetype, fname, be.bstrerror()); + return 0; } - free_pool_memory(fname); #endif + return ret; } +#else /* defined(HAVE_FCNTL_LOCK) */ +int create_lock_file(char *fname, const char *progname, const char *filetype, POOLMEM **errmsg, int *fd) +{ + int len; + int oldpid; + char pidbuf[20]; + /* Open the pidfile for writing */ + if ((*fd = open(fname, O_CREAT|O_RDWR, 0640)) >= 0) { + if (fcntl_lock(*fd, F_WRLCK) == -1) { + berrno be; + /* already locked by someone else, try to read the pid */ + if (read(*fd, &pidbuf, sizeof(pidbuf)) > 0 && + sscanf(pidbuf, "%d", &oldpid) == 1) { + Mmsg(errmsg, _("%s is already running. pid=%d, check file %s\n"), + progname, oldpid, fname); + } else { + Mmsg(errmsg, _("Cannot lock %s file. %s ERR=%s\n"), filetype, fname, be.bstrerror()); + } + close(*fd); + *fd=-1; + return 0; + } + /* write the pid */ + len = sprintf(pidbuf, "%d\n", (int)getpid()); + write(*fd, pidbuf, len); + /* KEEP THE FILE OPEN TO KEEP THE LOCK !!! */ + return 1; + } else { + berrno be; + Mmsg(errmsg, _("Cannot not open %s file. %s ERR=%s\n"), filetype, fname, be.bstrerror()); + return 0; + } +} +#endif + +/* + * Create a standard "Unix" pid file. + */ +void create_pid_file(char *dir, const char *progname, int port) +{ + POOLMEM *errmsg = get_pool_memory(PM_MESSAGE); + POOLMEM *fname = get_pool_memory(PM_FNAME); + + Mmsg(fname, "%s/%s.%d.pid", dir, progname, port); + if (create_lock_file(fname, progname, "pid", &errmsg, &pid_fd) == 0) { + Emsg1(M_ERROR_TERM, 0, "%s", errmsg); + /* never return */ + } +#if !defined(HAVE_WIN32) + del_pid_file_ok = TRUE; /* we created it so we can delete it */ +#endif + + free_pool_memory(fname); + free_pool_memory(errmsg); +} /* * Delete the pid file if we created it @@ -509,7 +677,9 @@ int delete_pid_file(char *dir, const char *progname, int port) { #if !defined(HAVE_WIN32) POOLMEM *fname = get_pool_memory(PM_FNAME); - + if (pid_fd!=-1) { + close(pid_fd); + } if (!del_pid_file_ok) { free_pool_memory(fname); return 0; @@ -553,7 +723,7 @@ void read_state_file(char *dir, const char *progname, int port) if ((sfd = open(fname, O_RDONLY|O_BINARY)) < 0) { berrno be; Dmsg3(010, "Could not open state file. sfd=%d size=%d: ERR=%s\n", - sfd, sizeof(hdr), be.bstrerror()); + sfd, (int)sizeof(hdr), be.bstrerror()); goto bail_out; } if ((stat=read(sfd, &hdr, hdr_size)) != hdr_size) { @@ -597,7 +767,7 @@ void write_state_file(char *dir, const char *progname, int port) int sfd; bool ok = false; POOLMEM *fname = get_pool_memory(PM_FNAME); - + P(state_mutex); /* Only one job at a time can call here */ Mmsg(&fname, "%s/%s.%d.state", dir, progname, port); /* Create new state file */ @@ -774,7 +944,26 @@ char *escape_filename(const char *file_path) return escaped_path; } +/* + * For the moment preventing suspensions is only + * implemented on Windows. + */ +#ifndef HAVE_WIN32 +void prevent_os_suspensions() +{ } + +void allow_os_suspensions() +{ } +#endif + + #if HAVE_BACKTRACE && HAVE_GCC +/* if some names are not resolved you can try using : addr2line, like this + * $ addr2line -e bin/bacula-sd -a 0x43cd11 + * OR + * use the the -rdynamic option in the linker, like this + * $ LDFLAGS="-rdynamic" make setup + */ #include #include void stack_trace() @@ -783,10 +972,10 @@ void stack_trace() size_t stack_depth; void *stack_addrs[max_depth]; char **stack_strings; - + stack_depth = backtrace(stack_addrs, max_depth); stack_strings = backtrace_symbols(stack_addrs, stack_depth); - + for (size_t i = 3; i < stack_depth; i++) { size_t sz = 200; /* just a guess, template names will go much wider */ char *function = (char *)actuallymalloc(sz); @@ -803,7 +992,7 @@ void stack_trace() *begin++ = '\0'; *end = '\0'; /* found our mangled name, now in [begin, end] */ - + int status; char *ret = abi::__cxa_demangle(begin, function, &sz, &status); if (ret) { @@ -828,3 +1017,423 @@ void stack_trace() #else /* HAVE_BACKTRACE && HAVE_GCC */ void stack_trace() {} #endif /* HAVE_BACKTRACE && HAVE_GCC */ + +#ifdef HAVE_SYS_STATVFS_H +#include +#else +#define statvfs statfs +#endif +/* statvfs.h defines ST_APPEND, which is also used by Bacula */ +#undef ST_APPEND + + +int fs_get_free_space(const char *path, int64_t *freeval, int64_t *totalval) +{ +#ifndef HAVE_WIN32 + struct statvfs st; + + if (statvfs(path, &st) == 0) { + *freeval = (uint64_t)st.f_bavail * (uint64_t)st.f_frsize; + *totalval = (uint64_t)st.f_blocks * (uint64_t)st.f_frsize; + return 0; + } +#endif + + *totalval = *freeval = 0; + return -1; +} + +/* This function is used after a fork, the memory manager is not be initialized + * properly, so we must stay simple. + */ +void setup_env(char *envp[]) +{ + if (envp) { +#if defined(HAVE_SETENV) + char *p; + for (int i=0; envp[i] ; i++) { + p = strchr(envp[i], '='); /* HOME=/tmp */ + if (p) { + *p=0; /* HOME\0tmp\0 */ + setenv(envp[i], p+1, true); + *p='='; + } + } +#elif defined(HAVE_PUTENV) + for (int i=0; envp[i] ; i++) { + putenv(envp[i]); + } +#else +#error "putenv() and setenv() are not available on this system" +#endif + } +} + +/* Small function to copy a file somewhere else, + * for debug purpose. + */ +int copyfile(const char *src, const char *dst) +{ + int fd_src=-1, fd_dst=-1; + ssize_t len, lenw; + char buf[4096]; + berrno be; + fd_src = open(src, O_RDONLY); + if (fd_src < 0) { + Dmsg2(0, "Unable to open %s ERR=%s\n", src, be.bstrerror(errno)); + goto bail_out; + } + fd_dst = open(dst, O_WRONLY | O_CREAT | O_EXCL, 0600); + if (fd_dst < 0) { + Dmsg2(0, "Unable to open %s ERR=%s\n", dst, be.bstrerror(errno)); + goto bail_out; + } + + while ((len = read(fd_src, buf, sizeof(buf))) > 0) + { + char *out_ptr = buf; + do { + lenw = write(fd_dst, out_ptr, len); + if (lenw >= 0) { + len -= lenw; + out_ptr += lenw; + } else if (errno != EINTR) { + Dmsg3(0, "Unable to write %d bytes in %s. ERR=%s\n", len, dst, be.bstrerror(errno)); + goto bail_out; + } + } while (len > 0); + } + + if (len == 0) { + close(fd_src); + if (close(fd_dst) < 0) { + Dmsg2(0, "Unable to close %s properly. ERR=%s\n", dst, be.bstrerror(errno)); + return -1; + } + /* Success! */ + return 0; + } +bail_out: + close(fd_src); + close(fd_dst); + return -1; +} + +/* The poll() code is currently disabled */ +#ifdef HAVE_POLL + +#include +#define NB_EVENT 1 + +int fd_wait_data(int fd, fd_wait_mode mode, int sec, int msec) +{ + int ret; + struct pollfd fds[NB_EVENT]; /* The structure for one event */ + + fds[0].fd = fd; + fds[0].events = (mode == WAIT_READ) ? POLLIN : POLLOUT; + + ret = poll(fds, NB_EVENT, sec * 1000 + msec); + + /* Check if poll actually succeed */ + switch(ret) { + case 0: /* timeout; no event detected */ + return 0; + + case -1: /* report error and abort */ + return -1; + + default: + if (fds[0].revents & POLLIN || fds[0].revents & POLLOUT) { + return 1; + + } else { + return -1; /* unexpected... */ + } + } + return -1; /* unexpected... */ +} +#else + +/* The select() code with a bigger fd_set was tested on Linux, FreeBSD and SunOS */ +#if defined(HAVE_LINUX_OS) || defined(HAVE_FREEBSD_OS) || defined(HAVE_SUN_OS) || defined(HAVE_WIN32) + #define SELECT_MAX_FD 7990 +#else + #define SELECT_MAX_FD 1023 /* For others, we keep it low */ +#endif + +int fd_wait_data(int fd, fd_wait_mode mode, int sec, int msec) +{ + + /* TODO: Allocate the fd_set when fd > SELECT_MAX_FD */ + union { + fd_set fdset; + char bfd_buf[1000]; + }; + struct timeval tv; + int ret; + + if (fd > SELECT_MAX_FD) { + Pmsg1(0, "Too many open files for the current system fd=%d\n", fd); + return -1; + } + + memset(&bfd_buf, 0, sizeof(bfd_buf)); /* FD_ZERO(&fdset) */ + FD_SET((unsigned)fd, &fdset); + + tv.tv_sec = sec; + tv.tv_usec = msec * 1000; + + if (mode == WAIT_READ) { + ret = select(fd + 1, &fdset, NULL, NULL, &tv); + + } else { /* WAIT_WRITE */ + ret = select(fd + 1, NULL, &fdset, NULL, &tv); + } + + switch (ret) { + case 0: /* timeout */ + return 0; + case -1: + return -1; /* error return */ + default: + break; + } + return 1; +} +#endif + +/* Use SOCK_CLOEXEC option when calling accept(). If not available, + * do it ourself (but with a race condition...) + */ +int baccept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) +{ + int fd; +#ifdef HAVE_ACCEPT4 + fd = accept4(sockfd, addr, addrlen, SOCK_CLOEXEC); +#else + fd = accept(sockfd, addr, addrlen); + +# ifdef HAVE_DECL_FD_CLOEXEC + if (fd >= 0) { + int tmp_errno = errno; + if (fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC) < 0) { + berrno be; + Dmsg2(0, "Unable to set the CLOEXEC flag on fd=%d ERR=%s\n", fd, be.bstrerror()); + } + errno = tmp_errno; + } + +# endif /* HAVE_DECL_FD_CLOEXEC */ +#endif /* HAVE_ACCEPT4 */ + return fd; +} + +#undef fopen +FILE *bfopen(const char *path, const char *mode) +{ + char options[50]; + FILE *fp; + + bstrncpy(options, mode, sizeof(options)); + +#if defined(HAVE_STREAM_CLOEXEC) + bstrncat(options, STREAM_CLOEXEC, sizeof(options)); +#endif + + fp = fopen(path, options); + +#if !defined(HAVE_STREAM_CLOEXEC) && defined(HAVE_DECL_FD_CLOEXEC) + if (fp) { + int fd = fileno(fp); + if (fd >= 0) { + int tmp_errno = errno; + if (fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC) < 0) { + berrno be; + Dmsg2(0, "Unable to set the CLOEXEC flag on fd=%d ERR=%s\n", fd, be.bstrerror()); + } + errno = tmp_errno; + } + } +#endif + return fp; +} + + +#ifdef TEST_PROGRAM +/* The main idea of the test is pretty simple, we have a writer and a reader, and + * they wait a little bit to read or send data over the fifo. + * So, for the first packets, the writer will wait, then the reader will wait + * read/write requests should always be fast. Only the time of the fd_wait_data() + * should be long. + */ +#include "findlib/namedpipe.h" +#define PIPENAME "/tmp/wait.pipe.%d" + +#define NBPACKETS 10 +#define BUFSIZE 128*512 /* The pipe size looks to be 65K */ + +typedef struct { + int nb; + pthread_t writer; + pthread_t reader; +} job; + +pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +pthread_mutex_t cond_mutex = PTHREAD_MUTEX_INITIALIZER; +int nb_ready=0; + +void *th1(void *a) +{ + NamedPipe p; + int fd, r; + btime_t s, e; + ssize_t nb; + char buf[BUFSIZE]; + job *j = (job *)a; + + namedpipe_init(&p); + bsnprintf(buf, sizeof(buf), PIPENAME, j->nb); + if (namedpipe_create(&p, buf, 0600) < 0) { + berrno be; + Dmsg2(0, "R: Unable to create the fifo %s. ERR=%s\n", buf, be.bstrerror()); + namedpipe_free(&p); + exit(2); + } + fd = namedpipe_open(&p, buf, O_RDONLY); + if (fd < 0) { + berrno be; + Dmsg2(0, "R: Unable to open the fifo %s. ERR=%s\n", buf, be.bstrerror()); + return NULL; + } + P(cond_mutex); + nb_ready++; + pthread_cond_wait(&cond, &cond_mutex); + V(cond_mutex); + for (int i = 0; i < NBPACKETS; i++) { + if (i < (NBPACKETS/2)) { + bmicrosleep(5, 0); + } + s = get_current_btime(); + r = fd_wait_data(fd, WAIT_READ, 10, 0); + if (r > 0) { + e = get_current_btime(); + Dmsg2(0, "Wait to read pkt %d %lldms\n",i, (int64_t) (e - s)); + + if (i <= NBPACKETS/2) { + ASSERT2((e-s) < 10000, "In the 1st phase, we are blocking the process"); + } else { + ASSERT2((e-s) > 10000, "In the 2nd phase, the writer is slowing down things"); + } + + s = get_current_btime(); + nb = read(fd, buf, sizeof(buf)); + e = get_current_btime(); + Dmsg3(0, "Read pkt %d %d bytes in %lldms\n",i, (int)nb, (int64_t) (e - s)); + ASSERT2((e-s) < 10000, "The read operation should be FAST"); + } + } + namedpipe_free(&p); + return NULL; +} + +void *th2(void *a) +{ + NamedPipe p; + btime_t s, e; + job *j = (job *)a; + char buf[BUFSIZE]; + int fd; + ssize_t nb; + + bsnprintf(buf, sizeof(buf), PIPENAME, j->nb); + namedpipe_init(&p); + if (namedpipe_create(&p, buf, 0600) < 0) { + berrno be; + Dmsg2(0, "W: Unable to create the fifo %s. ERR=%s\n", buf, be.bstrerror()); + namedpipe_free(&p); + exit(2); + } + + fd = namedpipe_open(&p, buf, O_WRONLY); + if (fd < 0) { + berrno be; + Dmsg2(0, "W: Unable to open the fifo %s. ERR=%s\n", buf, be.bstrerror()); + namedpipe_free(&p); + exit(2); + } + + P(cond_mutex); + nb_ready++; + pthread_cond_wait(&cond, &cond_mutex); + V(cond_mutex); + + unlink(buf); + + for (int i=0; i < NBPACKETS; i++) { + if (i > (NBPACKETS/2)) { + bmicrosleep(5, 0); + } + s = get_current_btime(); + if (fd_wait_data(fd, WAIT_WRITE, 10, 0) > 0) { + e = get_current_btime(); + Dmsg2(0, "Wait to write pkt %d %lldms\n",i, (int64_t) (e - s)); + + if (i == 0 || i > NBPACKETS/2) { /* The first packet doesn't count */ + ASSERT2((e-s) < 100000, "In the 2nd phase, it's fast to send, we are the blocker"); + } else { + ASSERT2((e-s) > 100000, "In the 1st phase, we wait for the reader"); + } + + s = get_current_btime(); + nb = write(fd, buf, sizeof(buf)); + e = get_current_btime(); + Dmsg3(0, "Wrote pkt %d %d bytes in %lldms\n", i, (int)nb, (int64_t) (e - s)); + ASSERT2((e-s) < 100000, "The write operation should never block"); + } + } + namedpipe_free(&p); + return NULL; +} + +int main(int argc, char **argv) +{ + job pthread_list[10000]; + int j = (argc >= 2) ? atoi(argv[1]) : 1; + int maxfd = (argc == 3) ? atoi(argv[2]) : 0; + + j = MIN(10000, j); + + lmgr_init_thread(); + set_debug_flags((char *)"h"); + + for (int i=3; i < maxfd; i++) { + open("/dev/null", O_RDONLY); + } + + for (int i=0; i < j; i++) { + pthread_list[i].nb=i; + pthread_create(&pthread_list[i].writer, NULL, th2, &pthread_list[i]); + pthread_create(&pthread_list[i].reader, NULL, th1, &pthread_list[i]); + } + + while (nb_ready < j*2) { + bmicrosleep(1, 0); + } + + Dmsg0(0, "All threads are started\n"); + P(cond_mutex); + pthread_cond_broadcast(&cond); + V(cond_mutex); + + for (int i=0; i < j; i++) { + pthread_join(pthread_list[i].writer, NULL); + pthread_join(pthread_list[i].reader, NULL); + } + + for (int i=3; i < maxfd; i++) { + close(i); + } + return 0; +} +#endif