]> git.sur5r.net Git - openldap/blob - servers/slapd/back-asyncmeta/search.c
72aa890b7b59e894fe15ec00d242d4aac8cafc67
[openldap] / servers / slapd / back-asyncmeta / search.c
1 /* search.c - search request handler for back-asyncmeta */
2 /* $OpenLDAP$ */
3 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4  *
5  * Copyright 2016-2017 The OpenLDAP Foundation.
6  * Portions Copyright 2016 Symas Corporation.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted only as authorized by the OpenLDAP
11  * Public License.
12  *
13  * A copy of this license is available in the file LICENSE in the
14  * top-level directory of the distribution or, alternatively, at
15  * <http://www.OpenLDAP.org/license.html>.
16  */
17
18 /* ACKNOWLEDGEMENTS:
19  * This work was developed by Symas Corporation
20  * based on back-meta module for inclusion in OpenLDAP Software.
21  * This work was sponsored by Ericsson. */
22
23 #include "portable.h"
24
25 #include <stdio.h>
26
27 #include <ac/socket.h>
28 #include <ac/string.h>
29 #include <ac/time.h>
30
31 #include "lutil.h"
32 #include "slap.h"
33 #include "../back-ldap/back-ldap.h"
34 #include "back-asyncmeta.h"
35 #include "../../../libraries/liblber/lber-int.h"
36
37 #include "../../../libraries/libldap/ldap-int.h"
38 #undef  ldap_debug
39 #define ldap_debug      slap_debug
40
41 static void
42 asyncmeta_handle_onerr_stop(Operation *op,
43                             SlapReply *rs,
44                             a_metaconn_t *mc,
45                             bm_context_t *bc,
46                             int candidate,
47                             slap_callback *cb)
48 {
49         a_metainfo_t *mi = mc->mc_info;
50         int j;
51         ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
52         if (bc->bc_active > 0) {
53                 ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
54                 return;
55         }
56         bc->bc_active = 1;
57         asyncmeta_drop_bc(mc, bc);
58         ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
59
60         for (j=0; j<mi->mi_ntargets; j++) {
61                 if (j != candidate && bc->candidates[j].sr_msgid >= 0
62                     && mc->mc_conns[j].msc_ld != NULL) {
63                         asyncmeta_back_abandon_candidate( mc, op,
64                                                 bc->candidates[ j ].sr_msgid, j );
65                 }
66         }
67         if (cb != NULL) {
68                 op->o_callback = cb;
69         }
70         send_ldap_result(op, rs);
71         asyncmeta_clear_bm_context(bc);
72 }
73
74 meta_search_candidate_t
75 asyncmeta_back_search_start(
76                                 Operation *op,
77                                 SlapReply *rs,
78                             a_metaconn_t *mc,
79                             bm_context_t *bc,
80                             int candidate,
81                             struct berval               *prcookie,
82                             ber_int_t           prsize )
83 {
84         SlapReply               *candidates = bc->candidates;
85         a_metainfo_t            *mi = ( a_metainfo_t * )mc->mc_info;
86         a_metatarget_t          *mt = mi->mi_targets[ candidate ];
87         a_metasingleconn_t      *msc = &mc->mc_conns[ candidate ];
88         a_dncookie              dc;
89         struct berval           realbase = op->o_req_dn;
90         int                     realscope = op->ors_scope;
91         struct berval           mbase = BER_BVNULL;
92         struct berval           mfilter = BER_BVNULL;
93         char                    **mapped_attrs = NULL;
94         int                     rc;
95         meta_search_candidate_t retcode;
96         int timelimit;
97         int                     nretries = 1;
98         LDAPControl             **ctrls = NULL;
99         BerElement *ber;
100         ber_int_t       msgid;
101 #ifdef SLAPD_META_CLIENT_PR
102         LDAPControl             **save_ctrls = NULL;
103 #endif /* SLAPD_META_CLIENT_PR */
104
105         /* this should not happen; just in case... */
106         if ( msc->msc_ld == NULL ) {
107                 Debug( LDAP_DEBUG_ANY,
108                         "%s: asyncmeta_back_search_start candidate=%d ld=NULL%s.\n",
109                         op->o_log_prefix, candidate,
110                         META_BACK_ONERR_STOP( mi ) ? "" : " (ignored)" );
111                 candidates[ candidate ].sr_err = LDAP_OTHER;
112                 if ( META_BACK_ONERR_STOP( mi ) ) {
113                         return META_SEARCH_ERR;
114                 }
115                 candidates[ candidate ].sr_msgid = META_MSGID_IGNORE;
116                 return META_SEARCH_NOT_CANDIDATE;
117         }
118
119         Debug( LDAP_DEBUG_TRACE, "%s >>> asyncmeta_back_search_start[%d]\n", op->o_log_prefix, candidate, 0 );
120         /*
121          * modifies the base according to the scope, if required
122          */
123         if ( mt->mt_nsuffix.bv_len > op->o_req_ndn.bv_len ) {
124                 switch ( op->ors_scope ) {
125                 case LDAP_SCOPE_SUBTREE:
126                         /*
127                          * make the target suffix the new base
128                          * FIXME: this is very forgiving, because
129                          * "illegal" searchBases may be turned
130                          * into the suffix of the target; however,
131                          * the requested searchBase already passed
132                          * thru the candidate analyzer...
133                          */
134                         if ( dnIsSuffix( &mt->mt_nsuffix, &op->o_req_ndn ) ) {
135                                 realbase = mt->mt_nsuffix;
136                                 if ( mt->mt_scope == LDAP_SCOPE_SUBORDINATE ) {
137                                         realscope = LDAP_SCOPE_SUBORDINATE;
138                                 }
139
140                         } else {
141                                 /*
142                                  * this target is no longer candidate
143                                  */
144                                 retcode = META_SEARCH_NOT_CANDIDATE;
145                                 goto doreturn;
146                         }
147                         break;
148
149                 case LDAP_SCOPE_SUBORDINATE:
150                 case LDAP_SCOPE_ONELEVEL:
151                 {
152                         struct berval   rdn = mt->mt_nsuffix;
153                         rdn.bv_len -= op->o_req_ndn.bv_len + STRLENOF( "," );
154                         if ( dnIsOneLevelRDN( &rdn )
155                                         && dnIsSuffix( &mt->mt_nsuffix, &op->o_req_ndn ) )
156                         {
157                                 /*
158                                  * if there is exactly one level,
159                                  * make the target suffix the new
160                                  * base, and make scope "base"
161                                  */
162                                 realbase = mt->mt_nsuffix;
163                                 if ( op->ors_scope == LDAP_SCOPE_SUBORDINATE ) {
164                                         if ( mt->mt_scope == LDAP_SCOPE_SUBORDINATE ) {
165                                                 realscope = LDAP_SCOPE_SUBORDINATE;
166                                         } else {
167                                                 realscope = LDAP_SCOPE_SUBTREE;
168                                         }
169                                 } else {
170                                         realscope = LDAP_SCOPE_BASE;
171                                 }
172                                 break;
173                         } /* else continue with the next case */
174                 }
175
176                 case LDAP_SCOPE_BASE:
177                         /*
178                          * this target is no longer candidate
179                          */
180                         retcode = META_SEARCH_NOT_CANDIDATE;
181                         goto doreturn;
182                 }
183         }
184
185         /* check filter expression */
186         if ( mt->mt_filter ) {
187                 metafilter_t *mf;
188                 for ( mf = mt->mt_filter; mf; mf = mf->mf_next ) {
189                         if ( regexec( &mf->mf_regex, op->ors_filterstr.bv_val, 0, NULL, 0 ) == 0 )
190                                 break;
191                 }
192                 /* nothing matched, this target is no longer a candidate */
193                 if ( !mf ) {
194                         retcode = META_SEARCH_NOT_CANDIDATE;
195                         goto doreturn;
196                 }
197         }
198
199         /*
200          * Rewrite the search base, if required
201          */
202         dc.target = mt;
203         dc.ctx = "searchBase";
204         dc.conn = op->o_conn;
205         dc.rs = rs;
206         switch ( asyncmeta_dn_massage( &dc, &realbase, &mbase ) ) {
207         case LDAP_SUCCESS:
208                 break;
209
210         case LDAP_UNWILLING_TO_PERFORM:
211                 rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
212                 rs->sr_text = "Operation not allowed";
213                 retcode = META_SEARCH_ERR;
214                 goto doreturn;
215
216         default:
217
218                 /*
219                  * this target is no longer candidate
220                  */
221                 retcode = META_SEARCH_NOT_CANDIDATE;
222                 goto doreturn;
223         }
224
225         /*
226          * Maps filter
227          */
228         rc = asyncmeta_filter_map_rewrite( &dc, op->ors_filter,
229                         &mfilter, BACKLDAP_MAP, NULL );
230         switch ( rc ) {
231         case LDAP_SUCCESS:
232                 break;
233
234         case LDAP_COMPARE_FALSE:
235         default:
236                 /*
237                  * this target is no longer candidate
238                  */
239                 retcode = META_SEARCH_NOT_CANDIDATE;
240                 goto done;
241         }
242
243         /*
244          * Maps required attributes
245          */
246         rc = asyncmeta_map_attrs( op, &mt->mt_rwmap.rwm_at,
247                         op->ors_attrs, BACKLDAP_MAP, &mapped_attrs );
248         if ( rc != LDAP_SUCCESS ) {
249                 /*
250                  * this target is no longer candidate
251                  */
252                 retcode = META_SEARCH_NOT_CANDIDATE;
253                 goto done;
254         }
255
256         if ( op->ors_tlimit != SLAP_NO_LIMIT ) {
257                 timelimit = op->ors_tlimit > 0 ? op->ors_tlimit : 1;
258         } else {
259                 timelimit = -1; /* no limit */
260         }
261
262 #ifdef SLAPD_META_CLIENT_PR
263         save_ctrls = op->o_ctrls;
264         {
265                 LDAPControl *pr_c = NULL;
266                 int i = 0, nc = 0;
267
268                 if ( save_ctrls ) {
269                         for ( ; save_ctrls[i] != NULL; i++ );
270                         nc = i;
271                         pr_c = ldap_control_find( LDAP_CONTROL_PAGEDRESULTS, save_ctrls, NULL );
272                 }
273
274                 if ( pr_c != NULL ) nc--;
275                 if ( mt->mt_ps > 0 || prcookie != NULL ) nc++;
276
277                 if ( mt->mt_ps > 0 || prcookie != NULL || pr_c != NULL ) {
278                         int src = 0, dst = 0;
279                         BerElementBuffer berbuf;
280                         BerElement *ber = (BerElement *)&berbuf;
281                         struct berval val = BER_BVNULL;
282                         ber_len_t len;
283
284                         len = sizeof( LDAPControl * )*( nc + 1 ) + sizeof( LDAPControl );
285
286                         if ( mt->mt_ps > 0 || prcookie != NULL ) {
287                                 struct berval nullcookie = BER_BVNULL;
288                                 ber_tag_t tag;
289
290                                 if ( prsize == 0 && mt->mt_ps > 0 ) prsize = mt->mt_ps;
291                                 if ( prcookie == NULL ) prcookie = &nullcookie;
292
293                                 ber_init2( ber, NULL, LBER_USE_DER );
294                                 tag = ber_printf( ber, "{iO}", prsize, prcookie );
295                                 if ( tag == LBER_ERROR ) {
296                                         /* error */
297                                         (void) ber_free_buf( ber );
298                                         goto done_pr;
299                                 }
300
301                                 tag = ber_flatten2( ber, &val, 0 );
302                                 if ( tag == LBER_ERROR ) {
303                                         /* error */
304                                         (void) ber_free_buf( ber );
305                                         goto done_pr;
306                                 }
307
308                                 len += val.bv_len + 1;
309                         }
310
311                         op->o_ctrls = op->o_tmpalloc( len, op->o_tmpmemctx );
312                         if ( save_ctrls ) {
313                                 for ( ; save_ctrls[ src ] != NULL; src++ ) {
314                                         if ( save_ctrls[ src ] != pr_c ) {
315                                                 op->o_ctrls[ dst ] = save_ctrls[ src ];
316                                                 dst++;
317                                         }
318                                 }
319                         }
320
321                         if ( mt->mt_ps > 0 || prcookie != NULL ) {
322                                 op->o_ctrls[ dst ] = (LDAPControl *)&op->o_ctrls[ nc + 1 ];
323
324                                 op->o_ctrls[ dst ]->ldctl_oid = LDAP_CONTROL_PAGEDRESULTS;
325                                 op->o_ctrls[ dst ]->ldctl_iscritical = 1;
326
327                                 op->o_ctrls[ dst ]->ldctl_value.bv_val = (char *)&op->o_ctrls[ dst ][ 1 ];
328                                 AC_MEMCPY( op->o_ctrls[ dst ]->ldctl_value.bv_val, val.bv_val, val.bv_len + 1 );
329                                 op->o_ctrls[ dst ]->ldctl_value.bv_len = val.bv_len;
330                                 dst++;
331
332                                 (void)ber_free_buf( ber );
333                         }
334
335                         op->o_ctrls[ dst ] = NULL;
336                 }
337 done_pr:;
338         }
339 #endif /* SLAPD_META_CLIENT_PR */
340
341 retry:;
342         asyncmeta_set_msc_time(msc);
343         ctrls = op->o_ctrls;
344         if (nretries == 0)
345         {
346                 if (rc != LDAP_SUCCESS)
347                 {
348                         rs->sr_err = LDAP_BUSY;
349                         retcode = META_SEARCH_ERR;
350                         candidates[ candidate ].sr_msgid = META_MSGID_IGNORE;
351                         goto done;
352                 }
353         }
354
355         if ( asyncmeta_controls_add( op, rs, mc, candidate, &ctrls )
356                 != LDAP_SUCCESS )
357         {
358                 candidates[ candidate ].sr_msgid = META_MSGID_IGNORE;
359                 retcode = META_SEARCH_NOT_CANDIDATE;
360                 goto done;
361         }
362
363         /*
364          * Starts the search
365          */
366         ber = ldap_build_search_req( msc->msc_ld,
367                         mbase.bv_val, realscope, mfilter.bv_val,
368                         mapped_attrs, op->ors_attrsonly,
369                         ctrls, NULL, timelimit, op->ors_slimit, op->ors_deref,
370                         &msgid );
371         if (ber) {
372                 candidates[ candidate ].sr_msgid = msgid;
373                 rc = ldap_send_initial_request( msc->msc_ld, LDAP_REQ_SEARCH,
374                         mbase.bv_val, ber, msgid );
375                 if (rc == msgid)
376                         rc = LDAP_SUCCESS;
377                 else
378                         rc = LDAP_SERVER_DOWN;
379                 switch ( rc ) {
380                 case LDAP_SUCCESS:
381                         retcode = META_SEARCH_CANDIDATE;
382                         asyncmeta_set_msc_time(msc);
383                         break;
384
385                 case LDAP_SERVER_DOWN:
386                         ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
387                         if (mc->mc_active < 1) {
388                                 asyncmeta_clear_one_msc(NULL, mc, candidate);
389                         }
390                         ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
391                         if ( nretries && asyncmeta_retry( op, rs, &mc, candidate, LDAP_BACK_DONTSEND ) ) {
392                                 nretries = 0;
393                                 /* if the identity changed, there might be need to re-authz */
394                                 (void)mi->mi_ldap_extra->controls_free( op, rs, &ctrls );
395                                 goto retry;
396                         }
397                         rs->sr_err = LDAP_UNAVAILABLE;
398                         retcode = META_SEARCH_ERR;
399                         break;
400                 default:
401                         candidates[ candidate ].sr_msgid = META_MSGID_IGNORE;
402                         retcode = META_SEARCH_NOT_CANDIDATE;
403                 }
404         }
405
406 done:;
407         (void)mi->mi_ldap_extra->controls_free( op, rs, &ctrls );
408 #ifdef SLAPD_META_CLIENT_PR
409         if ( save_ctrls != op->o_ctrls ) {
410                 op->o_tmpfree( op->o_ctrls, op->o_tmpmemctx );
411                 op->o_ctrls = save_ctrls;
412         }
413 #endif /* SLAPD_META_CLIENT_PR */
414
415         if ( mapped_attrs ) {
416                 ber_memfree_x( mapped_attrs, op->o_tmpmemctx );
417         }
418         if ( mfilter.bv_val != op->ors_filterstr.bv_val ) {
419                 ber_memfree_x( mfilter.bv_val, NULL );
420         }
421         if ( mbase.bv_val != realbase.bv_val ) {
422                 free( mbase.bv_val );
423         }
424
425 doreturn:;
426         Debug( LDAP_DEBUG_TRACE, "%s <<< asyncmeta_back_search_start[%p]=%d\n", op->o_log_prefix, msc, candidates[candidate].sr_msgid );
427         return retcode;
428 }
429
430 int
431 asyncmeta_back_search( Operation *op, SlapReply *rs )
432 {
433         a_metainfo_t    *mi = ( a_metainfo_t * )op->o_bd->be_private;
434         struct timeval  save_tv = { 0, 0 },
435                         tv;
436         time_t          stoptime = (time_t)(-1),
437                         lastres_time = slap_get_time(),
438                         timeout = 0;
439         int             rc = 0, sres = LDAP_SUCCESS;
440         char            *matched = NULL;
441         int             last = 0, ncandidates = 0,
442                         initial_candidates = 0, candidate_match = 0,
443                         needbind = 0;
444         ldap_back_send_t        sendok = LDAP_BACK_SENDERR;
445         long            i,j;
446         int             is_ok = 0;
447         void            *savepriv;
448         SlapReply       *candidates = NULL;
449         int             do_taint = 0;
450         bm_context_t *bc;
451         a_metaconn_t *mc;
452         slap_callback *cb = op->o_callback;
453
454         rs_assert_ready( rs );
455         rs->sr_flags &= ~REP_ENTRY_MASK; /* paranoia, we can set rs = non-entry */
456
457         /*
458          * controls are set in ldap_back_dobind()
459          *
460          * FIXME: in case of values return filter, we might want
461          * to map attrs and maybe rewrite value
462          */
463
464         asyncmeta_new_bm_context(op, rs, &bc, mi->mi_ntargets );
465         if (bc == NULL) {
466                 rs->sr_err = LDAP_OTHER;
467                 send_ldap_result(op, rs);
468                 return rs->sr_err;
469         }
470
471         candidates = bc->candidates;
472         mc = asyncmeta_getconn( op, rs, candidates, NULL, LDAP_BACK_DONTSEND, 0);
473         if ( !mc || rs->sr_err != LDAP_SUCCESS) {
474                 op->o_callback = cb;
475                 send_ldap_result(op, rs);
476                 asyncmeta_clear_bm_context(bc);
477                 return rs->sr_err;
478         }
479
480         /*
481          * Inits searches
482          */
483
484         for ( i = 0; i < mi->mi_ntargets; i++ ) {
485                 /* reset sr_msgid; it is used in most loops
486                  * to check if that target is still to be considered */
487                 candidates[i].sr_msgid = META_MSGID_UNDEFINED;
488                 /* a target is marked as candidate by asyncmeta_getconn();
489                  * if for any reason (an error, it's over or so) it is
490                  * no longer active, sr_msgid is set to META_MSGID_IGNORE
491                  * but it remains candidate, which means it has been active
492                  * at some point during the operation.  This allows to
493                  * use its response code and more to compute the final
494                  * response */
495                 if ( !META_IS_CANDIDATE( &candidates[ i ] ) ) {
496                         continue;
497                 }
498
499                 candidates[ i ].sr_matched = NULL;
500                 candidates[ i ].sr_text = NULL;
501                 candidates[ i ].sr_ref = NULL;
502                 candidates[ i ].sr_ctrls = NULL;
503                 candidates[ i ].sr_nentries = 0;
504                 candidates[ i ].sr_type = -1;
505
506                 /* get largest timeout among candidates */
507                 if ( mi->mi_targets[ i ]->mt_timeout[ SLAP_OP_SEARCH ]
508                         && mi->mi_targets[ i ]->mt_timeout[ SLAP_OP_SEARCH ] > timeout )
509                 {
510                         timeout = mi->mi_targets[ i ]->mt_timeout[ SLAP_OP_SEARCH ];
511                 }
512         }
513
514         bc->timeout = timeout;
515         bc->stoptime = op->o_time + bc->timeout;
516
517         if ( op->ors_tlimit != SLAP_NO_LIMIT ) {
518                 stoptime = op->o_time + op->ors_tlimit;
519                 if (stoptime < bc->stoptime) {
520                         bc->stoptime = stoptime;
521                         bc->searchtime = 1;
522                         bc->timeout = op->ors_tlimit;
523                 }
524         }
525
526         ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
527         rc = asyncmeta_add_message_queue(mc, bc);
528         ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
529
530         if (rc != LDAP_SUCCESS) {
531                 rs->sr_err = LDAP_BUSY;
532                 rs->sr_text = "Maximum pending ops limit exceeded";
533                 asyncmeta_clear_bm_context(bc);
534                 op->o_callback = cb;
535                 send_ldap_result(op, rs);
536                 goto finish;
537         }
538
539         for ( i = 0; i < mi->mi_ntargets; i++ ) {
540                 if ( !META_IS_CANDIDATE( &candidates[ i ] )
541                         || candidates[ i ].sr_err != LDAP_SUCCESS )
542                 {
543                         continue;
544                 }
545
546                 rc = asyncmeta_dobind_init_with_retry(op, rs, bc, mc, i);
547                 switch (rc)
548                 {
549                 case META_SEARCH_CANDIDATE:
550                         /* target is already bound, just send the search request */
551                         ncandidates++;
552                         Debug( LDAP_DEBUG_TRACE, "%s asyncmeta_back_search: IS_CANDIDATE "
553                                "cnd=\"%ld\"\n", op->o_log_prefix, i , 0);
554
555                         rc = asyncmeta_back_search_start( op, rs, mc, bc, i,  NULL, 0 );
556                         if (rc == META_SEARCH_ERR) {
557                                 META_CANDIDATE_CLEAR(&candidates[i]);
558                                 candidates[ i ].sr_msgid = META_MSGID_IGNORE;
559                                 if ( META_BACK_ONERR_STOP( mi ) ) {
560                                         asyncmeta_handle_onerr_stop(op,rs,mc,bc,i,cb);
561                                         goto finish;
562                                 }
563                                 else {
564                                         continue;
565                                 }
566                         }
567                         break;
568                 case META_SEARCH_NOT_CANDIDATE:
569                         Debug( LDAP_DEBUG_TRACE, "%s asyncmeta_back_search: NOT_CANDIDATE "
570                                "cnd=\"%ld\"\n", op->o_log_prefix, i , 0);
571                         candidates[ i ].sr_msgid = META_MSGID_IGNORE;
572                         break;
573
574                 case META_SEARCH_NEED_BIND:
575                 case META_SEARCH_CONNECTING:
576                         Debug( LDAP_DEBUG_TRACE, "%s asyncmeta_back_search: NEED_BIND "
577                                "cnd=\"%ld\" %p\n", op->o_log_prefix, i , &mc->mc_conns[i]);
578                         ncandidates++;
579                         rc = asyncmeta_dobind_init(op, rs, bc, mc, i);
580                         if (rc == META_SEARCH_ERR) {
581                                 candidates[ i ].sr_msgid = META_MSGID_IGNORE;
582                                 if ( META_BACK_ONERR_STOP( mi ) ) {
583                                         asyncmeta_handle_onerr_stop(op,rs,mc,bc,i,cb);
584                                         goto finish;
585                                 }
586                                 else {
587                                         continue;
588                                 }
589                         }
590                         break;
591                 case META_SEARCH_BINDING:
592                         Debug( LDAP_DEBUG_TRACE, "%s asyncmeta_back_search: BINDING "
593                                "cnd=\"%ld\" %p\n", op->o_log_prefix, i , &mc->mc_conns[i]);
594                         ncandidates++;
595                         /* Todo add the context to the message queue but do not send the request
596                          the receiver must send this when we are done binding */
597                         /* question - how would do receiver know to which targets??? */
598                         break;
599
600                 case META_SEARCH_ERR:
601                         Debug( LDAP_DEBUG_TRACE, "%s asyncmeta_back_search: SEARCH_ERR "
602                                "cnd=\"%ldd\"\n", op->o_log_prefix, i , 0);
603                         candidates[ i ].sr_msgid = META_MSGID_IGNORE;
604                         candidates[ i ].sr_type = REP_RESULT;
605
606                         if ( META_BACK_ONERR_STOP( mi ) ) {
607                                 asyncmeta_handle_onerr_stop(op,rs,mc,bc,i,cb);
608                                 goto finish;
609                         }
610                         else {
611                                 continue;
612                         }
613                         break;
614
615                 default:
616                         assert( 0 );
617                         break;
618                 }
619         }
620
621         initial_candidates = ncandidates;
622
623         if ( LogTest( LDAP_DEBUG_TRACE ) ) {
624                 char    cnd[ SLAP_TEXT_BUFLEN ];
625                 int     c;
626
627                 for ( c = 0; c < mi->mi_ntargets; c++ ) {
628                         if ( META_IS_CANDIDATE( &candidates[ c ] ) ) {
629                                 cnd[ c ] = '*';
630                         } else {
631                                 cnd[ c ] = ' ';
632                         }
633                 }
634                 cnd[ c ] = '\0';
635
636                 Debug( LDAP_DEBUG_TRACE, "%s asyncmeta_back_search: ncandidates=%d "
637                         "cnd=\"%s\"\n", op->o_log_prefix, ncandidates, cnd );
638         }
639
640         if ( initial_candidates == 0 ) {
641                 /* NOTE: here we are not sending any matchedDN;
642                  * this is intended, because if the back-meta
643                  * is serving this search request, but no valid
644                  * candidate could be looked up, it means that
645                  * there is a hole in the mapping of the targets
646                  * and thus no knowledge of any remote superior
647                  * is available */
648                 Debug( LDAP_DEBUG_ANY, "%s asyncmeta_back_search: "
649                         "base=\"%s\" scope=%d: "
650                         "no candidate could be selected\n",
651                         op->o_log_prefix, op->o_req_dn.bv_val,
652                         op->ors_scope );
653
654                 /* FIXME: we're sending the first error we encounter;
655                  * maybe we should pick the worst... */
656                 rc = LDAP_NO_SUCH_OBJECT;
657                 for ( i = 0; i < mi->mi_ntargets; i++ ) {
658                         if ( META_IS_CANDIDATE( &candidates[ i ] )
659                                 && candidates[ i ].sr_err != LDAP_SUCCESS )
660                         {
661                                 rc = candidates[ i ].sr_err;
662                                 break;
663                         }
664                 }
665                 rs->sr_err = rc;
666                 ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
667                 asyncmeta_drop_bc(mc, bc);
668                 ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
669                 op->o_callback = cb;
670                 send_ldap_result(op, rs);
671                 asyncmeta_clear_bm_context(bc);
672                 goto finish;
673         }
674         ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
675         asyncmeta_start_listeners(mc, candidates, bc);
676         ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
677 finish:
678         return rs->sr_err;
679 }