]> git.sur5r.net Git - openldap/blobdiff - servers/slapd/overlays/sssvlv.c
Partial revert of f30269f5d2e4bb5ee7486fe6542078d1b59dba6d
[openldap] / servers / slapd / overlays / sssvlv.c
index 5c82b91212aef88870978452e6c317747de7e90e..543d00f2c180b2354878121e1c73fa572d46342a 100644 (file)
@@ -2,7 +2,7 @@
 /* $OpenLDAP$ */
 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
  *
- * Copyright 2009 The OpenLDAP Foundation.
+ * Copyright 2009-2012 The OpenLDAP Foundation.
  * Portions copyright 2009 Symas Corporation.
  * All rights reserved.
  *
@@ -16,7 +16,8 @@
  */
 /* ACKNOWLEDGEMENTS:
  * This work was initially developed by Howard Chu for inclusion in
- * OpenLDAP Software.
+ * OpenLDAP Software. Support for multiple sorts per connection added
+ * by Raphael Ouazana.
  */
 
 #include "portable.h"
 #define SAFESTR(macro_str, macro_def) ((macro_str) ? (macro_str) : (macro_def))
 
 #define SSSVLV_DEFAULT_MAX_KEYS        5
+#define SSSVLV_DEFAULT_MAX_REQUEST_PER_CONN 5
+
+#define NO_PS_COOKIE (PagedResultsCookie) -1
+#define NO_VC_CONTEXT (unsigned long) -1
 
 typedef struct vlv_ctrl {
        int vc_before;
@@ -85,6 +90,7 @@ typedef struct sort_ctrl {
 typedef struct sort_node
 {
        int sn_conn;
+       int sn_session;
        struct berval sn_dn;
        struct berval *sn_vals;
 } sort_node;
@@ -94,6 +100,7 @@ typedef struct sssvlv_info
        int svi_max;    /* max concurrent sorts */
        int svi_num;    /* current # sorts */
        int svi_max_keys;       /* max sort keys per request */
+       int svi_max_percon; /* max concurrent sorts per con */
 } sssvlv_info;
 
 typedef struct sort_op
@@ -107,12 +114,15 @@ typedef struct sort_op
        int so_vlv;
        int so_vlv_rc;
        int so_vlv_target;
+       int so_session;
        unsigned long so_vcontext;
 } sort_op;
 
 /* There is only one conn table for all overlay instances */
-static sort_op **sort_conns;
+/* Each conn can handle one session by context */
+static sort_op ***sort_conns;
 static ldap_pvt_thread_mutex_t sort_conns_mutex;
+static int ov_count;
 static const char *debug_header = "sssvlv";
 
 static int sss_cid;
@@ -130,7 +140,8 @@ static struct berval* select_value(
 {
        struct berval* ber1, *ber2;
        MatchingRule *mr = key->sk_ordering;
-       int i, cmp;
+       unsigned i;
+       int cmp;
 
        ber1 = &(attr->a_nvals[0]);
        ber2 = ber1+1;
@@ -153,9 +164,13 @@ static int node_cmp( const void* val1, const void* val2 )
 {
        sort_node *sn1 = (sort_node *)val1;
        sort_node *sn2 = (sort_node *)val2;
-       sort_ctrl *sc = sort_conns[sn1->sn_conn]->so_ctrl;
+       sort_ctrl *sc;
        MatchingRule *mr;
        int i, cmp = 0;
+       assert( sort_conns[sn1->sn_conn]
+               && sort_conns[sn1->sn_conn][sn1->sn_session]
+               && sort_conns[sn1->sn_conn][sn1->sn_session]->so_ctrl );
+       sc = sort_conns[sn1->sn_conn][sn1->sn_session]->so_ctrl;
 
        for ( i=0; cmp == 0 && i<sc->sc_nkeys; i++ ) {
                if ( BER_BVISNULL( &sn1->sn_vals[i] )) {
@@ -197,7 +212,7 @@ static int pack_vlv_response_control(
        ber_init2( ber, NULL, LBER_USE_DER );
        ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx );
 
-       rc = ber_printf( ber, "{iii", so->so_vlv_target, so->so_nentries,
+       rc = ber_printf( ber, "{iie", so->so_vlv_target, so->so_nentries,
                so->so_vlv_rc );
 
        if ( rc != -1 && so->so_vcontext ) {
@@ -326,21 +341,89 @@ static int pack_sss_response_control(
        return rs->sr_err;
 }
 
+/* Return the session id or -1 if unknown */
+static int find_session_by_so(
+       int svi_max_percon,
+       int conn_id,
+       sort_op *so )
+{
+       int sess_id;
+       if (so == NULL) {
+               return -1;
+       }
+       for (sess_id = 0; sess_id < svi_max_percon; sess_id++) {
+               if ( sort_conns[conn_id] && sort_conns[conn_id][sess_id] == so )
+                       return sess_id;
+       }
+       return -1;
+}
+
+/* Return the session id or -1 if unknown */
+static int find_session_by_context(
+       int svi_max_percon,
+       int conn_id,
+       unsigned long vc_context,
+       PagedResultsCookie ps_cookie )
+{
+       int sess_id;
+       for(sess_id = 0; sess_id < svi_max_percon; sess_id++) {
+               if( sort_conns[conn_id] && sort_conns[conn_id][sess_id] &&
+                   ( sort_conns[conn_id][sess_id]->so_vcontext == vc_context || 
+                      (PagedResultsCookie) sort_conns[conn_id][sess_id]->so_tree == ps_cookie ) )
+                       return sess_id;
+       }
+       return -1;
+}
+
+static int find_next_session(
+       int svi_max_percon,
+       int conn_id )
+{
+       int sess_id;
+       assert(sort_conns[conn_id] != NULL);
+       for(sess_id = 0; sess_id < svi_max_percon; sess_id++) {
+               if(!sort_conns[conn_id][sess_id]) {
+                       return sess_id;
+               }
+       }
+       if (sess_id >= svi_max_percon) {
+               return -1;
+       } else {
+               return sess_id;
+       }
+}
+       
 static void free_sort_op( Connection *conn, sort_op *so )
 {
+       int sess_id;
        if ( so->so_tree ) {
                tavl_free( so->so_tree, ch_free );
                so->so_tree = NULL;
        }
 
        ldap_pvt_thread_mutex_lock( &sort_conns_mutex );
-       sort_conns[conn->c_conn_idx] = NULL;
+       sess_id = find_session_by_so( so->so_info->svi_max_percon, conn->c_conn_idx, so );
+       sort_conns[conn->c_conn_idx][sess_id] = NULL;
        so->so_info->svi_num--;
        ldap_pvt_thread_mutex_unlock( &sort_conns_mutex );
 
        ch_free( so );
 }
 
+static void free_sort_ops( Connection *conn, sort_op **sos, int svi_max_percon )
+{
+       int sess_id;
+       sort_op *so;
+
+       for( sess_id = 0; sess_id < svi_max_percon ; sess_id++ ) {
+               so = sort_conns[conn->c_conn_idx][sess_id];
+               if ( so ) {
+                       free_sort_op( conn, so );
+                       sort_conns[conn->c_conn_idx][sess_id] = NULL;
+               }
+       }
+}
+       
 static void send_list(
        Operation               *op,
        SlapReply               *rs,
@@ -353,6 +436,8 @@ static void send_list(
        Entry *e;
        LDAPControl *ctrls[2];
 
+       rs->sr_attrs = op->ors_attrs;
+
        /* FIXME: it may be better to just flatten the tree into
         * an array before doing all of this...
         */
@@ -425,14 +510,17 @@ range_err:
                        sc->sc_nkeys * sizeof(struct berval), op->o_tmpmemctx );
                sn->sn_vals = (struct berval *)(sn+1);
                sn->sn_conn = op->o_conn->c_conn_idx;
+               sn->sn_session = find_session_by_so( so->so_info->svi_max_percon, op->o_conn->c_conn_idx, so );
                sn->sn_vals[0] = bv;
                for (i=1; i<sc->sc_nkeys; i++) {
                        BER_BVZERO( &sn->sn_vals[i] );
                }
                cur_node = tavl_find3( so->so_tree, sn, node_cmp, &j );
                /* didn't find >= match */
-               if ( j > 0 )
-                       cur_node = NULL;
+               if ( j > 0 ) {
+                       if ( cur_node )
+                               cur_node = tavl_next( cur_node, TAVL_DIR_RIGHT );
+               }
                op->o_tmpfree( sn, op->o_tmpmemctx );
 
                if ( !cur_node ) {
@@ -450,7 +538,7 @@ range_err:
                        }
                        for (i=0; tmp_node != cur_node;
                                tmp_node = tavl_next( tmp_node, dir ), i++);
-                       so->so_vlv_target = i;
+                       so->so_vlv_target = (dir == TAVL_DIR_RIGHT) ? i+1 : so->so_nentries - i;
                }
                if ( bv.bv_val != vc->vc_value.bv_val )
                        op->o_tmpfree( bv.bv_val, op->o_tmpmemctx );
@@ -470,7 +558,9 @@ range_err:
        be = op->o_bd;
        for ( i=0; i<j; i++ ) {
                sort_node *sn = cur_node->avl_data;
-               
+
+               if ( slapd_shutdown ) break;
+
                op->o_bd = select_backend( &sn->sn_dn, 0 );
                e = NULL;
                rc = be_entry_get_rw( op, &sn->sn_dn, NULL, NULL, 0, &e );
@@ -495,13 +585,16 @@ static void send_page( Operation *op, SlapReply *rs, sort_op *so )
        Avlnode         *cur_node               = so->so_tree;
        Avlnode         *next_node              = NULL;
        BackendDB *be = op->o_bd;
-       sort_node *sn;
        Entry *e;
        int rc;
 
+       rs->sr_attrs = op->ors_attrs;
+
        while ( cur_node && rs->sr_nentries < so->so_page_size ) {
                sort_node *sn = cur_node->avl_data;
 
+               if ( slapd_shutdown ) break;
+
                next_node = tavl_next( cur_node, TAVL_DIR_RIGHT );
 
                op->o_bd = select_backend( &sn->sn_dn, 0 );
@@ -660,6 +753,7 @@ static int sssvlv_op_response(
                op->o_tmpfree( sn, op->o_tmpmemctx );
                sn = sn2;
                sn->sn_conn = op->o_conn->c_conn_idx;
+               sn->sn_session = find_session_by_so( so->so_info->svi_max_percon, op->o_conn->c_conn_idx, so );
 
                /* Insert into the AVL tree */
                tavl_insert(&(so->so_tree), sn, node_insert, avl_dup_error);
@@ -695,10 +789,11 @@ static int sssvlv_op_search(
        sssvlv_info                             *si                     = on->on_bi.bi_private;
        int                                             rc                      = SLAP_CB_CONTINUE;
        int     ok;
-       sort_op *so, so2;
+       sort_op *so = NULL, so2;
        sort_ctrl *sc;
        PagedResultsState *ps;
        vlv_ctrl *vc;
+       int sess_id;
 
        if ( op->o_ctrlflag[sss_cid] <= SLAP_CONTROL_IGNORED ) {
                if ( op->o_ctrlflag[vlv_cid] > SLAP_CONTROL_IGNORED ) {
@@ -707,6 +802,7 @@ static int sssvlv_op_search(
                        so2.so_vlv_target = 0;
                        so2.so_nentries = 0;
                        so2.so_vlv_rc = LDAP_VLV_SSS_MISSING;
+                       so2.so_vlv = op->o_ctrlflag[vlv_cid];
                        rc = pack_vlv_response_control( op, rs, &so2, ctrls );
                        if ( rc == LDAP_SUCCESS ) {
                                ctrls[1] = NULL;
@@ -745,9 +841,10 @@ static int sssvlv_op_search(
 
        ok = 1;
        ldap_pvt_thread_mutex_lock( &sort_conns_mutex );
-       so = sort_conns[op->o_conn->c_conn_idx];
        /* Is there already a sort running on this conn? */
-       if ( so ) {
+       sess_id = find_session_by_context( si->svi_max_percon, op->o_conn->c_conn_idx, vc ? vc->vc_context : NO_VC_CONTEXT, ps ? ps->ps_cookie : NO_PS_COOKIE );
+       if ( sess_id >= 0 ) {
+               so = sort_conns[op->o_conn->c_conn_idx][sess_id];
                /* Is it a continuation of a VLV search? */
                if ( !vc || so->so_vlv <= SLAP_CONTROL_IGNORED ||
                        vc->vc_context != so->so_vcontext ) {
@@ -772,21 +869,27 @@ static int sssvlv_op_search(
        /* Are there too many running overall? */
        } else if ( si->svi_num >= si->svi_max ) {
                ok = 0;
+       } else if ( ( sess_id = find_next_session(si->svi_max_percon, op->o_conn->c_conn_idx ) ) < 0 ) {
+               ok = 0;
        } else {
                /* OK, this connection now has a sort running */
                si->svi_num++;
-               sort_conns[op->o_conn->c_conn_idx] = &so2;
+               sort_conns[op->o_conn->c_conn_idx][sess_id] = &so2;
+               sort_conns[op->o_conn->c_conn_idx][sess_id]->so_session = sess_id;
        }
        ldap_pvt_thread_mutex_unlock( &sort_conns_mutex );
        if ( ok ) {
+               /* If we're a global overlay, this check got bypassed */
+               if ( !op->ors_limit && limits_check( op, rs ))
+                       return rs->sr_err;
                /* are we continuing a VLV search? */
-               if ( vc && vc->vc_context ) {
+               if ( so && vc && vc->vc_context ) {
                        so->so_ctrl = sc;
                        send_list( op, rs, so );
                        send_result( op, rs, so );
                        rc = LDAP_SUCCESS;
                /* are we continuing a paged search? */
-               } else if ( ps && ps->ps_cookie ) {
+               } else if ( so && ps && ps->ps_cookie ) {
                        so->so_ctrl = sc;
                        send_page( op, rs, so );
                        send_result( op, rs, so );
@@ -796,11 +899,11 @@ static int sssvlv_op_search(
                                op->o_tmpmemctx );
                        /* Install serversort response callback to handle a new search */
                        if ( ps || vc ) {
-                               so = ch_malloc( sizeof(sort_op));
+                               so = ch_calloc( 1, sizeof(sort_op));
                        } else {
-                               so = op->o_tmpalloc( sizeof(sort_op), op->o_tmpmemctx );
+                               so = op->o_tmpcalloc( 1, sizeof(sort_op), op->o_tmpmemctx );
                        }
-                       sort_conns[op->o_conn->c_conn_idx] = so;
+                       sort_conns[op->o_conn->c_conn_idx][sess_id] = so;
 
                        cb->sc_cleanup          = NULL;
                        cb->sc_response         = sssvlv_op_response;
@@ -821,8 +924,12 @@ static int sssvlv_op_search(
                                        so->so_vlv = op->o_ctrlflag[vlv_cid];
                                        so->so_vlv_target = 0;
                                        so->so_vlv_rc = 0;
+                               } else {
+                                       so->so_vlv = SLAP_CONTROL_NONE;
                                }
                        }
+                       so->so_session = sess_id;
+                       so->so_vlv = op->o_ctrlflag[vlv_cid];
                        so->so_vcontext = (unsigned long)so;
                        so->so_nentries = 0;
 
@@ -1030,7 +1137,6 @@ static int vlv_parseCtrl(
        BerElement                      *ber;
        ber_tag_t               tag;
        ber_len_t               len;
-       int                                     i;
        vlv_ctrl        *vc, vc2;
 
        rs->sr_err = LDAP_PROTOCOL_ERROR;
@@ -1094,9 +1200,11 @@ static int vlv_parseCtrl(
 static int sssvlv_connection_destroy( BackendDB *be, Connection *conn )
 {
        slap_overinst   *on             = (slap_overinst *)be->bd_info;
+       sssvlv_info *si = on->on_bi.bi_private;
 
-       if ( sort_conns[conn->c_conn_idx] )
-               free_sort_op( conn, sort_conns[conn->c_conn_idx] );
+       if ( sort_conns[conn->c_conn_idx] ) {
+               free_sort_ops( conn, sort_conns[conn->c_conn_idx], si->svi_max_percon );
+       }
 
        return LDAP_SUCCESS;
 }
@@ -1108,11 +1216,22 @@ static int sssvlv_db_open(
        slap_overinst   *on = (slap_overinst *)be->bd_info;
        sssvlv_info *si = on->on_bi.bi_private;
        int rc;
+       int conn_index;
 
        /* If not set, default to 1/2 of available threads */
        if ( !si->svi_max )
                si->svi_max = connection_pool_max / 2;
 
+       if ( dtblsize && !sort_conns ) {
+               ldap_pvt_thread_mutex_init( &sort_conns_mutex );
+               /* accommodate for c_conn_idx == -1 */
+               sort_conns = ch_calloc( dtblsize + 1, sizeof(sort_op **) );
+               for ( conn_index = 0 ; conn_index < dtblsize + 1 ; conn_index++ ) {
+                       sort_conns[conn_index] = ch_calloc( si->svi_max_percon, sizeof(sort_op *) );
+               }
+               sort_conns++;
+       }
+
        rc = overlay_register_control( be, LDAP_CONTROL_SORTREQUEST );
        if ( rc == LDAP_SUCCESS )
                rc = overlay_register_control( be, LDAP_CONTROL_VLVREQUEST );
@@ -1132,6 +1251,12 @@ static ConfigTable sssvlv_cfg[] = {
                "( OLcfgOvAt:21.2 NAME 'olcSssVlvMaxKeys' "
                        "DESC 'Maximum number of Keys in a Sort request' "
                        "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
+       { "sssvlv-maxpercon", "num",
+               2, 2, 0, ARG_INT|ARG_OFFSET,
+                       (void *)offsetof(sssvlv_info, svi_max_percon),
+               "( OLcfgOvAt:21.3 NAME 'olcSssVlvMaxPerConn' "
+                       "DESC 'Maximum number of concurrent paged search requests per connection' "
+                       "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
        { NULL, NULL, 0, 0, 0, ARG_IGNORED }
 };
 
@@ -1151,6 +1276,38 @@ static int sssvlv_db_init(
 {
        slap_overinst   *on = (slap_overinst *)be->bd_info;
        sssvlv_info *si;
+
+       if ( ov_count == 0 ) {
+               int rc;
+
+               rc = register_supported_control2( LDAP_CONTROL_SORTREQUEST,
+                       SLAP_CTRL_SEARCH,
+                       NULL,
+                       sss_parseCtrl,
+                       1 /* replace */,
+                       &sss_cid );
+               if ( rc != LDAP_SUCCESS ) {
+                       Debug( LDAP_DEBUG_ANY, "Failed to register Sort Request control '%s' (%d)\n",
+                               LDAP_CONTROL_SORTREQUEST, rc, 0 );
+                       return rc;
+               }
+
+               rc = register_supported_control2( LDAP_CONTROL_VLVREQUEST,
+                       SLAP_CTRL_SEARCH,
+                       NULL,
+                       vlv_parseCtrl,
+                       1 /* replace */,
+                       &vlv_cid );
+               if ( rc != LDAP_SUCCESS ) {
+                       Debug( LDAP_DEBUG_ANY, "Failed to register VLV Request control '%s' (%d)\n",
+                               LDAP_CONTROL_VLVREQUEST, rc, 0 );
+#ifdef SLAP_CONFIG_DELETE
+                       overlay_unregister_control( be, LDAP_CONTROL_SORTREQUEST );
+                       unregister_supported_control( LDAP_CONTROL_SORTREQUEST );
+#endif /* SLAP_CONFIG_DELETE */
+                       return rc;
+               }
+       }
        
        si = (sssvlv_info *)ch_malloc(sizeof(sssvlv_info));
        on->on_bi.bi_private = si;
@@ -1158,13 +1315,9 @@ static int sssvlv_db_init(
        si->svi_max = 0;
        si->svi_num = 0;
        si->svi_max_keys = SSSVLV_DEFAULT_MAX_KEYS;
+       si->svi_max_percon = SSSVLV_DEFAULT_MAX_REQUEST_PER_CONN;
 
-       if ( dtblsize && !sort_conns ) {
-               ldap_pvt_thread_mutex_init( &sort_conns_mutex );
-               /* accommodate for c_conn_idx == -1 */
-               sort_conns = ch_calloc( sizeof(sort_op *), dtblsize + 1 );
-               sort_conns++;
-       }
+       ov_count++;
 
        return LDAP_SUCCESS;
 }
@@ -1175,7 +1328,27 @@ static int sssvlv_db_destroy(
 {
        slap_overinst   *on = (slap_overinst *)be->bd_info;
        sssvlv_info *si = (sssvlv_info *)on->on_bi.bi_private;
-       
+       int conn_index;
+
+       ov_count--;
+       if ( !ov_count && sort_conns) {
+               sort_conns--;
+               for ( conn_index = 0 ; conn_index < dtblsize + 1 ; conn_index++ ) {
+                       ch_free(sort_conns[conn_index]);
+               }
+               ch_free(sort_conns);
+               ldap_pvt_thread_mutex_destroy( &sort_conns_mutex );
+       }
+
+#ifdef SLAP_CONFIG_DELETE
+       overlay_unregister_control( be, LDAP_CONTROL_SORTREQUEST );
+       overlay_unregister_control( be, LDAP_CONTROL_VLVREQUEST );
+       if ( ov_count == 0 ) {
+               unregister_supported_control( LDAP_CONTROL_SORTREQUEST );
+               unregister_supported_control( LDAP_CONTROL_VLVREQUEST );
+       }
+#endif /* SLAP_CONFIG_DELETE */
+
        if ( si ) {
                ch_free( si );
                on->on_bi.bi_private = NULL;
@@ -1202,30 +1375,9 @@ int sssvlv_initialize()
        if ( rc )
                return rc;
 
-       rc = register_supported_control2( LDAP_CONTROL_SORTREQUEST,
-                       SLAP_CTRL_SEARCH,
-                       NULL,
-                       sss_parseCtrl,
-                       1 /* replace */,
-                       &sss_cid );
-
-       if ( rc == LDAP_SUCCESS ) {
-               rc = register_supported_control2( LDAP_CONTROL_VLVREQUEST,
-                       SLAP_CTRL_SEARCH,
-                       NULL,
-                       vlv_parseCtrl,
-                       1 /* replace */,
-                       &vlv_cid );
-       }
-
-       if ( rc == LDAP_SUCCESS ) {
-               rc = overlay_register( &sssvlv );
-               if ( rc != LDAP_SUCCESS ) {
-                       fprintf( stderr, "Failed to register server side sort overlay\n" );
-               }
-       }
-       else {
-               fprintf( stderr, "Failed to register control %d\n", rc );
+       rc = overlay_register( &sssvlv );
+       if ( rc != LDAP_SUCCESS ) {
+               Debug( LDAP_DEBUG_ANY, "Failed to register server side sort overlay\n", 0, 0, 0 );
        }
 
        return rc;