]> git.sur5r.net Git - openldap/commitdiff
ITS#5985 Only play one queued response at a time per psearch thread
authorHoward Chu <hyc@openldap.org>
Thu, 5 Mar 2009 11:21:07 +0000 (11:21 +0000)
committerHoward Chu <hyc@openldap.org>
Thu, 5 Mar 2009 11:21:07 +0000 (11:21 +0000)
servers/slapd/overlays/syncprov.c

index 7520656cf7ddfac6416149b566d19261cc1f1437..a890f5be6f771c15928cc077173271a26157132e 100644 (file)
@@ -71,12 +71,11 @@ typedef struct syncops {
 #define        PS_WROTE_BASE           0x04
 #define        PS_FIND_BASE            0x08
 #define        PS_FIX_FILTER           0x10
+#define        PS_TASK_QUEUED          0x20
 
        int             s_inuse;        /* reference count */
        struct syncres *s_res;
        struct syncres *s_restail;
-       struct re_s     *s_qtask;       /* task for playing psearch responses */
-#define        RUNQ_INTERVAL   36000   /* a long time */
        ldap_pvt_thread_mutex_t s_mutex;
 } syncops;
 
@@ -746,13 +745,6 @@ syncprov_free_syncop( syncops *so )
                ldap_pvt_thread_mutex_unlock( &so->s_mutex );
                return;
        }
-       if ( so->s_qtask ) {
-               ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
-               if ( ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) )
-                       ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
-               ldap_pvt_runqueue_remove( &slapd_rq, so->s_qtask );
-               ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
-       }
        ldap_pvt_thread_mutex_unlock( &so->s_mutex );
        if ( so->s_flags & PS_IS_DETACHED ) {
                filter_free( so->s_op->ors_filter );
@@ -851,11 +843,13 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so,
        return rs.sr_err;
 }
 
+static void
+syncprov_qstart( syncops *so );
+
 /* Play back queued responses */
 static int
-syncprov_qplay( Operation *op, struct re_s *rtask )
+syncprov_qplay( Operation *op, syncops *so )
 {
-       syncops *so = rtask->arg;
        slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
        syncres *sr;
        Entry *e;
@@ -864,7 +858,7 @@ syncprov_qplay( Operation *op, struct re_s *rtask )
 
        opc.son = on;
 
-       for (;;) {
+       do {
                ldap_pvt_thread_mutex_lock( &so->s_mutex );
                sr = so->s_res;
                if ( sr )
@@ -908,37 +902,31 @@ syncprov_qplay( Operation *op, struct re_s *rtask )
 
                ch_free( sr );
 
-               if ( rc ) {
-                       /* Exit loop with mutex held */
-                       ldap_pvt_thread_mutex_lock( &so->s_mutex );
-                       break;
-               }
-       }
+               /* Exit loop with mutex held */
+               ldap_pvt_thread_mutex_lock( &so->s_mutex );
 
-       /* wait until we get explicitly scheduled again */
-       ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
-       ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
-       if ( rc == 0 ) {
-               ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 );
-       } else {
-               /* bail out on any error */
-               ldap_pvt_runqueue_remove( &slapd_rq, rtask );
+       } while (0);
+
+       /* We now only send one change at a time, to prevent one
+        * psearch from hogging all the CPU. Resubmit this
+        * task if there are more responses queued.
+        */
 
-               /* Prevent duplicate remove */
-               if ( so->s_qtask == rtask )
-                       so->s_qtask = NULL;
+       if ( so->s_res ) {
+               syncprov_qstart( so );
+       } else {
+               so->s_flags ^= PS_TASK_QUEUED;
        }
-       ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+
        ldap_pvt_thread_mutex_unlock( &so->s_mutex );
        return rc;
 }
 
-/* runqueue task for playing back queued responses */
+/* task for playing back queued responses */
 static void *
 syncprov_qtask( void *ctx, void *arg )
 {
-       struct re_s *rtask = arg;
-       syncops *so = rtask->arg;
+       syncops *so = arg;
        OperationBuffer opbuf;
        Operation *op;
        BackendDB be;
@@ -963,22 +951,11 @@ syncprov_qtask( void *ctx, void *arg )
        LDAP_SLIST_FIRST(&op->o_extra) = NULL;
        op->o_callback = NULL;
 
-       rc = syncprov_qplay( op, rtask );
+       rc = syncprov_qplay( op, so );
 
        /* decrement use count... */
        syncprov_free_syncop( so );
 
-#if 0  /* FIXME: connection_close isn't exported from slapd.
-                * should it be?
-                */
-       if ( rc ) {
-               ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
-               if ( connection_state_closing( op->o_conn )) {
-                       connection_close( op->o_conn );
-               }
-               ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
-       }
-#endif
        return NULL;
 }
 
@@ -986,27 +963,10 @@ syncprov_qtask( void *ctx, void *arg )
 static void
 syncprov_qstart( syncops *so )
 {
-       int wake=0;
-       ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
-       if ( !so->s_qtask ) {
-               so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
-                       syncprov_qtask, so, "syncprov_qtask",
-                       so->s_op->o_conn->c_peer_name.bv_val );
-               ++so->s_inuse;
-               wake = 1;
-       } else {
-               if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
-                       !so->s_qtask->next_sched.tv_sec ) {
-                       so->s_qtask->interval.tv_sec = 0;
-                       ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
-                       so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
-                       ++so->s_inuse;
-                       wake = 1;
-               }
-       }
-       ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
-       if ( wake )
-               slap_wake_listener();
+       so->s_flags |= PS_TASK_QUEUED;
+       so->s_inuse++;
+       ldap_pvt_thread_pool_submit( &connection_pool, 
+               syncprov_qtask, so );
 }
 
 /* Queue a persistent search response */
@@ -1071,7 +1031,7 @@ syncprov_qresp( opcookie *opc, syncops *so, int mode )
                so->s_flags ^= PS_WROTE_BASE;
                so->s_flags |= PS_FIND_BASE;
        }
-       if ( so->s_flags & PS_IS_DETACHED ) {
+       if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) {
                syncprov_qstart( so );
        }
        ldap_pvt_thread_mutex_unlock( &so->s_mutex );