⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peer_digest.c

📁 -
💻 C
📖 第 1 页 / 共 2 页
字号:
{    DigestFetchState *fetch = data;    if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))	return;    if (size >= StoreDigestCBlockSize) {	PeerDigest *pd = fetch->pd;	HttpReply *rep = fetch->entry->mem_obj->reply;	const int seen = fetch->offset + size;	assert(pd && rep);	if (peerDigestSetCBlock(pd, buf)) {	    /* XXX: soon we will have variable header size */	    fetch->offset += StoreDigestCBlockSize;	    /* switch to CD buffer and fetch digest guts */	    memFree(buf, MEM_4K_BUF);	    buf = NULL;	    assert(pd->cd->mask);	    storeClientCopy(fetch->entry,		seen,		fetch->offset,		pd->cd->mask_size,		pd->cd->mask,		peerDigestSwapInMask, fetch);	} else {	    peerDigestFetchAbort(fetch, buf, "invalid digest cblock");	}    } else {	/* need more data, do we have space? */	if (size >= SM_PAGE_SIZE)	    peerDigestFetchAbort(fetch, buf, "digest cblock too big");	else	    storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf,		peerDigestSwapInCBlock, fetch);    }}static voidpeerDigestSwapInMask(void *data, char *buf, ssize_t size){    DigestFetchState *fetch = data;    PeerDigest *pd;    /* NOTE! buf points to the middle of pd->cd->mask! */    if (peerDigestFetchedEnough(fetch, NULL, size, "peerDigestSwapInMask"))	return;    pd = fetch->pd;    assert(pd->cd && pd->cd->mask);    fetch->offset += size;    fetch->mask_offset += size;    if (fetch->mask_offset >= pd->cd->mask_size) {	debug(72, 2) ("peerDigestSwapInMask: Done! Got %d, expected %d\n",	    fetch->mask_offset, pd->cd->mask_size);	assert(fetch->mask_offset == pd->cd->mask_size);	assert(peerDigestFetchedEnough(fetch, NULL, 0, "peerDigestSwapInMask"));    } else {	const size_t buf_sz = pd->cd->mask_size - fetch->mask_offset;	assert(buf_sz > 0);	storeClientCopy(fetch->entry,	    fetch->offset,	    fetch->offset,	    buf_sz,	    pd->cd->mask + fetch->mask_offset,	    peerDigestSwapInMask, fetch);    }}static intpeerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name){    PeerDigest *pd = NULL;    const char *host = "<unknown>";	/* peer host */    const char *reason = NULL;	/* reason for completion */    const char *no_bug = NULL;	/* successful completion if set */    const int fcb_valid = cbdataValid(fetch);    const int pdcb_valid = fcb_valid && cbdataValid(fetch->pd);    const int pcb_valid = pdcb_valid && cbdataValid(fetch->pd->peer);    /* test possible exiting conditions (the same for most steps!)     * cases marked with '?!' should not happen */    if (!reason) {	if (!fcb_valid)	    reason = "fetch aborted?!";	else if (!(pd = fetch->pd))	    reason = "peer digest disappeared?!";#if DONT	else if (!cbdataValid(pd))	    reason = "invalidated peer digest?!";#endif	else	    host = strBuf(pd->host);    }    debug(72, 6) ("%s: peer %s, offset: %d size: %d.\n",	step_name, host, fcb_valid ? fetch->offset : -1, size);    /* continue checking (with pd and host known and valid) */    if (!reason) {	if (!cbdataValid(pd->peer))	    reason = "peer disappeared";	else if (size < 0)	    reason = "swap failure";	else if (!fetch->entry)	    reason = "swap aborted?!";	else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED))	    reason = "swap aborted";    }    /* continue checking (maybe-successful eof case) */    if (!reason && !size) {	if (!pd->cd)	    reason = "null digest?!";	else if (fetch->mask_offset != pd->cd->mask_size)	    reason = "premature end of digest?!";	else if (!peerDigestUseful(pd))	    reason = "useless digest";	else	    reason = no_bug = "success";    }    /* finish if we have a reason */    if (reason) {	const int level = strstr(reason, "?!") ? 1 : 3;	debug(72, level) ("%s: peer %s, exiting after '%s'\n",	    step_name, host, reason);	peerDigestReqFinish(fetch, buf,	    fcb_valid, pdcb_valid, pcb_valid, reason, !no_bug);    } else {	/* paranoid check */	assert(fcb_valid && pdcb_valid && pcb_valid);    }    return reason != NULL;}/* call this when all callback data is valid and fetch must be stopped but * no error has occurred (e.g. we received 304 reply and reuse old digest) */static voidpeerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason){    assert(reason);    debug(72, 2) ("peerDigestFetchStop: peer %s, reason: %s\n",	strBuf(fetch->pd->host), reason);    peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 0);}/* call this when all callback data is valid but something bad happened */static voidpeerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason){    assert(reason);    debug(72, 2) ("peerDigestFetchAbort: peer %s, reason: %s\n",	strBuf(fetch->pd->host), reason);    peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 1);}/* complete the digest transfer, update stats, unlock/release everything */static voidpeerDigestReqFinish(DigestFetchState * fetch, char *buf,    int fcb_valid, int pdcb_valid, int pcb_valid,    const char *reason, int err){    assert(reason);    /* must go before peerDigestPDFinish */    if (pdcb_valid) {	fetch->pd->flags.requested = 0;	fetch->pd->req_result = reason;    }    /* schedule next check if peer is still out there */    if (pcb_valid) {	PeerDigest *pd = fetch->pd;	if (err) {	    pd->times.retry_delay = peerDigestIncDelay(pd);	    peerDigestSetCheck(pd, pd->times.retry_delay);	} else {	    pd->times.retry_delay = 0;	    peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry));	}    }    /* note: order is significant */    if (fcb_valid)	peerDigestFetchSetStats(fetch);    if (pdcb_valid)	peerDigestPDFinish(fetch, pcb_valid, err);    if (fcb_valid)	peerDigestFetchFinish(fetch, err);    if (buf)	memFree(buf, MEM_4K_BUF);}/* destroys digest if peer disappeared * must be called only when fetch and pd cbdata are valid */static voidpeerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err){    PeerDigest *pd = fetch->pd;    const char *host = strBuf(pd->host);    pd->times.received = squid_curtime;    pd->times.req_delay = fetch->resp_time;    kb_incr(&pd->stats.sent.kbytes, (size_t) fetch->sent.bytes);    kb_incr(&pd->stats.recv.kbytes, (size_t) fetch->recv.bytes);    pd->stats.sent.msgs += fetch->sent.msg;    pd->stats.recv.msgs += fetch->recv.msg;    if (err) {	debug(72, 1) ("%sdisabling (%s) digest from %s\n",	    pcb_valid ? "temporary " : "",	    pd->req_result, host);	if (pd->cd) {	    cacheDigestDestroy(pd->cd);	    pd->cd = NULL;	}	pd->flags.usable = 0;	if (!pcb_valid)	    peerDigestNotePeerGone(pd);    } else {	assert(pcb_valid);	pd->flags.usable = 1;	/* XXX: ugly condition, but how? */	if (fetch->entry->store_status == STORE_OK)	    debug(72, 2) ("re-used old digest from %s\n", host);	else	    debug(72, 2) ("received valid digest from %s\n", host);    }    fetch->pd = NULL;    cbdataUnlock(pd);}/* free fetch state structures * must be called only when fetch cbdata is valid */static voidpeerDigestFetchFinish(DigestFetchState * fetch, int err){    assert(fetch->entry && fetch->request);    if (fetch->old_entry) {	debug(72, 2) ("peerDigestFetchFinish: deleting old entry\n");	storeUnregister(fetch->old_entry, fetch);	storeReleaseRequest(fetch->old_entry);	storeUnlockObject(fetch->old_entry);	fetch->old_entry = NULL;    }    /* update global stats */    kb_incr(&Counter.cd.kbytes_sent, (size_t) fetch->sent.bytes);    kb_incr(&Counter.cd.kbytes_recv, (size_t) fetch->recv.bytes);    Counter.cd.msgs_sent += fetch->sent.msg;    Counter.cd.msgs_recv += fetch->recv.msg;    /* unlock everything */    storeUnregister(fetch->entry, fetch);    storeUnlockObject(fetch->entry);    requestUnlink(fetch->request);    fetch->entry = NULL;    fetch->request = NULL;    assert(fetch->pd == NULL);    cbdataUnlock(fetch);    cbdataFree(fetch);}/* calculate fetch stats after completion */static voidpeerDigestFetchSetStats(DigestFetchState * fetch){    MemObject *mem;    assert(fetch->entry && fetch->request);    mem = fetch->entry->mem_obj;    assert(mem);    /* XXX: outgoing numbers are not precise */    /* XXX: we must distinguish between 304 hits and misses here */    fetch->sent.bytes = httpRequestPrefixLen(fetch->request);    fetch->recv.bytes = fetch->entry->store_status == STORE_PENDING ?	mem->inmem_hi : mem->object_sz;    fetch->sent.msg = fetch->recv.msg = 1;    fetch->expires = fetch->entry->expires;    fetch->resp_time = squid_curtime - fetch->start_time;    debug(72, 3) ("peerDigestFetchFinish: recv %d bytes in %d secs\n",	fetch->recv.bytes, fetch->resp_time);    debug(72, 3) ("peerDigestFetchFinish: expires: %d (%+d), lmt: %d (%+d)\n",	fetch->expires, fetch->expires - squid_curtime,	fetch->entry->lastmod, fetch->entry->lastmod - squid_curtime);}static intpeerDigestSetCBlock(PeerDigest * pd, const char *buf){    StoreDigestCBlock cblock;    int freed_size = 0;    const char *host = strBuf(pd->host);    xmemcpy(&cblock, buf, sizeof(cblock));    /* network -> host conversions */    cblock.ver.current = ntohs(cblock.ver.current);    cblock.ver.required = ntohs(cblock.ver.required);    cblock.capacity = ntohl(cblock.capacity);    cblock.count = ntohl(cblock.count);    cblock.del_count = ntohl(cblock.del_count);    cblock.mask_size = ntohl(cblock.mask_size);    debug(72, 2) ("got digest cblock from %s; ver: %d (req: %d)\n",	host, (int) cblock.ver.current, (int) cblock.ver.required);    debug(72, 2) ("\t size: %d bytes, e-cnt: %d, e-util: %d%%\n",	cblock.mask_size, cblock.count,	xpercentInt(cblock.count, cblock.capacity));    /* check version requirements (both ways) */    if (cblock.ver.required > CacheDigestVer.current) {	debug(72, 1) ("%s digest requires version %d; have: %d\n",	    host, cblock.ver.required, CacheDigestVer.current);	return 0;    }    if (cblock.ver.current < CacheDigestVer.required) {	debug(72, 1) ("%s digest is version %d; we require: %d\n",	    host, cblock.ver.current, CacheDigestVer.required);	return 0;    }    /* check consistency */    if (cblock.ver.required > cblock.ver.current ||	cblock.mask_size <= 0 || cblock.capacity <= 0 ||	cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) {	debug(72, 0) ("%s digest cblock is corrupted.\n", host);	return 0;    }    /* check consistency further */    if (cblock.mask_size != cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)) {	debug(72, 0) ("%s digest cblock is corrupted (mask size mismatch: %d ? %d).\n",	    host, cblock.mask_size, cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry));	return 0;    }    /* there are some things we cannot do yet */    if (cblock.hash_func_count != CacheDigestHashFuncCount) {	debug(72, 0) ("%s digest: unsupported #hash functions: %d ? %d.\n",	    host, cblock.hash_func_count, CacheDigestHashFuncCount);	return 0;    }    /*     * no cblock bugs below this point     */    /* check size changes */    if (pd->cd && cblock.mask_size != pd->cd->mask_size) {	debug(72, 2) ("%s digest changed size: %d -> %d\n",	    host, cblock.mask_size, pd->cd->mask_size);	freed_size = pd->cd->mask_size;	cacheDigestDestroy(pd->cd);	pd->cd = NULL;    }    if (!pd->cd) {	debug(72, 2) ("creating %s digest; size: %d (%+d) bytes\n",	    host, cblock.mask_size, (int) (cblock.mask_size - freed_size));	pd->cd = cacheDigestCreate(cblock.capacity, cblock.bits_per_entry);	if (cblock.mask_size >= freed_size)	    kb_incr(&Counter.cd.memory, cblock.mask_size - freed_size);    }    assert(pd->cd);    /* these assignments leave us in an inconsistent state until we finish reading the digest */    pd->cd->count = cblock.count;    pd->cd->del_count = cblock.del_count;    return 1;}static intpeerDigestUseful(const PeerDigest * pd){    /* TODO: we should calculate the prob of a false hit instead of bit util */    const int bit_util = cacheDigestBitUtil(pd->cd);    if (bit_util > 65) {	debug(72, 0) ("Warning: %s peer digest has too many bits on (%d%%).\n",	    strBuf(pd->host), bit_util);	return 0;    }    return 1;}static intsaneDiff(time_t diff){    return abs(diff) > squid_curtime / 2 ? 0 : diff;}voidpeerDigestStatsReport(const PeerDigest * pd, StoreEntry * e){#define f2s(flag) (pd->flags.flag ? "yes" : "no")#define appendTime(tm) storeAppendPrintf(e, "%s\t %10d\t %+d\t %+d\n", \    ""#tm, pd->times.tm, \    saneDiff(pd->times.tm - squid_curtime), \    saneDiff(pd->times.tm - pd->times.initialized))    const char *host = pd ? strBuf(pd->host) : NULL;    assert(pd);    storeAppendPrintf(e, "\npeer digest from %s\n", host);    cacheDigestGuessStatsReport(&pd->stats.guess, e, host);    storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n");    appendTime(initialized);    appendTime(needed);    appendTime(requested);    appendTime(received);    appendTime(next_check);    storeAppendPrintf(e, "peer digest state:\n");    storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n",	f2s(needed), f2s(usable), f2s(requested));    storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n",	pd->times.retry_delay);    storeAppendPrintf(e, "\tlast request response time: %d secs\n",	pd->times.req_delay);    storeAppendPrintf(e, "\tlast request result: %s\n",	pd->req_result ? pd->req_result : "(none)");    storeAppendPrintf(e, "\npeer digest traffic:\n");    storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n",	pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb);    storeAppendPrintf(e, "\treplies recv:  %d, volume: %d KB\n",	pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb);    storeAppendPrintf(e, "\npeer digest structure:\n");    if (pd->cd)	cacheDigestReport(pd->cd, host, e);    else	storeAppendPrintf(e, "\tno in-memory copy\n");}#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -