+ return fp;
+}
+
+
+#ifdef TEST_PROGRAM
+/* The main idea of the test is pretty simple, we have a writer and a reader, and
+ * they wait a little bit to read or send data over the fifo.
+ * So, for the first packets, the writer will wait, then the reader will wait
+ * read/write requests should always be fast. Only the time of the fd_wait_data()
+ * should be long.
+ */
+#include "findlib/namedpipe.h"
+#define PIPENAME "/tmp/wait.pipe.%d"
+
+#define NBPACKETS 10
+#define BUFSIZE 128*512 /* The pipe size looks to be 65K */
+
+typedef struct {
+ int nb;
+ pthread_t writer;
+ pthread_t reader;
+} job;
+
+pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t cond_mutex = PTHREAD_MUTEX_INITIALIZER;
+int nb_ready=0;
+
+void *th1(void *a)
+{
+ NamedPipe p;
+ int fd, r;
+ btime_t s, e;
+ ssize_t nb;
+ char buf[BUFSIZE];
+ job *j = (job *)a;
+
+ namedpipe_init(&p);
+ bsnprintf(buf, sizeof(buf), PIPENAME, j->nb);
+ if (namedpipe_create(&p, buf, 0600) < 0) {
+ berrno be;
+ Dmsg2(0, "R: Unable to create the fifo %s. ERR=%s\n", buf, be.bstrerror());
+ namedpipe_free(&p);
+ exit(2);
+ }
+ fd = namedpipe_open(&p, buf, O_RDONLY);
+ if (fd < 0) {
+ berrno be;
+ Dmsg2(0, "R: Unable to open the fifo %s. ERR=%s\n", buf, be.bstrerror());
+ return NULL;
+ }
+ P(cond_mutex);
+ nb_ready++;
+ pthread_cond_wait(&cond, &cond_mutex);
+ V(cond_mutex);
+ for (int i = 0; i < NBPACKETS; i++) {
+ if (i < (NBPACKETS/2)) {
+ bmicrosleep(5, 0);
+ }
+ s = get_current_btime();
+ r = fd_wait_data(fd, WAIT_READ, 10, 0);
+ if (r > 0) {
+ e = get_current_btime();
+ Dmsg2(0, "Wait to read pkt %d %lldms\n",i, (int64_t) (e - s));
+
+ if (i <= NBPACKETS/2) {
+ ASSERT2((e-s) < 10000, "In the 1st phase, we are blocking the process");
+ } else {
+ ASSERT2((e-s) > 10000, "In the 2nd phase, the writer is slowing down things");
+ }
+
+ s = get_current_btime();
+ nb = read(fd, buf, sizeof(buf));
+ e = get_current_btime();
+ Dmsg3(0, "Read pkt %d %d bytes in %lldms\n",i, (int)nb, (int64_t) (e - s));
+ ASSERT2((e-s) < 10000, "The read operation should be FAST");
+ }
+ }
+ namedpipe_free(&p);
+ return NULL;
+}
+
+void *th2(void *a)
+{
+ NamedPipe p;
+ btime_t s, e;
+ job *j = (job *)a;
+ char buf[BUFSIZE];
+ int fd;
+ ssize_t nb;
+
+ bsnprintf(buf, sizeof(buf), PIPENAME, j->nb);
+ namedpipe_init(&p);
+ if (namedpipe_create(&p, buf, 0600) < 0) {
+ berrno be;
+ Dmsg2(0, "W: Unable to create the fifo %s. ERR=%s\n", buf, be.bstrerror());
+ namedpipe_free(&p);
+ exit(2);
+ }
+
+ fd = namedpipe_open(&p, buf, O_WRONLY);
+ if (fd < 0) {
+ berrno be;
+ Dmsg2(0, "W: Unable to open the fifo %s. ERR=%s\n", buf, be.bstrerror());
+ namedpipe_free(&p);
+ exit(2);
+ }
+
+ P(cond_mutex);
+ nb_ready++;
+ pthread_cond_wait(&cond, &cond_mutex);
+ V(cond_mutex);
+
+ unlink(buf);
+
+ for (int i=0; i < NBPACKETS; i++) {
+ if (i > (NBPACKETS/2)) {
+ bmicrosleep(5, 0);
+ }
+ s = get_current_btime();
+ if (fd_wait_data(fd, WAIT_WRITE, 10, 0) > 0) {
+ e = get_current_btime();
+ Dmsg2(0, "Wait to write pkt %d %lldms\n",i, (int64_t) (e - s));
+
+ if (i == 0 || i > NBPACKETS/2) { /* The first packet doesn't count */
+ ASSERT2((e-s) < 100000, "In the 2nd phase, it's fast to send, we are the blocker");
+ } else {
+ ASSERT2((e-s) > 100000, "In the 1st phase, we wait for the reader");
+ }
+
+ s = get_current_btime();
+ nb = write(fd, buf, sizeof(buf));
+ e = get_current_btime();
+ Dmsg3(0, "Wrote pkt %d %d bytes in %lldms\n", i, (int)nb, (int64_t) (e - s));
+ ASSERT2((e-s) < 100000, "The write operation should never block");
+ }
+ }
+ namedpipe_free(&p);
+ return NULL;
+}
+
+int main(int argc, char **argv)
+{
+ job pthread_list[10000];
+ int j = (argc >= 2) ? atoi(argv[1]) : 1;
+ int maxfd = (argc == 3) ? atoi(argv[2]) : 0;
+
+ j = MIN(10000, j);
+
+ lmgr_init_thread();
+ set_debug_flags((char *)"h");
+
+ for (int i=3; i < maxfd; i++) {
+ open("/dev/null", O_RDONLY);
+ }
+
+ for (int i=0; i < j; i++) {
+ pthread_list[i].nb=i;
+ pthread_create(&pthread_list[i].writer, NULL, th2, &pthread_list[i]);
+ pthread_create(&pthread_list[i].reader, NULL, th1, &pthread_list[i]);
+ }
+
+ while (nb_ready < j*2) {
+ bmicrosleep(1, 0);
+ }
+
+ Dmsg0(0, "All threads are started\n");
+ P(cond_mutex);
+ pthread_cond_broadcast(&cond);
+ V(cond_mutex);
+
+ for (int i=0; i < j; i++) {
+ pthread_join(pthread_list[i].writer, NULL);
+ pthread_join(pthread_list[i].reader, NULL);
+ }
+
+ for (int i=3; i < maxfd; i++) {
+ close(i);
+ }
+ return 0;