📄 syncprov.c
字号:
typedef enum find_csn_t { FIND_MAXCSN = 1, FIND_CSN = 2, FIND_PRESENT = 3} find_csn_t;static intfindmax_cb( Operation *op, SlapReply *rs ){ if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { struct berval *maxcsn = op->o_callback->sc_private; Attribute *a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN ); if ( a && ber_bvcmp( &a->a_vals[0], maxcsn ) > 0 ) { maxcsn->bv_len = a->a_vals[0].bv_len; strcpy( maxcsn->bv_val, a->a_vals[0].bv_val ); } } return LDAP_SUCCESS;}static intfindcsn_cb( Operation *op, SlapReply *rs ){ slap_callback *sc = op->o_callback; /* We just want to know that at least one exists, so it's OK if * we exceed the unchecked limit. */ if ( rs->sr_err == LDAP_ADMINLIMIT_EXCEEDED || (rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS )) { sc->sc_private = (void *)1; } return LDAP_SUCCESS;}/* Build a list of entryUUIDs for sending in a SyncID set */#define UUID_LEN 16typedef struct fpres_cookie { int num; BerVarray uuids; char *last;} fpres_cookie;static intfindpres_cb( Operation *op, SlapReply *rs ){ slap_callback *sc = op->o_callback; fpres_cookie *pc = sc->sc_private; Attribute *a; int ret = SLAP_CB_CONTINUE; switch ( rs->sr_type ) { case REP_SEARCH: a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID ); if ( a ) { pc->uuids[pc->num].bv_val = pc->last; AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val, pc->uuids[pc->num].bv_len ); pc->num++; pc->last = pc->uuids[pc->num].bv_val; pc->uuids[pc->num].bv_val = NULL; } ret = LDAP_SUCCESS; if ( pc->num != SLAP_SYNCUUID_SET_SIZE ) break; /* FALLTHRU */ case REP_RESULT: ret = rs->sr_err; if ( pc->num ) { ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, 0, pc->uuids, 0 ); pc->uuids[pc->num].bv_val = pc->last; pc->num = 0; pc->last = pc->uuids[0].bv_val; } break; default: break; } return ret;}static intsyncprov_findcsn( Operation *op, find_csn_t mode ){ slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; syncprov_info_t *si = on->on_bi.bi_private; slap_callback cb = {0}; Operation fop; SlapReply frs = { REP_RESULT }; char buf[LDAP_LUTIL_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")]; char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; struct berval maxcsn; Filter cf;#ifdef LDAP_COMP_MATCH AttributeAssertion eq = { NULL, BER_BVNULL, NULL };#else AttributeAssertion eq = { NULL, BER_BVNULL };#endif fpres_cookie pcookie; sync_control *srs = NULL; struct slap_limits_set fc_limits; int i, rc = LDAP_SUCCESS, findcsn_retry = 1; if ( mode != FIND_MAXCSN ) { srs = op->o_controls[slap_cids.sc_LDAPsync]; if ( srs->sr_state.ctxcsn.bv_len >= LDAP_LUTIL_CSNSTR_BUFSIZE ) { return LDAP_OTHER; } } fop = *op; fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync_mode */ /* We want pure entries, not referrals */ fop.o_managedsait = SLAP_CONTROL_CRITICAL; cf.f_ava = &eq; cf.f_av_desc = slap_schema.si_ad_entryCSN; cf.f_next = NULL; fop.o_callback = &cb; fop.ors_limit = NULL; fop.ors_tlimit = SLAP_NO_LIMIT; fop.ors_filter = &cf; fop.ors_filterstr.bv_val = buf;again: switch( mode ) { case FIND_MAXCSN: cf.f_choice = LDAP_FILTER_GE; cf.f_av_value = si->si_ctxcsn; fop.ors_filterstr.bv_len = sprintf( buf, "(entryCSN>=%s)", cf.f_av_value.bv_val ); fop.ors_attrsonly = 0; fop.ors_attrs = csn_anlist; fop.ors_slimit = SLAP_NO_LIMIT; cb.sc_private = &maxcsn; cb.sc_response = findmax_cb; strcpy( cbuf, si->si_ctxcsn.bv_val ); maxcsn.bv_val = cbuf; maxcsn.bv_len = si->si_ctxcsn.bv_len; break; case FIND_CSN: cf.f_av_value = srs->sr_state.ctxcsn; /* Look for exact match the first time */ if ( findcsn_retry ) { cf.f_choice = LDAP_FILTER_EQUALITY; fop.ors_filterstr.bv_len = sprintf( buf, "(entryCSN=%s)", cf.f_av_value.bv_val ); /* On retry, look for <= */ } else { cf.f_choice = LDAP_FILTER_LE; fop.ors_limit = &fc_limits; fc_limits.lms_s_unchecked = 1; fop.ors_filterstr.bv_len = sprintf( buf, "(entryCSN<=%s)", cf.f_av_value.bv_val ); } fop.ors_attrsonly = 1; fop.ors_attrs = slap_anlist_no_attrs; fop.ors_slimit = 1; cb.sc_private = NULL; cb.sc_response = findcsn_cb; break; case FIND_PRESENT: fop.ors_filter = op->ors_filter; fop.ors_filterstr = op->ors_filterstr; fop.ors_attrsonly = 0; fop.ors_attrs = uuid_anlist; fop.ors_slimit = SLAP_NO_LIMIT; cb.sc_private = &pcookie; cb.sc_response = findpres_cb; pcookie.num = 0; /* preallocate storage for a full set */ pcookie.uuids = op->o_tmpalloc( (SLAP_SYNCUUID_SET_SIZE+1) * sizeof(struct berval) + SLAP_SYNCUUID_SET_SIZE * UUID_LEN, op->o_tmpmemctx ); pcookie.last = (char *)(pcookie.uuids + SLAP_SYNCUUID_SET_SIZE+1); pcookie.uuids[0].bv_val = pcookie.last; pcookie.uuids[0].bv_len = UUID_LEN; for (i=1; i<SLAP_SYNCUUID_SET_SIZE; i++) { pcookie.uuids[i].bv_val = pcookie.uuids[i-1].bv_val + UUID_LEN; pcookie.uuids[i].bv_len = UUID_LEN; } break; } fop.o_bd->bd_info = on->on_info->oi_orig; fop.o_bd->be_search( &fop, &frs ); fop.o_bd->bd_info = (BackendInfo *)on; switch( mode ) { case FIND_MAXCSN: strcpy( si->si_ctxcsnbuf, maxcsn.bv_val ); si->si_ctxcsn.bv_len = maxcsn.bv_len; break; case FIND_CSN: /* If matching CSN was not found, invalidate the context. */ if ( !cb.sc_private ) { /* If we didn't find an exact match, then try for <= */ if ( findcsn_retry ) { findcsn_retry = 0; goto again; } rc = LDAP_NO_SUCH_OBJECT; } break; case FIND_PRESENT: op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx ); break; } return rc;}static voidsyncprov_free_syncop( syncops *so ){ syncres *sr, *srnext; GroupAssertion *ga, *gnext; ldap_pvt_thread_mutex_lock( &so->s_mutex ); if ( --so->s_inuse > 0 ) { ldap_pvt_thread_mutex_unlock( &so->s_mutex ); return; } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); if ( so->s_flags & PS_IS_DETACHED ) { filter_free( so->s_op->ors_filter ); for ( ga = so->s_op->o_groups; ga; ga=gnext ) { gnext = ga->ga_next; ch_free( ga ); } ch_free( so->s_op ); } ch_free( so->s_base.bv_val ); for ( sr=so->s_res; sr; sr=srnext ) { srnext = sr->s_next; ch_free( sr ); } ldap_pvt_thread_mutex_destroy( &so->s_mutex ); ch_free( so );}/* Send a persistent search response */static intsyncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode ){ slap_overinst *on = opc->son; SlapReply rs = { REP_SEARCH }; LDAPControl *ctrls[2]; struct berval cookie; Entry e_uuid = {0}; Attribute a_uuid = {0}; if ( so->s_op->o_abandon ) return SLAPD_ABANDON; ctrls[1] = NULL; slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, so->s_rid ); e_uuid.e_attrs = &a_uuid; a_uuid.a_desc = slap_schema.si_ad_entryUUID; a_uuid.a_nvals = &opc->suuid; rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid, mode, ctrls, 0, 1, &cookie ); op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); rs.sr_ctrls = ctrls; op->o_bd->bd_info = (BackendInfo *)on->on_info; switch( mode ) { case LDAP_SYNC_ADD: rs.sr_entry = *e; if ( rs.sr_entry->e_private ) rs.sr_flags = REP_ENTRY_MUSTRELEASE; if ( opc->sreference ) { rs.sr_ref = get_entry_referrals( op, rs.sr_entry ); rs.sr_err = send_search_reference( op, &rs ); ber_bvarray_free( rs.sr_ref ); if ( !rs.sr_entry ) *e = NULL; break; } /* fallthru */ case LDAP_SYNC_MODIFY: rs.sr_entry = *e; if ( rs.sr_entry->e_private ) rs.sr_flags = REP_ENTRY_MUSTRELEASE; rs.sr_attrs = op->ors_attrs; rs.sr_err = send_search_entry( op, &rs ); if ( !rs.sr_entry ) *e = NULL; break; case LDAP_SYNC_DELETE: e_uuid.e_attrs = NULL; e_uuid.e_name = opc->sdn; e_uuid.e_nname = opc->sndn; rs.sr_entry = &e_uuid; if ( opc->sreference ) { struct berval bv = BER_BVNULL; rs.sr_ref = &bv; rs.sr_err = send_search_reference( op, &rs ); } else { rs.sr_err = send_search_entry( op, &rs ); } break; default: assert(0); } /* In case someone else freed it already? */ if ( rs.sr_ctrls ) { op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx ); rs.sr_ctrls = NULL; } return rs.sr_err;}/* Play back queued responses */static intsyncprov_qplay( Operation *op, slap_overinst *on, syncops *so ){ syncres *sr; Entry *e; opcookie opc; int rc = 0; opc.son = on; op->o_bd->bd_info = (BackendInfo *)on->on_info; for (;;) { ldap_pvt_thread_mutex_lock( &so->s_mutex ); sr = so->s_res; if ( sr ) so->s_res = sr->s_next; if ( !so->s_res ) so->s_restail = NULL; ldap_pvt_thread_mutex_unlock( &so->s_mutex ); if ( !sr || so->s_op->o_abandon ) break; opc.sdn = sr->s_dn; opc.sndn = sr->s_ndn; opc.suuid = sr->s_uuid; opc.sctxcsn = sr->s_csn; opc.sreference = sr->s_isreference; e = NULL; if ( sr->s_mode != LDAP_SYNC_DELETE ) { rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e ); if ( rc ) { Debug( LDAP_DEBUG_SYNC, "syncprov_qplay: failed to get %s, " "error (%d), ignoring...\n", opc.sndn.bv_val, rc, 0 ); ch_free( sr ); rc = 0; continue; } } rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode ); if ( e ) { be_entry_release_rw( op, e, 0 ); } ch_free( sr ); if ( rc ) break; } op->o_bd->bd_info = (BackendInfo *)on; return rc;}/* runqueue task for playing back queued responses */static void *syncprov_qtask( void *ctx, void *arg ){ struct re_s *rtask = arg; syncops *so = rtask->arg; slap_overinst *on = so->s_op->o_private; OperationBuffer opbuf; Operation *op; BackendDB be; int rc; op = (Operation *) &opbuf; *op = *so->s_op; op->o_hdr = (Opheader *)(op+1); op->o_controls = (void **)(op->o_hdr+1); memset( op->o_controls, 0, SLAP_MAX_CIDS * sizeof(void *)); *op->o_hdr = *so->s_op->o_hdr; op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx); op->o_tmpmfuncs = &slap_sl_mfuncs; op->o_threadctx = ctx; /* syncprov_qplay expects a fake db */ be = *so->s_op->o_bd; be.be_flags |= SLAP_DBFLAG_OVERLAY; op->o_bd = &be; op->o_private = NULL; op->o_callback = NULL; rc = syncprov_qplay( op, on, so ); /* decrement use count... */ syncprov_free_syncop( so ); /* wait until we get explicitly scheduled again */ ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); ldap_pvt_runqueue_stoptask( &slapd_rq, rtask ); if ( rc == 0 ) { ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 ); } else { /* bail out on any error */ ldap_pvt_runqueue_remove( &slapd_rq, rtask ); } ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); return NULL;}/* Start the task to play back queued psearch responses */static voidsyncprov_qstart( syncops *so ){ int wake=0; ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); if ( !so->s_qtask ) { so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL, syncprov_qtask, so, "syncprov_qtask", so->s_op->o_conn->c_peer_name.bv_val ); ++so->s_inuse; wake = 1; } else { if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) && !so->s_qtask->next_sched.tv_sec ) { so->s_qtask->interval.tv_sec = 0; ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 ); so->s_qtask->interval.tv_sec = RUNQ_INTERVAL; ++so->s_inuse; wake = 1; } } ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); if ( wake ) slap_wake_listener();}/* Queue a persistent search response */static intsyncprov_qresp( opcookie *opc, syncops *so, int mode ){ syncres *sr; sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 + opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 ); sr->s_next = NULL; sr->s_dn.bv_val = (char *)(sr + 1); sr->s_dn.bv_len = opc->sdn.bv_len; sr->s_mode = mode; sr->s_isreference = opc->sreference; sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val ) + 1; sr->s_ndn.bv_len = opc->sndn.bv_len; sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val ) + 1; sr->s_uuid.bv_len = opc->suuid.bv_len; AC_MEMCPY( sr->s_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); sr->s_csn.bv_val = sr->s_uuid.bv_val + sr->s_uuid.bv_len + 1; sr->s_csn.bv_len = opc->sctxcsn.bv_len; strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val ); ldap_pvt_thread_mutex_lock( &so->s_mutex );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -