]> git.sur5r.net Git - openldap/blobdiff - servers/slapd/overlays/sssvlv.c
more for #6397
[openldap] / servers / slapd / overlays / sssvlv.c
index 0131c21a571779203dd1c926605beaf70fb4cad0..5c82b91212aef88870978452e6c317747de7e90e 100644 (file)
@@ -21,8 +21,6 @@
 
 #include "portable.h"
 
-#define SLAPD_OVER_SSSVLV      SLAPD_MOD_DYNAMIC
-
 #ifdef SLAPD_OVER_SSSVLV
 
 #include <stdio.h>
@@ -34,6 +32,9 @@
 
 #include "slap.h"
 #include "lutil.h"
+#include "config.h"
+
+#include "../../../libraries/liblber/lber-int.h"       /* ber_rewind */
 
 /* RFC2891: Server Side Sorting
  * RFC2696: Paged Results
@@ -57,6 +58,8 @@
 
 #define SAFESTR(macro_str, macro_def) ((macro_str) ? (macro_str) : (macro_def))
 
+#define SSSVLV_DEFAULT_MAX_KEYS        5
+
 typedef struct vlv_ctrl {
        int vc_before;
        int vc_after;
@@ -75,7 +78,7 @@ typedef struct sort_key
 
 typedef struct sort_ctrl {
        int sc_nkeys;
-       sort_key sc_keys[0];
+       sort_key sc_keys[1];
 } sort_ctrl;
 
 
@@ -166,10 +169,17 @@ static int node_cmp( const void* val1, const void* val2 )
                        mr = sc->sc_keys[i].sk_ordering;
                        mr->smr_match( &cmp, 0, mr->smr_syntax, mr,
                                &sn1->sn_vals[i], &sn2->sn_vals[i] );
+                       if ( cmp )
+                               cmp *= sc->sc_keys[i].sk_direction;
                }
        }
+       return cmp;
+}
+
+static int node_insert( const void *val1, const void *val2 )
+{
        /* Never return equal so that new entries are always inserted */
-       return cmp < 0 ? -1 : 1;
+       return node_cmp( val1, val2 ) < 0 ? -1 : 1;
 }
 
 static int pack_vlv_response_control(
@@ -181,7 +191,6 @@ static int pack_vlv_response_control(
        LDAPControl                     *ctrl;
        BerElementBuffer        berbuf;
        BerElement                      *ber            = (BerElement *)&berbuf;
-       PagedResultsCookie      resp_cookie;
        struct berval           cookie, bv;
        int                                     rc;
 
@@ -332,6 +341,155 @@ static void free_sort_op( Connection *conn, sort_op *so )
        ch_free( so );
 }
 
+static void send_list(
+       Operation               *op,
+       SlapReply               *rs,
+       sort_op                 *so)
+{
+       Avlnode *cur_node, *tmp_node;
+       vlv_ctrl *vc = op->o_controls[vlv_cid];
+       int i, j, dir, rc;
+       BackendDB *be;
+       Entry *e;
+       LDAPControl *ctrls[2];
+
+       /* FIXME: it may be better to just flatten the tree into
+        * an array before doing all of this...
+        */
+
+       /* Are we just counting an offset? */
+       if ( BER_BVISNULL( &vc->vc_value )) {
+               if ( vc->vc_offset == vc->vc_count ) {
+                       /* wants the last entry in the list */
+                       cur_node = tavl_end(so->so_tree, TAVL_DIR_RIGHT);
+                       so->so_vlv_target = so->so_nentries;
+               } else if ( vc->vc_offset == 1 ) {
+                       /* wants the first entry in the list */
+                       cur_node = tavl_end(so->so_tree, TAVL_DIR_LEFT);
+                       so->so_vlv_target = 1;
+               } else {
+                       int target;
+                       /* Just iterate to the right spot */
+                       if ( vc->vc_count && vc->vc_count != so->so_nentries ) {
+                               if ( vc->vc_offset > vc->vc_count )
+                                       goto range_err;
+                               target = so->so_nentries * vc->vc_offset / vc->vc_count;
+                       } else {
+                               if ( vc->vc_offset > so->so_nentries ) {
+range_err:
+                                       so->so_vlv_rc = LDAP_VLV_RANGE_ERROR;
+                                       pack_vlv_response_control( op, rs, so, ctrls );
+                                       ctrls[1] = NULL;
+                                       slap_add_ctrls( op, rs, ctrls );
+                                       rs->sr_err = LDAP_VLV_ERROR;
+                                       return;
+                               }
+                               target = vc->vc_offset;
+                       }
+                       so->so_vlv_target = target;
+                       /* Start at left and go right, or start at right and go left? */
+                       if ( target < so->so_nentries / 2 ) {
+                               cur_node = tavl_end(so->so_tree, TAVL_DIR_LEFT);
+                               dir = TAVL_DIR_RIGHT;
+                       } else {
+                               cur_node = tavl_end(so->so_tree, TAVL_DIR_RIGHT);
+                               dir = TAVL_DIR_LEFT;
+                               target = so->so_nentries - target + 1;
+                       }
+                       for ( i=1; i<target; i++ )
+                               cur_node = tavl_next( cur_node, dir );
+               }
+       } else {
+       /* we're looking for a specific value */
+               sort_ctrl *sc = so->so_ctrl;
+               MatchingRule *mr = sc->sc_keys[0].sk_ordering;
+               sort_node *sn;
+               struct berval bv;
+
+               if ( mr->smr_normalize ) {
+                       rc = mr->smr_normalize( SLAP_MR_VALUE_OF_SYNTAX,
+                               mr->smr_syntax, mr, &vc->vc_value, &bv, op->o_tmpmemctx );
+                       if ( rc ) {
+                               so->so_vlv_rc = LDAP_INAPPROPRIATE_MATCHING;
+                               pack_vlv_response_control( op, rs, so, ctrls );
+                               ctrls[1] = NULL;
+                               slap_add_ctrls( op, rs, ctrls );
+                               rs->sr_err = LDAP_VLV_ERROR;
+                               return;
+                       }
+               } else {
+                       bv = vc->vc_value;
+               }
+
+               sn = op->o_tmpalloc( sizeof(sort_node) +
+                       sc->sc_nkeys * sizeof(struct berval), op->o_tmpmemctx );
+               sn->sn_vals = (struct berval *)(sn+1);
+               sn->sn_conn = op->o_conn->c_conn_idx;
+               sn->sn_vals[0] = bv;
+               for (i=1; i<sc->sc_nkeys; i++) {
+                       BER_BVZERO( &sn->sn_vals[i] );
+               }
+               cur_node = tavl_find3( so->so_tree, sn, node_cmp, &j );
+               /* didn't find >= match */
+               if ( j > 0 )
+                       cur_node = NULL;
+               op->o_tmpfree( sn, op->o_tmpmemctx );
+
+               if ( !cur_node ) {
+                       so->so_vlv_target = so->so_nentries + 1;
+               } else {
+                       sort_node *sn = so->so_tree->avl_data;
+                       /* start from the left or the right side? */
+                       mr->smr_match( &i, 0, mr->smr_syntax, mr, &bv, &sn->sn_vals[0] );
+                       if ( i > 0 ) {
+                               tmp_node = tavl_end(so->so_tree, TAVL_DIR_RIGHT);
+                               dir = TAVL_DIR_LEFT;
+                       } else {
+                               tmp_node = tavl_end(so->so_tree, TAVL_DIR_LEFT);
+                               dir = TAVL_DIR_RIGHT;
+                       }
+                       for (i=0; tmp_node != cur_node;
+                               tmp_node = tavl_next( tmp_node, dir ), i++);
+                       so->so_vlv_target = i;
+               }
+               if ( bv.bv_val != vc->vc_value.bv_val )
+                       op->o_tmpfree( bv.bv_val, op->o_tmpmemctx );
+       }
+       if ( !cur_node ) {
+               i = 1;
+               cur_node = tavl_end(so->so_tree, TAVL_DIR_RIGHT);
+       } else {
+               i = 0;
+       }
+       for ( ; i<vc->vc_before; i++ ) {
+               tmp_node = tavl_next( cur_node, TAVL_DIR_LEFT );
+               if ( !tmp_node ) break;
+               cur_node = tmp_node;
+       }
+       j = i + vc->vc_after + 1;
+       be = op->o_bd;
+       for ( i=0; i<j; i++ ) {
+               sort_node *sn = cur_node->avl_data;
+               
+               op->o_bd = select_backend( &sn->sn_dn, 0 );
+               e = NULL;
+               rc = be_entry_get_rw( op, &sn->sn_dn, NULL, NULL, 0, &e );
+
+               if ( e && rc == LDAP_SUCCESS ) {
+                       rs->sr_entry = e;
+                       rs->sr_flags = REP_ENTRY_MUSTRELEASE;
+                       rs->sr_err = send_search_entry( op, rs );
+                       if ( rs->sr_err == LDAP_UNAVAILABLE )
+                               break;
+               }
+               cur_node = tavl_next( cur_node, TAVL_DIR_RIGHT );
+               if ( !cur_node ) break;
+       }
+       so->so_vlv_rc = LDAP_SUCCESS;
+
+       op->o_bd = be;
+}
+
 static void send_page( Operation *op, SlapReply *rs, sort_op *so )
 {
        Avlnode         *cur_node               = so->so_tree;
@@ -346,7 +504,6 @@ static void send_page( Operation *op, SlapReply *rs, sort_op *so )
 
                next_node = tavl_next( cur_node, TAVL_DIR_RIGHT );
 
-
                op->o_bd = select_backend( &sn->sn_dn, 0 );
                e = NULL;
                rc = be_entry_get_rw( op, &sn->sn_dn, NULL, NULL, 0, &e );
@@ -372,7 +529,7 @@ static void send_page( Operation *op, SlapReply *rs, sort_op *so )
        op->o_bd = be;
 }
 
-static int send_entry(
+static void send_entry(
        Operation               *op,
        SlapReply               *rs,
        sort_op                 *so)
@@ -381,6 +538,9 @@ static int send_entry(
                "%s: response control: status=%d, text=%s\n",
                debug_header, rs->sr_err, SAFESTR(rs->sr_text, "<None>"));
 
+       if ( !so->so_tree )
+               return;
+
        /* RFC 2891: If critical then send the entries iff they were
         * succesfully sorted.  If non-critical send all entries
         * whether they were sorted or not.
@@ -388,22 +548,24 @@ static int send_entry(
        if ( (op->o_ctrlflag[sss_cid] != SLAP_CONTROL_CRITICAL) ||
                 (rs->sr_err == LDAP_SUCCESS) )
        {
-               /* Get the first node to send */
-               Avlnode *start_node = tavl_end(so->so_tree, TAVL_DIR_LEFT);
-               so->so_tree = start_node;
-
-               if ( so->so_paged <= SLAP_CONTROL_IGNORED ) {
-                       /* Not paged result search.  Send all entries.
-                        * Set the page size to the number of entries
-                        * so that send_page() will send all entries.
-                        */
-                       so->so_page_size = so->so_nentries;
-               }
+               if ( so->so_vlv > SLAP_CONTROL_IGNORED ) {
+                       send_list( op, rs, so );
+               } else {
+                       /* Get the first node to send */
+                       Avlnode *start_node = tavl_end(so->so_tree, TAVL_DIR_LEFT);
+                       so->so_tree = start_node;
+
+                       if ( so->so_paged <= SLAP_CONTROL_IGNORED ) {
+                               /* Not paged result search.  Send all entries.
+                                * Set the page size to the number of entries
+                                * so that send_page() will send all entries.
+                                */
+                               so->so_page_size = so->so_nentries;
+                       }
 
-               send_page( op, rs, so );
+                       send_page( op, rs, so );
+               }
        }
-
-       return SLAP_CB_CONTINUE;
 }
 
 static void send_result(
@@ -412,15 +574,21 @@ static void send_result(
        sort_op                 *so)
 {
        LDAPControl *ctrls[3];
-       int rc;
+       int rc, i = 0;
 
        rc = pack_sss_response_control( op, rs, ctrls );
-       if ( rc == LDAP_SUCCESS && so->so_paged > SLAP_CONTROL_IGNORED ) {
-               rc = pack_pagedresult_response_control( op, rs, so, ctrls+1 );
-               ctrls[2] = NULL;
-       } else {
-               ctrls[1] = NULL;
+       if ( rc == LDAP_SUCCESS ) {
+               i++;
+               rc = -1;
+               if ( so->so_paged > SLAP_CONTROL_IGNORED ) {
+                       rc = pack_pagedresult_response_control( op, rs, so, ctrls+1 );
+               } else if ( so->so_vlv > SLAP_CONTROL_IGNORED ) {
+                       rc = pack_vlv_response_control( op, rs, so, ctrls+1 );
+               }
+               if ( rc == LDAP_SUCCESS )
+                       i++;
        }
+       ctrls[i] = NULL;
 
        if ( ctrls[0] != NULL )
                slap_add_ctrls( op, rs, ctrls );
@@ -438,7 +606,6 @@ static int sssvlv_op_response(
 {
        sort_ctrl *sc = op->o_controls[sss_cid];
        sort_op *so = op->o_callback->sc_private;
-       int rc = SLAP_CB_CONTINUE;
 
        if ( rs->sr_type == REP_SEARCH ) {
                int i;
@@ -474,34 +641,35 @@ static int sssvlv_op_response(
                sn2->sn_vals = (struct berval *)(sn2+1);
                AC_MEMCPY( sn2->sn_vals, sn->sn_vals,
                                sc->sc_nkeys * sizeof(struct berval));
-               sn = sn2;
 
-               ptr = (char *)(sn->sn_vals + sc->sc_nkeys);
-               sn->sn_dn.bv_val = ptr;
-               sn->sn_dn.bv_len = rs->sr_entry->e_nname.bv_len;
+               ptr = (char *)(sn2->sn_vals + sc->sc_nkeys);
+               sn2->sn_dn.bv_val = ptr;
+               sn2->sn_dn.bv_len = rs->sr_entry->e_nname.bv_len;
                AC_MEMCPY( ptr, rs->sr_entry->e_nname.bv_val,
                        rs->sr_entry->e_nname.bv_len );
                ptr += rs->sr_entry->e_nname.bv_len;
                *ptr++ = '\0';
                for ( i=0; i<sc->sc_nkeys; i++ ) {
-                       if ( !BER_BVISNULL( &sn->sn_vals[i] )) {
-                               AC_MEMCPY( ptr, sn->sn_vals[i].bv_val, sn->sn_vals[i].bv_len );
-                               sn->sn_vals[i].bv_val = ptr;
-                               ptr += sn->sn_vals[i].bv_len;
+                       if ( !BER_BVISNULL( &sn2->sn_vals[i] )) {
+                               AC_MEMCPY(ptr, sn2->sn_vals[i].bv_val, sn2->sn_vals[i].bv_len);
+                               sn2->sn_vals[i].bv_val = ptr;
+                               ptr += sn2->sn_vals[i].bv_len;
                                *ptr++ = '\0';
                        }
                }
+               op->o_tmpfree( sn, op->o_tmpmemctx );
+               sn = sn2;
                sn->sn_conn = op->o_conn->c_conn_idx;
 
                /* Insert into the AVL tree */
-               tavl_insert(&(so->so_tree), sn, node_cmp, avl_dup_ok);
+               tavl_insert(&(so->so_tree), sn, node_insert, avl_dup_error);
 
                so->so_nentries++;
 
                /* Collected the keys so that they can be sorted.  Thus, stop
                 * the entry from propagating.
                 */
-               rc = LDAP_SUCCESS;
+               rs->sr_err = LDAP_SUCCESS;
        }
        else if ( rs->sr_type == REP_RESULT ) {
                /* Remove serversort response callback.
@@ -512,11 +680,11 @@ static int sssvlv_op_response(
                        op->o_callback = op->o_callback->sc_next;
                }
 
-               rc = send_entry( op, rs, so );
+               send_entry( op, rs, so );
                send_result( op, rs, so );
        }
 
-       return rc;
+       return rs->sr_err;
 }
 
 static int sssvlv_op_search(
@@ -528,7 +696,7 @@ static int sssvlv_op_search(
        int                                             rc                      = SLAP_CB_CONTINUE;
        int     ok;
        sort_op *so, so2;
-       sort_ctrl *sc = op->o_controls[sss_cid];
+       sort_ctrl *sc;
        PagedResultsState *ps;
        vlv_ctrl *vc;
 
@@ -557,6 +725,7 @@ static int sssvlv_op_search(
                op->o_req_dn.bv_val, op->ors_filterstr.bv_val,
                op->o_ctrlflag[sss_cid]);
 
+       sc = op->o_controls[sss_cid];
        if ( sc->sc_nkeys > si->svi_max_keys ) {
                rs->sr_text = "Too many sort keys";
                rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
@@ -589,8 +758,17 @@ static int sssvlv_op_search(
                        } else if ( !ps->ps_size ) {
                        /* Abandoning current request */
                                ok = 0;
+                               so->so_nentries = 0;
+                               rs->sr_err = LDAP_SUCCESS;
                        }
                }
+               if (( vc && so->so_paged > SLAP_CONTROL_IGNORED ) ||
+                       ( ps && so->so_vlv > SLAP_CONTROL_IGNORED )) {
+                       /* changed from paged to vlv or vice versa, abandon */
+                       ok = 0;
+                       so->so_nentries = 0;
+                       rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
+               }
        /* Are there too many running overall? */
        } else if ( si->svi_num >= si->svi_max ) {
                ok = 0;
@@ -601,18 +779,23 @@ static int sssvlv_op_search(
        }
        ldap_pvt_thread_mutex_unlock( &sort_conns_mutex );
        if ( ok ) {
+               /* are we continuing a VLV search? */
+               if ( vc && vc->vc_context ) {
+                       so->so_ctrl = sc;
+                       send_list( op, rs, so );
+                       send_result( op, rs, so );
+                       rc = LDAP_SUCCESS;
                /* are we continuing a paged search? */
-               if ( ps && ps->ps_cookie ) {
+               } else if ( ps && ps->ps_cookie ) {
                        so->so_ctrl = sc;
                        send_page( op, rs, so );
                        send_result( op, rs, so );
                        rc = LDAP_SUCCESS;
-               }
-               else {
+               } else {
                        slap_callback *cb = op->o_tmpalloc( sizeof(slap_callback),
                                op->o_tmpmemctx );
                        /* Install serversort response callback to handle a new search */
-                       if ( ps ) {
+                       if ( ps || vc ) {
                                so = ch_malloc( sizeof(sort_op));
                        } else {
                                so = op->o_tmpalloc( sizeof(sort_op), op->o_tmpmemctx );
@@ -634,15 +817,20 @@ static int sssvlv_op_search(
                        } else {
                                so->so_paged = 0;
                                so->so_page_size = 0;
+                               if ( vc ) {
+                                       so->so_vlv = op->o_ctrlflag[vlv_cid];
+                                       so->so_vlv_target = 0;
+                                       so->so_vlv_rc = 0;
+                               }
                        }
+                       so->so_vcontext = (unsigned long)so;
                        so->so_nentries = 0;
 
                        op->o_callback          = cb;
                }
        } else {
-               if ( ps && !ps->ps_size ) {
+               if ( so && !so->so_nentries ) {
                        free_sort_op( op->o_conn, so );
-                       rs->sr_err = LDAP_SUCCESS;
                } else {
                        rs->sr_text = "Other sort requests already in progress";
                        rs->sr_err = LDAP_BUSY;
@@ -815,7 +1003,7 @@ static int sss_parseCtrl(
        i = count_key( ber );
 
        sc = op->o_tmpalloc( sizeof(sort_ctrl) +
-               i * sizeof(sort_key), op->o_tmpmemctx );
+               (i-1) * sizeof(sort_key), op->o_tmpmemctx );
        sc->sc_nkeys = i;
        op->o_controls[sss_cid] = sc;
 
@@ -873,7 +1061,7 @@ static int vlv_parseCtrl(
 
        tag = ber_peek_tag( ber, &len );
        if ( tag == LDAP_VLVBYINDEX_IDENTIFIER ) {
-               tag = ber_scanf( ber, "ii", &vc2.vc_offset, &vc2.vc_count );
+               tag = ber_scanf( ber, "{ii}", &vc2.vc_offset, &vc2.vc_count );
                if ( tag == LBER_ERROR )
                        return rs->sr_err;
                BER_BVZERO( &vc2.vc_value );
@@ -917,12 +1105,46 @@ static int sssvlv_db_open(
        BackendDB               *be,
        ConfigReply             *cr )
 {
-       int rc = overlay_register_control( be, LDAP_CONTROL_SORTREQUEST );
+       slap_overinst   *on = (slap_overinst *)be->bd_info;
+       sssvlv_info *si = on->on_bi.bi_private;
+       int rc;
+
+       /* If not set, default to 1/2 of available threads */
+       if ( !si->svi_max )
+               si->svi_max = connection_pool_max / 2;
+
+       rc = overlay_register_control( be, LDAP_CONTROL_SORTREQUEST );
        if ( rc == LDAP_SUCCESS )
                rc = overlay_register_control( be, LDAP_CONTROL_VLVREQUEST );
        return rc;
 }
 
+static ConfigTable sssvlv_cfg[] = {
+       { "sssvlv-max", "num",
+               2, 2, 0, ARG_INT|ARG_OFFSET,
+                       (void *)offsetof(sssvlv_info, svi_max),
+               "( OLcfgOvAt:21.1 NAME 'olcSssVlvMax' "
+                       "DESC 'Maximum number of concurrent Sort requests' "
+                       "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
+       { "sssvlv-maxkeys", "num",
+               2, 2, 0, ARG_INT|ARG_OFFSET,
+                       (void *)offsetof(sssvlv_info, svi_max_keys),
+               "( OLcfgOvAt:21.2 NAME 'olcSssVlvMaxKeys' "
+                       "DESC 'Maximum number of Keys in a Sort request' "
+                       "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
+       { NULL, NULL, 0, 0, 0, ARG_IGNORED }
+};
+
+static ConfigOCs sssvlv_ocs[] = {
+       { "( OLcfgOvOc:21.1 "
+               "NAME 'olcSssVlvConfig' "
+               "DESC 'SSS VLV configuration' "
+               "SUP olcOverlayConfig "
+               "MAY ( olcSssVlvMax $ olcSssVlvMaxKeys ) )",
+               Cft_Overlay, sssvlv_cfg, NULL, NULL },
+       { NULL, 0, NULL }
+};
+
 static int sssvlv_db_init(
        BackendDB               *be,
        ConfigReply             *cr)
@@ -933,9 +1155,9 @@ static int sssvlv_db_init(
        si = (sssvlv_info *)ch_malloc(sizeof(sssvlv_info));
        on->on_bi.bi_private = si;
 
-       si->svi_max = 5;
+       si->svi_max = 0;
        si->svi_num = 0;
-       si->svi_max_keys = 5;
+       si->svi_max_keys = SSSVLV_DEFAULT_MAX_KEYS;
 
        if ( dtblsize && !sort_conns ) {
                ldap_pvt_thread_mutex_init( &sort_conns_mutex );
@@ -974,6 +1196,12 @@ int sssvlv_initialize()
        sssvlv.on_bi.bi_connection_destroy      = sssvlv_connection_destroy;
        sssvlv.on_bi.bi_op_search                       = sssvlv_op_search;
 
+       sssvlv.on_bi.bi_cf_ocs = sssvlv_ocs;
+
+       rc = config_register_schema( sssvlv_cfg, sssvlv_ocs );
+       if ( rc )
+               return rc;
+
        rc = register_supported_control2( LDAP_CONTROL_SORTREQUEST,
                        SLAP_CTRL_SEARCH,
                        NULL,