#include "config.h"
#include "ldap_rq.h"
+#ifdef LDAP_DEVEL
#define CHECK_CSN 1
+#endif
/* A modify request on a particular entry */
typedef struct modinst {
int s_inuse; /* reference count */
struct syncres *s_res;
struct syncres *s_restail;
+ void *s_pool_cookie;
ldap_pvt_thread_mutex_t s_mutex;
} syncops;
}
}
+#define FS_UNLINK 1
+#define FS_LOCK 2
+
static int
-syncprov_free_syncop( syncops *so, int unlink )
+syncprov_free_syncop( syncops *so, int flags )
{
syncres *sr, *srnext;
GroupAssertion *ga, *gnext;
- ldap_pvt_thread_mutex_lock( &so->s_mutex );
+ if ( flags & FS_LOCK )
+ ldap_pvt_thread_mutex_lock( &so->s_mutex );
/* already being freed, or still in use */
if ( !so->s_inuse || --so->s_inuse > 0 ) {
- ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+ if ( flags & FS_LOCK )
+ ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return 0;
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
- if ( unlink ) {
+ if (( flags & FS_UNLINK ) && so->s_si ) {
syncops **sop;
ldap_pvt_thread_mutex_lock( &so->s_si->si_ops_mutex );
for ( sop = &so->s_si->si_ops; *sop; sop = &(*sop)->s_next ) {
static int
syncprov_qplay( Operation *op, syncops *so )
{
- slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
syncres *sr;
int rc = 0;
if ( rc == 0 && so->s_res ) {
syncprov_qstart( so );
- } else {
- so->s_flags ^= PS_TASK_QUEUED;
}
- ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return rc;
}
rc = syncprov_qplay( op, so );
+ /* if an error occurred, or no responses left, task is no longer queued */
+ if ( !rc && !so->s_res )
+ rc = 1;
+
/* decrement use count... */
- syncprov_free_syncop( so, 1 );
+ if ( !syncprov_free_syncop( so, FS_UNLINK )) {
+ if ( rc )
+ /* if we didn't unlink, and task is no longer queued, clear flag */
+ so->s_flags ^= PS_TASK_QUEUED;
+ ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+ }
return NULL;
}
{
so->s_flags |= PS_TASK_QUEUED;
so->s_inuse++;
- ldap_pvt_thread_pool_submit( &connection_pool,
- syncprov_qtask, so );
+ ldap_pvt_thread_pool_submit2( &connection_pool,
+ syncprov_qtask, so, &so->s_pool_cookie );
}
/* Queue a persistent search response */
if ( lock )
ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex );
}
- return syncprov_free_syncop( so, 0 );
+ return syncprov_free_syncop( so, FS_LOCK );
}
static int
* with saveit == TRUE
*/
snext = ss->s_next;
- if ( syncprov_free_syncop( ss, 0 ) ) {
+ if ( syncprov_free_syncop( ss, FS_LOCK ) ) {
*pss = snext;
gonext = 0;
}
for (sm = opc->smatches; sm; sm=snext) {
snext = sm->sm_next;
- syncprov_free_syncop( sm->sm_op, 1 );
+ syncprov_free_syncop( sm->sm_op, FS_LOCK|FS_UNLINK );
op->o_tmpfree( sm, op->o_tmpmemctx );
}
}
sl->sl_num++;
while ( sl->sl_num > sl->sl_size ) {
- int i, j;
+ int i;
se = sl->sl_head;
sl->sl_head = se->se_next;
for ( i=0; i<sl->sl_numcsns; i++ )
/* our state is older, complain to consumer */
rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
rs->sr_text = "consumer state is newer than provider!";
+ Log4( LDAP_DEBUG_SYNC, ldap_syslog_level,
+ "consumer %d state %s is newer than provider %d state %s\n",
+ sids[i], srs->sr_state.ctxcsn[i].bv_val, sids[j], /* == slap_serverID */
+ ctxcsn[j].bv_val);
bailout:
if ( sop ) {
syncops **sp = &si->si_ops;
rs.sr_err = LDAP_UNAVAILABLE;
send_ldap_result( so->s_op, &rs );
sonext=so->s_next;
- syncprov_drop_psearch( so, 0);
+ if ( so->s_flags & PS_TASK_QUEUED )
+ ldap_pvt_thread_pool_retract( so->s_pool_cookie );
+ if ( !syncprov_drop_psearch( so, 0 ))
+ so->s_si = NULL;
}
si->si_ops=NULL;
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );