📄 store_client.c
字号:
sc->flags.disk_io_pending = 0; sc->callback = NULL; callback(sc->callback_data, sc->copy_buf, -1); return; } sc->swapin_fd = fd; storeClientFileRead(sc);}static voidstoreClientFileRead(store_client * sc){ MemObject *mem = sc->entry->mem_obj; assert(sc->callback != NULL);#ifdef OPTIMISTIC_IO sc->flags.disk_io_pending = 1;#endif if (mem->swap_hdr_sz == 0) { file_read(sc->swapin_fd, sc->copy_buf, sc->copy_size, 0, storeClientReadHeader, sc); } else { if (sc->entry->swap_status == SWAPOUT_WRITING) assert(mem->swapout.done_offset > sc->copy_offset + mem->swap_hdr_sz); file_read(sc->swapin_fd, sc->copy_buf, sc->copy_size, sc->copy_offset + mem->swap_hdr_sz, storeClientReadBody, sc); }#ifndef OPTIMISTIC_IO sc->flags.disk_io_pending = 1;#endif}static voidstoreClientReadBody(int fd, const char *buf, int len, int flagnotused, void *data){ store_client *sc = data; MemObject *mem = sc->entry->mem_obj; STCB *callback = sc->callback; assert(sc->flags.disk_io_pending); sc->flags.disk_io_pending = 0; assert(sc->callback != NULL); debug(20, 3) ("storeClientReadBody: FD %d, len %d\n", fd, len); if (sc->copy_offset == 0 && len > 0 && mem->reply->sline.status == 0) httpReplyParse(mem->reply, sc->copy_buf); sc->callback = NULL; callback(sc->callback_data, sc->copy_buf, len);}static voidstoreClientReadHeader(int fd, const char *buf, int len, int flagnotused, void *data){ store_client *sc = data; StoreEntry *e = sc->entry; MemObject *mem = e->mem_obj; STCB *callback = sc->callback; int swap_hdr_sz = 0; size_t body_sz; size_t copy_sz; tlv *tlv_list; assert(sc->flags.disk_io_pending); sc->flags.disk_io_pending = 0; assert(sc->callback != NULL); debug(20, 3) ("storeClientReadHeader: FD %d, len %d\n", fd, len); if (len < 0) { debug(20, 3) ("storeClientReadHeader: FD %d: %s\n", fd, xstrerror()); sc->callback = NULL; callback(sc->callback_data, sc->copy_buf, len); return; } tlv_list = storeSwapMetaUnpack(buf, &swap_hdr_sz); if (tlv_list == NULL) { debug(20, 1) ("storeClientReadHeader: failed to unpack meta data\n"); sc->callback = NULL; callback(sc->callback_data, sc->copy_buf, -1); return; } /* * XXX Here we should check the meta data and make sure we got * the right object. */ storeSwapTLVFree(tlv_list); mem->swap_hdr_sz = swap_hdr_sz; mem->object_sz = e->swap_file_sz - swap_hdr_sz; /* * If our last read got some data the client wants, then give * it to them, otherwise schedule another read. */ body_sz = len - swap_hdr_sz; if (sc->copy_offset < body_sz) { /* * we have (part of) what they want */ copy_sz = XMIN(sc->copy_size, body_sz); debug(20, 3) ("storeClientReadHeader: copying %d bytes of body\n", copy_sz); xmemmove(sc->copy_buf, sc->copy_buf + swap_hdr_sz, copy_sz); if (sc->copy_offset == 0 && len > 0 && mem->reply->sline.status == 0) httpReplyParse(mem->reply, sc->copy_buf); sc->callback = NULL; callback(sc->callback_data, sc->copy_buf, copy_sz); return; } /* * we don't have what the client wants, but at least we now * know the swap header size. */ storeClientFileRead(sc);}intstoreClientCopyPending(StoreEntry * e, void *data){ /* return 1 if there is a callback registered for this client */ store_client *sc = storeClientListSearch(e->mem_obj, data); if (sc == NULL) return 0; if (sc->callback == NULL) return 0; return 1;}intstoreUnregister(StoreEntry * e, void *data){ MemObject *mem = e->mem_obj; store_client *sc; store_client **S; STCB *callback; if (mem == NULL) return 0; debug(20, 3) ("storeUnregister: called for '%s'\n", storeKeyText(e->key)); for (S = &mem->clients; (sc = *S) != NULL; S = &(*S)->next) { if (sc->callback_data == data) break; } if (sc == NULL) return 0; if (sc == mem->clients) { /* * If we are unregistering the _first_ client for this * entry, then we have to reset the client FD to -1. */ mem->fd = -1; } *S = sc->next; mem->nclients--; sc->flags.disk_io_pending = 0; if (e->store_status == STORE_OK && e->swap_status != SWAPOUT_DONE) storeCheckSwapOut(e); if (sc->swapin_fd > -1) { file_close(sc->swapin_fd); store_open_disk_fd--; }#if USE_ASYNC_IO else aioCancel(-1, sc);#endif if ((callback = sc->callback) != NULL) { /* callback with ssize = -1 to indicate unexpected termination */ debug(20, 3) ("storeUnregister: store_client for %s has a callback\n", mem->url); sc->callback = NULL; callback(sc->callback_data, sc->copy_buf, -1); }#if DELAY_POOLS delayUnregisterDelayIdPtr(&sc->delay_id);#endif cbdataFree(sc); assert(e->lock_count > 0); if (mem->nclients == 0) CheckQuickAbort(e); return 1;}off_tstoreLowestMemReaderOffset(const StoreEntry * entry){ const MemObject *mem = entry->mem_obj; off_t lowest = mem->inmem_hi; store_client *sc; store_client *nx = NULL; for (sc = mem->clients; sc; sc = nx) { nx = sc->next; if (sc->callback_data == NULL) /* open slot */ continue; if (sc->type != STORE_MEM_CLIENT) continue; if (sc->copy_offset < lowest) lowest = sc->copy_offset; } return lowest;}/* Call handlers waiting for data to be appended to E. */voidInvokeHandlers(StoreEntry * e){ int i = 0; MemObject *mem = e->mem_obj; store_client *sc; store_client *nx = NULL; assert(mem->clients != NULL || mem->nclients == 0); debug(20, 3) ("InvokeHandlers: %s\n", storeKeyText(e->key)); /* walk the entire list looking for valid callbacks */ for (sc = mem->clients; sc; sc = nx) { nx = sc->next; debug(20, 3) ("InvokeHandlers: checking client #%d\n", i++); if (sc->callback_data == NULL) continue; if (sc->callback == NULL) continue; storeClientCopy2(e, sc); }}intstorePendingNClients(const StoreEntry * e){ MemObject *mem = e->mem_obj; int npend = NULL == mem ? 0 : mem->nclients; debug(20, 3) ("storePendingNClients: returning %d\n", npend); return npend;}/* return 1 if the request should be aborted */static intCheckQuickAbort2(StoreEntry * entry){ int curlen; int minlen; int expectlen; MemObject *mem = entry->mem_obj; assert(mem); debug(20, 3) ("CheckQuickAbort2: entry=%p, mem=%p\n", entry, mem); if (mem->request && !mem->request->flags.cachable) { debug(20, 3) ("CheckQuickAbort2: YES !mem->request->flags.cachable\n"); return 1; } if (EBIT_TEST(entry->flags, KEY_PRIVATE)) { debug(20, 3) ("CheckQuickAbort2: YES KEY_PRIVATE\n"); return 1; } expectlen = mem->reply->content_length + mem->reply->hdr_sz; curlen = (int) mem->inmem_hi; minlen = (int) Config.quickAbort.min << 10; if (minlen < 0) { debug(20, 3) ("CheckQuickAbort2: NO disabled\n"); return 0; } if (curlen > expectlen) { debug(20, 3) ("CheckQuickAbort2: YES bad content length\n"); return 1; } if ((expectlen - curlen) < minlen) { debug(20, 3) ("CheckQuickAbort2: NO only little more left\n"); return 0; } if ((expectlen - curlen) > (Config.quickAbort.max << 10)) { debug(20, 3) ("CheckQuickAbort2: YES too much left to go\n"); return 1; } if (expectlen < 100) { debug(20, 3) ("CheckQuickAbort2: NO avoid FPE\n"); return 0; } if ((curlen / (expectlen / 100)) > Config.quickAbort.pct) { debug(20, 3) ("CheckQuickAbort2: NO past point of no return\n"); return 0; } debug(20, 3) ("CheckQuickAbort2: YES default, returning 1\n"); return 1;}static voidCheckQuickAbort(StoreEntry * entry){ if (entry == NULL) return; if (storePendingNClients(entry) > 0) return; if (entry->store_status != STORE_PENDING) return; if (EBIT_TEST(entry->flags, ENTRY_SPECIAL)) return; if (CheckQuickAbort2(entry) == 0) return; Counter.aborted_requests++; storeAbort(entry);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -