📄 aiops_win32.c
字号:
if (!ReleaseMutex(done_queue.mutex)) { CloseHandle(cond); return 1; } if (!done_signalled) { done_signalled = 1; FD_WRITE_METHOD(done_fd, "!", 1); } threadp->requests++;/* Relinquish the remainder of thread time slice to any other thread * of equal priority that is ready to run. */ Sleep(0); } /* while forever */ CloseHandle(cond); return 0;} /* squidaio_thread_loop */static voidsquidaio_queue_request(squidaio_request_t * request){ static int high_start = 0; debug(43, 9) ("squidaio_queue_request: %p type=%d result=%p\n", request, request->request_type, request->resultp); /* Mark it as not executed (failing result, no error) */ request->ret = -1; request->err = 0; /* Internal housekeeping */ request_queue_len += 1; request->resultp->_data = request; /* Play some tricks with the request_queue2 queue */ request->next = NULL; if (!request_queue2.head) { if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) { /* Normal path */ *request_queue.tailp = request; request_queue.tailp = &request->next; if (!SetEvent(request_queue.cond)) fatal("couldn't push queue\n"); if (!ReleaseMutex(request_queue.mutex)) { /* unexpected error */ fatal("couldn't push queue\n"); } } else { /* Oops, the request queue is blocked, use request_queue2 */ *request_queue2.tailp = request; request_queue2.tailp = &request->next; } } else { /* Secondary path. We have blocked requests to deal with */ /* add the request to the chain */ *request_queue2.tailp = request; if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) { /* Ok, the queue is no longer blocked */ *request_queue.tailp = request_queue2.head; request_queue.tailp = &request->next; if (!SetEvent(request_queue.cond)) fatal("couldn't push queue\n"); if (!ReleaseMutex(request_queue.mutex)) { /* unexpected error */ fatal("couldn't push queue\n"); } request_queue2.head = NULL; request_queue2.tailp = &request_queue2.head; } else { /* still blocked, bump the blocked request chain */ request_queue2.tailp = &request->next; } } if (request_queue2.head) { static int filter = 0; static int filter_limit = 8; if (++filter >= filter_limit) { filter_limit += filter; filter = 0; debug(43, 1) ("squidaio_queue_request: WARNING - Queue congestion\n"); } } /* Warn if out of threads */ if (request_queue_len > MAGIC1) { static int last_warn = 0; static int queue_high, queue_low; if (high_start == 0) { high_start = squid_curtime; queue_high = request_queue_len; queue_low = request_queue_len; } if (request_queue_len > queue_high) queue_high = request_queue_len; if (request_queue_len < queue_low) queue_low = request_queue_len; if (squid_curtime >= (last_warn + 15) && squid_curtime >= (high_start + 5)) { debug(43, 1) ("squidaio_queue_request: WARNING - Disk I/O overloading\n"); if (squid_curtime >= (high_start + 15)) debug(43, 1) ("squidaio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%ld\n", request_queue_len, queue_high, queue_low, (long int) (squid_curtime - high_start)); last_warn = squid_curtime; } } else { high_start = 0; } /* Warn if seriously overloaded */ if (request_queue_len > RIDICULOUS_LENGTH) { debug(43, 0) ("squidaio_queue_request: Async request queue growing uncontrollably!\n"); debug(43, 0) ("squidaio_queue_request: Syncing pending I/O operations.. (blocking)\n"); squidaio_sync(); debug(43, 0) ("squidaio_queue_request: Synced\n"); }} /* squidaio_queue_request */static voidsquidaio_cleanup_request(squidaio_request_t * requestp){ squidaio_result_t *resultp = requestp->resultp; int cancelled = requestp->cancelled; /* Free allocated structures and copy data back to user space if the */ /* request hasn't been cancelled */ switch (requestp->request_type) { case _AIO_OP_STAT: if (!cancelled && requestp->ret == 0) xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat)); squidaio_xfree(requestp->tmpstatp, sizeof(struct stat)); squidaio_xstrfree(requestp->path); break; case _AIO_OP_OPEN: if (cancelled && requestp->ret >= 0) /* The open() was cancelled but completed */ close(requestp->ret); squidaio_xstrfree(requestp->path); break; case _AIO_OP_CLOSE: if (cancelled && requestp->ret < 0) /* The close() was cancelled and never got executed */ close(requestp->fd); break; case _AIO_OP_UNLINK: case _AIO_OP_TRUNCATE: case _AIO_OP_OPENDIR: squidaio_xstrfree(requestp->path); break; case _AIO_OP_READ: break; case _AIO_OP_WRITE: break; default: break; } if (resultp != NULL && !cancelled) { resultp->aio_return = requestp->ret; resultp->aio_errno = requestp->err; } memPoolFree(squidaio_request_pool, requestp);} /* squidaio_cleanup_request */intsquidaio_cancel(squidaio_result_t * resultp){ squidaio_request_t *request = resultp->_data; if (request && request->resultp == resultp) { debug(43, 9) ("squidaio_cancel: %p type=%d result=%p\n", request, request->request_type, request->resultp); request->cancelled = 1; request->resultp = NULL; resultp->_data = NULL; return 0; } return 1;} /* squidaio_cancel */intsquidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp){ squidaio_request_t *requestp; requestp = memPoolAlloc(squidaio_request_pool); requestp->path = (char *) squidaio_xstrdup(path); requestp->oflag = oflag; requestp->mode = mode; requestp->resultp = resultp; requestp->request_type = _AIO_OP_OPEN; requestp->cancelled = 0; squidaio_queue_request(requestp); return 0;}static voidsquidaio_do_open(squidaio_request_t * requestp){ requestp->ret = open(requestp->path, requestp->oflag, requestp->mode); requestp->err = errno;}intsquidaio_read(int fd, char *bufp, int bufs, off_t offset, int whence, squidaio_result_t * resultp){ squidaio_request_t *requestp; requestp = memPoolAlloc(squidaio_request_pool); requestp->fd = fd; requestp->bufferp = bufp; requestp->buflen = bufs; requestp->offset = offset; requestp->whence = whence; requestp->resultp = resultp; requestp->request_type = _AIO_OP_READ; requestp->cancelled = 0; squidaio_queue_request(requestp); return 0;}static voidsquidaio_do_read(squidaio_request_t * requestp){ lseek(requestp->fd, requestp->offset, requestp->whence); if (!ReadFile((HANDLE) _get_osfhandle(requestp->fd), requestp->bufferp, requestp->buflen, (LPDWORD) & requestp->ret, NULL)) { WIN32_maperror(GetLastError()); requestp->ret = -1; } requestp->err = errno;}intsquidaio_write(int fd, char *bufp, int bufs, off_t offset, int whence, squidaio_result_t * resultp){ squidaio_request_t *requestp; requestp = memPoolAlloc(squidaio_request_pool); requestp->fd = fd; requestp->bufferp = bufp; requestp->buflen = bufs; requestp->offset = offset; requestp->whence = whence; requestp->resultp = resultp; requestp->request_type = _AIO_OP_WRITE; requestp->cancelled = 0; squidaio_queue_request(requestp); return 0;}static voidsquidaio_do_write(squidaio_request_t * requestp){ assert(requestp->offset >= 0); if (!WriteFile((HANDLE) _get_osfhandle(requestp->fd), requestp->bufferp, requestp->buflen, (LPDWORD) & requestp->ret, NULL)) { WIN32_maperror(GetLastError()); requestp->ret = -1; } requestp->err = errno;}intsquidaio_close(int fd, squidaio_result_t * resultp){ squidaio_request_t *requestp; requestp = memPoolAlloc(squidaio_request_pool); requestp->fd = fd; requestp->resultp = resultp; requestp->request_type = _AIO_OP_CLOSE; requestp->cancelled = 0; squidaio_queue_request(requestp); return 0;}static voidsquidaio_do_close(squidaio_request_t * requestp){ if ((requestp->ret = close(requestp->fd)) < 0) debug(43, 0) ("squidaio_do_close: FD %d, errno %d\n", requestp->fd, errno); requestp->err = errno;}intsquidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp){ squidaio_request_t *requestp; requestp = memPoolAlloc(squidaio_request_pool); requestp->path = (char *) squidaio_xstrdup(path); requestp->statp = sb; requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat)); requestp->resultp = resultp; requestp->request_type = _AIO_OP_STAT; requestp->cancelled = 0; squidaio_queue_request(requestp); return 0;}static voidsquidaio_do_stat(squidaio_request_t * requestp){ requestp->ret = stat(requestp->path, requestp->tmpstatp); requestp->err = errno;}intsquidaio_unlink(const char *path, squidaio_result_t * resultp){ squidaio_request_t *requestp; requestp = memPoolAlloc(squidaio_request_pool); requestp->path = squidaio_xstrdup(path); requestp->resultp = resultp; requestp->request_type = _AIO_OP_UNLINK; requestp->cancelled = 0; squidaio_queue_request(requestp); return 0;}static voidsquidaio_do_unlink(squidaio_request_t * requestp){ requestp->ret = unlink(requestp->path); requestp->err = errno;}#if USE_TRUNCATEintsquidaio_truncate(const char *path, off_t length, squidaio_result_t * resultp){ squidaio_request_t *requestp; requestp = memPoolAlloc(squidaio_request_pool); requestp->path = (char *) squidaio_xstrdup(path); requestp->offset = length; requestp->resultp = resultp; requestp->request_type = _AIO_OP_TRUNCATE; requestp->cancelled = 0; squidaio_queue_request(requestp); return 0;}static voidsquidaio_do_truncate(squidaio_request_t * requestp){ requestp->ret = truncate(requestp->path, requestp->offset); requestp->err = errno;}#endif#if AIO_OPENDIR/* XXX squidaio_opendir NOT implemented yet.. */intsquidaio_opendir(const char *path, squidaio_result_t * resultp){ squidaio_request_t *requestp; int len; requestp = memPoolAlloc(squidaio_request_pool); return -1;}static voidsquidaio_do_opendir(squidaio_request_t * requestp){ /* NOT IMPLEMENTED */}#endifstatic voidsquidaio_poll_queues(void){ /* kick "overflow" request queue */ if (request_queue2.head && (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0)) { *request_queue.tailp = request_queue2.head; request_queue.tailp = request_queue2.tailp; if (!SetEvent(request_queue.cond)) fatal("couldn't push queue\n"); if (!ReleaseMutex(request_queue.mutex)) { /* unexpected error */ fatal("couldn't push queue\n"); } request_queue2.head = NULL; request_queue2.tailp = &request_queue2.head; } /* Give up the CPU to allow the threads to do their work */ if (done_queue.head || request_queue.head) Sleep(0); /* poll done queue */ if (done_queue.head && (WaitForSingleObject(done_queue.mutex, 0) == WAIT_OBJECT_0)) { struct squidaio_request_t *requests = done_queue.head; done_queue.head = NULL; done_queue.tailp = &done_queue.head; if (!ReleaseMutex(done_queue.mutex)) { /* unexpected error */ fatal("couldn't poll queue\n"); } *done_requests.tailp = requests; request_queue_len -= 1; while (requests->next) { requests = requests->next; request_queue_len -= 1; } done_requests.tailp = &requests->next; }}squidaio_result_t *squidaio_poll_done(void){ squidaio_request_t *request; squidaio_result_t *resultp; int cancelled; int polled = 0; AIO_REPOLL: request = done_requests.head; if (request == NULL && !polled) { if (done_signalled) { char junk[256]; FD_READ_METHOD(done_fd_read, junk, sizeof(junk)); done_signalled = 0; } squidaio_poll_queues(); polled = 1; request = done_requests.head; } if (!request) { return NULL; } debug(43, 9) ("squidaio_poll_done: %p type=%d result=%p\n", request, request->request_type, request->resultp); done_requests.head = request->next; if (!done_requests.head) done_requests.tailp = &done_requests.head; resultp = request->resultp; cancelled = request->cancelled; squidaio_debug(request); debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err); squidaio_cleanup_request(request); if (cancelled) goto AIO_REPOLL; return resultp;} /* squidaio_poll_done */intsquidaio_operations_pending(void){ return request_queue_len + (done_requests.head ? 1 : 0);}intsquidaio_sync(void){ /* XXX This might take a while if the queue is large.. */ do { squidaio_poll_queues(); } while (request_queue_len > 0); return squidaio_operations_pending();}intsquidaio_get_queue_len(void){ return request_queue_len;}static voidsquidaio_debug(squidaio_request_t * request){ switch (request->request_type) { case _AIO_OP_OPEN: debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret); break; case _AIO_OP_READ: debug(43, 5) ("READ on fd: %d\n", request->fd); break; case _AIO_OP_WRITE: debug(43, 5) ("WRITE on fd: %d\n", request->fd); break; case _AIO_OP_CLOSE: debug(43, 5) ("CLOSE of fd: %d\n", request->fd); break; case _AIO_OP_UNLINK: debug(43, 5) ("UNLINK of %s\n", request->path); break; case _AIO_OP_TRUNCATE: debug(43, 5) ("UNLINK of %s\n", request->path); break; default: break; }}voidsquidaio_stats(StoreEntry * sentry){ squidaio_thread_t *threadp; int i; if (!squidaio_initialised) return; storeAppendPrintf(sentry, "\n\nThreads Status:\n"); storeAppendPrintf(sentry, "#\tID\t# Requests\n"); threadp = threads; for (i = 0; i < squidaio_nthreads; i++) { storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests); threadp = threadp->next; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -