📄 aiops_win32.c
字号:
/* * $Id: aiops_win32.c,v 1.3 2006/09/23 10:16:40 serassio Exp $ * * DEBUG: section 43 Windows AIOPS * AUTHOR: Stewart Forster <slf@connect.com.au> * AUTHOR: Robert Collins <robertc@squid-cache.org> * AUTHOR: Guido Serassio <serassio@squid-cache.org> * * SQUID Web Proxy Cache http://www.squid-cache.org/ * ---------------------------------------------------------- * * Squid is the result of efforts by numerous individuals from * the Internet community; see the CONTRIBUTORS file for full * details. Many organizations have provided support for Squid's * development; see the SPONSORS file for full details. Squid is * Copyrighted (C) 2001 by the Regents of the University of * California; see the COPYRIGHT file for full details. Squid * incorporates software developed and/or copyrighted by other * sources; see the CREDITS file for full details. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */#include "squid.h"#include <windows.h>#include "async_io.h"#include <stdio.h>#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include <errno.h>#include <dirent.h>#include <signal.h>#define RIDICULOUS_LENGTH 4096#ifdef AUFS_IO_THREADSint squidaio_nthreads = AUFS_IO_THREADS;#elseint squidaio_nthreads = 0;#endifint squidaio_magic1 = 1; /* dummy initializer value */int squidaio_magic2 = 1; /* real value set in aiops.c */enum _squidaio_thread_status { _THREAD_STARTING = 0, _THREAD_WAITING, _THREAD_BUSY, _THREAD_FAILED, _THREAD_DONE};typedef enum _squidaio_thread_status squidaio_thread_status;enum _squidaio_request_type { _AIO_OP_NONE = 0, _AIO_OP_OPEN, _AIO_OP_READ, _AIO_OP_WRITE, _AIO_OP_CLOSE, _AIO_OP_UNLINK, _AIO_OP_TRUNCATE, _AIO_OP_OPENDIR, _AIO_OP_STAT};typedef enum _squidaio_request_type squidaio_request_type;typedef struct squidaio_request_t { struct squidaio_request_t *next; squidaio_request_type request_type; int cancelled; char *path; int oflag; mode_t mode; int fd; char *bufferp; int buflen; off_t offset; int whence; int ret; int err; struct stat *tmpstatp; struct stat *statp; squidaio_result_t *resultp;} squidaio_request_t;typedef struct squidaio_request_queue_t { HANDLE mutex; HANDLE cond; /* See Event objects */ squidaio_request_t *volatile head; squidaio_request_t *volatile *volatile tailp; unsigned long requests; unsigned long blocked; /* main failed to lock the queue */} squidaio_request_queue_t;typedef struct squidaio_thread_t squidaio_thread_t;struct squidaio_thread_t { squidaio_thread_t *next; HANDLE thread; DWORD dwThreadId; /* thread ID */ squidaio_thread_status status; struct squidaio_request_t *current_req; unsigned long requests; int volatile exit;};static void squidaio_queue_request(squidaio_request_t *);static void squidaio_cleanup_request(squidaio_request_t *);static DWORD WINAPI squidaio_thread_loop(LPVOID lpParam);static void squidaio_do_open(squidaio_request_t *);static void squidaio_do_read(squidaio_request_t *);static void squidaio_do_write(squidaio_request_t *);static void squidaio_do_close(squidaio_request_t *);static void squidaio_do_stat(squidaio_request_t *);static void squidaio_do_unlink(squidaio_request_t *);#if USE_TRUNCATEstatic void squidaio_do_truncate(squidaio_request_t *);#endif#if AIO_OPENDIRstatic void *squidaio_do_opendir(squidaio_request_t *);#endifstatic void squidaio_debug(squidaio_request_t *);static void squidaio_poll_queues(void);static squidaio_thread_t *threads = NULL;static int squidaio_initialised = 0;#define AIO_LARGE_BUFS 16384#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3#define AIO_MICRO_BUFS 128static MemPool *squidaio_large_bufs = NULL; /* 16K */static MemPool *squidaio_medium_bufs = NULL; /* 8K */static MemPool *squidaio_small_bufs = NULL; /* 4K */static MemPool *squidaio_tiny_bufs = NULL; /* 2K */static MemPool *squidaio_micro_bufs = NULL; /* 128K */static int request_queue_len = 0;static MemPool *squidaio_request_pool = NULL;static MemPool *squidaio_thread_pool = NULL;static squidaio_request_queue_t request_queue;static struct { squidaio_request_t *head, **tailp;} request_queue2 = { NULL, &request_queue2.head};static squidaio_request_queue_t done_queue;static struct { squidaio_request_t *head, **tailp;} done_requests = { NULL, &done_requests.head};static int done_fd = 0;static int done_fd_read = 0;static int done_signalled = 0;static HANDLE main_thread;static MemPool *squidaio_get_pool(int size){ MemPool *p; if (size <= AIO_LARGE_BUFS) { if (size <= AIO_MICRO_BUFS) p = squidaio_micro_bufs; else if (size <= AIO_TINY_BUFS) p = squidaio_tiny_bufs; else if (size <= AIO_SMALL_BUFS) p = squidaio_small_bufs; else if (size <= AIO_MEDIUM_BUFS) p = squidaio_medium_bufs; else p = squidaio_large_bufs; } else p = NULL; return p;}void *squidaio_xmalloc(int size){ void *p; MemPool *pool; if ((pool = squidaio_get_pool(size)) != NULL) { p = memPoolAlloc(pool); } else p = xmalloc(size); return p;}static char *squidaio_xstrdup(const char *str){ char *p; int len = strlen(str) + 1; p = squidaio_xmalloc(len); strncpy(p, str, len); return p;}voidsquidaio_xfree(void *p, int size){ MemPool *pool; if ((pool = squidaio_get_pool(size)) != NULL) { memPoolFree(pool, p); } else xfree(p);}static voidsquidaio_xstrfree(char *str){ MemPool *pool; int len = strlen(str) + 1; if ((pool = squidaio_get_pool(len)) != NULL) { memPoolFree(pool, str); } else xfree(str);}static voidsquidaio_fdhandler(int fd, void *data){ char junk[256]; FD_READ_METHOD(done_fd_read, junk, sizeof(junk)); commSetSelect(fd, COMM_SELECT_READ, squidaio_fdhandler, NULL, 0);}voidsquidaio_init(void){ int i; int done_pipe[2]; squidaio_thread_t *threadp; if (squidaio_initialised) return; if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */ GetCurrentThread(), /* pseudo handle to copy */ GetCurrentProcess(), /* pseudo handle, don't close */ &main_thread, 0, /* required access */ FALSE, /* child process's don't inherit the handle */ DUPLICATE_SAME_ACCESS)) { /* spit errors */ fatal("couldn't get current thread handle\n"); } /* Initialize request queue */ if ((request_queue.mutex = CreateMutex(NULL, /* no inheritance */ FALSE, /* start unowned (as per mutex_init) */ NULL) /* no name */ ) == NULL) { fatal("failed to create mutex\n"); } if ((request_queue.cond = CreateEvent(NULL, /* no inheritance */ FALSE, /* auto signal reset - which I think is pthreads like ? */ FALSE, /* start non signaled */ NULL) /* no name */ ) == NULL) { fatal("failed to create condition event variable.\n"); } request_queue.head = NULL; request_queue.tailp = &request_queue.head; request_queue.requests = 0; request_queue.blocked = 0; /* Initialize done queue */ if ((done_queue.mutex = CreateMutex(NULL, /* no inheritance */ FALSE, /* start unowned (as per mutex_init) */ NULL) /* no name */ ) == NULL) { fatal("failed to create mutex\n"); } if ((done_queue.cond = CreateEvent(NULL, /* no inheritance */ TRUE, /* manually signaled - which I think is pthreads like ? */ FALSE, /* start non signaled */ NULL) /* no name */ ) == NULL) { fatal("failed to create condition event variable.\n"); } done_queue.head = NULL; done_queue.tailp = &done_queue.head; done_queue.requests = 0; done_queue.blocked = 0; /* Initialize done pipe signal */ pipe(done_pipe); done_fd = done_pipe[1]; done_fd_read = done_pipe[0]; fd_open(done_fd_read, FD_PIPE, "async-io completion event: main"); fd_open(done_fd, FD_PIPE, "async-io completion event: threads"); commSetNonBlocking(done_pipe[0]); commSetNonBlocking(done_pipe[1]); commSetCloseOnExec(done_pipe[0]); commSetCloseOnExec(done_pipe[1]); commSetSelect(done_pipe[0], COMM_SELECT_READ, squidaio_fdhandler, NULL, 0); /* Create threads and get them to sit in their wait loop */ squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t)); if (squidaio_nthreads == 0) { int j = 16; for (i = 0; i < n_asyncufs_dirs; i++) { squidaio_nthreads += j; j = j * 2 / 3; if (j < 4) j = 4; }#if USE_AUFSOPS j = 6; for (i = 0; i < n_coss_dirs; i++) { squidaio_nthreads += j; j = 3; }#endif } if (squidaio_nthreads == 0) squidaio_nthreads = 16; squidaio_magic1 = squidaio_nthreads * MAGIC1_FACTOR; squidaio_magic2 = squidaio_nthreads * MAGIC2_FACTOR; for (i = 0; i < squidaio_nthreads; i++) { threadp = memPoolAlloc(squidaio_thread_pool); threadp->status = _THREAD_STARTING; threadp->current_req = NULL; threadp->requests = 0; threadp->next = threads; threads = threadp; if ((threadp->thread = CreateThread(NULL, /* no security attributes */ 0, /* use default stack size */ squidaio_thread_loop, /* thread function */ threadp, /* argument to thread function */ 0, /* use default creation flags */ &(threadp->dwThreadId)) /* returns the thread identifier */ ) == NULL) { fprintf(stderr, "Thread creation failed\n"); threadp->status = _THREAD_FAILED; continue; } /* Set the new thread priority above parent process */ SetThreadPriority(threadp->thread, THREAD_PRIORITY_ABOVE_NORMAL); } /* Create request pool */ squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t)); squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS); squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS); squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS); squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS); squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS); squidaio_initialised = 1;}voidsquidaio_shutdown(void){ squidaio_thread_t *threadp; int i; HANDLE *hthreads; if (!squidaio_initialised) return; /* This is the same as in squidaio_sync */ do { squidaio_poll_queues(); } while (request_queue_len > 0); hthreads = (HANDLE *) xcalloc(squidaio_nthreads, sizeof(HANDLE)); threadp = threads; for (i = 0; i < squidaio_nthreads; i++) { threadp->exit = 1; hthreads[i] = threadp->thread; threadp = threadp->next; } ReleaseMutex(request_queue.mutex); ResetEvent(request_queue.cond); ReleaseMutex(done_queue.mutex); ResetEvent(done_queue.cond); Sleep(0); WaitForMultipleObjects(squidaio_nthreads, hthreads, TRUE, 2000); for (i = 0; i < squidaio_nthreads; i++) { CloseHandle(hthreads[i]); } CloseHandle(main_thread); xfree(hthreads); fd_close(done_fd); fd_close(done_fd_read); close(done_fd); close(done_fd_read);}static DWORD WINAPIsquidaio_thread_loop(LPVOID lpParam){ squidaio_thread_t *threadp = lpParam; squidaio_request_t *request; HANDLE cond; /* local copy of the event queue because win32 event handles * don't atomically release the mutex as cond variables do. */ /* lock the thread info */ if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) { fatal("Can't get ownership of mutex\n"); } /* duplicate the handle */ if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */ request_queue.cond, /* handle to copy */ GetCurrentProcess(), /* pseudo handle, don't close */ &cond, 0, /* required access */ FALSE, /* child process's don't inherit the handle */ DUPLICATE_SAME_ACCESS)) fatal("Can't duplicate mutex handle\n"); if (!ReleaseMutex(request_queue.mutex)) { CloseHandle(cond); fatal("Can't release mutex\n"); } while (1) { DWORD rv; threadp->current_req = request = NULL; request = NULL; /* Get a request to process */ threadp->status = _THREAD_WAITING; if (threadp->exit) { CloseHandle(request_queue.mutex); CloseHandle(cond); return 0; } rv = WaitForSingleObject(request_queue.mutex, INFINITE); if (rv == WAIT_FAILED) { CloseHandle(cond); return 1; } while (!request_queue.head) { if (!ReleaseMutex(request_queue.mutex)) { CloseHandle(cond); threadp->status = _THREAD_FAILED; return 1; } rv = WaitForSingleObject(cond, INFINITE); if (rv == WAIT_FAILED) { CloseHandle(cond); return 1; } rv = WaitForSingleObject(request_queue.mutex, INFINITE); if (rv == WAIT_FAILED) { CloseHandle(cond); return 1; } } request = request_queue.head; if (request) request_queue.head = request->next; if (!request_queue.head) request_queue.tailp = &request_queue.head; if (!ReleaseMutex(request_queue.mutex)) { CloseHandle(cond); return 1; } /* process the request */ threadp->status = _THREAD_BUSY; request->next = NULL; threadp->current_req = request; errno = 0; if (!request->cancelled) { switch (request->request_type) { case _AIO_OP_OPEN: squidaio_do_open(request); break; case _AIO_OP_READ: squidaio_do_read(request); break; case _AIO_OP_WRITE: squidaio_do_write(request); break; case _AIO_OP_CLOSE: squidaio_do_close(request); break; case _AIO_OP_UNLINK: squidaio_do_unlink(request); break;#if USE_TRUNCATE case _AIO_OP_TRUNCATE: squidaio_do_truncate(request); break;#endif#if AIO_OPENDIR /* Opendir not implemented yet */ case _AIO_OP_OPENDIR: squidaio_do_opendir(request); break;#endif case _AIO_OP_STAT: squidaio_do_stat(request); break; default: request->ret = -1; request->err = EINVAL; break; } } else { /* cancelled */ request->ret = -1; request->err = EINTR; } threadp->status = _THREAD_DONE; /* put the request in the done queue */ rv = WaitForSingleObject(done_queue.mutex, INFINITE); if (rv == WAIT_FAILED) { CloseHandle(cond); return 1; } *done_queue.tailp = request; done_queue.tailp = &request->next;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -