ldap_pvt_thread_mutex_t mt_mutex;
} modtarget;
+/* All the info of a psearch result that's shared between
+ * multiple queues
+ */
+typedef struct resinfo {
+ struct syncres *ri_list;
+ Entry *ri_e;
+ struct berval ri_dn;
+ struct berval ri_ndn;
+ struct berval ri_uuid;
+ struct berval ri_csn;
+ struct berval ri_cookie;
+ char ri_isref;
+ ldap_pvt_thread_mutex_t ri_mutex;
+} resinfo;
+
/* A queued result of a persistent search */
typedef struct syncres {
- struct syncres *s_next;
- Entry *s_e;
- struct berval s_dn;
- struct berval s_ndn;
- struct berval s_uuid;
- struct berval s_csn;
+ struct syncres *s_next; /* list of results on this psearch queue */
+ struct syncres *s_rilist; /* list of psearches using this result */
+ resinfo *s_info;
char s_mode;
- char s_isreference;
} syncres;
/* Record of a persistent search */
typedef struct syncops {
struct syncops *s_next;
+ struct syncprov_info_t *s_si;
struct berval s_base; /* ndn of search base */
ID s_eid; /* entryID of search base */
Operation *s_op; /* search op */
short osid; /* sid of op csn */
short rsid; /* sid of relay */
short sreference; /* Is the entry a reference? */
+ syncres ssres;
} opcookie;
-typedef struct mutexint {
- ldap_pvt_thread_mutex_t mi_mutex;
- int mi_int;
-} mutexint;
-
typedef struct fbase_cookie {
struct berval *fdn; /* DN of a modified entry, for scope testing */
syncops *fss; /* persistent search we're testing against */
return rc;
}
-/* Should find a place to cache these */
-static mutexint *get_mutexint()
+static void free_resinfo( syncres *sr )
{
- mutexint *mi = ch_malloc( sizeof( mutexint ));
- ldap_pvt_thread_mutex_init( &mi->mi_mutex );
- mi->mi_int = 1;
- return mi;
-}
-
-static void inc_mutexint( mutexint *mi )
-{
- ldap_pvt_thread_mutex_lock( &mi->mi_mutex );
- mi->mi_int++;
- ldap_pvt_thread_mutex_unlock( &mi->mi_mutex );
-}
-
-/* return resulting counter */
-static int dec_mutexint( mutexint *mi )
-{
- int i;
- ldap_pvt_thread_mutex_lock( &mi->mi_mutex );
- i = --mi->mi_int;
- ldap_pvt_thread_mutex_unlock( &mi->mi_mutex );
- if ( !i ) {
- ldap_pvt_thread_mutex_destroy( &mi->mi_mutex );
- ch_free( mi );
- }
- return i;
+ syncres **st;
+ int freeit = 0;
+ ldap_pvt_thread_mutex_lock( &sr->s_info->ri_mutex );
+ for (st = &sr->s_info->ri_list; *st; st = &(*st)->s_rilist) {
+ if (*st == sr) {
+ *st = sr->s_rilist;
+ break;
+ }
+ }
+ if ( !sr->s_info->ri_list )
+ freeit = 1;
+ ldap_pvt_thread_mutex_unlock( &sr->s_info->ri_mutex );
+ if ( freeit ) {
+ ldap_pvt_thread_mutex_destroy( &sr->s_info->ri_mutex );
+ if ( sr->s_info->ri_e )
+ entry_free( sr->s_info->ri_e );
+ if ( !BER_BVISNULL( &sr->s_info->ri_cookie ))
+ ch_free( sr->s_info->ri_cookie.bv_val );
+ ch_free( sr->s_info );
+ }
}
-static void
-syncprov_free_syncop( syncops *so )
+static int
+syncprov_free_syncop( syncops *so, int unlink )
{
syncres *sr, *srnext;
GroupAssertion *ga, *gnext;
/* already being freed, or still in use */
if ( !so->s_inuse || --so->s_inuse > 0 ) {
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
- return;
+ return 0;
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+ if ( unlink ) {
+ syncops **sop;
+ ldap_pvt_thread_mutex_lock( &so->s_si->si_ops_mutex );
+ for ( sop = &so->s_si->si_ops; *sop; sop = &(*sop)->s_next ) {
+ if ( *sop == so ) {
+ *sop = so->s_next;
+ break;
+ }
+ }
+ ldap_pvt_thread_mutex_unlock( &so->s_si->si_ops_mutex );
+ }
if ( so->s_flags & PS_IS_DETACHED ) {
filter_free( so->s_op->ors_filter );
for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
ch_free( so->s_base.bv_val );
for ( sr=so->s_res; sr; sr=srnext ) {
srnext = sr->s_next;
- if ( sr->s_e ) {
- if ( !dec_mutexint( sr->s_e->e_private )) {
- sr->s_e->e_private = NULL;
- entry_free( sr->s_e );
- }
- }
+ free_resinfo( sr );
ch_free( sr );
}
ldap_pvt_thread_mutex_destroy( &so->s_mutex );
ch_free( so );
+ return 1;
}
/* Send a persistent search response */
static int
-syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode )
+syncprov_sendresp( Operation *op, resinfo *ri, syncops *so, int mode )
{
SlapReply rs = { REP_SEARCH };
struct berval cookie, csns[2];
rs.sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, op->o_tmpmemctx );
rs.sr_ctrls[1] = NULL;
rs.sr_flags = REP_CTRLS_MUSTBEFREED;
- csns[0] = opc->sctxcsn;
+ csns[0] = ri->ri_csn;
BER_BVZERO( &csns[1] );
slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, slap_serverID ? slap_serverID : -1 );
e_uuid.e_attrs = &a_uuid;
a_uuid.a_desc = slap_schema.si_ad_entryUUID;
- a_uuid.a_nvals = &opc->suuid;
+ a_uuid.a_nvals = &ri->ri_uuid;
rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid,
mode, rs.sr_ctrls, 0, 1, &cookie );
op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
rs.sr_entry = &e_uuid;
if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) {
- e_uuid = *opc->se;
+ e_uuid = *ri->ri_e;
e_uuid.e_private = NULL;
}
switch( mode ) {
case LDAP_SYNC_ADD:
- if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
+ if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
rs.sr_err = send_search_reference( op, &rs );
ber_bvarray_free( rs.sr_ref );
break;
case LDAP_SYNC_DELETE:
e_uuid.e_attrs = NULL;
- e_uuid.e_name = opc->sdn;
- e_uuid.e_nname = opc->sndn;
- if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
+ e_uuid.e_name = ri->ri_dn;
+ e_uuid.e_nname = ri->ri_ndn;
+ if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
struct berval bv = BER_BVNULL;
rs.sr_ref = &bv;
rs.sr_err = send_search_reference( op, &rs );
{
slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
syncres *sr;
- opcookie opc;
int rc = 0;
- opc.son = on;
-
do {
ldap_pvt_thread_mutex_lock( &so->s_mutex );
sr = so->s_res;
SlapReply rs = { REP_INTERMEDIATE };
rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE,
- &sr->s_csn, 0, NULL, 0 );
+ &sr->s_info->ri_cookie, 0, NULL, 0 );
} else {
- opc.sdn = sr->s_dn;
- opc.sndn = sr->s_ndn;
- opc.suuid = sr->s_uuid;
- opc.sctxcsn = sr->s_csn;
- opc.sreference = sr->s_isreference;
- opc.se = sr->s_e;
-
- rc = syncprov_sendresp( op, &opc, so, sr->s_mode );
+ rc = syncprov_sendresp( op, sr->s_info, so, sr->s_mode );
}
}
- if ( sr->s_e ) {
- if ( !dec_mutexint( sr->s_e->e_private )) {
- sr->s_e->e_private = NULL;
- entry_free ( sr->s_e );
- }
- }
+ free_resinfo( sr );
ch_free( sr );
if ( so->s_op->o_abandon )
rc = syncprov_qplay( op, so );
/* decrement use count... */
- syncprov_free_syncop( so );
+ syncprov_free_syncop( so, 1 );
return NULL;
}
syncprov_qresp( opcookie *opc, syncops *so, int mode )
{
syncres *sr;
+ resinfo *ri;
int srsize;
- struct berval cookie = opc->sctxcsn;
-
- if ( mode == LDAP_SYNC_NEW_COOKIE ) {
- syncprov_info_t *si = opc->son->on_bi.bi_private;
+ struct berval csn = opc->sctxcsn;
- slap_compose_sync_cookie( NULL, &cookie, si->si_ctxcsn,
- so->s_rid, slap_serverID ? slap_serverID : -1);
- }
-
- srsize = sizeof(syncres) + opc->suuid.bv_len + 1 +
- opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1;
- if ( cookie.bv_len )
- srsize += cookie.bv_len + 1;
- sr = ch_malloc( srsize );
+ sr = ch_malloc( sizeof( syncres ));
sr->s_next = NULL;
- sr->s_e = opc->se;
- /* bump refcount on this entry */
- if ( opc->se )
- inc_mutexint( opc->se->e_private );
- sr->s_dn.bv_val = (char *)(sr + 1);
- sr->s_dn.bv_len = opc->sdn.bv_len;
sr->s_mode = mode;
- sr->s_isreference = opc->sreference;
- sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val,
- opc->sdn.bv_val ) + 1;
- sr->s_ndn.bv_len = opc->sndn.bv_len;
- sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val,
- opc->sndn.bv_val ) + 1;
- sr->s_uuid.bv_len = opc->suuid.bv_len;
- AC_MEMCPY( sr->s_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
- if ( cookie.bv_len ) {
- sr->s_csn.bv_val = sr->s_uuid.bv_val + sr->s_uuid.bv_len + 1;
- strcpy( sr->s_csn.bv_val, cookie.bv_val );
- } else {
- sr->s_csn.bv_val = NULL;
- }
- sr->s_csn.bv_len = cookie.bv_len;
+ if ( !opc->ssres.s_info ) {
+
+ srsize = sizeof( resinfo );
+ if ( csn.bv_len )
+ srsize += csn.bv_len + 1;
+
+ if ( opc->se ) {
+ Attribute *a;
+ ri = ch_malloc( srsize );
+ ri->ri_dn = opc->se->e_name;
+ ri->ri_ndn = opc->se->e_nname;
+ a = attr_find( opc->se->e_attrs, slap_schema.si_ad_entryUUID );
+ if ( a )
+ ri->ri_uuid = a->a_nvals[0];
+ else
+ ri->ri_uuid.bv_len = 0;
+ if ( csn.bv_len ) {
+ ri->ri_csn.bv_val = (char *)(ri + 1);
+ ri->ri_csn.bv_len = csn.bv_len;
+ memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len );
+ ri->ri_csn.bv_val[csn.bv_len] = '\0';
+ } else {
+ ri->ri_csn.bv_val = NULL;
+ }
+ } else {
+ srsize += opc->suuid.bv_len +
+ opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1;
+ ri = ch_malloc( srsize );
+ ri->ri_dn.bv_val = (char *)(ri + 1);
+ ri->ri_dn.bv_len = opc->sdn.bv_len;
+ ri->ri_ndn.bv_val = lutil_strcopy( ri->ri_dn.bv_val,
+ opc->sdn.bv_val ) + 1;
+ ri->ri_ndn.bv_len = opc->sndn.bv_len;
+ ri->ri_uuid.bv_val = lutil_strcopy( ri->ri_ndn.bv_val,
+ opc->sndn.bv_val ) + 1;
+ ri->ri_uuid.bv_len = opc->suuid.bv_len;
+ AC_MEMCPY( ri->ri_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
+ if ( csn.bv_len ) {
+ ri->ri_csn.bv_val = ri->ri_uuid.bv_val + ri->ri_uuid.bv_len;
+ memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len );
+ ri->ri_csn.bv_val[csn.bv_len] = '\0';
+ } else {
+ ri->ri_csn.bv_val = NULL;
+ }
+ }
+ ri->ri_list = &opc->ssres;
+ ri->ri_e = opc->se;
+ ri->ri_csn.bv_len = csn.bv_len;
+ ri->ri_isref = opc->sreference;
+ BER_BVZERO( &ri->ri_cookie );
+ ldap_pvt_thread_mutex_init( &ri->ri_mutex );
+ opc->se = NULL;
+ opc->ssres.s_info = ri;
+ }
+ ri = opc->ssres.s_info;
+ sr->s_info = ri;
+ ldap_pvt_thread_mutex_lock( &ri->ri_mutex );
+ sr->s_rilist = ri->ri_list;
+ ri->ri_list = sr;
+ if ( mode == LDAP_SYNC_NEW_COOKIE && BER_BVISNULL( &ri->ri_cookie )) {
+ syncprov_info_t *si = opc->son->on_bi.bi_private;
- if ( mode == LDAP_SYNC_NEW_COOKIE && cookie.bv_val ) {
- ch_free( cookie.bv_val );
+ slap_compose_sync_cookie( NULL, &ri->ri_cookie, si->si_ctxcsn,
+ so->s_rid, slap_serverID ? slap_serverID : -1);
}
+ ldap_pvt_thread_mutex_unlock( &ri->ri_mutex );
ldap_pvt_thread_mutex_lock( &so->s_mutex );
if ( !so->s_res ) {
if ( lock )
ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex );
}
- syncprov_free_syncop( so );
-
- return 0;
+ return syncprov_free_syncop( so, 0 );
}
static int
{
slap_callback *sc = op->o_callback;
op->o_callback = sc->sc_next;
- syncprov_drop_psearch( op->o_callback->sc_private, 0 );
+ syncprov_drop_psearch( sc->sc_private, 0 );
op->o_tmpfree( sc, op->o_tmpmemctx );
return 0;
}
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
syncprov_info_t *si = on->on_bi.bi_private;
- syncops *so, *soprev;
+ syncops *so, **sop;
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
- for ( so=si->si_ops, soprev = (syncops *)&si->si_ops; so;
- soprev=so, so=so->s_next ) {
+ for ( sop=&si->si_ops; (so = *sop); sop = &(*sop)->s_next ) {
if ( so->s_op->o_connid == op->o_connid &&
so->s_op->o_msgid == op->orn_msgid ) {
so->s_op->o_abandon = 1;
- soprev->s_next = so->s_next;
+ *sop = so->s_next;
break;
}
}
syncprov_info_t *si = on->on_bi.bi_private;
fbase_cookie fc;
- syncops *ss, *sprev, *snext;
+ syncops **pss;
Entry *e = NULL;
Attribute *a;
- int rc;
+ int rc, gonext;
struct berval newdn;
int freefdn = 0;
BackendDB *b0 = op->o_bd, db;
rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on );
/* If we're sending responses now, make a copy and unlock the DB */
if ( e && !saveit ) {
- if ( !opc->se ) {
+ if ( !opc->se )
opc->se = entry_dup( e );
- opc->se->e_private = get_mutexint();
- }
overlay_entry_release_ov( op, e, 0, on );
e = opc->se;
}
} else {
e = op->ora_e;
if ( !saveit ) {
- if ( !opc->se ) {
+ if ( !opc->se )
opc->se = entry_dup( e );
- opc->se->e_private = get_mutexint();
- }
e = opc->se;
}
}
}
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
- for (ss = si->si_ops, sprev = (syncops *)&si->si_ops; ss;
- sprev = ss, ss=snext)
+ for (pss = &si->si_ops; *pss; pss = gonext ? &(*pss)->s_next : pss)
{
Operation op2;
Opheader oh;
syncmatches *sm;
int found = 0;
+ syncops *snext, *ss = *pss;
- snext = ss->s_next;
+ gonext = 1;
if ( ss->s_op->o_abandon )
continue;
SlapReply rs = {REP_RESULT};
send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED,
"search base has changed" );
- sprev->s_next = snext;
- syncprov_drop_psearch( ss, 1 );
- ss = sprev;
+ snext = ss->s_next;
+ if ( syncprov_drop_psearch( ss, 1 ) )
+ *pss = snext;
+ gonext = 0;
continue;
}
phase otherwise (ITS#6555) */
op2.ors_filter = ss->s_op->ors_filter->f_and->f_next;
}
- ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
rc = test_filter( &op2, e, op2.ors_filter );
+ ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
}
Debug( LDAP_DEBUG_TRACE, "syncprov_matchops: sid %03x fscope %d rc %d\n",
/* Decrement s_inuse, was incremented when called
* with saveit == TRUE
*/
- syncprov_free_syncop( ss );
+ snext = ss->s_next;
+ if ( syncprov_free_syncop( ss, 0 ) ) {
+ *pss = snext;
+ gonext = 0;
+ }
}
}
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
overlay_entry_release_ov( op, e, 0, on );
op->o_bd = b0;
}
- if ( opc->se && !saveit ) {
- if ( !dec_mutexint( opc->se->e_private )) {
- opc->se->e_private = NULL;
+ if ( !saveit ) {
+ if ( opc->ssres.s_info )
+ free_resinfo( &opc->ssres );
+ else if ( opc->se )
entry_free( opc->se );
- opc->se = NULL;
- }
}
if ( freefdn ) {
op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx );
for (sm = opc->smatches; sm; sm=snext) {
snext = sm->sm_next;
- syncprov_free_syncop( sm->sm_op );
+ syncprov_free_syncop( sm->sm_op, 1 );
op->o_tmpfree( sm, op->o_tmpmemctx );
}
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
}
sop->s_next = si->si_ops;
+ sop->s_si = si;
si->si_ops = sop;
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
}