⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 aiops_win32.c

📁 一个功能非常全面的代理服务器源代码程序,
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * $Id: aiops_win32.c,v 1.3 2006/09/23 10:16:40 serassio Exp $ * * DEBUG: section 43    Windows AIOPS * AUTHOR: Stewart Forster <slf@connect.com.au> * AUTHOR: Robert Collins <robertc@squid-cache.org> * AUTHOR: Guido Serassio <serassio@squid-cache.org> * * SQUID Web Proxy Cache          http://www.squid-cache.org/ * ---------------------------------------------------------- * *  Squid is the result of efforts by numerous individuals from *  the Internet community; see the CONTRIBUTORS file for full *  details.   Many organizations have provided support for Squid's *  development; see the SPONSORS file for full details.  Squid is *  Copyrighted (C) 2001 by the Regents of the University of *  California; see the COPYRIGHT file for full details.  Squid *  incorporates software developed and/or copyrighted by other *  sources; see the CREDITS file for full details. * *  This program is free software; you can redistribute it and/or modify *  it under the terms of the GNU General Public License as published by *  the Free Software Foundation; either version 2 of the License, or *  (at your option) any later version. *   *  This program is distributed in the hope that it will be useful, *  but WITHOUT ANY WARRANTY; without even the implied warranty of *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *  GNU General Public License for more details. *   *  You should have received a copy of the GNU General Public License *  along with this program; if not, write to the Free Software *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */#include "squid.h"#include <windows.h>#include "async_io.h"#include	<stdio.h>#include	<sys/types.h>#include	<sys/stat.h>#include	<fcntl.h>#include	<errno.h>#include	<dirent.h>#include	<signal.h>#define RIDICULOUS_LENGTH	4096#ifdef AUFS_IO_THREADSint squidaio_nthreads = AUFS_IO_THREADS;#elseint squidaio_nthreads = 0;#endifint squidaio_magic1 = 1;	/* dummy initializer value */int squidaio_magic2 = 1;	/* real value set in aiops.c */enum _squidaio_thread_status {    _THREAD_STARTING = 0,    _THREAD_WAITING,    _THREAD_BUSY,    _THREAD_FAILED,    _THREAD_DONE};typedef enum _squidaio_thread_status squidaio_thread_status;enum _squidaio_request_type {    _AIO_OP_NONE = 0,    _AIO_OP_OPEN,    _AIO_OP_READ,    _AIO_OP_WRITE,    _AIO_OP_CLOSE,    _AIO_OP_UNLINK,    _AIO_OP_TRUNCATE,    _AIO_OP_OPENDIR,    _AIO_OP_STAT};typedef enum _squidaio_request_type squidaio_request_type;typedef struct squidaio_request_t {    struct squidaio_request_t *next;    squidaio_request_type request_type;    int cancelled;    char *path;    int oflag;    mode_t mode;    int fd;    char *bufferp;    int buflen;    off_t offset;    int whence;    int ret;    int err;    struct stat *tmpstatp;    struct stat *statp;    squidaio_result_t *resultp;} squidaio_request_t;typedef struct squidaio_request_queue_t {    HANDLE mutex;    HANDLE cond;		/* See Event objects */    squidaio_request_t *volatile head;    squidaio_request_t *volatile *volatile tailp;    unsigned long requests;    unsigned long blocked;	/* main failed to lock the queue */} squidaio_request_queue_t;typedef struct squidaio_thread_t squidaio_thread_t;struct squidaio_thread_t {    squidaio_thread_t *next;    HANDLE thread;    DWORD dwThreadId;		/* thread ID */    squidaio_thread_status status;    struct squidaio_request_t *current_req;    unsigned long requests;    int volatile exit;};static void squidaio_queue_request(squidaio_request_t *);static void squidaio_cleanup_request(squidaio_request_t *);static DWORD WINAPI squidaio_thread_loop(LPVOID lpParam);static void squidaio_do_open(squidaio_request_t *);static void squidaio_do_read(squidaio_request_t *);static void squidaio_do_write(squidaio_request_t *);static void squidaio_do_close(squidaio_request_t *);static void squidaio_do_stat(squidaio_request_t *);static void squidaio_do_unlink(squidaio_request_t *);#if USE_TRUNCATEstatic void squidaio_do_truncate(squidaio_request_t *);#endif#if AIO_OPENDIRstatic void *squidaio_do_opendir(squidaio_request_t *);#endifstatic void squidaio_debug(squidaio_request_t *);static void squidaio_poll_queues(void);static squidaio_thread_t *threads = NULL;static int squidaio_initialised = 0;#define AIO_LARGE_BUFS  16384#define AIO_MEDIUM_BUFS	AIO_LARGE_BUFS >> 1#define AIO_SMALL_BUFS	AIO_LARGE_BUFS >> 2#define AIO_TINY_BUFS	AIO_LARGE_BUFS >> 3#define AIO_MICRO_BUFS	128static MemPool *squidaio_large_bufs = NULL;	/* 16K */static MemPool *squidaio_medium_bufs = NULL;	/* 8K */static MemPool *squidaio_small_bufs = NULL;	/* 4K */static MemPool *squidaio_tiny_bufs = NULL;	/* 2K */static MemPool *squidaio_micro_bufs = NULL;	/* 128K */static int request_queue_len = 0;static MemPool *squidaio_request_pool = NULL;static MemPool *squidaio_thread_pool = NULL;static squidaio_request_queue_t request_queue;static struct {    squidaio_request_t *head, **tailp;} request_queue2 = {    NULL, &request_queue2.head};static squidaio_request_queue_t done_queue;static struct {    squidaio_request_t *head, **tailp;} done_requests = {    NULL, &done_requests.head};static int done_fd = 0;static int done_fd_read = 0;static int done_signalled = 0;static HANDLE main_thread;static MemPool *squidaio_get_pool(int size){    MemPool *p;    if (size <= AIO_LARGE_BUFS) {	if (size <= AIO_MICRO_BUFS)	    p = squidaio_micro_bufs;	else if (size <= AIO_TINY_BUFS)	    p = squidaio_tiny_bufs;	else if (size <= AIO_SMALL_BUFS)	    p = squidaio_small_bufs;	else if (size <= AIO_MEDIUM_BUFS)	    p = squidaio_medium_bufs;	else	    p = squidaio_large_bufs;    } else	p = NULL;    return p;}void *squidaio_xmalloc(int size){    void *p;    MemPool *pool;    if ((pool = squidaio_get_pool(size)) != NULL) {	p = memPoolAlloc(pool);    } else	p = xmalloc(size);    return p;}static char *squidaio_xstrdup(const char *str){    char *p;    int len = strlen(str) + 1;    p = squidaio_xmalloc(len);    strncpy(p, str, len);    return p;}voidsquidaio_xfree(void *p, int size){    MemPool *pool;    if ((pool = squidaio_get_pool(size)) != NULL) {	memPoolFree(pool, p);    } else	xfree(p);}static voidsquidaio_xstrfree(char *str){    MemPool *pool;    int len = strlen(str) + 1;    if ((pool = squidaio_get_pool(len)) != NULL) {	memPoolFree(pool, str);    } else	xfree(str);}static voidsquidaio_fdhandler(int fd, void *data){    char junk[256];    FD_READ_METHOD(done_fd_read, junk, sizeof(junk));    commSetSelect(fd, COMM_SELECT_READ, squidaio_fdhandler, NULL, 0);}voidsquidaio_init(void){    int i;    int done_pipe[2];    squidaio_thread_t *threadp;    if (squidaio_initialised)	return;    if (!DuplicateHandle(GetCurrentProcess(),	/* pseudo handle, don't close */	    GetCurrentThread(),	/* pseudo handle to copy */	    GetCurrentProcess(),	/* pseudo handle, don't close */	    &main_thread,	    0,			/* required access */	    FALSE,		/* child process's don't inherit the handle */	    DUPLICATE_SAME_ACCESS)) {	/* spit errors */	fatal("couldn't get current thread handle\n");    }    /* Initialize request queue */    if ((request_queue.mutex = CreateMutex(NULL,	/* no inheritance */		FALSE,		/* start unowned (as per mutex_init) */		NULL)		/* no name */	) == NULL) {	fatal("failed to create mutex\n");    }    if ((request_queue.cond = CreateEvent(NULL,		/* no inheritance */		FALSE,		/* auto signal reset - which I think is pthreads like ? */		FALSE,		/* start non signaled */		NULL)		/* no name */	) == NULL) {	fatal("failed to create condition event variable.\n");    }    request_queue.head = NULL;    request_queue.tailp = &request_queue.head;    request_queue.requests = 0;    request_queue.blocked = 0;    /* Initialize done queue */    if ((done_queue.mutex = CreateMutex(NULL,	/* no inheritance */		FALSE,		/* start unowned (as per mutex_init) */		NULL)		/* no name */	) == NULL) {	fatal("failed to create mutex\n");    }    if ((done_queue.cond = CreateEvent(NULL,	/* no inheritance */		TRUE,		/* manually signaled - which I think is pthreads like ? */		FALSE,		/* start non signaled */		NULL)		/* no name */	) == NULL) {	fatal("failed to create condition event variable.\n");    }    done_queue.head = NULL;    done_queue.tailp = &done_queue.head;    done_queue.requests = 0;    done_queue.blocked = 0;    /* Initialize done pipe signal */    pipe(done_pipe);    done_fd = done_pipe[1];    done_fd_read = done_pipe[0];    fd_open(done_fd_read, FD_PIPE, "async-io completion event: main");    fd_open(done_fd, FD_PIPE, "async-io completion event: threads");    commSetNonBlocking(done_pipe[0]);    commSetNonBlocking(done_pipe[1]);    commSetCloseOnExec(done_pipe[0]);    commSetCloseOnExec(done_pipe[1]);    commSetSelect(done_pipe[0], COMM_SELECT_READ, squidaio_fdhandler, NULL, 0);    /* Create threads and get them to sit in their wait loop */    squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));    if (squidaio_nthreads == 0) {	int j = 16;	for (i = 0; i < n_asyncufs_dirs; i++) {	    squidaio_nthreads += j;	    j = j * 2 / 3;	    if (j < 4)		j = 4;	}#if USE_AUFSOPS	j = 6;	for (i = 0; i < n_coss_dirs; i++) {	    squidaio_nthreads += j;	    j = 3;	}#endif    }    if (squidaio_nthreads == 0)	squidaio_nthreads = 16;    squidaio_magic1 = squidaio_nthreads * MAGIC1_FACTOR;    squidaio_magic2 = squidaio_nthreads * MAGIC2_FACTOR;    for (i = 0; i < squidaio_nthreads; i++) {	threadp = memPoolAlloc(squidaio_thread_pool);	threadp->status = _THREAD_STARTING;	threadp->current_req = NULL;	threadp->requests = 0;	threadp->next = threads;	threads = threadp;	if ((threadp->thread = CreateThread(NULL,	/* no security attributes */		    0,		/* use default stack size */		    squidaio_thread_loop,	/* thread function */		    threadp,	/* argument to thread function */		    0,		/* use default creation flags */		    &(threadp->dwThreadId))	/* returns the thread identifier */	    ) == NULL) {	    fprintf(stderr, "Thread creation failed\n");	    threadp->status = _THREAD_FAILED;	    continue;	}	/* Set the new thread priority above parent process */	SetThreadPriority(threadp->thread, THREAD_PRIORITY_ABOVE_NORMAL);    }    /* Create request pool */    squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));    squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);    squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);    squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);    squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);    squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);    squidaio_initialised = 1;}voidsquidaio_shutdown(void){    squidaio_thread_t *threadp;    int i;    HANDLE *hthreads;    if (!squidaio_initialised)	return;    /* This is the same as in squidaio_sync */    do {	squidaio_poll_queues();    } while (request_queue_len > 0);    hthreads = (HANDLE *) xcalloc(squidaio_nthreads, sizeof(HANDLE));    threadp = threads;    for (i = 0; i < squidaio_nthreads; i++) {	threadp->exit = 1;	hthreads[i] = threadp->thread;	threadp = threadp->next;    }    ReleaseMutex(request_queue.mutex);    ResetEvent(request_queue.cond);    ReleaseMutex(done_queue.mutex);    ResetEvent(done_queue.cond);    Sleep(0);    WaitForMultipleObjects(squidaio_nthreads, hthreads, TRUE, 2000);    for (i = 0; i < squidaio_nthreads; i++) {	CloseHandle(hthreads[i]);    }    CloseHandle(main_thread);    xfree(hthreads);    fd_close(done_fd);    fd_close(done_fd_read);    close(done_fd);    close(done_fd_read);}static DWORD WINAPIsquidaio_thread_loop(LPVOID lpParam){    squidaio_thread_t *threadp = lpParam;    squidaio_request_t *request;    HANDLE cond;		/* local copy of the event queue because win32 event handles				 * don't atomically release the mutex as cond variables do. */    /* lock the thread info */    if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {	fatal("Can't get ownership of mutex\n");    }    /* duplicate the handle */    if (!DuplicateHandle(GetCurrentProcess(),	/* pseudo handle, don't close */	    request_queue.cond,	/* handle to copy */	    GetCurrentProcess(),	/* pseudo handle, don't close */	    &cond,	    0,			/* required access */	    FALSE,		/* child process's don't inherit the handle */	    DUPLICATE_SAME_ACCESS))	fatal("Can't duplicate mutex handle\n");    if (!ReleaseMutex(request_queue.mutex)) {	CloseHandle(cond);	fatal("Can't release mutex\n");    }    while (1) {	DWORD rv;	threadp->current_req = request = NULL;	request = NULL;	/* Get a request to process */	threadp->status = _THREAD_WAITING;	if (threadp->exit) {	    CloseHandle(request_queue.mutex);	    CloseHandle(cond);	    return 0;	}	rv = WaitForSingleObject(request_queue.mutex, INFINITE);	if (rv == WAIT_FAILED) {	    CloseHandle(cond);	    return 1;	}	while (!request_queue.head) {	    if (!ReleaseMutex(request_queue.mutex)) {		CloseHandle(cond);		threadp->status = _THREAD_FAILED;		return 1;	    }	    rv = WaitForSingleObject(cond, INFINITE);	    if (rv == WAIT_FAILED) {		CloseHandle(cond);		return 1;	    }	    rv = WaitForSingleObject(request_queue.mutex, INFINITE);	    if (rv == WAIT_FAILED) {		CloseHandle(cond);		return 1;	    }	}	request = request_queue.head;	if (request)	    request_queue.head = request->next;	if (!request_queue.head)	    request_queue.tailp = &request_queue.head;	if (!ReleaseMutex(request_queue.mutex)) {	    CloseHandle(cond);	    return 1;	}	/* process the request */	threadp->status = _THREAD_BUSY;	request->next = NULL;	threadp->current_req = request;	errno = 0;	if (!request->cancelled) {	    switch (request->request_type) {	    case _AIO_OP_OPEN:		squidaio_do_open(request);		break;	    case _AIO_OP_READ:		squidaio_do_read(request);		break;	    case _AIO_OP_WRITE:		squidaio_do_write(request);		break;	    case _AIO_OP_CLOSE:		squidaio_do_close(request);		break;	    case _AIO_OP_UNLINK:		squidaio_do_unlink(request);		break;#if USE_TRUNCATE	    case _AIO_OP_TRUNCATE:		squidaio_do_truncate(request);		break;#endif#if AIO_OPENDIR			/* Opendir not implemented yet */	    case _AIO_OP_OPENDIR:		squidaio_do_opendir(request);		break;#endif	    case _AIO_OP_STAT:		squidaio_do_stat(request);		break;	    default:		request->ret = -1;		request->err = EINVAL;		break;	    }	} else {		/* cancelled */	    request->ret = -1;	    request->err = EINTR;	}	threadp->status = _THREAD_DONE;	/* put the request in the done queue */	rv = WaitForSingleObject(done_queue.mutex, INFINITE);	if (rv == WAIT_FAILED) {	    CloseHandle(cond);	    return 1;	}	*done_queue.tailp = request;	done_queue.tailp = &request->next;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -