⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 aiops.c

📁 -
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * $Id: aiops.c,v 1.25 1999/01/24 02:26:20 wessels Exp $ * * DEBUG: section 43    AIOPS * AUTHOR: Stewart Forster <slf@connect.com.au> * * SQUID Internet Object Cache  http://squid.nlanr.net/Squid/ * ---------------------------------------------------------- * *  Squid is the result of efforts by numerous individuals from the *  Internet community.  Development is led by Duane Wessels of the *  National Laboratory for Applied Network Research and funded by the *  National Science Foundation.  Squid is Copyrighted (C) 1998 by *  Duane Wessels and the University of California San Diego.  Please *  see the COPYRIGHT file for full details.  Squid incorporates *  software developed and/or copyrighted by other sources.  Please see *  the CREDITS file for full details. * *  This program is free software; you can redistribute it and/or modify *  it under the terms of the GNU General Public License as published by *  the Free Software Foundation; either version 2 of the License, or *  (at your option) any later version. *   *  This program is distributed in the hope that it will be useful, *  but WITHOUT ANY WARRANTY; without even the implied warranty of *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *  GNU General Public License for more details. *   *  You should have received a copy of the GNU General Public License *  along with this program; if not, write to the Free Software *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */#include "squid.h"#include	<stdio.h>#include	<sys/types.h>#include	<sys/stat.h>#include	<fcntl.h>#include	<pthread.h>#include	<errno.h>#include	<dirent.h>#include	<signal.h>#if HAVE_SCHED_H#include	<sched.h>#endif#ifndef NUMTHREADS#define	NUMTHREADS		16#endif#define RIDICULOUS_LENGTH	4096enum _aio_thread_status {    _THREAD_STARTING = 0,    _THREAD_WAITING,    _THREAD_BUSY,    _THREAD_FAILED,    _THREAD_DONE};enum _aio_request_type {    _AIO_OP_NONE = 0,    _AIO_OP_OPEN,    _AIO_OP_READ,    _AIO_OP_WRITE,    _AIO_OP_CLOSE,    _AIO_OP_UNLINK,    _AIO_OP_OPENDIR,    _AIO_OP_STAT};typedef struct aio_request_t {    enum _aio_request_type request_type;    int cancelled;    char *path;    int oflag;    mode_t mode;    int fd;    char *bufferp;    char *tmpbufp;    int buflen;    off_t offset;    int whence;    int ret;    int err;    struct stat *tmpstatp;    struct stat *statp;    aio_result_t *resultp;    struct aio_request_t *next;} aio_request_t;typedef struct aio_thread_t {    pthread_t thread;    enum _aio_thread_status status;    pthread_mutex_t mutex;	/* Mutex for testing condition variable */    pthread_cond_t cond;	/* Condition variable */    struct aio_request_t *volatile req;		/* set by main, cleared by thread */    struct aio_request_t *processed_req;	/* reminder to main */    struct aio_thread_t *next;} aio_thread_t;int aio_cancel(aio_result_t *);int aio_open(const char *, int, mode_t, aio_result_t *);int aio_read(int, char *, int, off_t, int, aio_result_t *);int aio_write(int, char *, int, off_t, int, aio_result_t *);int aio_close(int, aio_result_t *);int aio_unlink(const char *, aio_result_t *);int aio_opendir(const char *, aio_result_t *);aio_result_t *aio_poll_done();static void aio_init(void);static void aio_queue_request(aio_request_t *);static void aio_process_request_queue(void);static void aio_cleanup_request(aio_request_t *);static void *aio_thread_loop(void *);static void aio_do_open(aio_request_t *);static void aio_do_read(aio_request_t *);static void aio_do_write(aio_request_t *);static void aio_do_close(aio_request_t *);static void aio_do_stat(aio_request_t *);static void aio_do_unlink(aio_request_t *);#if AIO_OPENDIRstatic void *aio_do_opendir(aio_request_t *);#endifstatic void aio_debug(aio_request_t *);static void aio_poll_threads(void);static aio_thread_t *threads;static int aio_initialised = 0;static int request_queue_len = 0;static MemPool *aio_request_pool = NULL;static aio_request_t *request_queue_head = NULL;static aio_request_t *request_queue_tail = NULL;static aio_request_t *request_done_head = NULL;static aio_request_t *request_done_tail = NULL;static aio_thread_t *wait_threads = NULL;static aio_thread_t *busy_threads_head = NULL;static aio_thread_t *busy_threads_tail = NULL;static pthread_attr_t globattr;static struct sched_param globsched;static pthread_t main_thread;static voidaio_init(void){    int i;    aio_thread_t *threadp;    if (aio_initialised)	return;    pthread_attr_init(&globattr);#if HAVE_PTHREAD_ATTR_SETSCOPE    pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);#endif    globsched.sched_priority = 1;    main_thread = pthread_self();#if HAVE_PTHREAD_SETSCHEDPARAM    pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);#endif    globsched.sched_priority = 2;#if HAVE_PTHREAD_ATTR_SETSCHEDPARAM    pthread_attr_setschedparam(&globattr, &globsched);#endif    /* Create threads and get them to sit in their wait loop */    threads = xcalloc(NUMTHREADS, sizeof(aio_thread_t));    for (i = 0; i < NUMTHREADS; i++) {	threadp = &threads[i];	threadp->status = _THREAD_STARTING;	if (pthread_mutex_init(&(threadp->mutex), NULL)) {	    threadp->status = _THREAD_FAILED;	    continue;	}	if (pthread_cond_init(&(threadp->cond), NULL)) {	    threadp->status = _THREAD_FAILED;	    continue;	}	threadp->req = NULL;	threadp->processed_req = NULL;	if (pthread_create(&(threadp->thread), &globattr, aio_thread_loop, threadp)) {	    fprintf(stderr, "Thread creation failed\n");	    threadp->status = _THREAD_FAILED;	    continue;	}	threadp->next = wait_threads;	wait_threads = threadp;#if AIO_PROPER_MUTEX	pthread_mutex_lock(&threadp->mutex);#endif    }    /* Create request pool */    aio_request_pool = memPoolCreate("aio_request", sizeof(aio_request_t));    aio_initialised = 1;}static void *aio_thread_loop(void *ptr){    aio_thread_t *threadp = (aio_thread_t *) ptr;    aio_request_t *request;    sigset_t new;#if !AIO_PROPER_MUTEX    struct timespec wait_time;#endif    /* Make sure to ignore signals which may possibly get sent to the parent */    /* squid thread.  Causes havoc with mutex's and condition waits otherwise */    sigemptyset(&new);    sigaddset(&new, SIGPIPE);    sigaddset(&new, SIGCHLD);#ifdef _SQUID_LINUX_THREADS_    sigaddset(&new, SIGQUIT);    sigaddset(&new, SIGTRAP);#else    sigaddset(&new, SIGUSR1);    sigaddset(&new, SIGUSR2);#endif    sigaddset(&new, SIGHUP);    sigaddset(&new, SIGTERM);    sigaddset(&new, SIGINT);    sigaddset(&new, SIGALRM);    pthread_sigmask(SIG_BLOCK, &new, NULL);    pthread_mutex_lock(&threadp->mutex);    while (1) {#if AIO_PROPER_MUTEX	while (threadp->req == NULL) {	    threadp->status = _THREAD_WAITING;	    pthread_cond_wait(&(threadp->cond), &(threadp->mutex));	}#else	/* The timeout is used to unlock the race condition where	 * ->req is set between the check and pthread_cond_wait.	 * The thread steps it's own clock on each timeout, to avoid a CPU	 * spin situation if the main thread is suspended (paging), and	 * squid_curtime is not being updated timely.	 */	wait_time.tv_sec = squid_curtime + 1;	/* little quicker first time */	wait_time.tv_nsec = 0;	while (threadp->req == NULL) {	    threadp->status = _THREAD_WAITING;	    pthread_cond_timedwait(&(threadp->cond), &(threadp->mutex),		&wait_time);	    wait_time.tv_sec += 3;	/* then wait 3 seconds between each check */	}#endif	request = threadp->req;	errno = 0;	if (!request->cancelled) {	    switch (request->request_type) {	    case _AIO_OP_OPEN:		aio_do_open(request);		break;	    case _AIO_OP_READ:		aio_do_read(request);		break;	    case _AIO_OP_WRITE:		aio_do_write(request);		break;	    case _AIO_OP_CLOSE:		aio_do_close(request);		break;	    case _AIO_OP_UNLINK:		aio_do_unlink(request);		break;#if AIO_OPENDIR			/* Opendir not implemented yet */	    case _AIO_OP_OPENDIR:		aio_do_opendir(request);		break;#endif	    case _AIO_OP_STAT:		aio_do_stat(request);		break;	    default:		request->ret = -1;		request->err = EINVAL;		break;	    }	} else {		/* cancelled */	    request->ret = -1;	    request->err = EINTR;	}	threadp->req = NULL;	/* tells main thread that we are done */    }				/* while */}				/* aio_thread_loop */static voidaio_do_request(aio_request_t * requestp){    if (wait_threads == NULL && busy_threads_head == NULL) {	fprintf(stderr, "PANIC: No threads to service requests with!\n");	exit(-1);    }    aio_queue_request(requestp);}				/* aio_do_request */static voidaio_queue_request(aio_request_t * requestp){    aio_request_t *rp;    static int last_warn = 0;    static int high_start = 0;    static int queue_high, queue_low;    int i;    /* Mark it as not executed (failing result, no error) */    requestp->ret = -1;    requestp->err = 0;    /* Queue it on the request queue */    if (request_queue_head == NULL) {	request_queue_head = requestp;	request_queue_tail = requestp;    } else {	request_queue_tail->next = requestp;	request_queue_tail = requestp;    }    requestp->next = NULL;    request_queue_len += 1;    /* Poll done threads if needed */    if (wait_threads == NULL)	aio_poll_threads();    /* Kick it rolling */    aio_process_request_queue();    /* Warn if out of threads */    if (request_queue_len > (NUMTHREADS >> 1)) {	if (high_start == 0) {	    high_start = squid_curtime;	    queue_high = request_queue_len;	    queue_low = request_queue_len;	}	if (request_queue_len > queue_high)	    queue_high = request_queue_len;	if (request_queue_len < queue_low)	    queue_low = request_queue_len;	if (squid_curtime >= (last_warn + 15) &&	    squid_curtime >= (high_start + 1)) {	    debug(43, 1) ("aio_queue_request: WARNING - Running out of I/O theads\n");	    debug(43, 2) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n",		request_queue_len, queue_high, queue_low, squid_curtime - high_start);	    debug(43, 1) ("aio_queue_request: Perhaps you should increase NUMTHREADS\n");	    debug(43, 1) ("aio_queue_request: Or install more disks to share the load\n");	    debug(43, 3) ("aio_queue_request: First %d items on request queue\n", NUMTHREADS);	    rp = request_queue_head;	    for (i = 1; i <= NUMTHREADS; i++) {		switch (rp->request_type) {		case _AIO_OP_OPEN:		    debug(43, 3) ("aio_queue_request: %d : open -> %s\n", i, rp->path);		    break;		case _AIO_OP_READ:		    debug(43, 3) ("aio_queue_request: %d : read -> FD = %d\n", i, rp->fd);		    break;		case _AIO_OP_WRITE:		    debug(43, 3) ("aio_queue_request: %d : write -> FD = %d\n", i, rp->fd);		    break;		case _AIO_OP_CLOSE:		    debug(43, 3) ("aio_queue_request: %d : close -> FD = %d\n", i, rp->fd);		    break;		case _AIO_OP_UNLINK:		    debug(43, 3) ("aio_queue_request: %d : unlink -> %s\n", i, rp->path);		    break;		case _AIO_OP_STAT:		    debug(43, 3) ("aio_queue_request: %d : stat -> %s\n", i, rp->path);		    break;		default:		    debug(43, 1) ("aio_queue_request: %d : Unimplemented request type: %d\n", i, rp->request_type);		    break;		}		if ((rp = rp->next) == NULL)		    break;	    }	    last_warn = squid_curtime;	}    } else {	high_start = 0;    }    if (request_queue_len > RIDICULOUS_LENGTH) {	debug(43, 0) ("aio_queue_request: Async request queue growing uncontrollably!\n");	debug(43, 0) ("aio_queue_request: Possible infinite loop somewhere in squid. Restarting...\n");	abort();    }}				/* aio_queue_request */static voidaio_process_request_queue(){    aio_thread_t *threadp;    aio_request_t *requestp;    for (;;) {	if (wait_threads == NULL || request_queue_head == NULL)	    return;	requestp = request_queue_head;	if ((request_queue_head = requestp->next) == NULL)	    request_queue_tail = NULL;	requestp->next = NULL;	request_queue_len--;	if (requestp->cancelled) {	    aio_cleanup_request(requestp);	    continue;	}	threadp = wait_threads;	wait_threads = threadp->next;	threadp->next = NULL;	if (busy_threads_head != NULL)	    busy_threads_tail->next = threadp;	else	    busy_threads_head = threadp;	busy_threads_tail = threadp;	threadp->status = _THREAD_BUSY;	threadp->req = threadp->processed_req = requestp;	pthread_cond_signal(&(threadp->cond));#if AIO_PROPER_MUTEX	pthread_mutex_unlock(&threadp->mutex);#endif    }}				/* aio_process_request_queue */static voidaio_cleanup_request(aio_request_t * requestp)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -