]> git.sur5r.net Git - openldap/commitdiff
Always queue psearch responses (ITS#3671 revisited)
authorHoward Chu <hyc@openldap.org>
Sun, 2 Oct 2005 03:58:00 +0000 (03:58 +0000)
committerHoward Chu <hyc@openldap.org>
Sun, 2 Oct 2005 03:58:00 +0000 (03:58 +0000)
servers/slapd/overlays/syncprov.c

index 05f56c447de41e0a5df8e2a814d1524c0ccdc816..3d71305deec2f9b4b0fdda4c36ca1967598e5318 100644 (file)
@@ -26,6 +26,7 @@
 #include "lutil.h"
 #include "slap.h"
 #include "config.h"
+#include "ldap_rq.h"
 
 /* A modify request on a particular entry */
 typedef struct modinst {
@@ -63,6 +64,7 @@ typedef struct syncops {
        int             s_inuse;        /* reference count */
        struct syncres *s_res;
        struct syncres *s_restail;
+       void    *s_qtask;       /* task for playing psearch responses */
        ldap_pvt_thread_mutex_t s_mutex;
 } syncops;
 
@@ -680,90 +682,9 @@ again:
        return rc;
 }
 
-/* Queue a persistent search response */
-static int
-syncprov_qresp( opcookie *opc, syncops *so, int mode )
-{
-       syncres *sr;
-
-       sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 +
-               opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 );
-       sr->s_next = NULL;
-       sr->s_dn.bv_val = (char *)(sr + 1);
-       sr->s_dn.bv_len = opc->sdn.bv_len;
-       sr->s_mode = mode;
-       sr->s_isreference = opc->sreference;
-       sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val );
-       sr->s_ndn.bv_len = opc->sndn.bv_len;
-       *(sr->s_ndn.bv_val++) = '\0';
-       sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val );
-       sr->s_uuid.bv_len = opc->suuid.bv_len;
-       *(sr->s_uuid.bv_val++) = '\0';
-       sr->s_csn.bv_val = lutil_strcopy( sr->s_uuid.bv_val, opc->suuid.bv_val );
-       sr->s_csn.bv_len = opc->sctxcsn.bv_len;
-       strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val );
-
-       if ( !so->s_res ) {
-               so->s_res = sr;
-       } else {
-               so->s_restail->s_next = sr;
-       }
-       so->s_restail = sr;
-       ldap_pvt_thread_mutex_unlock( &so->s_mutex );
-       return LDAP_SUCCESS;
-}
-
-/* Play back queued responses */
-static int
-syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue );
-
-static int
-syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
-{
-       syncres *sr, *srnext;
-       Entry *e;
-       opcookie opc;
-       int rc;
-
-       opc.son = on;
-       op->o_bd->bd_info = (BackendInfo *)on->on_info;
-       for (sr = so->s_res; sr; sr=srnext) {
-               srnext = sr->s_next;
-               opc.sdn = sr->s_dn;
-               opc.sndn = sr->s_ndn;
-               opc.suuid = sr->s_uuid;
-               opc.sctxcsn = sr->s_csn;
-               opc.sreference = sr->s_isreference;
-               e = NULL;
-
-               if ( sr->s_mode != LDAP_SYNC_DELETE ) {
-                       rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
-                       if ( rc ) {
-                               ch_free( sr );
-                               so->s_res = srnext;
-                               continue;
-                       }
-               }
-               rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode, 0 );
-
-               if ( e ) {
-                       be_entry_release_rw( op, e, 0 );
-               }
-               if ( rc )
-                       break;
-
-               ch_free( sr );
-               so->s_res = srnext;
-       }
-       op->o_bd->bd_info = (BackendInfo *)on;
-       if ( !so->s_res )
-               so->s_restail = NULL;
-       return rc;
-}
-
 /* Send a persistent search response */
 static int
-syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue )
+syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode)
 {
        slap_overinst *on = opc->son;
 
@@ -772,56 +693,17 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mod
        struct berval cookie;
        Entry e_uuid = {0};
        Attribute a_uuid = {0};
-       Operation sop = *so->s_op;
-       Opheader ohdr;
 
        if ( so->s_op->o_abandon )
                return SLAPD_ABANDON;
 
-       ohdr = *sop.o_hdr;
-       sop.o_hdr = &ohdr;
-       sop.o_tmpmemctx = op->o_tmpmemctx;
-       sop.o_bd = op->o_bd;
-       sop.o_controls = op->o_controls;
-       sop.o_private = op->o_private;
-       sop.o_callback = NULL;
-
-       /* If queueing is allowed */
-       if ( queue ) {
-               ldap_pvt_thread_mutex_lock( &so->s_mutex );
-               /* If we're still in refresh mode, must queue */
-               if (so->s_flags & PS_IS_REFRESHING) {
-                       return syncprov_qresp( opc, so, mode );
-               }
-               /* If connection is free but queue is non-empty,
-                * try to flush the queue.
-                */
-               if ( so->s_res ) {
-                       rs.sr_err = syncprov_qplay( &sop, on, so );
-               }
-               /* If the connection is busy, must queue */
-               if ( sop.o_conn->c_writewaiter || rs.sr_err == LDAP_BUSY ) {
-                       return syncprov_qresp( opc, so, mode );
-               }
-               ldap_pvt_thread_mutex_unlock( &so->s_mutex );
-
-               /* If syncprov_qplay returned any other error, bail out. */
-               if ( rs.sr_err ) {
-                       return rs.sr_err;
-               }
-       } else {
-               /* Queueing not allowed and conn is busy, give up */
-               if ( sop.o_conn->c_writewaiter )
-                       return LDAP_BUSY;
-       }
-
        ctrls[1] = NULL;
        slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, so->s_rid );
 
        e_uuid.e_attrs = &a_uuid;
        a_uuid.a_desc = slap_schema.si_ad_entryUUID;
        a_uuid.a_nvals = &opc->suuid;
-       rs.sr_err = syncprov_state_ctrl( &sop, &rs, &e_uuid,
+       rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid,
                mode, ctrls, 0, 1, &cookie );
 
        rs.sr_ctrls = ctrls;
@@ -832,8 +714,8 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mod
                if ( rs.sr_entry->e_private )
                        rs.sr_flags = REP_ENTRY_MUSTRELEASE;
                if ( opc->sreference ) {
-                       rs.sr_ref = get_entry_referrals( &sop, rs.sr_entry );
-                       send_search_reference( &sop, &rs );
+                       rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
+                       send_search_reference( op, &rs );
                        ber_bvarray_free( rs.sr_ref );
                        if ( !rs.sr_entry )
                                *e = NULL;
@@ -844,8 +726,8 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mod
                rs.sr_entry = *e;
                if ( rs.sr_entry->e_private )
                        rs.sr_flags = REP_ENTRY_MUSTRELEASE;
-               rs.sr_attrs = sop.ors_attrs;
-               send_search_entry( &sop, &rs );
+               rs.sr_attrs = op->ors_attrs;
+               send_search_entry( op, &rs );
                if ( !rs.sr_entry )
                        *e = NULL;
                break;
@@ -857,30 +739,159 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mod
                if ( opc->sreference ) {
                        struct berval bv = BER_BVNULL;
                        rs.sr_ref = &bv;
-                       send_search_reference( &sop, &rs );
+                       send_search_reference( op, &rs );
                } else {
-                       send_search_entry( &sop, &rs );
+                       send_search_entry( op, &rs );
                }
                break;
        default:
                assert(0);
        }
        op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx );
-       op->o_private = sop.o_private;
        rs.sr_ctrls = NULL;
-       /* Check queue again here; if we were hanging in a send and eventually
-        * recovered, there may be more to send now. But don't check if the
-        * original psearch has been abandoned.
-        */
-       if ( so->s_op->o_abandon )
-               return SLAPD_ABANDON;
 
-       if ( rs.sr_err == LDAP_SUCCESS && queue && so->s_res ) {
+       return rs.sr_err;
+}
+
+/* Play back queued responses */
+static int
+syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
+{
+       syncres *sr;
+       Entry *e;
+       opcookie opc;
+       int rc;
+
+       opc.son = on;
+       op->o_bd->bd_info = (BackendInfo *)on->on_info;
+
+       for (;;) {
                ldap_pvt_thread_mutex_lock( &so->s_mutex );
-               rs.sr_err = syncprov_qplay( &sop, on, so );
+               sr = so->s_res;
+               if ( sr )
+                       so->s_res = sr->s_next;
+               if ( !so->s_res )
+                       so->s_restail = NULL;
                ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+
+               if ( !sr )
+                       break;
+
+               if ( !so->s_op->o_abandon ) {
+                       opc.sdn = sr->s_dn;
+                       opc.sndn = sr->s_ndn;
+                       opc.suuid = sr->s_uuid;
+                       opc.sctxcsn = sr->s_csn;
+                       opc.sreference = sr->s_isreference;
+                       e = NULL;
+
+                       if ( sr->s_mode != LDAP_SYNC_DELETE ) {
+                               rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
+                               if ( rc ) {
+                                       ch_free( sr );
+                                       continue;
+                               }
+                       }
+                       rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode );
+
+                       if ( e ) {
+                               be_entry_release_rw( op, e, 0 );
+                       }
+                       if ( rc )
+                               break;
+               }
+
+               ch_free( sr );
        }
-       return rs.sr_err;
+       op->o_bd->bd_info = (BackendInfo *)on;
+       return rc;
+}
+
+/* runqueue task for playing back queued responses */
+static void *
+syncprov_qtask( void *ctx, void *arg )
+{
+       struct re_s *rtask = arg;
+       syncops *so = rtask->arg;
+       slap_overinst *on = so->s_op->o_private;
+       char opbuf[OPERATION_BUFFER_SIZE];
+       Operation *op;
+       BackendDB be;
+
+       op = (Operation *)opbuf;
+       memset( op, 0, sizeof(opbuf));
+       op->o_hdr = (Opheader *)(op+1);
+       op->o_controls = (void **)(op->o_hdr+1);
+
+       *op->o_hdr = *so->s_op->o_hdr;
+
+       op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx);
+       op->o_tmpmfuncs = &slap_sl_mfuncs;
+       op->o_threadctx = ctx;
+
+       /* syncprov_qplay expects a fake db */
+       be = *so->s_op->o_bd;
+       be.be_flags |= SLAP_DBFLAG_OVERLAY;
+       op->o_bd = &be;
+       op->o_private = NULL;
+       op->o_callback = NULL;
+
+       syncprov_qplay( op, on, so );
+
+       /* wait until we get explicitly scheduled again */
+       ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+       ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
+       ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 1 );
+       ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+
+       return NULL;
+}
+
+/* Queue a persistent search response */
+static int
+syncprov_qresp( opcookie *opc, syncops *so, int mode )
+{
+       syncres *sr;
+
+       ldap_pvt_thread_mutex_lock( &so->s_mutex );
+       sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 +
+               opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 );
+       sr->s_next = NULL;
+       sr->s_dn.bv_val = (char *)(sr + 1);
+       sr->s_dn.bv_len = opc->sdn.bv_len;
+       sr->s_mode = mode;
+       sr->s_isreference = opc->sreference;
+       sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val );
+       sr->s_ndn.bv_len = opc->sndn.bv_len;
+       *(sr->s_ndn.bv_val++) = '\0';
+       sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val );
+       sr->s_uuid.bv_len = opc->suuid.bv_len;
+       *(sr->s_uuid.bv_val++) = '\0';
+       sr->s_csn.bv_val = lutil_strcopy( sr->s_uuid.bv_val, opc->suuid.bv_val );
+       sr->s_csn.bv_len = opc->sctxcsn.bv_len;
+       strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val );
+
+       if ( !so->s_res ) {
+               so->s_res = sr;
+       } else {
+               so->s_restail->s_next = sr;
+       }
+       so->s_restail = sr;
+       if ( so->s_flags & PS_IS_DETACHED ) {
+               ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+               if ( !so->s_qtask ) {
+                       so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, 0,
+                               syncprov_qtask, so, "syncprov_qtask",
+                               so->s_op->o_conn->c_peer_name.bv_val );
+               } else {
+                       if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask )) {
+                               ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
+                       }
+               }
+               ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+       }
+       ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+       return LDAP_SUCCESS;
 }
 
 static void
@@ -1019,12 +1030,10 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
                e = op->ora_e;
        }
 
-       if ( saveit ) {
+       if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
                ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
                ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
                opc->sreference = is_entry_referral( e );
-       }
-       if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
                a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID );
                if ( a )
                        ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx );
@@ -1082,15 +1091,15 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
                                /* if found send UPDATE else send ADD */
                                ss->s_inuse++;
                                ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
-                               syncprov_sendresp( op, opc, ss, &e,
-                                       found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD, 1 );
+                               syncprov_qresp( opc, ss,
+                                       found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );
                                ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
                                ss->s_inuse--;
                        }
                } else if ( !saveit && found ) {
                        /* send DELETE */
                        ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
-                       syncprov_sendresp( op, opc, ss, NULL, LDAP_SYNC_DELETE, 1 );
+                       syncprov_qresp( opc, ss, LDAP_SYNC_DELETE );
                        ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
                }
        }
@@ -1444,8 +1453,7 @@ syncprov_op_response( Operation *op, SlapReply *rs )
                                for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
                                        if ( sm->sm_op->s_op->o_abandon )
                                                continue;
-                                       syncprov_sendresp( op, opc, sm->sm_op, NULL,
-                                               LDAP_SYNC_DELETE, 1 );
+                                       syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE );
                                }
                                break;
                        }
@@ -1633,7 +1641,7 @@ syncprov_search_cleanup( Operation *op, SlapReply *rs )
 }
 
 static void
-syncprov_detach_op( Operation *op, syncops *so )
+syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on )
 {
        Operation *op2;
        int i, alen = 0;
@@ -1659,8 +1667,9 @@ syncprov_detach_op( Operation *op, syncops *so )
        *op2->o_hdr = *op->o_hdr;
        op2->o_tag = op->o_tag;
        op2->o_time = op->o_time;
-       op2->o_bd = op->o_bd;
+       op2->o_bd = on->on_info->oi_origdb;
        op2->o_request = op->o_request;
+       op2->o_private = on;
 
        if ( i ) {
                op2->ors_attrs = (AttributeName *)(op2->o_hdr + 1);
@@ -1761,25 +1770,23 @@ syncprov_search_response( Operation *op, SlapReply *rs )
                                0, 1, &cookie, ss->ss_present ?  LDAP_SYNC_REFRESH_PRESENTS :
                                        LDAP_SYNC_REFRESH_DELETES );
                } else {
-                       int locked = 0;
                /* It's RefreshAndPersist, transition to Persist phase */
                        syncprov_sendinfo( op, rs, ( ss->ss_present && rs->sr_nentries ) ?
                                LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
                                &cookie, 1, NULL, 0 );
                        /* Flush any queued persist messages */
                        if ( ss->ss_so->s_res ) {
-                               ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
-                               locked = 1;
                                syncprov_qplay( op, on, ss->ss_so );
                        }
 
+                       /* Detach this Op from frontend control */
+                       ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
+
                        /* Turn off the refreshing flag */
                        ss->ss_so->s_flags ^= PS_IS_REFRESHING;
-                       if ( locked )
-                               ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
 
-                       /* Detach this Op from frontend control */
-                       syncprov_detach_op( op, ss->ss_so );
+                       syncprov_detach_op( op, ss->ss_so, on );
+                       ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
 
                        return LDAP_SUCCESS;
                }