From: Howard Chu Date: Fri, 26 Nov 2004 21:24:40 +0000 (+0000) Subject: Added response queuing for updates occurring during a refresh X-Git-Tag: OPENLDAP_REL_ENG_2_3_0ALPHA~215 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=2620b4084deb2edee02dab58160efe33a0d8d2b9;p=openldap Added response queuing for updates occurring during a refresh --- diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index 0484dc6ade..dd4caa1613 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -28,9 +28,12 @@ /* A queued result of a persistent search */ typedef struct syncres { struct syncres *s_next; + struct berval s_dn; struct berval s_ndn; struct berval s_uuid; - int result; + struct berval s_csn; + char s_mode; + char s_isreference; } syncres; /* Record of a persistent search */ @@ -393,8 +396,37 @@ syncprov_findcsn( Operation *op, int mode ) return LDAP_NO_SUCH_OBJECT; } +/* Queue a persistent search response if still in Refresh stage */ static int -syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry *e, int mode ) +syncprov_qresp( opcookie *opc, syncops *so, int mode ) +{ + syncres *sr; + + sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 + + opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 ); + sr->s_next = NULL; + sr->s_dn.bv_val = (char *)(sr + 1); + sr->s_mode = mode; + sr->s_isreference = opc->sreference; + sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val ); + *(sr->s_ndn.bv_val++) = '\0'; + sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val ); + *(sr->s_uuid.bv_val++) = '\0'; + sr->s_csn.bv_val = lutil_strcopy( sr->s_uuid.bv_val, opc->suuid.bv_val ); + + if ( !so->s_res ) { + so->s_res = sr; + } else { + so->s_restail->s_next = sr; + } + so->s_restail = sr; + ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + return LDAP_SUCCESS; +} + +/* Send a persistent search response */ +static int +syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry *e, int mode, int queue ) { slap_overinst *on = opc->son; syncprov_info_t *si = on->on_bi.bi_private; @@ -412,6 +444,13 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry *e, int mode sop.o_hdr = &ohdr; sop.o_tmpmemctx = op->o_tmpmemctx; + if ( queue && (so->s_flags & PS_IS_REFRESHING) ) { + ldap_pvt_thread_mutex_lock( &so->s_mutex ); + if ( so->s_flags & PS_IS_REFRESHING ) + return syncprov_qresp( opc, so, mode ); + ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + } + ctrls[1] = NULL; slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, srs->sr_state.sid, srs->sr_state.rid ); @@ -539,11 +578,11 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) } else { /* if found send UPDATE else send ADD */ syncprov_sendresp( op, opc, ss, e, - found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD ); + found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD, 1 ); } } else if ( !saveit && found ) { /* send DELETE */ - syncprov_sendresp( op, opc, ss, NULL, LDAP_SYNC_DELETE ); + syncprov_sendresp( op, opc, ss, NULL, LDAP_SYNC_DELETE, 1 ); } } ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); @@ -612,7 +651,7 @@ syncprov_op_response( Operation *op, SlapReply *rs ) */ for ( sm = opc->smatches; sm; sm=sm->sm_next ) { syncprov_sendresp( op, opc, sm->sm_op, NULL, - LDAP_SYNC_DELETE ); + LDAP_SYNC_DELETE, 1 ); } break; } @@ -770,16 +809,55 @@ syncprov_search_response( Operation *op, SlapReply *rs ) rs->sr_err = slap_build_sync_done_ctrl( op, rs, rs->sr_ctrls, 0, 1, &cookie, LDAP_SYNC_REFRESH_PRESENTS ); } else { + int locked = 0; /* It's RefreshAndPersist, transition to Persist phase */ rs->sr_rspoid = LDAP_SYNC_INFO; slap_send_syncinfo( op, rs, rs->sr_nentries ? LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, &cookie, 1, NULL, 0 ); /* Flush any queued persist messages */ - ; + if ( ss->ss_so->s_res ) { + syncres *sr, *srnext; + Entry *e; + opcookie opc; + + opc.son = on; + ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex ); + locked = 1; + for (sr = ss->ss_so->s_res; sr; sr=srnext) { + int rc = LDAP_SUCCESS; + srnext = sr->s_next; + opc.sdn = sr->s_dn; + opc.sndn = sr->s_ndn; + opc.suuid = sr->s_uuid; + opc.sctxcsn = sr->s_csn; + opc.sreference = sr->s_isreference; + e = NULL; + + if ( sr->s_mode != LDAP_SYNC_DELETE ) { + op->o_bd->bd_info = (BackendInfo *)on->on_info; + rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e ); + op->o_bd->bd_info = (BackendInfo *)on; + } + if ( rc == LDAP_SUCCESS ) + syncprov_sendresp( op, &opc, ss->ss_so, e, + sr->s_mode, 0 ); + + if ( e ) { + op->o_bd->bd_info = (BackendInfo *)on->on_info; + be_entry_release_r( op, e ); + op->o_bd->bd_info = (BackendInfo *)on; + } + ch_free( sr ); + } + ss->ss_so->s_res = NULL; + ss->ss_so->s_restail = NULL; + } /* Turn off the refreshing flag */ - ss->ss_so->s_flags ^= PS_IS_REFRESHING; + ss->ss_so->s_flags ^= PS_IS_REFRESHING; + if ( locked ) + ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex ); /* Detach this Op from frontend control */ ss->ss_done = 1;