📄 peer_digest.c
字号:
{ DigestFetchState *fetch = data; if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock")) return; if (size >= StoreDigestCBlockSize) { PeerDigest *pd = fetch->pd; HttpReply *rep = fetch->entry->mem_obj->reply; const int seen = fetch->offset + size; assert(pd && rep); if (peerDigestSetCBlock(pd, buf)) { /* XXX: soon we will have variable header size */ fetch->offset += StoreDigestCBlockSize; /* switch to CD buffer and fetch digest guts */ memFree(buf, MEM_4K_BUF); buf = NULL; assert(pd->cd->mask); storeClientCopy(fetch->entry, seen, fetch->offset, pd->cd->mask_size, pd->cd->mask, peerDigestSwapInMask, fetch); } else { peerDigestFetchAbort(fetch, buf, "invalid digest cblock"); } } else { /* need more data, do we have space? */ if (size >= SM_PAGE_SIZE) peerDigestFetchAbort(fetch, buf, "digest cblock too big"); else storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf, peerDigestSwapInCBlock, fetch); }}static voidpeerDigestSwapInMask(void *data, char *buf, ssize_t size){ DigestFetchState *fetch = data; PeerDigest *pd; /* NOTE! buf points to the middle of pd->cd->mask! */ if (peerDigestFetchedEnough(fetch, NULL, size, "peerDigestSwapInMask")) return; pd = fetch->pd; assert(pd->cd && pd->cd->mask); fetch->offset += size; fetch->mask_offset += size; if (fetch->mask_offset >= pd->cd->mask_size) { debug(72, 2) ("peerDigestSwapInMask: Done! Got %d, expected %d\n", fetch->mask_offset, pd->cd->mask_size); assert(fetch->mask_offset == pd->cd->mask_size); assert(peerDigestFetchedEnough(fetch, NULL, 0, "peerDigestSwapInMask")); } else { const size_t buf_sz = pd->cd->mask_size - fetch->mask_offset; assert(buf_sz > 0); storeClientCopy(fetch->entry, fetch->offset, fetch->offset, buf_sz, pd->cd->mask + fetch->mask_offset, peerDigestSwapInMask, fetch); }}static intpeerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name){ PeerDigest *pd = NULL; const char *host = "<unknown>"; /* peer host */ const char *reason = NULL; /* reason for completion */ const char *no_bug = NULL; /* successful completion if set */ const int fcb_valid = cbdataValid(fetch); const int pdcb_valid = fcb_valid && cbdataValid(fetch->pd); const int pcb_valid = pdcb_valid && cbdataValid(fetch->pd->peer); /* test possible exiting conditions (the same for most steps!) * cases marked with '?!' should not happen */ if (!reason) { if (!fcb_valid) reason = "fetch aborted?!"; else if (!(pd = fetch->pd)) reason = "peer digest disappeared?!";#if DONT else if (!cbdataValid(pd)) reason = "invalidated peer digest?!";#endif else host = strBuf(pd->host); } debug(72, 6) ("%s: peer %s, offset: %d size: %d.\n", step_name, host, fcb_valid ? fetch->offset : -1, size); /* continue checking (with pd and host known and valid) */ if (!reason) { if (!cbdataValid(pd->peer)) reason = "peer disappeared"; else if (size < 0) reason = "swap failure"; else if (!fetch->entry) reason = "swap aborted?!"; else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED)) reason = "swap aborted"; } /* continue checking (maybe-successful eof case) */ if (!reason && !size) { if (!pd->cd) reason = "null digest?!"; else if (fetch->mask_offset != pd->cd->mask_size) reason = "premature end of digest?!"; else if (!peerDigestUseful(pd)) reason = "useless digest"; else reason = no_bug = "success"; } /* finish if we have a reason */ if (reason) { const int level = strstr(reason, "?!") ? 1 : 3; debug(72, level) ("%s: peer %s, exiting after '%s'\n", step_name, host, reason); peerDigestReqFinish(fetch, buf, fcb_valid, pdcb_valid, pcb_valid, reason, !no_bug); } else { /* paranoid check */ assert(fcb_valid && pdcb_valid && pcb_valid); } return reason != NULL;}/* call this when all callback data is valid and fetch must be stopped but * no error has occurred (e.g. we received 304 reply and reuse old digest) */static voidpeerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason){ assert(reason); debug(72, 2) ("peerDigestFetchStop: peer %s, reason: %s\n", strBuf(fetch->pd->host), reason); peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 0);}/* call this when all callback data is valid but something bad happened */static voidpeerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason){ assert(reason); debug(72, 2) ("peerDigestFetchAbort: peer %s, reason: %s\n", strBuf(fetch->pd->host), reason); peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 1);}/* complete the digest transfer, update stats, unlock/release everything */static voidpeerDigestReqFinish(DigestFetchState * fetch, char *buf, int fcb_valid, int pdcb_valid, int pcb_valid, const char *reason, int err){ assert(reason); /* must go before peerDigestPDFinish */ if (pdcb_valid) { fetch->pd->flags.requested = 0; fetch->pd->req_result = reason; } /* schedule next check if peer is still out there */ if (pcb_valid) { PeerDigest *pd = fetch->pd; if (err) { pd->times.retry_delay = peerDigestIncDelay(pd); peerDigestSetCheck(pd, pd->times.retry_delay); } else { pd->times.retry_delay = 0; peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry)); } } /* note: order is significant */ if (fcb_valid) peerDigestFetchSetStats(fetch); if (pdcb_valid) peerDigestPDFinish(fetch, pcb_valid, err); if (fcb_valid) peerDigestFetchFinish(fetch, err); if (buf) memFree(buf, MEM_4K_BUF);}/* destroys digest if peer disappeared * must be called only when fetch and pd cbdata are valid */static voidpeerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err){ PeerDigest *pd = fetch->pd; const char *host = strBuf(pd->host); pd->times.received = squid_curtime; pd->times.req_delay = fetch->resp_time; kb_incr(&pd->stats.sent.kbytes, (size_t) fetch->sent.bytes); kb_incr(&pd->stats.recv.kbytes, (size_t) fetch->recv.bytes); pd->stats.sent.msgs += fetch->sent.msg; pd->stats.recv.msgs += fetch->recv.msg; if (err) { debug(72, 1) ("%sdisabling (%s) digest from %s\n", pcb_valid ? "temporary " : "", pd->req_result, host); if (pd->cd) { cacheDigestDestroy(pd->cd); pd->cd = NULL; } pd->flags.usable = 0; if (!pcb_valid) peerDigestNotePeerGone(pd); } else { assert(pcb_valid); pd->flags.usable = 1; /* XXX: ugly condition, but how? */ if (fetch->entry->store_status == STORE_OK) debug(72, 2) ("re-used old digest from %s\n", host); else debug(72, 2) ("received valid digest from %s\n", host); } fetch->pd = NULL; cbdataUnlock(pd);}/* free fetch state structures * must be called only when fetch cbdata is valid */static voidpeerDigestFetchFinish(DigestFetchState * fetch, int err){ assert(fetch->entry && fetch->request); if (fetch->old_entry) { debug(72, 2) ("peerDigestFetchFinish: deleting old entry\n"); storeUnregister(fetch->old_entry, fetch); storeReleaseRequest(fetch->old_entry); storeUnlockObject(fetch->old_entry); fetch->old_entry = NULL; } /* update global stats */ kb_incr(&Counter.cd.kbytes_sent, (size_t) fetch->sent.bytes); kb_incr(&Counter.cd.kbytes_recv, (size_t) fetch->recv.bytes); Counter.cd.msgs_sent += fetch->sent.msg; Counter.cd.msgs_recv += fetch->recv.msg; /* unlock everything */ storeUnregister(fetch->entry, fetch); storeUnlockObject(fetch->entry); requestUnlink(fetch->request); fetch->entry = NULL; fetch->request = NULL; assert(fetch->pd == NULL); cbdataUnlock(fetch); cbdataFree(fetch);}/* calculate fetch stats after completion */static voidpeerDigestFetchSetStats(DigestFetchState * fetch){ MemObject *mem; assert(fetch->entry && fetch->request); mem = fetch->entry->mem_obj; assert(mem); /* XXX: outgoing numbers are not precise */ /* XXX: we must distinguish between 304 hits and misses here */ fetch->sent.bytes = httpRequestPrefixLen(fetch->request); fetch->recv.bytes = fetch->entry->store_status == STORE_PENDING ? mem->inmem_hi : mem->object_sz; fetch->sent.msg = fetch->recv.msg = 1; fetch->expires = fetch->entry->expires; fetch->resp_time = squid_curtime - fetch->start_time; debug(72, 3) ("peerDigestFetchFinish: recv %d bytes in %d secs\n", fetch->recv.bytes, fetch->resp_time); debug(72, 3) ("peerDigestFetchFinish: expires: %d (%+d), lmt: %d (%+d)\n", fetch->expires, fetch->expires - squid_curtime, fetch->entry->lastmod, fetch->entry->lastmod - squid_curtime);}static intpeerDigestSetCBlock(PeerDigest * pd, const char *buf){ StoreDigestCBlock cblock; int freed_size = 0; const char *host = strBuf(pd->host); xmemcpy(&cblock, buf, sizeof(cblock)); /* network -> host conversions */ cblock.ver.current = ntohs(cblock.ver.current); cblock.ver.required = ntohs(cblock.ver.required); cblock.capacity = ntohl(cblock.capacity); cblock.count = ntohl(cblock.count); cblock.del_count = ntohl(cblock.del_count); cblock.mask_size = ntohl(cblock.mask_size); debug(72, 2) ("got digest cblock from %s; ver: %d (req: %d)\n", host, (int) cblock.ver.current, (int) cblock.ver.required); debug(72, 2) ("\t size: %d bytes, e-cnt: %d, e-util: %d%%\n", cblock.mask_size, cblock.count, xpercentInt(cblock.count, cblock.capacity)); /* check version requirements (both ways) */ if (cblock.ver.required > CacheDigestVer.current) { debug(72, 1) ("%s digest requires version %d; have: %d\n", host, cblock.ver.required, CacheDigestVer.current); return 0; } if (cblock.ver.current < CacheDigestVer.required) { debug(72, 1) ("%s digest is version %d; we require: %d\n", host, cblock.ver.current, CacheDigestVer.required); return 0; } /* check consistency */ if (cblock.ver.required > cblock.ver.current || cblock.mask_size <= 0 || cblock.capacity <= 0 || cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) { debug(72, 0) ("%s digest cblock is corrupted.\n", host); return 0; } /* check consistency further */ if (cblock.mask_size != cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)) { debug(72, 0) ("%s digest cblock is corrupted (mask size mismatch: %d ? %d).\n", host, cblock.mask_size, cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)); return 0; } /* there are some things we cannot do yet */ if (cblock.hash_func_count != CacheDigestHashFuncCount) { debug(72, 0) ("%s digest: unsupported #hash functions: %d ? %d.\n", host, cblock.hash_func_count, CacheDigestHashFuncCount); return 0; } /* * no cblock bugs below this point */ /* check size changes */ if (pd->cd && cblock.mask_size != pd->cd->mask_size) { debug(72, 2) ("%s digest changed size: %d -> %d\n", host, cblock.mask_size, pd->cd->mask_size); freed_size = pd->cd->mask_size; cacheDigestDestroy(pd->cd); pd->cd = NULL; } if (!pd->cd) { debug(72, 2) ("creating %s digest; size: %d (%+d) bytes\n", host, cblock.mask_size, (int) (cblock.mask_size - freed_size)); pd->cd = cacheDigestCreate(cblock.capacity, cblock.bits_per_entry); if (cblock.mask_size >= freed_size) kb_incr(&Counter.cd.memory, cblock.mask_size - freed_size); } assert(pd->cd); /* these assignments leave us in an inconsistent state until we finish reading the digest */ pd->cd->count = cblock.count; pd->cd->del_count = cblock.del_count; return 1;}static intpeerDigestUseful(const PeerDigest * pd){ /* TODO: we should calculate the prob of a false hit instead of bit util */ const int bit_util = cacheDigestBitUtil(pd->cd); if (bit_util > 65) { debug(72, 0) ("Warning: %s peer digest has too many bits on (%d%%).\n", strBuf(pd->host), bit_util); return 0; } return 1;}static intsaneDiff(time_t diff){ return abs(diff) > squid_curtime / 2 ? 0 : diff;}voidpeerDigestStatsReport(const PeerDigest * pd, StoreEntry * e){#define f2s(flag) (pd->flags.flag ? "yes" : "no")#define appendTime(tm) storeAppendPrintf(e, "%s\t %10d\t %+d\t %+d\n", \ ""#tm, pd->times.tm, \ saneDiff(pd->times.tm - squid_curtime), \ saneDiff(pd->times.tm - pd->times.initialized)) const char *host = pd ? strBuf(pd->host) : NULL; assert(pd); storeAppendPrintf(e, "\npeer digest from %s\n", host); cacheDigestGuessStatsReport(&pd->stats.guess, e, host); storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n"); appendTime(initialized); appendTime(needed); appendTime(requested); appendTime(received); appendTime(next_check); storeAppendPrintf(e, "peer digest state:\n"); storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n", f2s(needed), f2s(usable), f2s(requested)); storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n", pd->times.retry_delay); storeAppendPrintf(e, "\tlast request response time: %d secs\n", pd->times.req_delay); storeAppendPrintf(e, "\tlast request result: %s\n", pd->req_result ? pd->req_result : "(none)"); storeAppendPrintf(e, "\npeer digest traffic:\n"); storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n", pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb); storeAppendPrintf(e, "\treplies recv: %d, volume: %d KB\n", pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb); storeAppendPrintf(e, "\npeer digest structure:\n"); if (pd->cd) cacheDigestReport(pd->cd, host, e); else storeAppendPrintf(e, "\tno in-memory copy\n");}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -