X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=servers%2Fslapd%2Fdaemon.c;h=066c8529ff03c11aeb9cd01b09c809a839274caf;hb=HEAD;hp=6f66d220ad636f6b6340f9ae2968e550b3b133a5;hpb=207a43dac2eef40f77c44afeb7de064344b1410b;p=openldap diff --git a/servers/slapd/daemon.c b/servers/slapd/daemon.c index 6f66d220ad..066c8529ff 100644 --- a/servers/slapd/daemon.c +++ b/servers/slapd/daemon.c @@ -1,7 +1,7 @@ /* $OpenLDAP$ */ /* This work is part of OpenLDAP Software . * - * Copyright 1998-2015 The OpenLDAP Foundation. + * Copyright 1998-2018 The OpenLDAP Foundation. * Portions Copyright 2007 by Howard Chu, Symas Corporation. * All rights reserved. * @@ -45,14 +45,18 @@ #include #endif -#if defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL) +#ifdef HAVE_KQUEUE +# include +# include +# include +#elif defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL) # include #elif defined(SLAP_X_DEVPOLL) && defined(HAVE_SYS_DEVPOLL_H) && defined(HAVE_DEVPOLL) # include # include # include # include -#endif /* ! epoll && ! /dev/poll */ +#endif /* ! kqueue && ! epoll && ! /dev/poll */ #ifdef HAVE_TCPD int allow_severity = LOG_INFO; @@ -93,7 +97,7 @@ static volatile sig_atomic_t listening = 1; /* 0 when slap_listeners closed */ static ldap_pvt_thread_t *listener_tid; #ifndef SLAPD_LISTEN_BACKLOG -#define SLAPD_LISTEN_BACKLOG 1024 +#define SLAPD_LISTEN_BACKLOG 2048 #endif /* ! SLAPD_LISTEN_BACKLOG */ #define DAEMON_ID(fd) (fd & slapd_daemon_mask) @@ -140,7 +144,21 @@ typedef struct slap_daemon_st { int sd_nwriters; int sd_nfds; -#if defined(HAVE_EPOLL) +#if defined(HAVE_KQUEUE) + uint8_t* sd_fdmodes; /* indexed by fd */ + Listener** sd_l; /* indexed by fd */ + /* Double buffer the kqueue changes to avoid holding the sd_mutex \ + * during a kevent() call. \ + */ + struct kq_change { + struct kevent* sd_changes; + int sd_nchanges; + int sd_maxchanges; + } sd_kqc[2]; + int sd_changeidx; /* index to current change buffer */ + int sd_kq; +#elif defined(HAVE_EPOLL) + struct epoll_event *sd_epolls; int *sd_index; int sd_epfd; @@ -150,7 +168,7 @@ typedef struct slap_daemon_st { int *sd_index; Listener **sd_l; int sd_dpfd; -#else /* ! epoll && ! /dev/poll */ +#else /* ! kqueue && ! epoll && ! /dev/poll */ #ifdef HAVE_WINSOCK char *sd_flags; char *sd_rflags; @@ -159,7 +177,7 @@ typedef struct slap_daemon_st { fd_set sd_readers; fd_set sd_writers; #endif /* ! HAVE_WINSOCK */ -#endif /* ! epoll && ! /dev/poll */ +#endif /* ! kqueue && ! epoll && ! /dev/poll */ } slap_daemon_st; static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS]; @@ -171,11 +189,220 @@ static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS]; * with file descriptors and events respectively * * - SLAP__* for private interface; type by now is one of - * EPOLL, DEVPOLL, SELECT + * EPOLL, DEVPOLL, SELECT, KQUEUE * * private interface should not be used in the code. */ -#if defined(HAVE_EPOLL) +#ifdef HAVE_KQUEUE +# define SLAP_EVENT_FNAME "kqueue" +# define SLAP_EVENTS_ARE_INDEXED 0 +# define SLAP_EVENT_MAX(t) (2 * dtblsize) /* each fd can have a read & a write event */ + +# define SLAP_EVENT_DECL \ + static struct kevent* events = NULL + +# define SLAP_EVENT_INIT(t) do {\ + if (!events) { \ + events = ch_malloc(sizeof(*events) * SLAP_EVENT_MAX(t)); \ + if (!events) { \ + Debug(LDAP_DEBUG_ANY, \ + "daemon: SLAP_EVENT_INIT: ch_malloc of events failed, wanted %d bytes\n", \ + sizeof(*events) * SLAP_EVENT_MAX(t), 0, 0); \ + slapd_shutdown = 2; \ + } \ + } \ +} while (0) + +# define SLAP_SOCK_INIT(t) do { \ + int kq_i; \ + size_t kq_nbytes; \ + Debug(LDAP_DEBUG_ANY, "daemon: SLAP_SOCK_INIT: dtblsize=%d\n", dtblsize, 0, 0); \ + slap_daemon[t].sd_nfds = 0; \ + slap_daemon[t].sd_changeidx = 0; \ + for (kq_i = 0; kq_i < 2; kq_i++) { \ + struct kq_change* kqc = &slap_daemon[t].sd_kqc[kq_i]; \ + kqc->sd_nchanges = 0; \ + kqc->sd_maxchanges = 256; /* will grow as needed */ \ + kq_nbytes = sizeof(*kqc->sd_changes) * kqc->sd_maxchanges; \ + kqc->sd_changes = ch_calloc(1, kq_nbytes); \ + if (!kqc->sd_changes) { \ + Debug(LDAP_DEBUG_ANY, \ + "daemon: SLAP_SOCK_INIT: ch_calloc of slap_daemon.sd_changes[%d] failed, wanted %d bytes, shutting down\n", \ + kq_i, kq_nbytes, 0); \ + slapd_shutdown = 2; \ + } \ + } \ + kq_nbytes = sizeof(*slap_daemon[t].sd_fdmodes) * dtblsize; \ + slap_daemon[t].sd_fdmodes = ch_calloc(1, kq_nbytes); \ + if (!slap_daemon[t].sd_fdmodes) { \ + Debug(LDAP_DEBUG_ANY, \ + "daemon: SLAP_SOCK_INIT: ch_calloc of slap_daemon.sd_fdmodes failed, wanted %d bytes, shutting down\n", \ + kq_nbytes, 0, 0); \ + slapd_shutdown = 2; \ + } \ + kq_nbytes = sizeof(*slap_daemon[t].sd_l) * dtblsize; \ + slap_daemon[t].sd_l = ch_calloc(1, kq_nbytes); \ + if (!slap_daemon[t].sd_l) { \ + Debug(LDAP_DEBUG_ANY, \ + "daemon: SLAP_SOCK_INIT: ch_calloc of slap_daemon.sd_l failed, wanted %d bytes, shutting down\n", \ + kq_nbytes, 0, 0); \ + slapd_shutdown = 2; \ + } \ + slap_daemon[t].sd_kq = kqueue(); \ + if (slap_daemon[t].sd_kq < 0) { \ + Debug(LDAP_DEBUG_ANY, "daemon: SLAP_SOCK_INIT: kqueue() failed, errno=%d, shutting down\n", errno, 0, 0); \ + slapd_shutdown = 2; \ + } \ +} while (0) + +# define SLAP_SOCK_DESTROY(t) do { \ + int kq_i; \ + if (slap_daemon[t].sd_kq > 0) { \ + close(slap_daemon[t].sd_kq); \ + slap_daemon[t].sd_kq = -1; \ + } \ + for (kq_i = 0; kq_i < 2; kq_i++) { \ + if (slap_daemon[t].sd_kqc[kq_i].sd_changes != NULL) { \ + ch_free(slap_daemon[t].sd_kqc[kq_i].sd_changes); \ + slap_daemon[t].sd_kqc[kq_i].sd_changes = NULL; \ + } \ + slap_daemon[t].sd_kqc[kq_i].sd_nchanges = 0; \ + slap_daemon[t].sd_kqc[kq_i].sd_maxchanges = 0; \ + } \ + if (slap_daemon[t].sd_l != NULL) { \ + ch_free(slap_daemon[t].sd_l); \ + slap_daemon[t].sd_l = NULL; \ + } \ + if (slap_daemon[t].sd_fdmodes != NULL) { \ + ch_free(slap_daemon[t].sd_fdmodes); \ + slap_daemon[t].sd_fdmodes = NULL; \ + } \ + slap_daemon[t].sd_nfds = 0; \ +} while (0) + +# define SLAP_KQUEUE_SOCK_ACTIVE 0x01 +# define SLAP_KQUEUE_SOCK_READ_ENABLED 0x02 +# define SLAP_KQUEUE_SOCK_WRITE_ENABLED 0x04 + +# define SLAP_SOCK_IS_ACTIVE(t,s) (slap_daemon[t].sd_fdmodes[(s)] != 0) +# define SLAP_SOCK_NOT_ACTIVE(t,s) (slap_daemon[t].sd_fdmodes[(s)] == 0) +# define SLAP_SOCK_IS_READ(t,s) (slap_daemon[t].sd_fdmodes[(s)] & SLAP_KQUEUE_SOCK_READ_ENABLED) +# define SLAP_SOCK_IS_WRITE(t,s) (slap_daemon[t].sd_fdmodes[(s)] & SLAP_KQUEUE_SOCK_WRITE_ENABLED) + +/* + * SLAP_SOCK_SET_* & SLAP_SOCK_CLR_* get called a _lot_. Since kevent() + * processes changes before it looks for events, batch up the changes which + * will get submitted the next time kevent() is called for events. + */ + +# define SLAP_KQUEUE_CHANGE(t, s, filter, flag) do { \ + /* If maxchanges is reached, have to realloc to make room for more. \ + * Ideally we'd call kevent(), but the daemon thread could be sitting \ + * in kevent() waiting for events. \ + */ \ + struct kq_change* kqc = &slap_daemon[t].sd_kqc[slap_daemon[t].sd_changeidx]; \ + if (kqc->sd_nchanges == kqc->sd_maxchanges) { \ + /* Don't want to do this very often. Double the size. */ \ + size_t kq_nbytes; \ + Debug(LDAP_DEBUG_CONNS, \ + "daemon: SLAP_KQUEUE_CHANGE: increasing slap_daemon.sd_kqc[%d].maxchanges from %d to %d\n", \ + slap_daemon[t].sd_changeidx, kqc->sd_maxchanges, 2*kqc->sd_maxchanges); \ + kqc->sd_maxchanges += kqc->sd_maxchanges; \ + kq_nbytes = sizeof(*kqc->sd_changes) * kqc->sd_maxchanges; \ + kqc->sd_changes = ch_realloc(kqc->sd_changes, kq_nbytes); \ + if (!kqc->sd_changes) { \ + Debug(LDAP_DEBUG_ANY, \ + "daemon: SLAP_KQUEUE_CHANGE: ch_realloc of slap_daemon.sd_kqc[%d].sd_changes failed, wanted %d bytes, shutting down\n", \ + slap_daemon[t].sd_changeidx, kq_nbytes, 0); \ + slapd_shutdown = 2; \ + break; /* Don't want to do the EV_SET if sd_changes is NULL */ \ + } \ + } \ + EV_SET(&kqc->sd_changes[kqc->sd_nchanges++], \ + (s), (filter), (flag), 0, 0, slap_daemon[t].sd_l[(s)]); \ +} while (0) + +# define SLAP_KQUEUE_SOCK_SET(t, s, filter, mode) do { \ + if ((slap_daemon[t].sd_fdmodes[(s)] & (mode)) != (mode)) { \ + slap_daemon[t].sd_fdmodes[(s)] |= (mode); \ + SLAP_KQUEUE_CHANGE(t, (s), (filter), EV_ENABLE); \ + } \ +} while (0) + +# define SLAP_KQUEUE_SOCK_CLR(t, s, filter, mode) do { \ + if (slap_daemon[t].sd_fdmodes[(s)] & (mode)) { \ + slap_daemon[t].sd_fdmodes[(s)] &= ~(mode); \ + SLAP_KQUEUE_CHANGE(t, (s), (filter), EV_DISABLE); \ + } \ +} while (0) + +# define SLAP_SOCK_SET_READ(t, s) SLAP_KQUEUE_SOCK_SET(t, (s), EVFILT_READ, SLAP_KQUEUE_SOCK_READ_ENABLED) +# define SLAP_SOCK_SET_WRITE(t, s) SLAP_KQUEUE_SOCK_SET(t, (s), EVFILT_WRITE, SLAP_KQUEUE_SOCK_WRITE_ENABLED) +# define SLAP_SOCK_CLR_READ(t, s) SLAP_KQUEUE_SOCK_CLR(t, (s), EVFILT_READ, SLAP_KQUEUE_SOCK_READ_ENABLED) +# define SLAP_SOCK_CLR_WRITE(t, s) SLAP_KQUEUE_SOCK_CLR(t, (s), EVFILT_WRITE, SLAP_KQUEUE_SOCK_WRITE_ENABLED) + +/* kqueue doesn't need to do anything to clear the event. */ +# define SLAP_EVENT_CLR_READ(i) do {} while (0) +# define SLAP_EVENT_CLR_WRITE(i) do {} while (0) + +# define SLAP_SOCK_ADD(t, s, l) do { \ + assert( s < dtblsize ); \ + slap_daemon[t].sd_l[(s)] = (l); \ + slap_daemon[t].sd_fdmodes[(s)] = SLAP_KQUEUE_SOCK_ACTIVE | SLAP_KQUEUE_SOCK_READ_ENABLED; \ + ++slap_daemon[t].sd_nfds; \ + SLAP_KQUEUE_CHANGE(t, (s), EVFILT_READ, EV_ADD); \ + SLAP_KQUEUE_CHANGE(t, (s), EVFILT_WRITE, EV_ADD | EV_DISABLE); \ +} while (0) + +# define SLAP_SOCK_DEL(t, s) do { \ + SLAP_KQUEUE_CHANGE(t, (s), EVFILT_READ, EV_DELETE); \ + SLAP_KQUEUE_CHANGE(t, (s), EVFILT_WRITE, EV_DELETE); \ + slap_daemon[t].sd_l[(s)] = NULL; \ + slap_daemon[t].sd_fdmodes[(s)] = 0; \ + --slap_daemon[t].sd_nfds; \ +} while (0) + +# define SLAP_EVENT_FD(t, i) (events[(i)].ident) + +# define SLAP_EVENT_IS_READ(t, i) \ + (events[(i)].filter == EVFILT_READ && SLAP_SOCK_IS_READ(t, SLAP_EVENT_FD(0, i))) + +# define SLAP_EVENT_IS_WRITE(t, i) \ + (events[(i)].filter == EVFILT_WRITE && SLAP_SOCK_IS_WRITE(t, SLAP_EVENT_FD(0, i))) + +# define SLAP_EVENT_IS_LISTENER(t, i) \ + (events[(i)].udata && SLAP_SOCK_IS_READ(t, SLAP_EVENT_FD(t, i))) + +# define SLAP_EVENT_LISTENER(t, i) ((Listener*)(events[(i)].udata)) + +# define SLAP_EVENT_WAIT(t, tvp, nsp) do { \ + struct timespec kq_ts; \ + struct timespec* kq_tsp; \ + int kq_idx; \ + if (tvp) { \ + TIMEVAL_TO_TIMESPEC((tvp), &kq_ts); \ + kq_tsp = &kq_ts; \ + } else { \ + kq_tsp = NULL; \ + } \ + /* Save the change buffer index for use when the mutex is unlocked, \ + * then switch the index so new changes go to the other buffer. \ + */ \ + ldap_pvt_thread_mutex_lock( &slap_daemon[t].sd_mutex ); \ + kq_idx = slap_daemon[t].sd_changeidx; \ + slap_daemon[t].sd_changeidx ^= 1; \ + ldap_pvt_thread_mutex_unlock( &slap_daemon[t].sd_mutex ); \ + *(nsp) = kevent(slap_daemon[t].sd_kq, \ + slap_daemon[t].sd_kqc[kq_idx].sd_nchanges \ + ? slap_daemon[t].sd_kqc[kq_idx].sd_changes : NULL, \ + slap_daemon[t].sd_kqc[kq_idx].sd_nchanges, \ + events, SLAP_EVENT_MAX(t), kq_tsp); \ + slap_daemon[t].sd_kqc[kq_idx].sd_nchanges = 0; \ +} while(0) + +/*-------------------------------------------------------------------------------*/ + +#elif defined(HAVE_EPOLL) /*************************************** * Use epoll infrastructure - epoll(4) * ***************************************/ @@ -407,7 +634,7 @@ static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS]; SLAP_DEVPOLL_SOCK_LX(t,(s)) = (l); \ SLAP_DEVPOLL_SOCK_FD(t,(s)) = (s); \ SLAP_DEVPOLL_SOCK_EV(t,(s)) = POLLIN; \ - SLAP_DEVPOLL_WRITE_POLLFD(t,(s), &SLAP_DEVPOLL_SOCK_EP((s)), 1, "ADD", 1); \ + SLAP_DEVPOLL_WRITE_POLLFD(t,(s), &SLAP_DEVPOLL_SOCK_EP(t, (s)), 1, "ADD", 1); \ slap_daemon[t].sd_nfds++; \ } while (0) @@ -440,8 +667,18 @@ static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS]; # define SLAP_EVENT_IS_READ(i) SLAP_DEVPOLL_EVENT_CHK((i), POLLIN) # define SLAP_EVENT_IS_WRITE(i) SLAP_DEVPOLL_EVENT_CHK((i), POLLOUT) -# define SLAP_EVENT_IS_LISTENER(t,i) SLAP_DEVPOLL_EV_LISTENER(SLAP_DEVPOLL_SOCK_LX(SLAP_EVENT_FD(t,(i)))) -# define SLAP_EVENT_LISTENER(t,i) SLAP_DEVPOLL_SOCK_LX(SLAP_EVENT_FD(t,(i))) +# define SLAP_EVENT_IS_LISTENER(t,i) SLAP_DEVPOLL_EV_LISTENER(SLAP_DEVPOLL_SOCK_LX(t, SLAP_EVENT_FD(t,(i)))) +# define SLAP_EVENT_LISTENER(t,i) SLAP_DEVPOLL_SOCK_LX(t, SLAP_EVENT_FD(t,(i))) + +# define SLAP_SOCK_DESTROY(t) do { \ + if ( slap_daemon[t].sd_pollfd != NULL ) { \ + ch_free( slap_daemon[t].sd_pollfd ); \ + slap_daemon[t].sd_pollfd = NULL; \ + slap_daemon[t].sd_index = NULL; \ + slap_daemon[t].sd_l = NULL; \ + close( slap_daemon[t].sd_dpfd ); \ + } \ +} while ( 0 ) # define SLAP_SOCK_INIT(t) do { \ slap_daemon[t].sd_pollfd = ch_calloc( 1, \ @@ -455,7 +692,7 @@ static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS]; Debug( LDAP_DEBUG_ANY, "daemon: " SLAP_EVENT_FNAME ": " \ "open(\"" SLAP_EVENT_FNAME "\") failed errno=%d\n", \ errno, 0, 0 ); \ - SLAP_SOCK_DESTROY; \ + SLAP_SOCK_DESTROY(t); \ return -1; \ } \ for ( i = 0; i < dtblsize; i++ ) { \ @@ -464,16 +701,6 @@ static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS]; } \ } while (0) -# define SLAP_SOCK_DESTROY(t) do { \ - if ( slap_daemon[t].sd_pollfd != NULL ) { \ - ch_free( slap_daemon[t].sd_pollfd ); \ - slap_daemon[t].sd_pollfd = NULL; \ - slap_daemon[t].sd_index = NULL; \ - slap_daemon[t].sd_l = NULL; \ - close( slap_daemon[t].sd_dpfd ); \ - } \ -} while ( 0 ) - # define SLAP_EVENT_DECL struct pollfd *revents # define SLAP_EVENT_INIT(t) do { \ @@ -488,7 +715,7 @@ static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS]; *(nsp) = ioctl( slap_daemon[t].sd_dpfd, DP_POLL, &sd_dvpoll ); \ } while (0) -#else /* ! epoll && ! /dev/poll */ +#else /* ! kqueue && ! epoll && ! /dev/poll */ # ifdef HAVE_WINSOCK # define SLAP_EVENT_FNAME "WSselect" /* Winsock provides a "select" function but its fd_sets are @@ -676,7 +903,7 @@ static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS]; nwriters > 0 ? &writefds : NULL, NULL, (tvp) ); \ } while (0) # endif /* !HAVE_WINSOCK */ -#endif /* ! epoll && ! /dev/poll */ +#endif /* ! kqueue && ! epoll && ! /dev/poll */ #ifdef HAVE_SLP /* @@ -1274,7 +1501,7 @@ slap_open_listener( LDAPURLDesc *lud; unsigned short port; int err, addrlen = 0; - struct sockaddr **sal, **psal; + struct sockaddr **sal = NULL, **psal; int socktype = SOCK_STREAM; /* default to COTS */ ber_socket_t s; @@ -2421,6 +2648,7 @@ loop: if ( lr->sl_sd == AC_SOCKET_INVALID ) continue; if ( DAEMON_ID( lr->sl_sd ) != tid ) continue; + if ( !SLAP_SOCK_IS_ACTIVE( tid, lr->sl_sd )) continue; if ( lr->sl_mute || lr->sl_busy ) { @@ -2460,8 +2688,8 @@ loop: ldap_pvt_runqueue_runtask( &slapd_rq, rtask ); ldap_pvt_runqueue_resched( &slapd_rq, rtask, 0 ); ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); - ldap_pvt_thread_pool_submit( &connection_pool, - rtask->routine, (void *) rtask ); + ldap_pvt_thread_pool_submit2( &connection_pool, + rtask->routine, (void *) rtask, &rtask->pool_cookie ); ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); } rtask = ldap_pvt_runqueue_next_sched( &slapd_rq, &cat ); @@ -2711,8 +2939,13 @@ loop: /* Don't log internal wake events */ if ( fd == wake_sds[tid][0] ) continue; +#ifdef HAVE_KQUEUE + r = SLAP_EVENT_IS_READ( tid, i ); + w = SLAP_EVENT_IS_WRITE( tid, i ); +#else r = SLAP_EVENT_IS_READ( i ); w = SLAP_EVENT_IS_WRITE( i ); +#endif /* HAVE_KQUEUE */ if ( r || w ) { Debug( LDAP_DEBUG_CONNS, " %d%s%s", fd, r ? "r" : "", w ? "w" : "" ); @@ -2743,7 +2976,12 @@ loop: continue; } - if ( SLAP_EVENT_IS_WRITE( i ) ) { +#ifdef HAVE_KQUEUE + if ( SLAP_EVENT_IS_WRITE( tid, i ) ) +#else + if ( SLAP_EVENT_IS_WRITE( i ) ) +#endif /* HAVE_KQUEUE */ + { Debug( LDAP_DEBUG_CONNS, "daemon: write active on %d\n", fd, 0, 0 ); @@ -2762,7 +3000,12 @@ loop: } } /* If event is a read */ - if ( SLAP_EVENT_IS_READ( i )) { +#ifdef HAVE_KQUEUE + if ( SLAP_EVENT_IS_READ( tid, i )) +#else + if ( SLAP_EVENT_IS_READ( i )) +#endif /* HAVE_KQUEUE */ + { r = 1; Debug( LDAP_DEBUG_CONNS, "daemon: read active on %d\n", @@ -2813,13 +3056,17 @@ loop: 0, 0, 0 ); } - close_listeners( 0 ); + close_listeners( 1 ); if ( !slapd_gentle_shutdown ) { slapd_abrupt_shutdown = 1; connections_shutdown(); } +#ifdef HAVE_KQUEUE + close( slap_daemon[tid].sd_kq ); +#endif + if ( LogTest( LDAP_DEBUG_ANY )) { int t = ldap_pvt_thread_pool_backload( &connection_pool ); Debug( LDAP_DEBUG_ANY,