]> git.sur5r.net Git - openldap/commitdiff
ITS#4549, rewritten query_containment etc...
authorHoward Chu <hyc@openldap.org>
Fri, 19 May 2006 15:25:33 +0000 (15:25 +0000)
committerHoward Chu <hyc@openldap.org>
Fri, 19 May 2006 15:25:33 +0000 (15:25 +0000)
servers/slapd/overlays/pcache.c

index d61712cef276ac8741ccead4be544b1611b7905f..d144a36c410d4de7743ada55d4c8af5f8053cf3a 100644 (file)
@@ -31,6 +31,7 @@
 #include "slap.h"
 #include "lutil.h"
 #include "ldap_rq.h"
+#include "avl.h"
 
 #include "config.h"
 
@@ -45,9 +46,17 @@ typedef struct Query_s {
 
 struct query_template_s;
 
+typedef struct Qbase_s {
+       Avlnode *scopes[4];             /* threaded AVL trees of cached queries */
+       struct berval base;
+} Qbase;
+
 /* struct representing a cached query */
 typedef struct cached_query_s {
-       Query                           query;          /* LDAP query */
+       Filter                                  *filter;
+       Filter                                  *first;
+       Qbase                                   *qbase;
+       int                                             scope;
        struct berval                   q_uuid;         /* query identifier */
        struct query_template_s *qtemp; /* template of the query */
        time_t                          expiry_time;    /* time till the query is considered valid */
@@ -78,6 +87,7 @@ typedef struct query_template_s {
        struct query_template_s *qtnext;
        struct query_template_s *qmnext;
 
+       Avlnode*                qbase;
        CachedQuery*    query;          /* most recent query cached for the template */
        CachedQuery*    query_last;     /* oldest query cached for the template */
        ldap_pvt_thread_rdwr_t *t_rwlock; /* Rd/wr lock for accessing queries in the template */
@@ -212,44 +222,99 @@ merge_entry(
        return rc;
 }
 
-/* compare base and scope of incoming and cached queries */
-static int base_scope_compare(
-       struct berval* ndn_stored,
-       struct berval* ndn_incoming,
-       int scope_stored,
-       int scope_incoming      )
+/* Length-ordered sort on normalized DNs */
+static int pcache_dn_cmp( const void *v1, const void *v2 )
 {
-       struct berval pdn_incoming = BER_BVNULL;
-
-       if (scope_stored < scope_incoming)
-               return 0;
+       const Qbase *q1 = v1, *q2 = v2;
 
-       if ( !dnIsSuffix(ndn_incoming, ndn_stored))
-               return 0;
-
-       switch(scope_stored) {
-       case LDAP_SCOPE_BASE:
-               return (ndn_incoming->bv_len == ndn_stored->bv_len);
-
-       case LDAP_SCOPE_ONELEVEL:
-               switch(scope_incoming){
-               case LDAP_SCOPE_BASE:
-                       dnParent(ndn_incoming, &pdn_incoming);
-                       return (pdn_incoming.bv_len == ndn_stored->bv_len);
+       int rc = q1->base.bv_len - q2->base.bv_len;
+       if ( rc == 0 )
+               rc = strncmp( q1->base.bv_val, q2->base.bv_val, q1->base.bv_len );
+       return rc;
+}
 
-               case LDAP_SCOPE_ONELEVEL:
-                       return (ndn_incoming->bv_len == ndn_stored->bv_len);
+/* compare the first value in each filter */
+static int pcache_filter_cmp( const void *v1, const void *v2 )
+{
+       const CachedQuery *q1 = v1, *q2 =v2;
+       int rc, weight1, weight2;
 
-               default:
-                       return 0;
-               }
-       case LDAP_SCOPE_SUBTREE:
-               return 1;
+       switch( q1->first->f_choice ) {
+       case LDAP_FILTER_PRESENT:
+               weight1 = 0;
+               break;
+       case LDAP_FILTER_EQUALITY:
+       case LDAP_FILTER_GE:
+       case LDAP_FILTER_LE:
+               weight1 = 1;
                break;
        default:
-               return 0;
+               weight1 = 2;
+       }
+       switch( q2->first->f_choice ) {
+       case LDAP_FILTER_PRESENT:
+               weight2 = 0;
+               break;
+       case LDAP_FILTER_EQUALITY:
+       case LDAP_FILTER_GE:
+       case LDAP_FILTER_LE:
+               weight2 = 1;
                break;
-    }
+       default:
+               weight2 = 2;
+       }
+       rc = weight1 - weight2;
+       if ( !rc ) {
+               switch( weight1 ) {
+               case 0: return 0;
+               case 1:
+                       rc = ber_bvcmp( &q1->first->f_ava->aa_value,
+                               &q2->first->f_ava->aa_value );
+                       break;
+               case 2:
+                       if ( q1->first->f_choice == LDAP_FILTER_SUBSTRINGS ) {
+                               rc = 0;
+                               if ( !BER_BVISNULL( &q1->first->f_sub_initial )) {
+                                       if ( !BER_BVISNULL( &q2->first->f_sub_initial )) {
+                                               rc = ber_bvcmp( &q1->first->f_sub_initial,
+                                                       &q2->first->f_sub_initial );
+                                       } else {
+                                               rc = 1;
+                                       }
+                               } else if ( !BER_BVISNULL( &q2->first->f_sub_initial )) {
+                                       rc = -1;
+                               }
+                               if ( rc ) break;
+                               if ( q1->first->f_sub_any ) {
+                                       if ( q2->first->f_sub_any ) {
+                                               rc = ber_bvcmp( q1->first->f_sub_any,
+                                                       q2->first->f_sub_any );
+                                       } else {
+                                               rc = 1;
+                                       }
+                               } else if ( q2->first->f_sub_any ) {
+                                       rc = -1;
+                               }
+                               if ( rc ) break;
+                               if ( !BER_BVISNULL( &q1->first->f_sub_final )) {
+                                       if ( !BER_BVISNULL( &q2->first->f_sub_final )) {
+                                               rc = ber_bvcmp( &q1->first->f_sub_final,
+                                                       &q2->first->f_sub_final );
+                                       } else {
+                                               rc = 1;
+                                       }
+                               } else if ( !BER_BVISNULL( &q2->first->f_sub_final )) {
+                                       rc = -1;
+                               }
+                       } else {
+                               rc = ber_bvcmp( &q1->first->f_mr_value,
+                                       &q2->first->f_mr_value );
+                       }
+                       break;
+               }
+       }
+
+       return rc;
 }
 
 /* add query on top of LRU list */
@@ -474,6 +539,133 @@ final:
        return rc;
 }
 
+static Filter *
+filter_first( Filter *f )
+{
+       while ( f->f_choice == LDAP_FILTER_OR || f->f_choice == LDAP_FILTER_AND )
+               f = f->f_and;
+       return f;
+}
+
+
+static CachedQuery *
+find_filter( Operation *op, Avlnode *root, Filter *inputf, Filter *first )
+{
+       Filter* fs;
+       Filter* fi;
+       const char* text;
+       MatchingRule* mrule = NULL;
+       int res=0;
+       int ret, rc, dir;
+       Avlnode *ptr;
+       CachedQuery cq, *qc;
+
+       cq.filter = inputf;
+       cq.first = first;
+
+       /* substring matches sort to the end, and we just have to
+        * walk the entire list.
+        */
+       if ( first->f_choice == LDAP_FILTER_SUBSTRINGS ) {
+               ptr = tavl_end( root, 1 );
+               dir = TAVL_DIR_LEFT;
+       } else {
+               ptr = tavl_find3( root, &cq, pcache_filter_cmp, &ret );
+               dir = (first->f_choice == LDAP_FILTER_LE) ? TAVL_DIR_LEFT :
+                       TAVL_DIR_RIGHT;
+       }
+
+       while (ptr) {
+               qc = ptr->avl_data;
+               fi = inputf;
+               fs = qc->filter;
+               do {
+                       res=0;
+                       switch (fs->f_choice) {
+                       case LDAP_FILTER_EQUALITY:
+                               if (fi->f_choice == LDAP_FILTER_EQUALITY)
+                                       mrule = fs->f_ava->aa_desc->ad_type->sat_equality;
+                               else
+                                       ret = 1;
+                               break;
+                       case LDAP_FILTER_GE:
+                       case LDAP_FILTER_LE:
+                               mrule = fs->f_ava->aa_desc->ad_type->sat_ordering;
+                               break;
+                       default:
+                               mrule = NULL; 
+                       }
+                       if (mrule) {
+                               const char *text;
+                               rc = value_match(&ret, fs->f_ava->aa_desc, mrule,
+                                       SLAP_MR_VALUE_OF_ASSERTION_SYNTAX,
+                                       &(fi->f_ava->aa_value),
+                                       &(fs->f_ava->aa_value), &text);
+                               if (rc != LDAP_SUCCESS) {
+                                       return NULL;
+                               }
+                       }
+                       switch (fs->f_choice) {
+                       case LDAP_FILTER_OR:
+                       case LDAP_FILTER_AND:
+                               fs = fs->f_and;
+                               fi = fi->f_and;
+                               res=1;
+                               break;
+                       case LDAP_FILTER_SUBSTRINGS:
+                               /* check if the equality query can be
+                               * answered with cached substring query */
+                               if ((fi->f_choice == LDAP_FILTER_EQUALITY)
+                                       && substr_containment_equality( op,
+                                       fs, fi))
+                                       res=1;
+                               /* check if the substring query can be
+                               * answered with cached substring query */
+                               if ((fi->f_choice ==LDAP_FILTER_SUBSTRINGS
+                                       ) && substr_containment_substr( op,
+                                       fs, fi))
+                                       res= 1;
+                               fs=fs->f_next;
+                               fi=fi->f_next;
+                               break;
+                       case LDAP_FILTER_PRESENT:
+                               res=1;
+                               fs=fs->f_next;
+                               fi=fi->f_next;
+                               break;
+                       case LDAP_FILTER_EQUALITY:
+                               if (ret == 0)
+                                       res = 1;
+                               fs=fs->f_next;
+                               fi=fi->f_next;
+                               break;
+                       case LDAP_FILTER_GE:
+                               if (mrule && ret >= 0)
+                                       res = 1;
+                               fs=fs->f_next;
+                               fi=fi->f_next;
+                               break;
+                       case LDAP_FILTER_LE:
+                               if (mrule && ret <= 0)
+                                       res = 1;
+                               fs=fs->f_next;
+                               fi=fi->f_next;
+                               break;
+                       case LDAP_FILTER_NOT:
+                               res=0;
+                               break;
+                       default:
+                               break;
+                       }
+               } while((res) && (fi != NULL) && (fs != NULL));
+
+               if ( res )
+                       return qc;
+               ptr = tavl_next( ptr, dir );
+       }
+       return NULL;
+}
+
 /* check whether query is contained in any of
  * the cached queries in template
  */
@@ -483,120 +675,87 @@ query_containment(Operation *op, query_manager *qm,
                  QueryTemplate *templa)
 {
        CachedQuery* qc;
-       Query* q;
-       Filter* inputf = query->filter;
-       struct berval* base = &(query->base);
-       int scope = query->scope;
-       int res=0;
-       Filter* fs;
-       Filter* fi;
-       int ret, rc;
-       const char* text;
+       int depth = 0, tscope;
+       Qbase qbase, *qbptr = NULL;
+       struct berval pdn;
+
+       if (query->filter != NULL) {
+               Filter *first;
 
-       MatchingRule* mrule = NULL;
-       if (inputf != NULL) {
                Debug( pcache_debug, "Lock QC index = %p\n",
                                templa, 0, 0 );
+               qbase.base = query->base;
+
+               first = filter_first( query->filter );
+
                ldap_pvt_thread_rdwr_rlock(templa->t_rwlock);
-               for(qc=templa->query; qc != NULL; qc= qc->next) {
-                       q = (Query*)qc;
-                       if(base_scope_compare(&(q->base), base, q->scope, scope)) {
-                               fi = inputf;
-                               fs = q->filter;
-                               do {
-                                       res=0;
-                                       switch (fs->f_choice) {
-                                       case LDAP_FILTER_EQUALITY:
-                                               if (fi->f_choice == LDAP_FILTER_EQUALITY)
-                                                       mrule = fs->f_ava->aa_desc->ad_type->sat_equality;
-                                               else
-                                                       ret = 1;
-                                               break;
-                                       case LDAP_FILTER_GE:
-                                       case LDAP_FILTER_LE:
-                                               mrule = fs->f_ava->aa_desc->ad_type->sat_ordering;
-                                               break;
-                                       default:
-                                               mrule = NULL; 
-                                       }
-                                       if (mrule) {
-                                               rc = value_match(&ret, fs->f_ava->aa_desc, mrule,
-                                                       SLAP_MR_VALUE_OF_ASSERTION_SYNTAX,
-                                                       &(fi->f_ava->aa_value),
-                                                       &(fs->f_ava->aa_value), &text);
-                                               if (rc != LDAP_SUCCESS) {
-                                                       ldap_pvt_thread_rdwr_runlock(templa->t_rwlock);
-                                                       Debug( pcache_debug,
-                                                       "Unlock: Exiting QC index=%p\n",
-                                                       templa, 0, 0 );
-                                                       return NULL;
-                                               }
-                                       }
-                                       switch (fs->f_choice) {
-                                       case LDAP_FILTER_OR:
-                                       case LDAP_FILTER_AND:
-                                               fs = fs->f_and;
-                                               fi = fi->f_and;
-                                               res=1;
+               for( ;; ) {
+                       /* Find the base */
+                       qbptr = avl_find( templa->qbase, &qbase, pcache_dn_cmp );
+                       if ( qbptr ) {
+                               tscope = query->scope;
+                               /* Find a matching scope:
+                                * match at depth 0 OK
+                                * scope is BASE,
+                                *      one at depth 1 OK
+                                *  subord at depth > 0 OK
+                                *      subtree at any depth OK
+                                * scope is ONE,
+                                *  subtree or subord at any depth OK
+                                * scope is SUBORD,
+                                *  subtree or subord at any depth OK
+                                * scope is SUBTREE,
+                                *  subord at depth > 0 OK
+                                *  subtree at any depth OK
+                                */
+                               for ( tscope = 0 ; tscope <= LDAP_SCOPE_CHILDREN; tscope++ ) {
+                                       switch ( query->scope ) {
+                                       case LDAP_SCOPE_BASE:
+                                               if ( tscope == LDAP_SCOPE_BASE && depth ) continue;
+                                               if ( tscope == LDAP_SCOPE_ONE && depth != 1) continue;
+                                               if ( tscope == LDAP_SCOPE_CHILDREN && !depth ) continue;
                                                break;
-                                       case LDAP_FILTER_SUBSTRINGS:
-                                               /* check if the equality query can be
-                                               * answered with cached substring query */
-                                               if ((fi->f_choice == LDAP_FILTER_EQUALITY)
-                                                       && substr_containment_equality( op,
-                                                       fs, fi))
-                                                       res=1;
-                                               /* check if the substring query can be
-                                               * answered with cached substring query */
-                                               if ((fi->f_choice ==LDAP_FILTER_SUBSTRINGS
-                                                       ) && substr_containment_substr( op,
-                                                       fs, fi))
-                                                       res= 1;
-                                               fs=fs->f_next;
-                                               fi=fi->f_next;
+                                       case LDAP_SCOPE_ONE:
+                                               if ( tscope == LDAP_SCOPE_BASE )
+                                                       tscope = LDAP_SCOPE_ONE;
+                                               if ( tscope == LDAP_SCOPE_ONE && depth ) continue;
+                                               if ( !depth ) break;
+                                               if ( tscope < LDAP_SCOPE_SUBTREE )
+                                                       tscope = LDAP_SCOPE_SUBTREE;
                                                break;
-                                       case LDAP_FILTER_PRESENT:
-                                               res=1;
-                                               fs=fs->f_next;
-                                               fi=fi->f_next;
+                                       case LDAP_SCOPE_SUBTREE:
+                                               if ( tscope < LDAP_SCOPE_SUBTREE )
+                                                       tscope = LDAP_SCOPE_SUBTREE;
+                                               if ( tscope == LDAP_SCOPE_CHILDREN && !depth ) continue;
                                                break;
-                                       case LDAP_FILTER_EQUALITY:
-                                               if (ret == 0)
-                                                       res = 1;
-                                               fs=fs->f_next;
-                                               fi=fi->f_next;
-                                               break;
-                                       case LDAP_FILTER_GE:
-                                               if (mrule && ret >= 0)
-                                                       res = 1;
-                                               fs=fs->f_next;
-                                               fi=fi->f_next;
-                                               break;
-                                       case LDAP_FILTER_LE:
-                                               if (mrule && ret <= 0)
-                                                       res = 1;
-                                               fs=fs->f_next;
-                                               fi=fi->f_next;
-                                               break;
-                                       case LDAP_FILTER_NOT:
-                                               res=0;
-                                               break;
-                                       default:
+                                       case LDAP_SCOPE_CHILDREN:
+                                               if ( tscope < LDAP_SCOPE_SUBTREE )
+                                                       tscope = LDAP_SCOPE_SUBTREE;
                                                break;
                                        }
-                               } while((res) && (fi != NULL) && (fs != NULL));
-
-                               if(res) {
-                                       ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
-                                       if (qm->lru_top != qc) {
-                                               remove_query(qm, qc);
-                                               add_query_on_top(qm, qc);
+                                       if ( !qbptr->scopes[tscope] ) continue;
+
+                                       /* Find filter */
+                                       qc = find_filter( op, qbptr->scopes[tscope],
+                                                       query->filter, first );
+                                       if ( qc ) {
+                                               ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
+                                               if (qm->lru_top != qc) {
+                                                       remove_query(qm, qc);
+                                                       add_query_on_top(qm, qc);
+                                               }
+                                               ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
+                                               return qc;
                                        }
-                                       ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
-                                       return qc;
                                }
                        }
+                       if ( be_issuffix( op->o_bd, &qbase.base ))
+                               break;
+                       /* Up a level */
+                       dnParent( &qbase.base, &pdn );
+                       qbase.base = pdn;
                }
+
                Debug( pcache_debug,
                        "Not answerable: Unlock QC index=%p\n",
                        templa, 0, 0 );
@@ -608,11 +767,8 @@ query_containment(Operation *op, query_manager *qm,
 static void
 free_query (CachedQuery* qc)
 {
-       Query* q = (Query*)qc;
-
        free(qc->q_uuid.bv_val);
-       filter_free(q->filter);
-       free (q->base.bv_val);
+       filter_free(qc->filter);
        free(qc);
 }
 
@@ -625,7 +781,8 @@ static void add_query(
        struct berval* uuid)
 {
        CachedQuery* new_cached_query = (CachedQuery*) ch_malloc(sizeof(CachedQuery));
-       Query* new_query;
+       Qbase *qbase, qb;
+
        new_cached_query->qtemp = templ;
        if ( uuid ) {
                new_cached_query->q_uuid = *uuid;
@@ -638,22 +795,35 @@ static void add_query(
        new_cached_query->lru_down = NULL;
        Debug( pcache_debug, "Added query expires at %ld\n",
                        (long) new_cached_query->expiry_time, 0, 0 );
-       new_query = (Query*)new_cached_query;
 
-       ber_dupbv(&new_query->base, &query->base);
-       new_query->scope = query->scope;
-       new_query->filter = query->filter;
+       new_cached_query->scope = query->scope;
+       new_cached_query->filter = query->filter;
+       new_cached_query->first = filter_first( query->filter );
+
+       qb.base = query->base;
 
        /* Adding a query    */
        Debug( pcache_debug, "Lock AQ index = %p\n",
                        templ, 0, 0 );
        ldap_pvt_thread_rdwr_wlock(templ->t_rwlock);
+       qbase = avl_find( templ->qbase, &qb, pcache_dn_cmp );
+       if ( !qbase ) {
+               qbase = ch_calloc( 1, sizeof(Qbase) + qb.base.bv_len + 1 );
+               qbase->base.bv_len =qb.base.bv_len;
+               qbase->base.bv_val = (char *)(qbase+1);
+               memcpy( qbase->base.bv_val, qb.base.bv_val, qb.base.bv_len );
+               qbase->base.bv_val[qbase->base.bv_len] = '\0';
+               avl_insert( &templ->qbase, qbase, pcache_dn_cmp, avl_dup_error );
+       }
        if (templ->query == NULL)
                templ->query_last = new_cached_query;
        else
                templ->query->prev = new_cached_query;
        new_cached_query->next = templ->query;
        new_cached_query->prev = NULL;
+       new_cached_query->qbase = qbase;
+       tavl_insert( &qbase->scopes[query->scope], new_cached_query,
+               pcache_filter_cmp, avl_dup_ok );
        templ->query = new_cached_query;
        templ->no_of_queries++;
        Debug( pcache_debug, "TEMPLATE %p QUERIES++ %d\n",
@@ -684,6 +854,15 @@ remove_from_template (CachedQuery* qc, QueryTemplate* template)
                qc->next->prev = qc->prev;
                qc->prev->next = qc->next;
        }
+       tavl_delete( &qc->qbase->scopes[qc->scope], qc, pcache_filter_cmp );
+       if ( qc->qbase->scopes[0] == NULL &&
+               qc->qbase->scopes[1] == NULL &&
+               qc->qbase->scopes[2] == NULL &&
+               qc->qbase->scopes[3] == NULL ) {
+               avl_delete( &template->qbase, qc->qbase, pcache_dn_cmp );
+               ch_free( qc->qbase );
+               qc->qbase = NULL;
+       }
 
        template->no_of_queries--;
 }
@@ -1999,6 +2178,17 @@ pcache_db_open(
        return rc;
 }
 
+static void
+pcache_free_qbase( void *v )
+{
+       Qbase *qb = v;
+       int i;
+
+       for (i=0; i<3; i++)
+               tavl_free( qb->scopes[i], NULL );
+       ch_free( qb );
+}
+
 static int
 pcache_db_close(
        BackendDB *be
@@ -2024,6 +2214,7 @@ pcache_db_close(
                        qn = qc->next;
                        free_query( qc );
                }
+               avl_free( tm->qbase, pcache_free_qbase );
                free( tm->querystr.bv_val );
                ldap_pvt_thread_rdwr_destroy( tm->t_rwlock );
                ch_free( tm->t_rwlock );