From 8bad70d957cc083d4c850950651be635a975938c Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Wed, 8 Dec 2004 00:47:24 +0000 Subject: [PATCH] Added sessionlog support. consumer needs work... --- servers/slapd/overlays/syncprov.c | 328 +++++++++++++++++++++++++++--- servers/slapd/slap.h | 4 +- 2 files changed, 301 insertions(+), 31 deletions(-) diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index 5847451ea2..a751f477a2 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -56,8 +56,8 @@ typedef struct syncops { struct berval s_base; /* ndn of search base */ ID s_eid; /* entryID of search base */ Operation *s_op; /* search op */ - long s_sid; - long s_rid; + int s_sid; + int s_rid; struct berval s_filterstr; int s_flags; /* search status */ int s_inuse; /* reference count */ @@ -91,6 +91,25 @@ typedef struct syncmatches { syncops *sm_op; } syncmatches; +/* Session log data */ +typedef struct slog_entry { + struct slog_entry *se_next; + struct berval se_uuid; + struct berval se_csn; + ber_tag_t se_tag; +} slog_entry; + +typedef struct sessionlog { + struct sessionlog *sl_next; + int sl_sid; + struct berval sl_mincsn; + int sl_num; + int sl_size; + slog_entry *sl_head; + slog_entry *sl_tail; + ldap_pvt_thread_mutex_t sl_mutex; +} sessionlog; + /* The main state for this overlay */ typedef struct syncprov_info_t { syncops *si_ops; @@ -100,6 +119,7 @@ typedef struct syncprov_info_t { int si_numops; /* number of ops since last checkpoint */ time_t si_chklast; /* time of last checkpoint */ Avlnode *si_mods; /* entries being modified */ + sessionlog *si_logs; ldap_pvt_thread_mutex_t si_csn_mutex; ldap_pvt_thread_mutex_t si_ops_mutex; ldap_pvt_thread_mutex_t si_mods_mutex; @@ -539,11 +559,12 @@ findpres_cb( Operation *op, SlapReply *rs ) { slap_callback *sc = op->o_callback; fpres_cookie *pc = sc->sc_private; + Attribute *a; int ret = SLAP_CB_CONTINUE; - if ( rs->sr_type == REP_SEARCH ) { - Attribute *a = attr_find( rs->sr_entry->e_attrs, - slap_schema.si_ad_entryUUID ); + switch ( rs->sr_type ) { + case REP_SEARCH: + a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID ); if ( a ) { pc->uuids[pc->num].bv_val = pc->last; AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val, @@ -553,15 +574,10 @@ findpres_cb( Operation *op, SlapReply *rs ) pc->uuids[pc->num].bv_val = NULL; } ret = LDAP_SUCCESS; - if ( pc->num == SLAP_SYNCUUID_SET_SIZE ) { - ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, - 0, pc->uuids, 0 ); - pc->uuids[pc->num].bv_val = pc->last; - pc->num = 0; - pc->last = pc->uuids[0].bv_val; - } - - } else if ( rs->sr_type == REP_RESULT ) { + if ( pc->num != SLAP_SYNCUUID_SET_SIZE ) + break; + /* FALLTHRU */ + case REP_RESULT: ret = rs->sr_err; if ( pc->num ) { ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, @@ -570,6 +586,9 @@ findpres_cb( Operation *op, SlapReply *rs ) pc->num = 0; pc->last = pc->uuids[0].bv_val; } + break; + default: + break; } return ret; } @@ -1056,6 +1075,183 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on ) opm.o_bd->be_modify( &opm, rs ); } +static void +syncprov_add_slog( Operation *op, struct berval *csn ) +{ + opcookie *opc = op->o_callback->sc_private; + slap_overinst *on = opc->son; + syncprov_info_t *si = on->on_bi.bi_private; + sessionlog *sl; + slog_entry *se; + + for ( sl = si->si_logs; sl; sl=sl->sl_next ) { + /* Allocate a record. UUIDs are not NUL-terminated. */ + se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + + csn->bv_len + 1 ); + se->se_next = NULL; + se->se_tag = op->o_tag; + + se->se_uuid.bv_val = (char *)(se+1); + se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len + 1; + AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); + se->se_uuid.bv_len = opc->suuid.bv_len; + + AC_MEMCPY( se->se_csn.bv_val, csn->bv_val, csn->bv_len ); + se->se_csn.bv_val[csn->bv_len] = '\0'; + se->se_csn.bv_len = csn->bv_len; + + ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); + if ( sl->sl_head ) { + sl->sl_tail->se_next = se; + } else { + sl->sl_head = se; + } + sl->sl_tail = se; + sl->sl_num++; + while ( sl->sl_num > sl->sl_size ) { + se = sl->sl_head; + sl->sl_head = se->se_next; + strcpy( sl->sl_mincsn.bv_val, se->se_csn.bv_val ); + sl->sl_mincsn.bv_len = se->se_csn.bv_len; + ch_free( se ); + sl->sl_num--; + if ( !sl->sl_head ) { + sl->sl_tail = NULL; + } + } + ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); + } +} + +/* Just set a flag if we found the matching entry */ +static int +playlog_cb( Operation *op, SlapReply *rs ) +{ + if ( rs->sr_type == REP_SEARCH ) { + op->o_callback->sc_private = (void *)1; + } + return rs->sr_err; +} + +/* enter with sl->sl_mutex locked, release before returning */ +static void +syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, + struct berval *oldcsn, struct berval *ctxcsn ) +{ + slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; + syncprov_info_t *si = on->on_bi.bi_private; + slog_entry *se; + int i, j, ndel, num, nmods, mmods; + BerVarray uuids; + + num = sl->sl_num; + i = 0; + nmods = 0; + + uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) + + num * UUID_LEN, op->o_tmpmemctx ); + + uuids[0].bv_val = (char *)(uuids + num + 1); + + /* Make a copy of the relevant UUIDs. Put the Deletes up front + * and everything else at the end. Do this first so we can + * unlock the list mutex. + */ + for ( se=sl->sl_head; se; se=se->se_next ) { + if ( ber_bvcmp( &se->se_csn, oldcsn ) < 0 ) continue; + if ( ber_bvcmp( &se->se_csn, ctxcsn ) > 0 ) break; + if ( se->se_tag == LDAP_REQ_DELETE ) { + j = i; + i++; + } else { + nmods++; + j = num - nmods; + } + uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN); + AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN); + uuids[j].bv_len = UUID_LEN; + } + ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); + + ndel = i; + + /* Mods must be validated to see if they belong in this delete set. + */ + + mmods = nmods; + /* Strip any duplicates */ + for ( i=0; ibd_info = on->on_info->oi_orig; + cb.sc_private = NULL; + rc = fop.o_bd->be_search( &fop, &frs ); + fop.o_bd->bd_info = (BackendInfo *)on; + op->o_tmpfree( fop.ors_filterstr.bv_val, op->o_tmpmemctx ); + + /* If entry was not found, add to delete list */ + if ( !cb.sc_private ) { + uuids[ndel++] = uuids[num - 1 - i]; + } + } + } + if ( ndel ) { + uuids[ndel].bv_val = NULL; + syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, 0, uuids, 1 ); + } +} + static int syncprov_op_response( Operation *op, SlapReply *rs ) { @@ -1066,13 +1262,13 @@ syncprov_op_response( Operation *op, SlapReply *rs ) if ( rs->sr_err == LDAP_SUCCESS ) { - struct berval maxcsn; + struct berval maxcsn = BER_BVNULL, curcsn = BER_BVNULL; char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; /* Update our context CSN */ cbuf[0] = '\0'; ldap_pvt_thread_mutex_lock( &si->si_csn_mutex ); - slap_get_commit_csn( op, &maxcsn ); + slap_get_commit_csn( op, &maxcsn, &curcsn ); if ( !BER_BVISNULL( &maxcsn ) ) { strcpy( cbuf, maxcsn.bv_val ); if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn ) > 0 ) { @@ -1127,6 +1323,11 @@ syncprov_op_response( Operation *op, SlapReply *rs ) } } + /* Add any log records */ + if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) { + syncprov_add_slog( op, &curcsn ); + } + } return SLAP_CB_CONTINUE; } @@ -1255,11 +1456,12 @@ syncprov_op_mod( Operation *op, SlapReply *rs ) avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error ); ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); } - - if ( op->o_tag != LDAP_REQ_ADD ) - syncprov_matchops( op, opc, 1 ); } + if (( si->si_ops || si->si_logs ) && op->o_tag != LDAP_REQ_ADD ) + syncprov_matchops( op, opc, 1 ); + + return SLAP_CB_CONTINUE; } @@ -1275,6 +1477,7 @@ syncprov_op_extended( Operation *op, SlapReply *rs ) typedef struct searchstate { slap_overinst *ss_on; syncops *ss_so; + int ss_present; } searchstate; static int @@ -1390,11 +1593,12 @@ syncprov_search_response( Operation *op, SlapReply *rs ) op->o_tmpmemctx ); rs->sr_ctrls[1] = NULL; rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls, - 0, 1, &cookie, LDAP_SYNC_REFRESH_PRESENTS ); + 0, 1, &cookie, ss->ss_present ? LDAP_SYNC_REFRESH_PRESENTS : + LDAP_SYNC_REFRESH_DELETES ); } else { int locked = 0; /* It's RefreshAndPersist, transition to Persist phase */ - syncprov_sendinfo( op, rs, rs->sr_nentries ? + syncprov_sendinfo( op, rs, ss->ss_present ? LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, &cookie, 1, NULL, 0 ); /* Flush any queued persist messages */ @@ -1457,11 +1661,13 @@ syncprov_op_search( Operation *op, SlapReply *rs ) slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; slap_callback *cb; - int gotstate = 0, nochange = 0; + int gotstate = 0, nochange = 0, do_present = 1; Filter *fand, *fava; syncops *sop = NULL; searchstate *ss; sync_control *srs; + struct berval ctxcsn; + char csnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE; @@ -1508,16 +1714,24 @@ syncprov_op_search( Operation *op, SlapReply *rs ) ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); } - /* If we have a cookie, handle the PRESENT lookups - */ + /* snapshot the ctxcsn */ + ldap_pvt_thread_mutex_lock( &si->si_csn_mutex ); + strcpy( csnbuf, si->si_ctxcsnbuf ); + ctxcsn.bv_len = si->si_ctxcsn.bv_len; + ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); + ctxcsn.bv_val = csnbuf; + + /* If we have a cookie, handle the PRESENT lookups */ if ( srs->sr_state.ctxcsn ) { + sessionlog *sl; + /* Is the CSN in a valid format? */ if ( srs->sr_state.ctxcsn->bv_len >= LDAP_LUTIL_CSNSTR_BUFSIZE ) { send_ldap_error( op, rs, LDAP_OTHER, "invalid sync cookie" ); return rs->sr_err; } /* If just Refreshing and nothing has changed, shortcut it */ - if ( bvmatch( srs->sr_state.ctxcsn, &si->si_ctxcsn )) { + if ( bvmatch( srs->sr_state.ctxcsn, &ctxcsn )) { nochange = 1; if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { LDAPControl *ctrls[2]; @@ -1534,6 +1748,19 @@ syncprov_op_search( Operation *op, SlapReply *rs ) } goto shortcut; } + /* Do we have a sessionlog for this search? */ + for ( sl=si->si_logs; sl; sl=sl->sl_next ) + if ( sl->sl_sid == srs->sr_state.sid ) break; + if ( sl ) { + ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); + if ( ber_bvcmp( srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) { + do_present = 0; + /* mutex is unlocked in playlog */ + syncprov_playlog( op, rs, sl, srs->sr_state.ctxcsn, &ctxcsn ); + } else { + ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); + } + } /* Is the CSN still present in the database? */ if ( syncprov_findcsn( op, FIND_CSN ) != LDAP_SUCCESS ) { /* No, so a reload is required */ @@ -1545,8 +1772,9 @@ syncprov_op_search( Operation *op, SlapReply *rs ) #endif } else { gotstate = 1; - /* If context has changed, check for Present UUIDs */ - if ( syncprov_findcsn( op, FIND_PRESENT ) != LDAP_SUCCESS ) { + /* If changed and doing Present lookup, send Present UUIDs */ + if ( do_present && syncprov_findcsn( op, FIND_PRESENT ) != + LDAP_SUCCESS ) { send_ldap_result( op, rs ); return rs->sr_err; } @@ -1567,9 +1795,7 @@ syncprov_op_search( Operation *op, SlapReply *rs ) fava->f_choice = LDAP_FILTER_LE; fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx ); fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN; - ldap_pvt_thread_mutex_lock( &si->si_csn_mutex ); - ber_dupbv_x( &fava->f_ava->aa_value, &si->si_ctxcsn, op->o_tmpmemctx ); - ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); + ber_dupbv_x( &fava->f_ava->aa_value, &ctxcsn, op->o_tmpmemctx ); fand->f_and = fava; if ( gotstate ) { fava->f_next = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx ); @@ -1589,6 +1815,7 @@ shortcut: ss = (searchstate *)(cb+1); ss->ss_on = on; ss->ss_so = sop; + ss->ss_present = do_present; cb->sc_response = syncprov_search_response; cb->sc_cleanup = syncprov_search_cleanup; cb->sc_private = ss; @@ -1679,6 +1906,47 @@ syncprov_db_config( si->si_chktime = atoi( argv[2] ) * 60; return 0; + } else if ( strcasecmp( argv[0], "syncprov-sessionlog" ) == 0 ) { + sessionlog *sl; + int sid, size; + if ( argc != 3 ) { + fprintf( stderr, "%s: line %d: wrong number of arguments in " + "\"syncprov-sessionlog \"\n", fname, lineno ); + return -1; + } + sid = atoi( argv[1] ); + if ( sid < 0 || sid > 999 ) { + fprintf( stderr, + "%s: line %d: session log id %d is out of range [0..999]\n", + fname, lineno, sid ); + return -1; + } + size = atoi( argv[2] ); + if ( size < 0 ) { + fprintf( stderr, + "%s: line %d: session log size %d is negative\n", + fname, lineno, size ); + return -1; + } + for ( sl = si->si_logs; sl; sl=sl->sl_next ) { + if ( sl->sl_sid == sid ) { + sl->sl_size = size; + break; + } + } + if ( !sl ) { + sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE ); + sl->sl_mincsn.bv_val = (char *)(sl+1); + sl->sl_mincsn.bv_len = 0; + sl->sl_sid = sid; + sl->sl_size = size; + sl->sl_num = 0; + sl->sl_head = sl->sl_tail = NULL; + sl->sl_next = si->si_logs; + ldap_pvt_thread_mutex_init( &sl->sl_mutex ); + si->si_logs = sl; + } + return 0; } return SLAP_CONF_UNKNOWN; diff --git a/servers/slapd/slap.h b/servers/slapd/slap.h index 09a1851ee6..a501942d4e 100644 --- a/servers/slapd/slap.h +++ b/servers/slapd/slap.h @@ -2065,11 +2065,12 @@ typedef struct slap_paged_state { #define LDAP_PSEARCH_BY_SCOPEOUT 0x05 #define LDAP_PSEARCH_BY_PREDELETE 0x06 -struct psid_entry { +struct psid_entry { /* DELETE ME */ struct slap_op *ps_op; LDAP_LIST_ENTRY(psid_entry) ps_link; }; +#if 0 /* DELETE ME */ struct slog_entry { struct berval sl_uuid; struct berval sl_name; @@ -2084,6 +2085,7 @@ struct slap_session_entry { struct berval se_spec; LDAP_LIST_ENTRY( slap_session_entry ) se_link; }; +#endif struct slap_csn_entry { struct berval ce_csn; -- 2.39.5