]> git.sur5r.net Git - openldap/commitdiff
Fix concurrency issues
authorHoward Chu <hyc@openldap.org>
Thu, 25 May 2006 05:08:04 +0000 (05:08 +0000)
committerHoward Chu <hyc@openldap.org>
Thu, 25 May 2006 05:08:04 +0000 (05:08 +0000)
servers/slapd/overlays/pcache.c

index cb91d5ef610bbd6e79fa9de6778173786c370f46..f03d7860f8cdd98c6c7975834734ba66e33352a0 100644 (file)
@@ -49,6 +49,7 @@ struct query_template_s;
 typedef struct Qbase_s {
        Avlnode *scopes[4];             /* threaded AVL trees of cached queries */
        struct berval base;
+       int queries;
 } Qbase;
 
 /* struct representing a cached query */
@@ -106,7 +107,7 @@ struct query_manager_s;
  * 2) query addition, 3) cache replacement
  */
 typedef CachedQuery *  (QCfunc)(Operation *op, struct query_manager_s*, Query*, QueryTemplate*);
-typedef void   (AddQueryfunc)(struct query_manager_s*, Query*, QueryTemplate*, struct berval*);
+typedef CachedQuery *  (AddQueryfunc)(Operation *op, struct query_manager_s*, Query*, QueryTemplate*, int positive);
 typedef void   (CRfunc)(struct query_manager_s*, struct berval * );
 
 /* LDAP query cache */
@@ -233,6 +234,18 @@ static int pcache_dn_cmp( const void *v1, const void *v2 )
        return rc;
 }
 
+static int lex_bvcmp( struct berval *bv1, struct berval *bv2 )
+{
+       int len, dif;
+       dif = bv1->bv_len - bv2->bv_len;
+       len = bv1->bv_len;
+       if ( dif > 0 ) len -= dif;
+       len = memcmp( bv1->bv_val, bv2->bv_val, len );
+       if ( !len )
+               len = dif;
+       return len;
+}
+
 /* compare the first value in each filter */
 static int pcache_filter_cmp( const void *v1, const void *v2 )
 {
@@ -268,15 +281,14 @@ static int pcache_filter_cmp( const void *v1, const void *v2 )
                switch( weight1 ) {
                case 0: return 0;
                case 1:
-                       rc = ber_bvcmp( &q1->first->f_ava->aa_value,
-                               &q2->first->f_ava->aa_value );
+                       rc = lex_bvcmp( &q1->first->f_av_value, &q2->first->f_av_value );
                        break;
                case 2:
                        if ( q1->first->f_choice == LDAP_FILTER_SUBSTRINGS ) {
                                rc = 0;
                                if ( !BER_BVISNULL( &q1->first->f_sub_initial )) {
                                        if ( !BER_BVISNULL( &q2->first->f_sub_initial )) {
-                                               rc = ber_bvcmp( &q1->first->f_sub_initial,
+                                               rc = lex_bvcmp( &q1->first->f_sub_initial,
                                                        &q2->first->f_sub_initial );
                                        } else {
                                                rc = 1;
@@ -287,7 +299,7 @@ static int pcache_filter_cmp( const void *v1, const void *v2 )
                                if ( rc ) break;
                                if ( q1->first->f_sub_any ) {
                                        if ( q2->first->f_sub_any ) {
-                                               rc = ber_bvcmp( q1->first->f_sub_any,
+                                               rc = lex_bvcmp( q1->first->f_sub_any,
                                                        q2->first->f_sub_any );
                                        } else {
                                                rc = 1;
@@ -298,7 +310,7 @@ static int pcache_filter_cmp( const void *v1, const void *v2 )
                                if ( rc ) break;
                                if ( !BER_BVISNULL( &q1->first->f_sub_final )) {
                                        if ( !BER_BVISNULL( &q2->first->f_sub_final )) {
-                                               rc = ber_bvcmp( &q1->first->f_sub_final,
+                                               rc = lex_bvcmp( &q1->first->f_sub_final,
                                                        &q2->first->f_sub_final );
                                        } else {
                                                rc = 1;
@@ -307,7 +319,7 @@ static int pcache_filter_cmp( const void *v1, const void *v2 )
                                        rc = -1;
                                }
                        } else {
-                               rc = ber_bvcmp( &q1->first->f_mr_value,
+                               rc = lex_bvcmp( &q1->first->f_mr_value,
                                        &q2->first->f_mr_value );
                        }
                        break;
@@ -322,7 +334,6 @@ static void
 add_query_on_top (query_manager* qm, CachedQuery* qc)
 {
        CachedQuery* top = qm->lru_top;
-       Query* q = (Query*)qc;
 
        qm->lru_top = qc;
 
@@ -334,7 +345,7 @@ add_query_on_top (query_manager* qm, CachedQuery* qc)
        qc->lru_down = top;
        qc->lru_up = NULL;
        Debug( pcache_debug, "Base of added query = %s\n",
-                       q->base.bv_val, 0, 0 );
+                       qc->qbase->base.bv_val, 0, 0 );
 }
 
 /* remove_query from LRU list */
@@ -801,21 +812,23 @@ free_query (CachedQuery* qc)
 
 
 /* Add query to query cache */
-static void add_query(
+static CachedQuery * add_query(
+       Operation *op,
        query_manager* qm,
        Query* query,
        QueryTemplate *templ,
-       struct berval* uuid)
+       int positive)
 {
        CachedQuery* new_cached_query = (CachedQuery*) ch_malloc(sizeof(CachedQuery));
        Qbase *qbase, qb;
+       Filter *first;
+       int rc;
 
        new_cached_query->qtemp = templ;
-       if ( uuid ) {
-               new_cached_query->q_uuid = *uuid;
+       BER_BVZERO( &new_cached_query->q_uuid );
+       if ( positive ) {
                new_cached_query->expiry_time = slap_get_time() + templ->ttl;
        } else {
-               BER_BVZERO( &new_cached_query->q_uuid );
                new_cached_query->expiry_time = slap_get_time() + templ->negttl;
        }
        new_cached_query->lru_up = NULL;
@@ -825,7 +838,7 @@ static void add_query(
 
        new_cached_query->scope = query->scope;
        new_cached_query->filter = query->filter;
-       new_cached_query->first = filter_first( query->filter );
+       new_cached_query->first = first = filter_first( query->filter );
 
        qb.base = query->base;
 
@@ -842,17 +855,25 @@ static void add_query(
                qbase->base.bv_val[qbase->base.bv_len] = '\0';
                avl_insert( &templ->qbase, qbase, pcache_dn_cmp, avl_dup_error );
        }
-       if (templ->query == NULL)
-               templ->query_last = new_cached_query;
-       else
-               templ->query->prev = new_cached_query;
        new_cached_query->next = templ->query;
        new_cached_query->prev = NULL;
        new_cached_query->qbase = qbase;
-       tavl_insert( &qbase->scopes[query->scope], new_cached_query,
-               pcache_filter_cmp, avl_dup_ok );
-       templ->query = new_cached_query;
-       templ->no_of_queries++;
+       rc = tavl_insert( &qbase->scopes[query->scope], new_cached_query,
+               pcache_filter_cmp, avl_dup_error );
+       if ( rc == 0 ) {
+               qbase->queries++;
+               if (templ->query == NULL)
+                       templ->query_last = new_cached_query;
+               else
+                       templ->query->prev = new_cached_query;
+               templ->query = new_cached_query;
+               templ->no_of_queries++;
+       } else {
+               ch_free( new_cached_query );
+               new_cached_query = find_filter( op, qbase->scopes[query->scope],
+                                                       query->filter, first );
+               filter_free( query->filter );
+       }
        Debug( pcache_debug, "TEMPLATE %p QUERIES++ %d\n",
                        templ, templ->no_of_queries, 0 );
 
@@ -861,9 +882,12 @@ static void add_query(
        ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock);
 
        /* Adding on top of LRU list  */
-       ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
-       add_query_on_top(qm, new_cached_query);
-       ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
+       if ( rc == 0 ) {
+               ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
+               add_query_on_top(qm, new_cached_query);
+               ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
+       }
+       return rc == 0 ? new_cached_query : NULL;
 }
 
 static void
@@ -882,10 +906,8 @@ remove_from_template (CachedQuery* qc, QueryTemplate* template)
                qc->prev->next = qc->next;
        }
        tavl_delete( &qc->qbase->scopes[qc->scope], qc, pcache_filter_cmp );
-       if ( qc->qbase->scopes[0] == NULL &&
-               qc->qbase->scopes[1] == NULL &&
-               qc->qbase->scopes[2] == NULL &&
-               qc->qbase->scopes[3] == NULL ) {
+       qc->qbase->queries--;
+       if ( qc->qbase->queries == 0 ) {
                avl_delete( &template->qbase, qc->qbase, pcache_dn_cmp );
                ch_free( qc->qbase );
                qc->qbase = NULL;
@@ -1272,29 +1294,40 @@ pcache_response(
                }
 
        } else if ( rs->sr_type == REP_RESULT ) {
-               QueryTemplate* templ = si->qtemp;
-               if (( si->count && cache_entries( op, rs, &uuid ) == 0 ) ||
-                       ( templ->negttl && !si->count && !si->over &&
+               if ( si->count ||
+                       ( si->qtemp->negttl && !si->count && !si->over &&
                                rs->sr_err == LDAP_SUCCESS )) {
-                       qm->addfunc(qm, &si->query, si->qtemp,
-                               si->count ? &uuid : NULL);
-
-                       ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
-                       cm->num_cached_queries++;
-                       Debug( pcache_debug, "STORED QUERIES = %lu\n",
-                                       cm->num_cached_queries, 0, 0 );
-                       ldap_pvt_thread_mutex_unlock(&cm->cache_mutex);
+                       CachedQuery *qc = qm->addfunc(op, qm, &si->query, si->qtemp,
+                               si->count);
+
+                       if ( qc != NULL ) {
+                               if ( si->count )
+                                       cache_entries( op, rs, &qc->q_uuid );
+                               ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
+                               cm->num_cached_queries++;
+                               Debug( pcache_debug, "STORED QUERIES = %lu\n",
+                                               cm->num_cached_queries, 0, 0 );
+                               ldap_pvt_thread_mutex_unlock(&cm->cache_mutex);
 
-                       /* If the consistency checker suspended itself,
-                        * wake it back up
-                        */
-                       if ( cm->cc_paused ) {
-                               ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+                               /* If the consistency checker suspended itself,
+                                * wake it back up
+                                */
                                if ( cm->cc_paused ) {
-                                       cm->cc_paused = 0;
-                                       ldap_pvt_runqueue_resched( &slapd_rq, cm->cc_arg, 0 );
+                                       ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+                                       if ( cm->cc_paused ) {
+                                               cm->cc_paused = 0;
+                                               ldap_pvt_runqueue_resched( &slapd_rq, cm->cc_arg, 0 );
+                                       }
+                                       ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+                               }
+                       } else if ( si->count ) {
+                               /* duplicate query, free it */
+                               Entry *e;
+                               for (;si->head; si->head=e) {
+                                       e = si->head->e_private;
+                                       si->head->e_private = NULL;
+                                       entry_free(si->head);
                                }
-                               ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
                        }
                } else {
                        filter_free( si->query.filter );
@@ -1453,7 +1486,7 @@ pcache_op_search(
                                continue;
                        cacheable = 1;
                        qtemp = qt;
-                       Debug( LDAP_DEBUG_NONE, "Entering QC, querystr = %s\n",
+                       Debug( pcache_debug, "Entering QC, querystr = %s\n",
                                        op->ors_filterstr.bv_val, 0, 0 );
                        answerable = (*(qm->qcfunc))(op, qm, &query, qt);
 
@@ -1471,7 +1504,6 @@ pcache_op_search(
 
                Debug( pcache_debug, "QUERY ANSWERABLE\n", 0, 0, 0 );
                op->o_tmpfree( filter_attrs, op->o_tmpmemctx );
-               ldap_pvt_thread_rdwr_runlock(&qtemp->t_rwlock);
                if ( BER_BVISNULL( &answerable->q_uuid )) {
                        /* No entries cached, just an empty result set */
                        i = rs->sr_err = 0;
@@ -1481,6 +1513,7 @@ pcache_op_search(
                        op->o_callback = NULL;
                        i = cm->db.bd_info->bi_op_search( op, rs );
                }
+               ldap_pvt_thread_rdwr_runlock(&qtemp->t_rwlock);
                op->o_bd = save_bd;
                op->o_callback = save_cb;
                return i;
@@ -1605,7 +1638,7 @@ consistency_check(
        Operation *op;
 
        SlapReply rs = {REP_RESULT};
-       CachedQuery* query, *query_prev;
+       CachedQuery* query;
        int return_val, pause = 1;
        QueryTemplate* templ;
 
@@ -1623,15 +1656,23 @@ consistency_check(
                if ( query ) pause = 0;
                op->o_time = slap_get_time();
                while (query && (query->expiry_time < op->o_time)) {
+                       int rem = 0;
                        Debug( pcache_debug, "Lock CR index = %p\n",
                                        templ, 0, 0 );
                        ldap_pvt_thread_rdwr_wlock(&templ->t_rwlock);
-                       remove_from_template(query, templ);
-                       Debug( pcache_debug, "TEMPLATE %p QUERIES-- %d\n",
-                                       templ, templ->no_of_queries, 0 );
-                       Debug( pcache_debug, "Unlock CR index = %p\n",
-                                       templ, 0, 0 );
+                       if ( query == templ->query_last ) {
+                               rem = 1;
+                               remove_from_template(query, templ);
+                               Debug( pcache_debug, "TEMPLATE %p QUERIES-- %d\n",
+                                               templ, templ->no_of_queries, 0 );
+                               Debug( pcache_debug, "Unlock CR index = %p\n",
+                                               templ, 0, 0 );
+                       }
                        ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock);
+                       if ( !rem ) {
+                               query = templ->query_last;
+                               continue;
+                       }
                        ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
                        remove_query(qm, query);
                        ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
@@ -1651,9 +1692,8 @@ consistency_check(
                                "STALE QUERY REMOVED, CACHE ="
                                "%d entries\n",
                                cm->cur_entries, 0, 0 );
-                       query_prev = query;
-                       query = query->prev;
-                       free_query(query_prev);
+                       free_query(query);
+                       query = templ->query_last;
                }
        }
        ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
@@ -2043,8 +2083,6 @@ pc_cf_gen( ConfigArgs *c )
                                Debug( pcache_debug, "\t%s\n",
                                        attrarray[i].an_name.bv_val, 0, 0 );
                }
-               temp++; 
-               temp->querystr.bv_val = NULL;
                break;
        case PC_RESP:
                if ( strcasecmp( c->argv[1], "head" ) == 0 ) {