⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 aiops.c

📁 一个功能非常全面的代理服务器源代码程序,
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * $Id: aiops.c,v 1.31 2006/09/23 10:16:40 serassio Exp $ * * DEBUG: section 43    AIOPS * AUTHOR: Stewart Forster <slf@connect.com.au> * * SQUID Web Proxy Cache          http://www.squid-cache.org/ * ---------------------------------------------------------- * *  Squid is the result of efforts by numerous individuals from *  the Internet community; see the CONTRIBUTORS file for full *  details.   Many organizations have provided support for Squid's *  development; see the SPONSORS file for full details.  Squid is *  Copyrighted (C) 2001 by the Regents of the University of *  California; see the COPYRIGHT file for full details.  Squid *  incorporates software developed and/or copyrighted by other *  sources; see the CREDITS file for full details. * *  This program is free software; you can redistribute it and/or modify *  it under the terms of the GNU General Public License as published by *  the Free Software Foundation; either version 2 of the License, or *  (at your option) any later version. *   *  This program is distributed in the hope that it will be useful, *  but WITHOUT ANY WARRANTY; without even the implied warranty of *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *  GNU General Public License for more details. *   *  You should have received a copy of the GNU General Public License *  along with this program; if not, write to the Free Software *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */#ifndef _REENTRANT#error "_REENTRANT MUST be defined to build squid async io support."#endif#include "squid.h"#include "async_io.h"#include	<stdio.h>#include	<sys/types.h>#include	<sys/stat.h>#include	<fcntl.h>#include	<pthread.h>#include	<errno.h>#include	<dirent.h>#include	<signal.h>#if HAVE_SCHED_H#include	<sched.h>#endif#define RIDICULOUS_LENGTH	4096#ifdef AUFS_IO_THREADSint squidaio_nthreads = AUFS_IO_THREADS;#elseint squidaio_nthreads = 0;#endifint squidaio_magic1 = 1;	/* dummy initializer value */int squidaio_magic2 = 1;	/* real value set in aiops.c */enum _squidaio_thread_status {    _THREAD_STARTING = 0,    _THREAD_WAITING,    _THREAD_BUSY,    _THREAD_FAILED,    _THREAD_DONE};typedef enum _squidaio_thread_status squidaio_thread_status;enum _squidaio_request_type {    _AIO_OP_NONE = 0,    _AIO_OP_OPEN,    _AIO_OP_READ,    _AIO_OP_WRITE,    _AIO_OP_CLOSE,    _AIO_OP_UNLINK,    _AIO_OP_TRUNCATE,    _AIO_OP_OPENDIR,    _AIO_OP_STAT};typedef enum _squidaio_request_type squidaio_request_type;typedef struct squidaio_request_t {    struct squidaio_request_t *next;    squidaio_request_type request_type;    int cancelled;    char *path;    int oflag;    mode_t mode;    int fd;    char *bufferp;    int buflen;    off_t offset;    int whence;    int ret;    int err;    struct stat *tmpstatp;    struct stat *statp;    squidaio_result_t *resultp;} squidaio_request_t;typedef struct squidaio_request_queue_t {    pthread_mutex_t mutex;    pthread_cond_t cond;    squidaio_request_t *volatile head;    squidaio_request_t *volatile *volatile tailp;    unsigned long requests;    unsigned long blocked;	/* main failed to lock the queue */} squidaio_request_queue_t;typedef struct squidaio_thread_t squidaio_thread_t;struct squidaio_thread_t {    squidaio_thread_t *next;    pthread_t thread;    squidaio_thread_status status;    struct squidaio_request_t *current_req;    unsigned long requests;};static void squidaio_queue_request(squidaio_request_t *);static void squidaio_cleanup_request(squidaio_request_t *);static void *squidaio_thread_loop(void *);static void squidaio_do_open(squidaio_request_t *);static void squidaio_do_read(squidaio_request_t *);static void squidaio_do_write(squidaio_request_t *);static void squidaio_do_close(squidaio_request_t *);static void squidaio_do_stat(squidaio_request_t *);static void squidaio_do_unlink(squidaio_request_t *);#if USE_TRUNCATEstatic void squidaio_do_truncate(squidaio_request_t *);#endif#if AIO_OPENDIRstatic void *squidaio_do_opendir(squidaio_request_t *);#endifstatic void squidaio_debug(squidaio_request_t *);static void squidaio_poll_queues(void);static squidaio_thread_t *threads = NULL;static int squidaio_initialised = 0;#define AIO_LARGE_BUFS  16384#define AIO_MEDIUM_BUFS	AIO_LARGE_BUFS >> 1#define AIO_SMALL_BUFS	AIO_LARGE_BUFS >> 2#define AIO_TINY_BUFS	AIO_LARGE_BUFS >> 3#define AIO_MICRO_BUFS	128static MemPool *squidaio_large_bufs = NULL;	/* 16K */static MemPool *squidaio_medium_bufs = NULL;	/* 8K */static MemPool *squidaio_small_bufs = NULL;	/* 4K */static MemPool *squidaio_tiny_bufs = NULL;	/* 2K */static MemPool *squidaio_micro_bufs = NULL;	/* 128K */static int request_queue_len = 0;static MemPool *squidaio_request_pool = NULL;static MemPool *squidaio_thread_pool = NULL;static squidaio_request_queue_t request_queue;static struct {    squidaio_request_t *head, **tailp;} request_queue2 = {    NULL, &request_queue2.head};static squidaio_request_queue_t done_queue;static struct {    squidaio_request_t *head, **tailp;} done_requests = {    NULL, &done_requests.head};static int done_fd = 0;static int done_fd_read = 0;static int done_signalled = 0;static pthread_attr_t globattr;#if HAVE_SCHED_Hstatic struct sched_param globsched;#endifstatic pthread_t main_thread;static MemPool *squidaio_get_pool(int size){    MemPool *p;    if (size <= AIO_LARGE_BUFS) {	if (size <= AIO_MICRO_BUFS)	    p = squidaio_micro_bufs;	else if (size <= AIO_TINY_BUFS)	    p = squidaio_tiny_bufs;	else if (size <= AIO_SMALL_BUFS)	    p = squidaio_small_bufs;	else if (size <= AIO_MEDIUM_BUFS)	    p = squidaio_medium_bufs;	else	    p = squidaio_large_bufs;    } else	p = NULL;    return p;}void *squidaio_xmalloc(int size){    void *p;    MemPool *pool;    if ((pool = squidaio_get_pool(size)) != NULL) {	p = memPoolAlloc(pool);    } else	p = xmalloc(size);    return p;}static char *squidaio_xstrdup(const char *str){    char *p;    int len = strlen(str) + 1;    p = squidaio_xmalloc(len);    strncpy(p, str, len);    return p;}voidsquidaio_xfree(void *p, int size){    MemPool *pool;    if ((pool = squidaio_get_pool(size)) != NULL) {	memPoolFree(pool, p);    } else	xfree(p);}static voidsquidaio_xstrfree(char *str){    MemPool *pool;    int len = strlen(str) + 1;    if ((pool = squidaio_get_pool(len)) != NULL) {	memPoolFree(pool, str);    } else	xfree(str);}static voidsquidaio_fdhandler(int fd, void *data){    char junk[256];    FD_READ_METHOD(done_fd_read, junk, sizeof(junk));    commSetSelect(fd, COMM_SELECT_READ, squidaio_fdhandler, NULL, 0);}voidsquidaio_init(void){    int i;    int done_pipe[2];    squidaio_thread_t *threadp;    if (squidaio_initialised)	return;    pthread_attr_init(&globattr);#if HAVE_PTHREAD_ATTR_SETSCOPE    pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);#endif#if HAVE_SCHED_H    globsched.sched_priority = 1;#endif    main_thread = pthread_self();#if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM    pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);#endif#if HAVE_SCHED_H    globsched.sched_priority = 2;#endif#if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM    pthread_attr_setschedparam(&globattr, &globsched);#endif    /* Give each thread a smaller 256KB stack, should be more than sufficient */    pthread_attr_setstacksize(&globattr, 256 * 1024);    /* Initialize request queue */    if (pthread_mutex_init(&(request_queue.mutex), NULL))	fatal("Failed to create mutex");    if (pthread_cond_init(&(request_queue.cond), NULL))	fatal("Failed to create condition variable");    request_queue.head = NULL;    request_queue.tailp = &request_queue.head;    request_queue.requests = 0;    request_queue.blocked = 0;    /* Initialize done queue */    if (pthread_mutex_init(&(done_queue.mutex), NULL))	fatal("Failed to create mutex");    if (pthread_cond_init(&(done_queue.cond), NULL))	fatal("Failed to create condition variable");    done_queue.head = NULL;    done_queue.tailp = &done_queue.head;    done_queue.requests = 0;    done_queue.blocked = 0;    /* Initialize done pipe signal */    pipe(done_pipe);    done_fd = done_pipe[1];    done_fd_read = done_pipe[0];    fd_open(done_fd_read, FD_PIPE, "async-io completion event: main");    fd_open(done_fd, FD_PIPE, "async-io completion event: threads");    commSetNonBlocking(done_pipe[0]);    commSetNonBlocking(done_pipe[1]);    commSetCloseOnExec(done_pipe[0]);    commSetCloseOnExec(done_pipe[1]);    commSetSelect(done_pipe[0], COMM_SELECT_READ, squidaio_fdhandler, NULL, 0);    /* Create threads and get them to sit in their wait loop */    squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));    if (squidaio_nthreads == 0) {	int j = 16;	for (i = 0; i < n_asyncufs_dirs; i++) {	    squidaio_nthreads += j;	    j = j * 2 / 3;	    if (j < 4)		j = 4;	}#if USE_AUFSOPS	j = 6;	for (i = 0; i < n_coss_dirs; i++) {	    squidaio_nthreads += j;	    j = 3;	}#endif    }    if (squidaio_nthreads == 0)	squidaio_nthreads = 16;    squidaio_magic1 = squidaio_nthreads * MAGIC1_FACTOR;    squidaio_magic2 = squidaio_nthreads * MAGIC2_FACTOR;    for (i = 0; i < squidaio_nthreads; i++) {	threadp = memPoolAlloc(squidaio_thread_pool);	threadp->status = _THREAD_STARTING;	threadp->current_req = NULL;	threadp->requests = 0;	threadp->next = threads;	threads = threadp;	if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {	    fprintf(stderr, "Thread creation failed\n");	    threadp->status = _THREAD_FAILED;	    continue;	}    }    /* Create request pool */    squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));    squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);    squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);    squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);    squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);    squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);    squidaio_initialised = 1;}voidsquidaio_shutdown(void){    if (!squidaio_initialised)	return;    /* This is the same as in squidaio_sync */    do {	squidaio_poll_queues();    } while (request_queue_len > 0);    fd_close(done_fd);    fd_close(done_fd_read);    close(done_fd);    close(done_fd_read);}static void *squidaio_thread_loop(void *ptr){    squidaio_thread_t *threadp = ptr;    squidaio_request_t *request;    sigset_t new;    /*     * Make sure to ignore signals which may possibly get sent to     * the parent squid thread.  Causes havoc with mutex's and     * condition waits otherwise     */    sigemptyset(&new);    sigaddset(&new, SIGPIPE);    sigaddset(&new, SIGCHLD);#ifdef _SQUID_LINUX_THREADS_    sigaddset(&new, SIGQUIT);    sigaddset(&new, SIGTRAP);#else    sigaddset(&new, SIGUSR1);    sigaddset(&new, SIGUSR2);#endif    sigaddset(&new, SIGHUP);    sigaddset(&new, SIGTERM);    sigaddset(&new, SIGINT);    sigaddset(&new, SIGALRM);    pthread_sigmask(SIG_BLOCK, &new, NULL);    while (1) {	threadp->current_req = request = NULL;	request = NULL;	/* Get a request to process */	threadp->status = _THREAD_WAITING;	pthread_mutex_lock(&request_queue.mutex);	while (!request_queue.head) {	    pthread_cond_wait(&request_queue.cond, &request_queue.mutex);	}	request = request_queue.head;	if (request)	    request_queue.head = request->next;	if (!request_queue.head)	    request_queue.tailp = &request_queue.head;	pthread_mutex_unlock(&request_queue.mutex);	/* process the request */	threadp->status = _THREAD_BUSY;	request->next = NULL;	threadp->current_req = request;	errno = 0;	if (!request->cancelled) {	    switch (request->request_type) {	    case _AIO_OP_OPEN:		squidaio_do_open(request);		break;	    case _AIO_OP_READ:		squidaio_do_read(request);		break;	    case _AIO_OP_WRITE:		squidaio_do_write(request);		break;	    case _AIO_OP_CLOSE:		squidaio_do_close(request);		break;	    case _AIO_OP_UNLINK:		squidaio_do_unlink(request);		break;#if USE_TRUNCATE	    case _AIO_OP_TRUNCATE:		squidaio_do_truncate(request);		break;#endif#if AIO_OPENDIR			/* Opendir not implemented yet */	    case _AIO_OP_OPENDIR:		squidaio_do_opendir(request);		break;#endif	    case _AIO_OP_STAT:		squidaio_do_stat(request);		break;	    default:		request->ret = -1;		request->err = EINVAL;		break;	    }	} else {		/* cancelled */	    request->ret = -1;	    request->err = EINTR;	}	threadp->status = _THREAD_DONE;	/* put the request in the done queue */	pthread_mutex_lock(&done_queue.mutex);	*done_queue.tailp = request;	done_queue.tailp = &request->next;	pthread_mutex_unlock(&done_queue.mutex);	if (!done_signalled) {	    done_signalled = 1;	    FD_WRITE_METHOD(done_fd, "!", 1);	}	threadp->requests++;    }				/* while forever */    return NULL;}				/* squidaio_thread_loop */static voidsquidaio_queue_request(squidaio_request_t * request){    static int high_start = 0;    debug(43, 9) ("squidaio_queue_request: %p type=%d result=%p\n",	request, request->request_type, request->resultp);    /* Mark it as not executed (failing result, no error) */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -