#define PS_WROTE_BASE 0x04
#define PS_FIND_BASE 0x08
#define PS_FIX_FILTER 0x10
+#define PS_TASK_QUEUED 0x20
int s_inuse; /* reference count */
struct syncres *s_res;
struct syncres *s_restail;
- struct re_s *s_qtask; /* task for playing psearch responses */
-#define RUNQ_INTERVAL 36000 /* a long time */
ldap_pvt_thread_mutex_t s_mutex;
} syncops;
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return;
}
- if ( so->s_qtask ) {
- ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
- if ( ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) )
- ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
- ldap_pvt_runqueue_remove( &slapd_rq, so->s_qtask );
- ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
- }
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
if ( so->s_flags & PS_IS_DETACHED ) {
filter_free( so->s_op->ors_filter );
return rs.sr_err;
}
+static void
+syncprov_qstart( syncops *so );
+
/* Play back queued responses */
static int
-syncprov_qplay( Operation *op, struct re_s *rtask )
+syncprov_qplay( Operation *op, syncops *so )
{
- syncops *so = rtask->arg;
slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
syncres *sr;
Entry *e;
opc.son = on;
- for (;;) {
+ do {
ldap_pvt_thread_mutex_lock( &so->s_mutex );
sr = so->s_res;
if ( sr )
ch_free( sr );
- if ( rc ) {
- /* Exit loop with mutex held */
- ldap_pvt_thread_mutex_lock( &so->s_mutex );
- break;
- }
- }
+ /* Exit loop with mutex held */
+ ldap_pvt_thread_mutex_lock( &so->s_mutex );
- /* wait until we get explicitly scheduled again */
- ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
- ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
- if ( rc == 0 ) {
- ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 );
- } else {
- /* bail out on any error */
- ldap_pvt_runqueue_remove( &slapd_rq, rtask );
+ } while (0);
+
+ /* We now only send one change at a time, to prevent one
+ * psearch from hogging all the CPU. Resubmit this
+ * task if there are more responses queued.
+ */
- /* Prevent duplicate remove */
- if ( so->s_qtask == rtask )
- so->s_qtask = NULL;
+ if ( so->s_res ) {
+ syncprov_qstart( so );
+ } else {
+ so->s_flags ^= PS_TASK_QUEUED;
}
- ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return rc;
}
-/* runqueue task for playing back queued responses */
+/* task for playing back queued responses */
static void *
syncprov_qtask( void *ctx, void *arg )
{
- struct re_s *rtask = arg;
- syncops *so = rtask->arg;
+ syncops *so = arg;
OperationBuffer opbuf;
Operation *op;
BackendDB be;
LDAP_SLIST_FIRST(&op->o_extra) = NULL;
op->o_callback = NULL;
- rc = syncprov_qplay( op, rtask );
+ rc = syncprov_qplay( op, so );
/* decrement use count... */
syncprov_free_syncop( so );
-#if 0 /* FIXME: connection_close isn't exported from slapd.
- * should it be?
- */
- if ( rc ) {
- ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
- if ( connection_state_closing( op->o_conn )) {
- connection_close( op->o_conn );
- }
- ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
- }
-#endif
return NULL;
}
static void
syncprov_qstart( syncops *so )
{
- int wake=0;
- ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
- if ( !so->s_qtask ) {
- so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
- syncprov_qtask, so, "syncprov_qtask",
- so->s_op->o_conn->c_peer_name.bv_val );
- ++so->s_inuse;
- wake = 1;
- } else {
- if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
- !so->s_qtask->next_sched.tv_sec ) {
- so->s_qtask->interval.tv_sec = 0;
- ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
- so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
- ++so->s_inuse;
- wake = 1;
- }
- }
- ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
- if ( wake )
- slap_wake_listener();
+ so->s_flags |= PS_TASK_QUEUED;
+ so->s_inuse++;
+ ldap_pvt_thread_pool_submit( &connection_pool,
+ syncprov_qtask, so );
}
/* Queue a persistent search response */
so->s_flags ^= PS_WROTE_BASE;
so->s_flags |= PS_FIND_BASE;
}
- if ( so->s_flags & PS_IS_DETACHED ) {
+ if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) {
syncprov_qstart( so );
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );