/* syncprov.c - syncrepl provider */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
- * Copyright 2004-2010 The OpenLDAP Foundation.
+ * Copyright 2004-2011 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
if ( ret < 0 ) {
Debug( LDAP_DEBUG_TRACE,
- "slap_build_sync_ctrl: ber_flatten2 failed\n",
- 0, 0, 0 );
+ "slap_build_sync_ctrl: ber_flatten2 failed (%d)\n",
+ ret, 0, 0 );
send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
- return ret;
+ return LDAP_OTHER;
}
return LDAP_SUCCESS;
if ( ret < 0 ) {
Debug( LDAP_DEBUG_TRACE,
- "syncprov_done_ctrl: ber_flatten2 failed\n",
- 0, 0, 0 );
+ "syncprov_done_ctrl: ber_flatten2 failed (%d)\n",
+ ret, 0, 0 );
send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
- return ret;
+ return LDAP_OTHER;
}
return LDAP_SUCCESS;
if ( ret < 0 ) {
Debug( LDAP_DEBUG_TRACE,
- "syncprov_sendinfo: ber_flatten2 failed\n",
- 0, 0, 0 );
+ "syncprov_sendinfo: ber_flatten2 failed (%d)\n",
+ ret, 0, 0 );
send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
- return ret;
+ return LDAP_OTHER;
}
rs->sr_rspoid = LDAP_SYNC_INFO;
}
static int
-syncprov_findcsn( Operation *op, find_csn_t mode )
+syncprov_findcsn( Operation *op, find_csn_t mode, struct berval *csn )
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
syncprov_info_t *si = on->on_bi.bi_private;
break;
case FIND_CSN:
if ( BER_BVISEMPTY( &cf.f_av_value )) {
- cf.f_av_value = srs->sr_state.ctxcsn[0];
- /* If there are multiple CSNs, use the smallest */
- for ( i=1; i<srs->sr_state.numcsns; i++ ) {
- if ( ber_bvcmp( &cf.f_av_value, &srs->sr_state.ctxcsn[i] )
- > 0 ) {
- cf.f_av_value = srs->sr_state.ctxcsn[i];
- }
- }
+ cf.f_av_value = *csn;
}
+ fop.o_dn = op->o_bd->be_rootdn;
+ fop.o_ndn = op->o_bd->be_rootndn;
+ fop.o_req_dn = op->o_bd->be_suffix[0];
+ fop.o_req_ndn = op->o_bd->be_nsuffix[0];
/* Look for exact match the first time */
if ( findcsn_retry ) {
cf.f_choice = LDAP_FILTER_EQUALITY;
/* If we didn't find an exact match, then try for <= */
if ( findcsn_retry ) {
findcsn_retry = 0;
+ rs_reinit( &frs, REP_RESULT );
goto again;
}
rc = LDAP_NO_SUCH_OBJECT;
static int
syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode )
{
- slap_overinst *on = opc->son;
-
SlapReply rs = { REP_SEARCH };
LDAPControl *ctrls[2];
struct berval cookie = BER_BVNULL, csns[2];
{
slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
syncres *sr;
- Entry *e;
opcookie opc;
int rc = 0;
slap_overinst *on = opc->son;
syncprov_info_t *si = on->on_bi.bi_private;
syncmatches *sm, *snext;
- modtarget *mt, mtdummy;
+ modtarget *mt;
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
if ( si->si_active )
}
static void
-syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
+syncprov_checkpoint( Operation *op, slap_overinst *on )
{
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
Modifications mod;
Operation opm;
- SlapReply rsm = { 0 };
+ SlapReply rsm = {REP_RESULT};
slap_callback cb = {0};
BackendDB be;
BackendInfo *bi;
char txtbuf[SLAP_TEXT_BUFLEN];
size_t textlen = sizeof txtbuf;
Entry *e = slap_create_context_csn_entry( opm.o_bd, NULL );
+ rs_reinit( &rsm, REP_RESULT );
slap_mods2entry( &mod, &e, 0, 1, &text, txtbuf, textlen);
opm.ora_e = e;
opm.o_bd->be_add( &opm, &rsm );
sl = si->si_logs;
{
+ if ( BER_BVISEMPTY( &op->o_csn ) ) {
+ /* During the syncrepl refresh phase we can receive operations
+ * without a csn. We cannot reliably determine the consumers
+ * state with respect to such operations, so we ignore them and
+ * wipe out anything in the log if we see them.
+ */
+ ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+ while ( se = sl->sl_head ) {
+ sl->sl_head = se->se_next;
+ ch_free( se );
+ }
+ sl->sl_tail = NULL;
+ sl->sl_num = 0;
+ ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ return;
+ }
+
/* Allocate a record. UUIDs are not NUL-terminated. */
se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
op->o_csn.bv_len + 1 );
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
if ( sl->sl_head ) {
- sl->sl_tail->se_next = se;
+ /* Keep the list in csn order. */
+ if ( ber_bvcmp( &sl->sl_tail->se_csn, &se->se_csn ) <= 0 ) {
+ sl->sl_tail->se_next = se;
+ sl->sl_tail = se;
+ } else {
+ slog_entry **sep;
+
+ for ( sep = &sl->sl_head; *sep; sep = &(*sep)->se_next ) {
+ if ( ber_bvcmp( &se->se_csn, &(*sep)->se_csn ) < 0 ) {
+ se->se_next = *sep;
+ *sep = se;
+ break;
+ }
+ }
+ }
} else {
sl->sl_head = se;
+ sl->sl_tail = se;
}
- sl->sl_tail = se;
sl->sl_num++;
while ( sl->sl_num > sl->sl_size ) {
se = sl->sl_head;
sl->sl_head = se->se_next;
- strcpy( sl->sl_mincsn.bv_val, se->se_csn.bv_val );
+ AC_MEMCPY( sl->sl_mincsn.bv_val, se->se_csn.bv_val, se->se_csn.bv_len );
sl->sl_mincsn.bv_len = se->se_csn.bv_len;
ch_free( se );
sl->sl_num--;
delcsn[0].bv_len = se->se_csn.bv_len;
delcsn[0].bv_val[delcsn[0].bv_len] = '\0';
} else {
+ if ( se->se_tag == LDAP_REQ_ADD )
+ continue;
nmods++;
j = num - nmods;
}
if ( mmods ) {
Operation fop;
- SlapReply frs = { REP_RESULT };
int rc;
Filter mf, af;
AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
fop.o_bd->bd_info = (BackendInfo *)on->on_info;
for ( i=ndel; i<num; i++ ) {
- if ( uuids[i].bv_len == 0 ) continue;
+ if ( uuids[i].bv_len != 0 ) {
+ SlapReply frs = { REP_RESULT };
mf.f_av_value = uuids[i];
cb.sc_private = NULL;
fop.ors_slimit = 1;
- frs.sr_nentries = 0;
rc = fop.o_bd->be_search( &fop, &frs );
/* If entry was not found, add to delete list */
if ( !cb.sc_private ) {
uuids[ndel++] = uuids[i];
}
+ }
}
fop.o_bd->bd_info = (BackendInfo *)on;
}
* that changed, and only one can be passed in the csn queue.
*/
Modifications *mod = op->orm_modlist;
- int i, j, sid;
+ unsigned i;
+ int j, sid;
for ( i=0; i<mod->sml_numvals; i++ ) {
sid = slap_parse_csn_sid( &mod->sml_values[i] );
ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
if ( csn_changed ) {
+ syncops *ss;
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
- have_psearches = ( si->si_ops != NULL );
- ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
-
- if ( have_psearches ) {
- for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
- if ( sm->sm_op->s_op->o_abandon )
- continue;
- syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_NEW_COOKIE );
- }
+ for ( ss = si->si_ops; ss; ss = ss->s_next ) {
+ if ( ss->s_op->o_abandon )
+ continue;
+ /* Send the updated csn to all syncrepl consumers,
+ * including the server from which it originated.
+ * The syncrepl consumer and syncprov provider on
+ * the originating server may be configured to store
+ * their csn values in different entries.
+ */
+ syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE );
}
+ ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
}
} else {
ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
if ( do_check ) {
ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
- syncprov_checkpoint( op, rs, on );
+ syncprov_checkpoint( op, on );
ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
}
}
/* Add any log records */
- if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) {
+ if ( si->si_logs ) {
syncprov_add_slog( op );
}
leave: ldap_pvt_thread_mutex_unlock( &si->si_resp_mutex );
rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
op->o_tmpmemctx );
rs->sr_ctrls[1] = NULL;
+ rs->sr_flags |= REP_CTRLS_MUSTBEFREED;
/* If we're in delta-sync mode, always send a cookie */
if ( si->si_nopres && si->si_usehint && a ) {
struct berval cookie;
sync_control *srs;
BerVarray ctxcsn;
int i, *sids, numcsns;
- struct berval mincsn;
+ struct berval mincsn, maxcsn;
int dirty = 0;
if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
i++;
}
- /* Find the smallest CSN */
- mincsn = srs->sr_state.ctxcsn[0];
- for ( i=1; i<srs->sr_state.numcsns; i++ ) {
- if ( ber_bvcmp( &mincsn, &srs->sr_state.ctxcsn[i] ) > 0 )
- mincsn = srs->sr_state.ctxcsn[i];
- }
+ /* Find the smallest CSN which differs from contextCSN */
+ mincsn.bv_len = 0;
+ maxcsn.bv_len = 0;
+ for ( i=0; i<srs->sr_state.numcsns; i++ ) {
+ for ( j=0; j<numcsns; j++ ) {
+ if ( srs->sr_state.sids[i] != sids[j] )
+ continue;
+ if ( BER_BVISEMPTY( &maxcsn ) || ber_bvcmp( &maxcsn,
+ &srs->sr_state.ctxcsn[i] ) < 0 ) {
+ maxcsn = srs->sr_state.ctxcsn[i];
+ }
+ if ( ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] ) < 0) {
+ if ( BER_BVISEMPTY( &mincsn ) || ber_bvcmp( &mincsn,
+ &srs->sr_state.ctxcsn[i] ) > 0 ) {
+ mincsn = srs->sr_state.ctxcsn[i];
+ }
+ }
+ }
+ }
+ if ( BER_BVISEMPTY( &mincsn ))
+ mincsn = maxcsn;
/* If nothing has changed, shortcut it */
if ( srs->sr_state.numcsns == numcsns ) {
}
}
/* Is the CSN still present in the database? */
- if ( syncprov_findcsn( op, FIND_CSN ) != LDAP_SUCCESS ) {
+ if ( syncprov_findcsn( op, FIND_CSN, &mincsn ) != LDAP_SUCCESS ) {
/* No, so a reload is required */
/* the 2.2 consumer doesn't send this hint */
if ( si->si_usehint && srs->sr_rhint == 0 ) {
} else {
gotstate = 1;
/* If changed and doing Present lookup, send Present UUIDs */
- if ( do_present && syncprov_findcsn( op, FIND_PRESENT ) !=
+ if ( do_present && syncprov_findcsn( op, FIND_PRESENT, 0 ) !=
LDAP_SUCCESS ) {
if ( ctxcsn )
ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx );
}
if ( !ap ) {
- if ( !(rs->sr_flags & REP_ENTRY_MODIFIABLE) ) {
- Entry *e = entry_dup( rs->sr_entry );
- if ( rs->sr_flags & REP_ENTRY_MUSTRELEASE ) {
- overlay_entry_release_ov( op, rs->sr_entry, 0, on );
- rs->sr_flags ^= REP_ENTRY_MUSTRELEASE;
- } else if ( rs->sr_flags & REP_ENTRY_MUSTBEFREED ) {
- entry_free( rs->sr_entry );
- }
- rs->sr_entry = e;
- rs->sr_flags |=
- REP_ENTRY_MODIFIABLE|REP_ENTRY_MUSTBEFREED;
+ if ( rs_entry2modifiable( op, rs, on )) {
a = attr_find( rs->sr_entry->e_attrs,
slap_schema.si_ad_contextCSN );
}
void *ptr
)
{
- syncprov_findcsn( ptr, FIND_MAXCSN );
+ syncprov_findcsn( ptr, FIND_MAXCSN, 0 );
return NULL;
}
si->si_numops++;
}
+ /* Initialize the sessionlog mincsn */
+ if ( si->si_logs && si->si_numcsns ) {
+ sessionlog *sl = si->si_logs;
+ int i;
+ /* If there are multiple, find the newest */
+ for ( i=0; i < si->si_numcsns; i++ ) {
+ if ( ber_bvcmp( &sl->sl_mincsn, &si->si_ctxcsn[i] ) < 0 ) {
+ AC_MEMCPY( sl->sl_mincsn.bv_val, si->si_ctxcsn[i].bv_val,
+ si->si_ctxcsn[i].bv_len );
+ sl->sl_mincsn.bv_len = si->si_ctxcsn[i].bv_len;
+ }
+ }
+ }
+
out:
op->o_bd->bd_info = (BackendInfo *)on;
return 0;
Connection conn = {0};
OperationBuffer opbuf;
Operation *op;
- SlapReply rs = {REP_RESULT};
void *thrctx;
thrctx = ldap_pvt_thread_pool_context();
op->o_bd = be;
op->o_dn = be->be_rootdn;
op->o_ndn = be->be_rootndn;
- syncprov_checkpoint( op, &rs, on );
+ syncprov_checkpoint( op, on );
}
#ifdef SLAP_CONFIG_DELETE
se = se_next;
}
+ ldap_pvt_thread_mutex_destroy(&si->si_logs->sl_mutex);
ch_free( si->si_logs );
}
if ( si->si_ctxcsn )