From a5cfab44d7768505d0d9a9b4b357629501ca4bb1 Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Mon, 17 Aug 2009 12:02:48 +0000 Subject: [PATCH] ITS#6152 preliminary refresh support, untested --- servers/slapd/overlays/pcache.c | 221 +++++++++++++++++++++++++++++--- 1 file changed, 204 insertions(+), 17 deletions(-) diff --git a/servers/slapd/overlays/pcache.c b/servers/slapd/overlays/pcache.c index 6c03df4b29..14e1be942b 100644 --- a/servers/slapd/overlays/pcache.c +++ b/servers/slapd/overlays/pcache.c @@ -2664,6 +2664,206 @@ get_attr_set( return i; } +/* Refresh a cached query: + * 1: Replay the query on the remote DB and merge each entry into + * the local DB. Remember the DNs of each remote entry. + * 2: Search the local DB for all entries matching this queryID. + * Delete any entry whose DN is not in the list from (1). + */ +typedef struct dnlist { + struct dnlist *next; + struct berval dn; +} dnlist; + +typedef struct refresh_info { + dnlist *ri_dns; + dnlist *ri_tail; + dnlist *ri_dels; + BackendDB *ri_be; + CachedQuery *ri_q; +} refresh_info; + +static dnlist *dnl_alloc( Operation *op, struct berval *bvdn ) +{ + dnlist *dn = op->o_tmpalloc( sizeof(dnlist) + bvdn->bv_len + 1, + op->o_tmpmemctx ); + dn->dn.bv_len = bvdn->bv_len; + dn->dn.bv_val = (char *)(dn+1); + AC_MEMCPY( dn->dn.bv_val, bvdn->bv_val, dn->dn.bv_len ); + dn->dn.bv_val[dn->dn.bv_len] = '\0'; + return dn; +} + +static int +refresh_merge( Operation *op, SlapReply *rs ) +{ + if ( rs->sr_type == REP_SEARCH ) { + refresh_info *ri = op->o_callback->sc_private; + BackendDB *be = op->o_bd; + Entry *e; + dnlist *dn; + slap_callback *ocb; + int rc; + + ocb = op->o_callback; + /* Find local entry, merge */ + op->o_bd = ri->ri_be; + rc = be_entry_get_rw( op, &rs->sr_entry->e_nname, NULL, NULL, 0, &e ); + if ( rc != LDAP_SUCCESS || e == NULL ) { + /* No local entry, just add it. FIXME: we are not checking + * the cache entry limit here + */ + merge_entry( op, rs->sr_entry, &ri->ri_q->q_uuid ); + } else { + /* Entry exists, update it */ + Entry ne; + Attribute *a, **b; + Modifications *modlist, *mods; + const char* text = NULL; + char textbuf[SLAP_TEXT_BUFLEN]; + size_t textlen = sizeof(textbuf); + slap_callback cb = { NULL, slap_null_cb, NULL, NULL }; + + ne = *e; + b = &ne.e_attrs; + /* Get a copy of only the attrs we requested */ + for ( a=e->e_attrs; a; a=a->a_next ) { + if ( ad_inlist( a->a_desc, rs->sr_attrs )) { + *b = attr_alloc( a->a_desc ); + *(*b) = *a; + b = &((*b)->a_next); + } + } + *b = NULL; + slap_entry2mods( &ne, &modlist, &text, textbuf, textlen ); + syncrepl_diff_entry( op, ne.e_attrs, rs->sr_entry->e_attrs, + &mods, &modlist, 0 ); + be_entry_release_r( op, e ); + op->o_tag = LDAP_REQ_MODIFY; + op->orm_modlist = mods; + op->o_callback = &cb; + op->o_bd->be_modify( op, rs ); + slap_mods_free( mods, 1 ); + } + + /* Add DN to list */ + dn = dnl_alloc( op, &rs->sr_entry->e_nname ); + dn->next = NULL; + if ( ri->ri_tail ) { + ri->ri_tail->next = dn; + } else { + ri->ri_dns = dn; + } + ri->ri_tail = dn; + op->o_callback = ocb; + } + return 0; +} + +static int +refresh_purge( Operation *op, SlapReply *rs ) +{ + if ( rs->sr_type == REP_SEARCH ) { + refresh_info *ri = op->o_callback->sc_private; + dnlist **dn; + int del = 1; + + /* Did the entry exist on the remote? */ + for ( dn=&ri->ri_dns; *dn; dn = &(*dn)->next ) { + if ( dnmatch( &(*dn)->dn, &rs->sr_entry->e_nname )) { + dnlist *dnext = (*dn)->next; + op->o_tmpfree( *dn, op->o_tmpmemctx ); + *dn = dnext; + del = 0; + break; + } + } + /* No, so put it on the list to delete */ + if ( del ) { + dnlist *dnl = dnl_alloc( op, &rs->sr_entry->e_nname ); + dnl->next = ri->ri_dels; + ri->ri_dels = dnl; + } + } + return 0; +} + +static int +refresh_query( Operation *op, SlapReply *rs, CachedQuery *query, + slap_overinst *on ) +{ + slap_callback cb = { 0 }; + refresh_info ri = { 0 }; + char filter_str[ LDAP_LUTIL_UUIDSTR_BUFSIZE + STRLENOF( "(queryId=)" ) ]; + AttributeAssertion ava = ATTRIBUTEASSERTION_INIT; + Filter filter = {LDAP_FILTER_EQUALITY}; + dnlist *dn; + int i, rc; + + cb.sc_response = refresh_merge; + cb.sc_private = &ri; + + /* cache DB */ + ri.ri_be = op->o_bd; + ri.ri_q = query; + + op->o_tag = LDAP_REQ_SEARCH; + op->o_protocol = LDAP_VERSION3; + op->o_callback = &cb; + op->o_do_not_cache = 1; + + op->o_req_dn = query->qbase->base; + op->o_req_ndn = query->qbase->base; + op->ors_scope = query->scope; + op->ors_slimit = SLAP_NO_LIMIT; + op->ors_tlimit = SLAP_NO_LIMIT; + op->ors_filter = query->filter; + filter2bv_x( op, query->filter, &op->ors_filterstr ); + op->ors_attrs = query->qtemp->t_attrs.attrs; + op->ors_attrsonly = 0; + + op->o_bd = on->on_info->oi_origdb; + rc = op->o_bd->be_search( op, rs ); + if ( rc ) { + op->o_bd = ri.ri_be; + goto leave; + } + + /* Get the DNs of all entries matching this query */ + cb.sc_response = refresh_purge; + + op->o_bd = ri.ri_be; + op->o_req_dn = op->o_bd->be_suffix[0]; + op->o_req_ndn = op->o_bd->be_nsuffix[0]; + op->ors_scope = LDAP_SCOPE_SUBTREE; + op->ors_deref = LDAP_DEREF_NEVER; + op->ors_filterstr.bv_len = snprintf(filter_str, sizeof(filter_str), + "(%s=%s)", ad_queryId->ad_cname.bv_val, query->q_uuid.bv_val); + filter.f_ava = &ava; + filter.f_av_desc = ad_queryId; + filter.f_av_value = query->q_uuid; + op->ors_attrs = slap_anlist_no_attrs; + op->ors_attrsonly = 1; + rs->sr_entry = NULL; + rs->sr_nentries = 0; + rc = op->o_bd->be_search( op, rs ); + if ( rc ) goto leave; + + op->o_tag = LDAP_REQ_DELETE; + while (( dn = ri.ri_dels )) { + op->o_req_dn = dn->dn; + op->o_req_ndn = dn->dn; + op->o_bd->be_delete( op, rs ); + ri.ri_dels = dn->next; + op->o_tmpfree( dn, op->o_tmpmemctx ); + } + +leave: + /* reset our local heap, we're done with it */ + slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, op->o_threadctx, 1 ); + return rc; +} + static void* consistency_check( void *ctx, @@ -2720,27 +2920,14 @@ consistency_check( * expiration has been hit, then skip the refresh since * we're just going to discard the result anyway. */ - if ( query->refcnt ) { + if ( query->refcnt ) query->expiry_time = op->o_time + templ->ttl; - } else if ( query->expiry_time < op->o_time ) { - goto expire; + if ( query->expiry_time > op->o_time ) { + refresh_query( op, &rs, query, on ); + continue; } - /* Do the refresh */ - /* FIXME: and then a miracle occurs... - * the next step requires firing off a search request - * for a single entry and merging the result into any - * already existing entry in the DB. This is similar - * to what syncrepl already does, but using the wrong - * data structures. Need to see what refactoring can be - * done to share the code... - */ - query->refcnt = 0; - - - continue; } -expire: if (query->expiry_time < op->o_time) { int rem = 0; Debug( pcache_debug, "Lock CR index = %p\n", -- 2.39.5