} slog_entry;
typedef struct sessionlog {
- struct berval sl_mincsn;
+ BerVarray sl_mincsn;
+ int *sl_sids;
+ int sl_numcsns;
int sl_num;
int sl_size;
slog_entry *sl_head;
/* The main state for this overlay */
typedef struct syncprov_info_t {
syncops *si_ops;
- BerVarray si_ctxcsn; /* ldapsync context */
struct berval si_contextdn;
+ BerVarray si_ctxcsn; /* ldapsync context */
int *si_sids;
int si_numcsns;
int si_chkops; /* checkpointing info */
} else {
sl->sl_head = se;
sl->sl_tail = se;
+ if ( !sl->sl_mincsn ) {
+ sl->sl_numcsns = 1;
+ sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval ));
+ sl->sl_sids = ch_malloc( sizeof( int ));
+ sl->sl_sids[0] = se->se_sid;
+ ber_dupbv( sl->sl_mincsn, &se->se_csn );
+ BER_BVZERO( &sl->sl_mincsn[1] );
+ }
}
sl->sl_num++;
while ( sl->sl_num > sl->sl_size ) {
+ int i, j;
se = sl->sl_head;
sl->sl_head = se->se_next;
- AC_MEMCPY( sl->sl_mincsn.bv_val, se->se_csn.bv_val, se->se_csn.bv_len );
- sl->sl_mincsn.bv_len = se->se_csn.bv_len;
+ for ( i=0; i<sl->sl_numcsns; i++ )
+ if ( sl->sl_sids[i] >= se->se_sid )
+ break;
+ if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) {
+ slap_insert_csn_sids( (struct sync_cookie *)sl,
+ i, se->se_sid, &se->se_csn );
+ } else {
+ ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn );
+ }
ch_free( se );
sl->sl_num--;
}
#endif
sid = slap_parse_csn_sid( &maxcsn );
for ( i=0; i<si->si_numcsns; i++ ) {
+ if ( sid < si->si_sids[i] )
+ break;
if ( sid == si->si_sids[i] ) {
if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn[i] ) > 0 ) {
ber_bvreplace( &si->si_ctxcsn[i], &maxcsn );
}
}
/* It's a new SID for us */
- if ( i == si->si_numcsns ) {
- value_add_one( &si->si_ctxcsn, &maxcsn );
+ if ( i == si->si_numcsns || sid != si->si_sids[i] ) {
+ slap_insert_csn_sids((struct sync_cookie *)&(si->si_ctxcsn),
+ i, sid, &maxcsn );
csn_changed = 1;
- si->si_numcsns++;
- si->si_sids = ch_realloc( si->si_sids, si->si_numcsns *
- sizeof(int));
- si->si_sids[i] = sid;
}
}
for ( i=0; i<mod->sml_numvals; i++ ) {
sid = slap_parse_csn_sid( &mod->sml_values[i] );
for ( j=0; j<si->si_numcsns; j++ ) {
+ if ( sid < si->si_sids[j] )
+ break;
if ( sid == si->si_sids[j] ) {
if ( ber_bvcmp( &mod->sml_values[i], &si->si_ctxcsn[j] ) > 0 ) {
ber_bvreplace( &si->si_ctxcsn[j], &mod->sml_values[i] );
}
}
- if ( j == si->si_numcsns ) {
- value_add_one( &si->si_ctxcsn, &mod->sml_values[i] );
- si->si_numcsns++;
- si->si_sids = ch_realloc( si->si_sids, si->si_numcsns *
- sizeof(int));
- si->si_sids[j] = sid;
+ if ( j == si->si_numcsns || sid != si->si_sids[j] ) {
+ slap_insert_csn_sids( (struct sync_cookie *)&si->si_ctxcsn,
+ j, sid, &mod->sml_values[i] );
csn_changed = 1;
}
}
BerVarray ctxcsn;
int i, *sids, numcsns;
struct berval mincsn, maxcsn;
+ int minsid, maxsid;
int dirty = 0;
if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
/* If there are SIDs we don't recognize in the cookie, drop them */
for (i=0; i<srs->sr_state.numcsns; ) {
- for (j=0; j<numcsns; j++) {
- if ( srs->sr_state.sids[i] == sids[j] ) {
+ for (j=i; j<numcsns; j++) {
+ if ( srs->sr_state.sids[i] <= sids[j] ) {
break;
}
}
/* not found */
- if ( j == numcsns ) {
- struct berval tmp = srs->sr_state.ctxcsn[i];
- j = srs->sr_state.numcsns - 1;
- srs->sr_state.ctxcsn[i] = srs->sr_state.ctxcsn[j];
- tmp.bv_len = 0;
- srs->sr_state.ctxcsn[j] = tmp;
- srs->sr_state.numcsns = j;
- srs->sr_state.sids[i] = srs->sr_state.sids[j];
+ if ( j == numcsns || srs->sr_state.sids[i] != sids[j] ) {
+ char *tmp = srs->sr_state.ctxcsn[i].bv_val;
+ srs->sr_state.numcsns--;
+ for ( j=i; j<srs->sr_state.numcsns; j++ ) {
+ srs->sr_state.ctxcsn[j] = srs->sr_state.ctxcsn[j+1];
+ srs->sr_state.sids[j] = srs->sr_state.sids[j+1];
+ }
+ srs->sr_state.ctxcsn[j].bv_val = tmp;
+ srs->sr_state.ctxcsn[j].bv_len = 0;
continue;
}
i++;
/* Find the smallest CSN which differs from contextCSN */
mincsn.bv_len = 0;
maxcsn.bv_len = 0;
- for ( i=0; i<srs->sr_state.numcsns; i++ ) {
- for ( j=0; j<numcsns; j++ ) {
- if ( srs->sr_state.sids[i] != sids[j] )
- continue;
- if ( BER_BVISEMPTY( &maxcsn ) || ber_bvcmp( &maxcsn,
- &srs->sr_state.ctxcsn[i] ) < 0 ) {
- maxcsn = srs->sr_state.ctxcsn[i];
+ for ( i=0,j=0; i<srs->sr_state.numcsns; i++ ) {
+ int newer;
+ while ( srs->sr_state.sids[i] != sids[j] ) j++;
+ if ( BER_BVISEMPTY( &maxcsn ) || ber_bvcmp( &maxcsn,
+ &srs->sr_state.ctxcsn[i] ) < 0 ) {
+ maxcsn = srs->sr_state.ctxcsn[i];
+ maxsid = sids[j];
+ }
+ newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] );
+ /* If our state is newer, tell consumer about changes */
+ if ( newer < 0) {
+ changed = SS_CHANGED;
+ if ( BER_BVISEMPTY( &mincsn ) || ber_bvcmp( &mincsn,
+ &srs->sr_state.ctxcsn[i] ) > 0 ) {
+ mincsn = srs->sr_state.ctxcsn[i];
+ minsid = sids[j];
}
- if ( ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] ) < 0) {
- if ( BER_BVISEMPTY( &mincsn ) || ber_bvcmp( &mincsn,
- &srs->sr_state.ctxcsn[i] ) > 0 ) {
- mincsn = srs->sr_state.ctxcsn[i];
- }
+ } else if ( newer > 0 ) {
+ /* our state is older, complain to consumer */
+ rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
+ rs->sr_text = "consumer state is newer than provider!";
+bailout:
+ if ( sop ) {
+ syncops **sp = &si->si_ops;
+
+ ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+ while ( *sp != sop )
+ sp = &(*sp)->s_next;
+ *sp = sop->s_next;
+ ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+ ch_free( sop );
}
+ rs->sr_ctrls = NULL;
+ send_ldap_result( op, rs );
+ return rs->sr_err;
}
}
- if ( BER_BVISEMPTY( &mincsn ))
+ if ( BER_BVISEMPTY( &mincsn )) {
mincsn = maxcsn;
+ minsid = maxsid;
+ }
/* If nothing has changed, shortcut it */
if ( srs->sr_state.numcsns == numcsns ) {
- int i, j, newer;
- for ( i=0; i<srs->sr_state.numcsns; i++ ) {
- for ( j=0; j<numcsns; j++ ) {
- if ( srs->sr_state.sids[i] != sids[j] )
- continue;
- newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] );
- /* If our state is newer, tell consumer about changes */
- if ( newer < 0 )
- changed = SS_CHANGED;
- else if ( newer > 0 ) {
- /* our state is older, complain to consumer */
- rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
- rs->sr_text = "consumer state is newer than provider!";
-bailout:
- if ( sop ) {
- syncops **sp = &si->si_ops;
-
- ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
- while ( *sp != sop )
- sp = &(*sp)->s_next;
- *sp = sop->s_next;
- ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
- ch_free( sop );
- }
- rs->sr_ctrls = NULL;
- send_ldap_result( op, rs );
- return rs->sr_err;
- }
- break;
- }
- if ( changed )
- break;
- }
if ( !changed && !dirty ) {
do_present = 0;
no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
/* Do we have a sessionlog for this search? */
sl=si->si_logs;
if ( sl ) {
+ int do_play = 0;
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
/* Are there any log entries, and is the consumer state
* present in the session log?
*/
- if ( sl->sl_num > 0 && ber_bvcmp( &mincsn, &sl->sl_mincsn ) >= 0 ) {
+ if ( sl->sl_num > 0 ) {
+ int i;
+ for ( i=0; i<sl->sl_numcsns; i++ ) {
+ /* SID not present == new enough */
+ if ( minsid < sl->sl_sids[i] ) {
+ do_play = 1;
+ break;
+ }
+ /* SID present and new enough */
+ if ( minsid == sl->sl_sids[i]
+ && ber_bvcmp( &mincsn, &sl->sl_mincsn[i] ) >= 0 ) {
+ do_play = 1;
+ break;
+ }
+ }
+ /* SID not present == new enough */
+ if ( i == sl->sl_numcsns )
+ do_play = 1;
+ }
+ if ( do_play ) {
do_present = 0;
/* mutex is unlocked in playlog */
syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids );
}
sl = si->si_logs;
if ( !sl ) {
- sl = ch_malloc( sizeof( sessionlog ) + LDAP_PVT_CSNSTR_BUFSIZE );
- sl->sl_mincsn.bv_val = (char *)(sl+1);
- sl->sl_mincsn.bv_len = 0;
+ sl = ch_malloc( sizeof( sessionlog ));
+ sl->sl_mincsn = NULL;
+ sl->sl_sids = NULL;
sl->sl_num = 0;
+ sl->sl_numcsns = 0;
sl->sl_head = sl->sl_tail = NULL;
ldap_pvt_thread_mutex_init( &sl->sl_mutex );
si->si_logs = sl;
return NULL;
}
+
/* Read any existing contextCSN from the underlying db.
* Then search for any entries newer than that. If no value exists,
* just generate it. Cache whatever result.
ber_bvarray_dup_x( &si->si_ctxcsn, a->a_vals, NULL );
si->si_numcsns = a->a_numvals;
si->si_sids = slap_parse_csn_sids( si->si_ctxcsn, a->a_numvals, NULL );
+ slap_sort_csn_sids( si->si_ctxcsn, si->si_sids, si->si_numcsns, NULL );
}
overlay_entry_release_ov( op, e, 0, on );
if ( si->si_ctxcsn && !SLAP_DBCLEAN( be )) {
if ( si->si_logs && si->si_numcsns ) {
sessionlog *sl = si->si_logs;
int i;
- /* If there are multiple, find the newest */
- for ( i=0; i < si->si_numcsns; i++ ) {
- if ( ber_bvcmp( &sl->sl_mincsn, &si->si_ctxcsn[i] ) < 0 ) {
- AC_MEMCPY( sl->sl_mincsn.bv_val, si->si_ctxcsn[i].bv_val,
- si->si_ctxcsn[i].bv_len );
- sl->sl_mincsn.bv_len = si->si_ctxcsn[i].bv_len;
- }
- }
+ ber_bvarray_dup_x( &sl->sl_mincsn, si->si_ctxcsn, NULL );
+ sl->sl_numcsns = si->si_numcsns;
+ sl->sl_sids = ch_malloc( si->si_numcsns * sizeof(int) );
+ for ( i=0; i < si->si_numcsns; i++ )
+ sl->sl_sids[i] = si->si_sids[i];
}
out:
if ( si ) {
if ( si->si_logs ) {
- slog_entry *se = si->si_logs->sl_head;
+ sessionlog *sl = si->si_logs;
+ slog_entry *se = sl->sl_head;
while ( se ) {
slog_entry *se_next = se->se_next;
ch_free( se );
se = se_next;
}
-
+ if ( sl->sl_mincsn )
+ ber_bvarray_free( sl->sl_mincsn );
+ if ( sl->sl_sids )
+ ch_free( sl->sl_sids );
+
ldap_pvt_thread_mutex_destroy(&si->si_logs->sl_mutex);
ch_free( si->si_logs );
}