From: Howard Chu Date: Fri, 19 May 2006 15:25:33 +0000 (+0000) Subject: ITS#4549, rewritten query_containment etc... X-Git-Tag: OPENLDAP_REL_ENG_2_4_3ALPHA~9^2~240 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=e6475734fab18021e0ebfc564c446ae1e15cec4f;p=openldap ITS#4549, rewritten query_containment etc... --- diff --git a/servers/slapd/overlays/pcache.c b/servers/slapd/overlays/pcache.c index d61712cef2..d144a36c41 100644 --- a/servers/slapd/overlays/pcache.c +++ b/servers/slapd/overlays/pcache.c @@ -31,6 +31,7 @@ #include "slap.h" #include "lutil.h" #include "ldap_rq.h" +#include "avl.h" #include "config.h" @@ -45,9 +46,17 @@ typedef struct Query_s { struct query_template_s; +typedef struct Qbase_s { + Avlnode *scopes[4]; /* threaded AVL trees of cached queries */ + struct berval base; +} Qbase; + /* struct representing a cached query */ typedef struct cached_query_s { - Query query; /* LDAP query */ + Filter *filter; + Filter *first; + Qbase *qbase; + int scope; struct berval q_uuid; /* query identifier */ struct query_template_s *qtemp; /* template of the query */ time_t expiry_time; /* time till the query is considered valid */ @@ -78,6 +87,7 @@ typedef struct query_template_s { struct query_template_s *qtnext; struct query_template_s *qmnext; + Avlnode* qbase; CachedQuery* query; /* most recent query cached for the template */ CachedQuery* query_last; /* oldest query cached for the template */ ldap_pvt_thread_rdwr_t *t_rwlock; /* Rd/wr lock for accessing queries in the template */ @@ -212,44 +222,99 @@ merge_entry( return rc; } -/* compare base and scope of incoming and cached queries */ -static int base_scope_compare( - struct berval* ndn_stored, - struct berval* ndn_incoming, - int scope_stored, - int scope_incoming ) +/* Length-ordered sort on normalized DNs */ +static int pcache_dn_cmp( const void *v1, const void *v2 ) { - struct berval pdn_incoming = BER_BVNULL; - - if (scope_stored < scope_incoming) - return 0; + const Qbase *q1 = v1, *q2 = v2; - if ( !dnIsSuffix(ndn_incoming, ndn_stored)) - return 0; - - switch(scope_stored) { - case LDAP_SCOPE_BASE: - return (ndn_incoming->bv_len == ndn_stored->bv_len); - - case LDAP_SCOPE_ONELEVEL: - switch(scope_incoming){ - case LDAP_SCOPE_BASE: - dnParent(ndn_incoming, &pdn_incoming); - return (pdn_incoming.bv_len == ndn_stored->bv_len); + int rc = q1->base.bv_len - q2->base.bv_len; + if ( rc == 0 ) + rc = strncmp( q1->base.bv_val, q2->base.bv_val, q1->base.bv_len ); + return rc; +} - case LDAP_SCOPE_ONELEVEL: - return (ndn_incoming->bv_len == ndn_stored->bv_len); +/* compare the first value in each filter */ +static int pcache_filter_cmp( const void *v1, const void *v2 ) +{ + const CachedQuery *q1 = v1, *q2 =v2; + int rc, weight1, weight2; - default: - return 0; - } - case LDAP_SCOPE_SUBTREE: - return 1; + switch( q1->first->f_choice ) { + case LDAP_FILTER_PRESENT: + weight1 = 0; + break; + case LDAP_FILTER_EQUALITY: + case LDAP_FILTER_GE: + case LDAP_FILTER_LE: + weight1 = 1; break; default: - return 0; + weight1 = 2; + } + switch( q2->first->f_choice ) { + case LDAP_FILTER_PRESENT: + weight2 = 0; + break; + case LDAP_FILTER_EQUALITY: + case LDAP_FILTER_GE: + case LDAP_FILTER_LE: + weight2 = 1; break; - } + default: + weight2 = 2; + } + rc = weight1 - weight2; + if ( !rc ) { + switch( weight1 ) { + case 0: return 0; + case 1: + rc = ber_bvcmp( &q1->first->f_ava->aa_value, + &q2->first->f_ava->aa_value ); + break; + case 2: + if ( q1->first->f_choice == LDAP_FILTER_SUBSTRINGS ) { + rc = 0; + if ( !BER_BVISNULL( &q1->first->f_sub_initial )) { + if ( !BER_BVISNULL( &q2->first->f_sub_initial )) { + rc = ber_bvcmp( &q1->first->f_sub_initial, + &q2->first->f_sub_initial ); + } else { + rc = 1; + } + } else if ( !BER_BVISNULL( &q2->first->f_sub_initial )) { + rc = -1; + } + if ( rc ) break; + if ( q1->first->f_sub_any ) { + if ( q2->first->f_sub_any ) { + rc = ber_bvcmp( q1->first->f_sub_any, + q2->first->f_sub_any ); + } else { + rc = 1; + } + } else if ( q2->first->f_sub_any ) { + rc = -1; + } + if ( rc ) break; + if ( !BER_BVISNULL( &q1->first->f_sub_final )) { + if ( !BER_BVISNULL( &q2->first->f_sub_final )) { + rc = ber_bvcmp( &q1->first->f_sub_final, + &q2->first->f_sub_final ); + } else { + rc = 1; + } + } else if ( !BER_BVISNULL( &q2->first->f_sub_final )) { + rc = -1; + } + } else { + rc = ber_bvcmp( &q1->first->f_mr_value, + &q2->first->f_mr_value ); + } + break; + } + } + + return rc; } /* add query on top of LRU list */ @@ -474,6 +539,133 @@ final: return rc; } +static Filter * +filter_first( Filter *f ) +{ + while ( f->f_choice == LDAP_FILTER_OR || f->f_choice == LDAP_FILTER_AND ) + f = f->f_and; + return f; +} + + +static CachedQuery * +find_filter( Operation *op, Avlnode *root, Filter *inputf, Filter *first ) +{ + Filter* fs; + Filter* fi; + const char* text; + MatchingRule* mrule = NULL; + int res=0; + int ret, rc, dir; + Avlnode *ptr; + CachedQuery cq, *qc; + + cq.filter = inputf; + cq.first = first; + + /* substring matches sort to the end, and we just have to + * walk the entire list. + */ + if ( first->f_choice == LDAP_FILTER_SUBSTRINGS ) { + ptr = tavl_end( root, 1 ); + dir = TAVL_DIR_LEFT; + } else { + ptr = tavl_find3( root, &cq, pcache_filter_cmp, &ret ); + dir = (first->f_choice == LDAP_FILTER_LE) ? TAVL_DIR_LEFT : + TAVL_DIR_RIGHT; + } + + while (ptr) { + qc = ptr->avl_data; + fi = inputf; + fs = qc->filter; + do { + res=0; + switch (fs->f_choice) { + case LDAP_FILTER_EQUALITY: + if (fi->f_choice == LDAP_FILTER_EQUALITY) + mrule = fs->f_ava->aa_desc->ad_type->sat_equality; + else + ret = 1; + break; + case LDAP_FILTER_GE: + case LDAP_FILTER_LE: + mrule = fs->f_ava->aa_desc->ad_type->sat_ordering; + break; + default: + mrule = NULL; + } + if (mrule) { + const char *text; + rc = value_match(&ret, fs->f_ava->aa_desc, mrule, + SLAP_MR_VALUE_OF_ASSERTION_SYNTAX, + &(fi->f_ava->aa_value), + &(fs->f_ava->aa_value), &text); + if (rc != LDAP_SUCCESS) { + return NULL; + } + } + switch (fs->f_choice) { + case LDAP_FILTER_OR: + case LDAP_FILTER_AND: + fs = fs->f_and; + fi = fi->f_and; + res=1; + break; + case LDAP_FILTER_SUBSTRINGS: + /* check if the equality query can be + * answered with cached substring query */ + if ((fi->f_choice == LDAP_FILTER_EQUALITY) + && substr_containment_equality( op, + fs, fi)) + res=1; + /* check if the substring query can be + * answered with cached substring query */ + if ((fi->f_choice ==LDAP_FILTER_SUBSTRINGS + ) && substr_containment_substr( op, + fs, fi)) + res= 1; + fs=fs->f_next; + fi=fi->f_next; + break; + case LDAP_FILTER_PRESENT: + res=1; + fs=fs->f_next; + fi=fi->f_next; + break; + case LDAP_FILTER_EQUALITY: + if (ret == 0) + res = 1; + fs=fs->f_next; + fi=fi->f_next; + break; + case LDAP_FILTER_GE: + if (mrule && ret >= 0) + res = 1; + fs=fs->f_next; + fi=fi->f_next; + break; + case LDAP_FILTER_LE: + if (mrule && ret <= 0) + res = 1; + fs=fs->f_next; + fi=fi->f_next; + break; + case LDAP_FILTER_NOT: + res=0; + break; + default: + break; + } + } while((res) && (fi != NULL) && (fs != NULL)); + + if ( res ) + return qc; + ptr = tavl_next( ptr, dir ); + } + return NULL; +} + /* check whether query is contained in any of * the cached queries in template */ @@ -483,120 +675,87 @@ query_containment(Operation *op, query_manager *qm, QueryTemplate *templa) { CachedQuery* qc; - Query* q; - Filter* inputf = query->filter; - struct berval* base = &(query->base); - int scope = query->scope; - int res=0; - Filter* fs; - Filter* fi; - int ret, rc; - const char* text; + int depth = 0, tscope; + Qbase qbase, *qbptr = NULL; + struct berval pdn; + + if (query->filter != NULL) { + Filter *first; - MatchingRule* mrule = NULL; - if (inputf != NULL) { Debug( pcache_debug, "Lock QC index = %p\n", templa, 0, 0 ); + qbase.base = query->base; + + first = filter_first( query->filter ); + ldap_pvt_thread_rdwr_rlock(templa->t_rwlock); - for(qc=templa->query; qc != NULL; qc= qc->next) { - q = (Query*)qc; - if(base_scope_compare(&(q->base), base, q->scope, scope)) { - fi = inputf; - fs = q->filter; - do { - res=0; - switch (fs->f_choice) { - case LDAP_FILTER_EQUALITY: - if (fi->f_choice == LDAP_FILTER_EQUALITY) - mrule = fs->f_ava->aa_desc->ad_type->sat_equality; - else - ret = 1; - break; - case LDAP_FILTER_GE: - case LDAP_FILTER_LE: - mrule = fs->f_ava->aa_desc->ad_type->sat_ordering; - break; - default: - mrule = NULL; - } - if (mrule) { - rc = value_match(&ret, fs->f_ava->aa_desc, mrule, - SLAP_MR_VALUE_OF_ASSERTION_SYNTAX, - &(fi->f_ava->aa_value), - &(fs->f_ava->aa_value), &text); - if (rc != LDAP_SUCCESS) { - ldap_pvt_thread_rdwr_runlock(templa->t_rwlock); - Debug( pcache_debug, - "Unlock: Exiting QC index=%p\n", - templa, 0, 0 ); - return NULL; - } - } - switch (fs->f_choice) { - case LDAP_FILTER_OR: - case LDAP_FILTER_AND: - fs = fs->f_and; - fi = fi->f_and; - res=1; + for( ;; ) { + /* Find the base */ + qbptr = avl_find( templa->qbase, &qbase, pcache_dn_cmp ); + if ( qbptr ) { + tscope = query->scope; + /* Find a matching scope: + * match at depth 0 OK + * scope is BASE, + * one at depth 1 OK + * subord at depth > 0 OK + * subtree at any depth OK + * scope is ONE, + * subtree or subord at any depth OK + * scope is SUBORD, + * subtree or subord at any depth OK + * scope is SUBTREE, + * subord at depth > 0 OK + * subtree at any depth OK + */ + for ( tscope = 0 ; tscope <= LDAP_SCOPE_CHILDREN; tscope++ ) { + switch ( query->scope ) { + case LDAP_SCOPE_BASE: + if ( tscope == LDAP_SCOPE_BASE && depth ) continue; + if ( tscope == LDAP_SCOPE_ONE && depth != 1) continue; + if ( tscope == LDAP_SCOPE_CHILDREN && !depth ) continue; break; - case LDAP_FILTER_SUBSTRINGS: - /* check if the equality query can be - * answered with cached substring query */ - if ((fi->f_choice == LDAP_FILTER_EQUALITY) - && substr_containment_equality( op, - fs, fi)) - res=1; - /* check if the substring query can be - * answered with cached substring query */ - if ((fi->f_choice ==LDAP_FILTER_SUBSTRINGS - ) && substr_containment_substr( op, - fs, fi)) - res= 1; - fs=fs->f_next; - fi=fi->f_next; + case LDAP_SCOPE_ONE: + if ( tscope == LDAP_SCOPE_BASE ) + tscope = LDAP_SCOPE_ONE; + if ( tscope == LDAP_SCOPE_ONE && depth ) continue; + if ( !depth ) break; + if ( tscope < LDAP_SCOPE_SUBTREE ) + tscope = LDAP_SCOPE_SUBTREE; break; - case LDAP_FILTER_PRESENT: - res=1; - fs=fs->f_next; - fi=fi->f_next; + case LDAP_SCOPE_SUBTREE: + if ( tscope < LDAP_SCOPE_SUBTREE ) + tscope = LDAP_SCOPE_SUBTREE; + if ( tscope == LDAP_SCOPE_CHILDREN && !depth ) continue; break; - case LDAP_FILTER_EQUALITY: - if (ret == 0) - res = 1; - fs=fs->f_next; - fi=fi->f_next; - break; - case LDAP_FILTER_GE: - if (mrule && ret >= 0) - res = 1; - fs=fs->f_next; - fi=fi->f_next; - break; - case LDAP_FILTER_LE: - if (mrule && ret <= 0) - res = 1; - fs=fs->f_next; - fi=fi->f_next; - break; - case LDAP_FILTER_NOT: - res=0; - break; - default: + case LDAP_SCOPE_CHILDREN: + if ( tscope < LDAP_SCOPE_SUBTREE ) + tscope = LDAP_SCOPE_SUBTREE; break; } - } while((res) && (fi != NULL) && (fs != NULL)); - - if(res) { - ldap_pvt_thread_mutex_lock(&qm->lru_mutex); - if (qm->lru_top != qc) { - remove_query(qm, qc); - add_query_on_top(qm, qc); + if ( !qbptr->scopes[tscope] ) continue; + + /* Find filter */ + qc = find_filter( op, qbptr->scopes[tscope], + query->filter, first ); + if ( qc ) { + ldap_pvt_thread_mutex_lock(&qm->lru_mutex); + if (qm->lru_top != qc) { + remove_query(qm, qc); + add_query_on_top(qm, qc); + } + ldap_pvt_thread_mutex_unlock(&qm->lru_mutex); + return qc; } - ldap_pvt_thread_mutex_unlock(&qm->lru_mutex); - return qc; } } + if ( be_issuffix( op->o_bd, &qbase.base )) + break; + /* Up a level */ + dnParent( &qbase.base, &pdn ); + qbase.base = pdn; } + Debug( pcache_debug, "Not answerable: Unlock QC index=%p\n", templa, 0, 0 ); @@ -608,11 +767,8 @@ query_containment(Operation *op, query_manager *qm, static void free_query (CachedQuery* qc) { - Query* q = (Query*)qc; - free(qc->q_uuid.bv_val); - filter_free(q->filter); - free (q->base.bv_val); + filter_free(qc->filter); free(qc); } @@ -625,7 +781,8 @@ static void add_query( struct berval* uuid) { CachedQuery* new_cached_query = (CachedQuery*) ch_malloc(sizeof(CachedQuery)); - Query* new_query; + Qbase *qbase, qb; + new_cached_query->qtemp = templ; if ( uuid ) { new_cached_query->q_uuid = *uuid; @@ -638,22 +795,35 @@ static void add_query( new_cached_query->lru_down = NULL; Debug( pcache_debug, "Added query expires at %ld\n", (long) new_cached_query->expiry_time, 0, 0 ); - new_query = (Query*)new_cached_query; - ber_dupbv(&new_query->base, &query->base); - new_query->scope = query->scope; - new_query->filter = query->filter; + new_cached_query->scope = query->scope; + new_cached_query->filter = query->filter; + new_cached_query->first = filter_first( query->filter ); + + qb.base = query->base; /* Adding a query */ Debug( pcache_debug, "Lock AQ index = %p\n", templ, 0, 0 ); ldap_pvt_thread_rdwr_wlock(templ->t_rwlock); + qbase = avl_find( templ->qbase, &qb, pcache_dn_cmp ); + if ( !qbase ) { + qbase = ch_calloc( 1, sizeof(Qbase) + qb.base.bv_len + 1 ); + qbase->base.bv_len =qb.base.bv_len; + qbase->base.bv_val = (char *)(qbase+1); + memcpy( qbase->base.bv_val, qb.base.bv_val, qb.base.bv_len ); + qbase->base.bv_val[qbase->base.bv_len] = '\0'; + avl_insert( &templ->qbase, qbase, pcache_dn_cmp, avl_dup_error ); + } if (templ->query == NULL) templ->query_last = new_cached_query; else templ->query->prev = new_cached_query; new_cached_query->next = templ->query; new_cached_query->prev = NULL; + new_cached_query->qbase = qbase; + tavl_insert( &qbase->scopes[query->scope], new_cached_query, + pcache_filter_cmp, avl_dup_ok ); templ->query = new_cached_query; templ->no_of_queries++; Debug( pcache_debug, "TEMPLATE %p QUERIES++ %d\n", @@ -684,6 +854,15 @@ remove_from_template (CachedQuery* qc, QueryTemplate* template) qc->next->prev = qc->prev; qc->prev->next = qc->next; } + tavl_delete( &qc->qbase->scopes[qc->scope], qc, pcache_filter_cmp ); + if ( qc->qbase->scopes[0] == NULL && + qc->qbase->scopes[1] == NULL && + qc->qbase->scopes[2] == NULL && + qc->qbase->scopes[3] == NULL ) { + avl_delete( &template->qbase, qc->qbase, pcache_dn_cmp ); + ch_free( qc->qbase ); + qc->qbase = NULL; + } template->no_of_queries--; } @@ -1999,6 +2178,17 @@ pcache_db_open( return rc; } +static void +pcache_free_qbase( void *v ) +{ + Qbase *qb = v; + int i; + + for (i=0; i<3; i++) + tavl_free( qb->scopes[i], NULL ); + ch_free( qb ); +} + static int pcache_db_close( BackendDB *be @@ -2024,6 +2214,7 @@ pcache_db_close( qn = qc->next; free_query( qc ); } + avl_free( tm->qbase, pcache_free_qbase ); free( tm->querystr.bv_val ); ldap_pvt_thread_rdwr_destroy( tm->t_rwlock ); ch_free( tm->t_rwlock );