⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pump.c

📁 -
💻 C
字号:
/* * $Id: pump.c,v 1.71.2.1 1999/02/19 20:18:13 wessels Exp $ * * DEBUG: section 61    PUMP handler * AUTHOR: Kostas Anagnostakis * * SQUID Internet Object Cache  http://squid.nlanr.net/Squid/ * ---------------------------------------------------------- * *  Squid is the result of efforts by numerous individuals from the *  Internet community.  Development is led by Duane Wessels of the *  National Laboratory for Applied Network Research and funded by the *  National Science Foundation.  Squid is Copyrighted (C) 1998 by *  Duane Wessels and the University of California San Diego.  Please *  see the COPYRIGHT file for full details.  Squid incorporates *  software developed and/or copyrighted by other sources.  Please see *  the CREDITS file for full details. * *  This program is free software; you can redistribute it and/or modify *  it under the terms of the GNU General Public License as published by *  the Free Software Foundation; either version 2 of the License, or *  (at your option) any later version. *   *  This program is distributed in the hope that it will be useful, *  but WITHOUT ANY WARRANTY; without even the implied warranty of *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *  GNU General Public License for more details. *   *  You should have received a copy of the GNU General Public License *  along with this program; if not, write to the Free Software *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */#include "squid.h"#define PUMP_MAXBUFFER 2*SQUID_UDP_SO_SNDBUFstruct _PumpStateData {    FwdState *fwd;    request_t *req;    int c_fd;			/* client fd */    int s_fd;			/* server end */    int rcvd;			/* bytes received from client */    int sent;			/* bytes sent to server */    int cont_len;		/* Content-Length header */    StoreEntry *request_entry;	/* the request entry */    StoreEntry *reply_entry;	/* the reply entry */    CWCB *callback;		/* what to do when we finish sending */    void *cbdata;		/* callback data passed to callback func */    struct {	int closing:1;    } flags;    struct _PumpStateData *next;};#define PUMP_FLAG_CLOSING	0x01typedef struct _PumpStateData PumpStateData;static PumpStateData *pump_head = NULL;static PF pumpReadFromClient;static STCB pumpServerCopy;static CWCB pumpServerCopyComplete;static PF pumpFree;static PF pumpTimeout;static PF pumpServerClosed;static DEFER pumpReadDefer;static void pumpClose(void *data);voidpumpInit(int fd, request_t * r, char *uri){    request_flags flags;    LOCAL_ARRAY(char, new_key, MAX_URL + 8);    int clen = 0;    PumpStateData *p = xcalloc(1, sizeof(PumpStateData));    debug(61, 3) ("pumpInit: FD %d, uri=%s\n", fd, uri);    /*     * create a StoreEntry which will buffer the data      * to be pumped     */    assert(fd > -1);    assert(uri != NULL);    assert(r != NULL);    clen = httpHeaderGetInt(&r->header, HDR_CONTENT_LENGTH);    /* we shouldn't have gotten this far if content-length is invalid */    assert(clen >= 0);    debug(61, 4) ("pumpInit: Content-Length=%d.\n", clen);    flags = null_request_flags;    flags.nocache = 1;    snprintf(new_key, MAX_URL + 5, "%s|Pump", uri);    p->request_entry = storeCreateEntry(new_key, new_key, flags, r->method);    storeClientListAdd(p->request_entry, p);#if DELAY_POOLS    delaySetStoreClient(p->request_entry, p, delayClient(r));#endif    /*     * initialize data structure     */    p->c_fd = fd;    p->s_fd = -1;    p->cont_len = clen;    p->req = requestLink(r);    p->callback = NULL;    p->cbdata = NULL;    p->next = pump_head;    pump_head = p;    cbdataAdd(p, cbdataXfree, 0);    comm_add_close_handler(p->c_fd, pumpFree, p);    commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);    debug(61, 4) ("pumpInit: FD %d, Created %p\n", fd, p);}voidpumpStart(int s_fd, FwdState * fwd, CWCB * callback, void *cbdata){    PumpStateData *p = NULL;    request_t *r = fwd->request;    size_t copy_sz;    debug(61, 3) ("pumpStart: FD %d, key %s\n",	s_fd, storeKeyText(fwd->entry->key));    /*     * find state data generated by pumpInit in linked list     */    for (p = pump_head; p && p->req != r; p = p->next);    assert(p != NULL);    assert(p->request_entry);    assert(p->c_fd > -1);    assert(r == p->req);    /*     * fill in the rest of data needed by the pump     */    p->fwd = fwd;    p->s_fd = s_fd;    p->reply_entry = fwd->entry;    p->callback = callback;    p->cbdata = cbdata;    cbdataLock(p->cbdata);    storeLockObject(p->reply_entry);    comm_add_close_handler(p->s_fd, pumpServerClosed, p);    /*     * see if part of the body is in the request     */    if (p->rcvd < p->cont_len && r->body_sz > 0) {	assert(p->request_entry->store_status == STORE_PENDING);	assert(r->body != NULL);	assert(r->body_sz <= p->cont_len);	copy_sz = XMIN(r->body_sz, p->cont_len);	debug(61, 3) ("pumpStart: Appending %d bytes from r->body\n", copy_sz);	storeAppend(p->request_entry, r->body, copy_sz);	p->rcvd = copy_sz;    }    /*     * Do we need to read more data from the client?     */    if (p->rcvd < p->cont_len) {	assert(p->request_entry->store_status == STORE_PENDING);	commSetSelect(p->c_fd, COMM_SELECT_READ, pumpReadFromClient, p, 0);	commSetTimeout(p->c_fd, Config.Timeout.read, pumpTimeout, p);	commSetDefer(p->c_fd, pumpReadDefer, p);    }    p->sent = 0;    if (p->sent == p->cont_len) {	pumpServerCopyComplete(p->s_fd, NULL, 0, DISK_OK, p);    } else {	storeClientCopy(p->request_entry, p->sent, p->sent, 4096,	    memAllocate(MEM_4K_BUF),	    pumpServerCopy, p);    }}static voidpumpServerCopy(void *data, char *buf, ssize_t size){    PumpStateData *p = data;    debug(61, 5) ("pumpServerCopy: called with size=%d\n", size);    if (size < 0) {	debug(61, 5) ("pumpServerCopy: freeing and returning\n");	memFree(buf, MEM_4K_BUF);	return;    }    if (size == 0) {	debug(61, 5) ("pumpServerCopy: done, finishing\n", size);	pumpServerCopyComplete(p->s_fd, NULL, 0, DISK_OK, p);	memFree(buf, MEM_4K_BUF);	return;    }    debug(61, 5) ("pumpServerCopy: to FD %d, %d bytes\n", p->s_fd, size);    comm_write(p->s_fd, buf, size, pumpServerCopyComplete, p, memFree4K);}static voidpumpServerCopyComplete(int fd, char *bufnotused, size_t size, int errflag, void *data){    PumpStateData *p = data;    int sfd;    debug(61, 5) ("pumpServerCopyComplete: called with size=%d (%d,%d)\n",	size, p->sent + size, p->cont_len);    if (errflag == COMM_ERR_CLOSING)	return;    if (errflag != 0) {	debug(61, 5) ("pumpServerCopyComplete: aborted, errflag %d\n", errflag);	pumpClose(p);	return;    }    if (EBIT_TEST(p->request_entry->flags, ENTRY_ABORTED)) {	debug(61, 5) ("pumpServerCopyComplete: ENTRY_ABORTED\n");	pumpClose(p);	return;    }    p->sent += size;    assert(p->sent <= p->cont_len);    if (p->sent < p->cont_len) {	storeClientCopy(p->request_entry, p->sent, p->sent, 4096,	    memAllocate(MEM_4K_BUF),	    pumpServerCopy, p);	return;    }    debug(61, 5) ("pumpServerCopyComplete: Done!\n", size);    /*     * we don't care what happens on the server side now     */    sfd = p->s_fd;    comm_remove_close_handler(p->s_fd, pumpServerClosed, p);    p->s_fd = -1;    if (cbdataValid(p->cbdata))	p->callback(sfd, NULL, p->sent, 0, p->cbdata);    cbdataUnlock(p->cbdata);    storeUnlockObject(p->reply_entry);    p->reply_entry = NULL;}static voidpumpReadFromClient(int fd, void *data){    PumpStateData *p = data;    StoreEntry *req = p->request_entry;    LOCAL_ARRAY(char, buf, SQUID_TCP_SO_RCVBUF);    int bytes_to_read = XMIN(p->cont_len - p->rcvd, SQUID_TCP_SO_RCVBUF);    int len = 0;    errno = 0;    Counter.syscalls.sock.reads++;    len = read(fd, buf, bytes_to_read);    fd_bytes(fd, len, FD_READ);    debug(61, 5) ("pumpReadFromClient: FD %d: len %d.\n", fd, len);    if (len > 0) {	(void) 0;		/* continue */    } else if (len < 0) {	debug(61, 2) ("pumpReadFromClient: FD %d: read failure: %s.\n",	    fd, xstrerror());	if (ignoreErrno(errno)) {	    debug(61, 5) ("pumpReadFromClient: FD %d: len %d and ignore!\n",		fd, len);	    commSetSelect(fd,		COMM_SELECT_READ,		pumpReadFromClient,		p,		Config.Timeout.read);	} else {	    debug(61, 2) ("pumpReadFromClient: aborted.\n");	    pumpClose(p);	}	return;    } else if (req->mem_obj->inmem_hi == 0) {	debug(61, 2) ("pumpReadFromClient: FD %d: failed.\n", fd);	pumpClose(p);	return;    } else if (p->rcvd < p->cont_len) {	debug(61, 4) ("pumpReadFromClient: FD %d, incomplete request\n", fd);	pumpClose(p);	return;    }    if (len > 0) {	int delta = p->rcvd + len - p->cont_len;	if (delta > 0 && p->req->flags.proxy_keepalive) {	    debug(61, delta == 2 ? 3 : 1) ("pumpReadFromClient: Warning: read %d bytes past content-length, truncating\n", delta);	    len = p->cont_len - p->rcvd;	}	storeAppend(req, buf, len);	p->rcvd += len;    }    if (p->rcvd < p->cont_len) {	/* We need more data */	commSetSelect(fd, COMM_SELECT_READ, pumpReadFromClient,	    p, Config.Timeout.read);	return;    }    /* all done! */    if (p->req->flags.proxy_keepalive)	assert(p->rcvd == p->cont_len);    debug(61, 2) ("pumpReadFromClient: finished!\n");    storeComplete(req);    commSetDefer(p->c_fd, NULL, NULL);    commSetTimeout(p->c_fd, -1, NULL, NULL);}static intpumpReadDefer(int fd, void *data){    PumpStateData *p = data;    assert(p->rcvd >= p->sent);    if ((p->rcvd - p->sent) < PUMP_MAXBUFFER)	return 0;    debug(61, 5) ("pumpReadDefer: deferring, rcvd=%d, sent=%d\n",	p->rcvd, p->sent);    return 1;}static voidpumpClose(void *data){    PumpStateData *p = data;    StoreEntry *req = p->request_entry;    StoreEntry *rep = p->reply_entry;    cbdataLock(p);    debug(61, 3) ("pumpClose: %p Server FD %d, Client FD %d\n",	p, p->s_fd, p->c_fd);    /* double-call detection */    assert(!p->flags.closing);    p->flags.closing = 1;    if (req != NULL && req->store_status == STORE_PENDING) {	storeUnregister(req, p);    }    if (rep != NULL && rep->store_status == STORE_PENDING) {	ErrorState *err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR);	fwdFail(p->fwd, err);    }    if (p->s_fd > -1) {	comm_close(p->s_fd);	p->s_fd = -1;    }    if (p->c_fd > -1) {	comm_close(p->c_fd);    }    /* This tests that pumpFree() got called somewhere */    assert(0 == cbdataValid(p));    cbdataUnlock(p);}static voidpumpFree(int fd, void *data){    PumpStateData *p = NULL;    PumpStateData *q = NULL;    StoreEntry *req;    StoreEntry *rep;    debug(61, 3) ("pumpFree: FD %d, releasing %p!\n", fd, data);    for (p = pump_head; p && p != data; q = p, p = p->next);    if (p == NULL) {	debug(61, 1) ("pumpFree: p=%p not found?\n", p);	return;    }    if (q)	q->next = p->next;    else	pump_head = p->next;    assert(fd == p->c_fd);    p->c_fd = -1;    req = p->request_entry;    rep = p->reply_entry;    if (req != NULL) {	storeUnregister(req, p);	storeUnlockObject(req);	p->request_entry = NULL;    }    if (rep != NULL) {	debug(61, 3) ("pumpFree: did the server-side FD (%d) get closed?\n", p->s_fd);	storeUnlockObject(rep);	p->reply_entry = NULL;    }    requestUnlink(p->req);    if (p->s_fd > -1) {	assert(!fd_table[p->s_fd].flags.open);	p->s_fd = -1;    }    cbdataFree(p);}static voidpumpTimeout(int fd, void *data){    PumpStateData *p = data;    debug(61, 3) ("pumpTimeout: FD %d\n", p->c_fd);    pumpClose(p);}/* *This is called only if the client connect closes unexpectedly */static voidpumpServerClosed(int fd, void *data){    PumpStateData *p = data;    debug(61, 3) ("pumpServerClosed: FD %d\n", fd);    /*     * we have been called from comm_close for the server side, so     * just need to clean up the client side     */    assert(p->s_fd == fd);    p->s_fd = -1;    if (p->flags.closing)	return;    if (p->c_fd > -1)	comm_close(p->c_fd);}/* * This function returns true for the request methods handled * by this module */intpumpMethod(method_t method){    switch (method) {    case METHOD_POST:    case METHOD_PUT:	return 1;	break;    default:	return 0;	break;    }    /* NOTREACHED */}/* * This function returns True if we can submit this request again. * The request may have been pipelined, but the connection got * closed before we got a reply.  If we still have the whole * request in memory then we can send it again.  If we want to * be able to restart very large requests, then we'll have to * swap them out to disk. */intpumpRestart(request_t * r){    PumpStateData *p;    MemObject *mem;    for (p = pump_head; p && p->req != r; p = p->next);    if (p == NULL) {	debug(61, 3) ("pumpRestart: NO: Can't find pumpState!\n");	return 0;    }    mem = p->request_entry->mem_obj;    if (mem == NULL) {	debug(61, 3) ("pumpRestart: NO: request_entry->mem_obj == NULL!\n");	return 0;    }    if (mem->inmem_lo > 0) {	debug(61, 3) ("pumpRestart: NO: mem->inmem_lo == %d\n",	    (int) mem->inmem_lo);	return 0;    }    debug(61, 3) ("pumpRestart: YES!\n");    return 1;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -