📄 ntio.c
字号:
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- *//* * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the License for the specific language governing * rights and limitations under the License. * * The Original Code is the Netscape Portable Runtime (NSPR). * * The Initial Developer of the Original Code is Netscape * Communications Corporation. Portions created by Netscape are * Copyright (C) 1998-2000 Netscape Communications Corporation. All * Rights Reserved. * * Contributor(s): * * Alternatively, the contents of this file may be used under the * terms of the GNU General Public License Version 2 or later (the * "GPL"), in which case the provisions of the GPL are applicable * instead of those above. If you wish to allow use of your * version of this file only under the terms of the GPL and not to * allow others to use your version of this file under the MPL, * indicate your decision by deleting the provisions above and * replace them with the notice and other provisions required by * the GPL. If you do not delete the provisions above, a recipient * may use your version of this file under either the MPL or the * GPL. *//* Windows NT IO module * * This module handles IO for LOCAL_SCOPE and GLOBAL_SCOPE threads. * For LOCAL_SCOPE threads, we're using NT fibers. For GLOBAL_SCOPE threads * we're using NT-native threads. * * When doing IO, we want to use completion ports for optimal performance * with fibers. But if we use completion ports for all IO, it is difficult * to project a blocking model with GLOBAL_SCOPE threads. To handle this * we create an extra thread for completing IO for GLOBAL_SCOPE threads. * We don't really want to complete IO on a separate thread for LOCAL_SCOPE * threads because it means extra context switches, which are really slow * on NT... Since we're using a single completion port, some IO will * be incorrectly completed on the GLOBAL_SCOPE IO thread; this will mean * extra context switching; but I don't think there is anything I can do * about it. */#include "primpl.h"#include "pprmwait.h"#include <direct.h>#include <mbstring.h>static HANDLE _pr_completion_port;static PRThread *_pr_io_completion_thread;#define RECYCLE_SIZE 512static struct _MDLock _pr_recycle_lock;static PRInt32 _pr_recycle_array[RECYCLE_SIZE];static PRInt32 _pr_recycle_tail = 0; __declspec(thread) PRThread *_pr_io_restarted_io = NULL;DWORD _pr_io_restartedIOIndex; /* The thread local storage slot for each * thread is initialized to NULL. */PRBool _nt_version_gets_lockfile_completion;struct _MDLock _pr_ioq_lock;extern _MDLock _nt_idleLock;extern PRCList _nt_idleList;extern PRUint32 _nt_idleCount;#define CLOSE_TIMEOUT PR_SecondsToInterval(5)/* * NSPR-to-NT access right mapping table for files. */static DWORD fileAccessTable[] = { FILE_GENERIC_READ, FILE_GENERIC_WRITE, FILE_GENERIC_EXECUTE};/* * NSPR-to-NT access right mapping table for directories. */static DWORD dirAccessTable[] = { FILE_GENERIC_READ, FILE_GENERIC_WRITE|FILE_DELETE_CHILD, FILE_GENERIC_EXECUTE};/* * The NSPR epoch (00:00:00 1 Jan 1970 UTC) in FILETIME. * We store the value in a PRTime variable for convenience. * This constant is used by _PR_FileTimeToPRTime(). */static const PRTime _pr_filetime_offset = 116444736000000000i64;#define _NEED_351_FILE_LOCKING_HACK#ifdef _NEED_351_FILE_LOCKING_HACK#define _PR_LOCAL_FILE 1#define _PR_REMOTE_FILE 2PRBool IsFileLocalInit();PRInt32 IsFileLocal(HANDLE hFile);#endif /* _NEED_351_FILE_LOCKING_HACK */static PRInt32 _md_MakeNonblock(HANDLE);static PRInt32 _nt_nonblock_accept(PRFileDesc *fd, struct sockaddr *addr, int *addrlen, PRIntervalTime);static PRInt32 _nt_nonblock_connect(PRFileDesc *fd, struct sockaddr *addr, int addrlen, PRIntervalTime);static PRInt32 _nt_nonblock_recv(PRFileDesc *fd, char *buf, int len, int flags, PRIntervalTime);static PRInt32 _nt_nonblock_send(PRFileDesc *fd, char *buf, int len, PRIntervalTime);static PRInt32 _nt_nonblock_writev(PRFileDesc *fd, const PRIOVec *iov, int size, PRIntervalTime);static PRInt32 _nt_nonblock_sendto(PRFileDesc *, const char *, int, const struct sockaddr *, int, PRIntervalTime);static PRInt32 _nt_nonblock_recvfrom(PRFileDesc *, char *, int, struct sockaddr *, int *, PRIntervalTime);/* * We cannot associate a fd (a socket) with an I/O completion port * if the fd is nonblocking or inheritable. * * Nonblocking socket I/O won't work if the socket is associated with * an I/O completion port. * * An inheritable fd cannot be associated with an I/O completion port * because the completion notification of async I/O initiated by the * child process is still posted to the I/O completion port in the * parent process. */#define _NT_USE_NB_IO(fd) \ ((fd)->secret->nonblocking || (fd)->secret->inheritable == _PR_TRI_TRUE)/* * UDP support * * UDP is supported on NT by the continuation thread mechanism. * The code is borrowed from ptio.c in pthreads nspr, hence the * PT and pt prefixes. This mechanism is in fact general and * not limited to UDP. For now, only UDP's recvfrom and sendto * go through the continuation thread if they get WSAEWOULDBLOCK * on first try. Recv and send on a connected UDP socket still * goes through asychronous io. */#define PT_DEFAULT_SELECT_MSEC 100typedef struct pt_Continuation pt_Continuation;typedef PRBool (*ContinuationFn)(pt_Continuation *op, PRInt16 revent);typedef enum pr_ContuationStatus{ pt_continuation_sumbitted, pt_continuation_inprogress, pt_continuation_abort, pt_continuation_done} pr_ContuationStatus;struct pt_Continuation{ /* These objects are linked in ascending timeout order */ pt_Continuation *next, *prev; /* self linked list of these things */ /* The building of the continuation operation */ ContinuationFn function; /* what function to continue */ union { SOCKET osfd; } arg1; /* #1 - the op's fd */ union { void* buffer; } arg2; /* #2 - primary transfer buffer */ union { PRIntn amount; } arg3; /* #3 - size of 'buffer' */ union { PRIntn flags; } arg4; /* #4 - read/write flags */ union { PRNetAddr *addr; } arg5; /* #5 - send/recv address */ PRIntervalTime timeout; /* representation of the timeout */ PRIntn event; /* flags for select()'s events */ /* ** The representation and notification of the results of the operation. ** These function can either return an int return code or a pointer to ** some object. */ union { PRIntn code; void *object; } result; PRIntn syserrno; /* in case it failed, why (errno) */ pr_ContuationStatus status; /* the status of the operation */ PRCondVar *complete; /* to notify the initiating thread */};static struct pt_TimedQueue{ PRLock *ml; /* a little protection */ PRThread *thread; /* internal thread's identification */ PRCondVar *new_op; /* new operation supplied */ PRCondVar *finish_op; /* an existing operation finished */ PRUintn op_count; /* number of operations in the list */ pt_Continuation *head, *tail; /* head/tail of list of operations */ pt_Continuation *op; /* timed operation furthest in future */ PRIntervalTime epoch; /* the epoch of 'timed' */} pt_tq;#if defined(DEBUG)static struct pt_debug_s{ PRIntn predictionsFoiled; PRIntn pollingListMax; PRIntn continuationsServed;} pt_debug;#endif /* DEBUG */static void ContinuationThread(void *arg);static PRInt32 pt_SendTo( SOCKET osfd, const void *buf, PRInt32 amount, PRInt32 flags, const PRNetAddr *addr, PRIntn addrlen, PRIntervalTime timeout);static PRInt32 pt_RecvFrom(SOCKET osfd, void *buf, PRInt32 amount, PRInt32 flags, PRNetAddr *addr, PRIntn *addr_len, PRIntervalTime timeout);/* The key returned from GetQueuedCompletionStatus() is used to determine what * type of completion we have. We differentiate between IO completions and * CVAR completions. */#define KEY_IO 0xaaaaaaaa#define KEY_CVAR 0xbbbbbbbbPRInt32_PR_MD_PAUSE_CPU(PRIntervalTime ticks){ int awoken = 0; unsigned long bytes, key; int rv; LPOVERLAPPED olp; _MDOverlapped *mdOlp; PRUint32 timeout; if (_nt_idleCount > 0) { PRThread *deadThread; _MD_LOCK(&_nt_idleLock); while( !PR_CLIST_IS_EMPTY(&_nt_idleList) ) { deadThread = _PR_THREAD_PTR(PR_LIST_HEAD(&_nt_idleList)); PR_REMOVE_LINK(&deadThread->links); PR_ASSERT(deadThread->state == _PR_DEAD_STATE); /* XXXMB - cleanup to do here? */ if ( !_PR_IS_NATIVE_THREAD(deadThread) ){ /* Spinlock while user thread is still running. * There is no way to use a condition variable here. The thread * is dead, and we have to wait until we switch off the dead * thread before we can kill the fiber completely. */ while ( deadThread->no_sched) ; DeleteFiber(deadThread->md.fiber_id); } memset(deadThread, 0xa, sizeof(PRThread)); /* debugging */ if (!deadThread->threadAllocatedOnStack) PR_DELETE(deadThread); _nt_idleCount--; } _MD_UNLOCK(&_nt_idleLock); } if (ticks == PR_INTERVAL_NO_TIMEOUT)#if 0 timeout = INFINITE;#else /* * temporary hack to poll the runq every 5 seconds because of bug in * native threads creating user threads and not poking the right cpu. * * A local thread that was interrupted is bound to its current * cpu but there is no easy way for the interrupter to poke the * right cpu. This is a hack to poll the runq every 5 seconds. */ timeout = 5000;#endif else timeout = PR_IntervalToMilliseconds(ticks); /* * The idea of looping here is to complete as many IOs as possible before * returning. This should minimize trips to the idle thread. */ while(1) { rv = GetQueuedCompletionStatus( _pr_completion_port, &bytes, &key, &olp, timeout); if (rv == 0 && olp == NULL) { /* Error in GetQueuedCompetionStatus */ if (GetLastError() != WAIT_TIMEOUT) { /* ARGH - what can we do here? Log an error? XXXMB */ return -1; } else { /* If awoken == 0, then we just had a timeout */ return awoken; } } if (olp == NULL) return 0; mdOlp = (_MDOverlapped *)olp; if (mdOlp->ioModel == _MD_MultiWaitIO) { PRRecvWait *desc; PRWaitGroup *group; PRThread *thred = NULL; PRMWStatus mwstatus; desc = mdOlp->data.mw.desc; PR_ASSERT(desc != NULL); mwstatus = rv ? PR_MW_SUCCESS : PR_MW_FAILURE; if (InterlockedCompareExchange((PVOID *)&desc->outcome, (PVOID)mwstatus, (PVOID)PR_MW_PENDING) == (PVOID)PR_MW_PENDING) { if (mwstatus == PR_MW_SUCCESS) { desc->bytesRecv = bytes; } else { mdOlp->data.mw.error = GetLastError(); } } group = mdOlp->data.mw.group; PR_ASSERT(group != NULL); _PR_MD_LOCK(&group->mdlock); PR_APPEND_LINK(&mdOlp->data.mw.links, &group->io_ready); PR_ASSERT(desc->fd != NULL); NT_HashRemoveInternal(group, desc->fd); if (!PR_CLIST_IS_EMPTY(&group->wait_list)) { thred = _PR_THREAD_CONDQ_PTR(PR_LIST_HEAD(&group->wait_list)); PR_REMOVE_LINK(&thred->waitQLinks); } _PR_MD_UNLOCK(&group->mdlock); if (thred) { if (!_PR_IS_NATIVE_THREAD(thred)) { int pri = thred->priority; _PRCPU *lockedCPU = _PR_MD_CURRENT_CPU(); _PR_THREAD_LOCK(thred); if (thred->flags & _PR_ON_PAUSEQ) { _PR_SLEEPQ_LOCK(thred->cpu); _PR_DEL_SLEEPQ(thred, PR_TRUE); _PR_SLEEPQ_UNLOCK(thred->cpu); _PR_THREAD_UNLOCK(thred); thred->cpu = lockedCPU; thred->state = _PR_RUNNABLE; _PR_RUNQ_LOCK(lockedCPU); _PR_ADD_RUNQ(thred, lockedCPU, pri); _PR_RUNQ_UNLOCK(lockedCPU); } else { /* * The thread was just interrupted and moved * from the pause queue to the run queue. */ _PR_THREAD_UNLOCK(thred); } } else { _PR_THREAD_LOCK(thred); thred->state = _PR_RUNNABLE; _PR_THREAD_UNLOCK(thred); ReleaseSemaphore(thred->md.blocked_sema, 1, NULL); } } } else { PRThread *completed_io; PR_ASSERT(mdOlp->ioModel == _MD_BlockingIO); completed_io = _PR_THREAD_MD_TO_PTR(mdOlp->data.mdThread); completed_io->md.blocked_io_status = rv; if (rv == 0) completed_io->md.blocked_io_error = GetLastError(); completed_io->md.blocked_io_bytes = bytes; if ( !_PR_IS_NATIVE_THREAD(completed_io) ) { int pri = completed_io->priority; _PRCPU *lockedCPU = _PR_MD_CURRENT_CPU(); /* The KEY_CVAR notification only occurs when a native thread * is notifying a user thread. For user-user notifications * the wakeup occurs by having the notifier place the thread * on the runq directly; for native-native notifications the * wakeup occurs by calling ReleaseSemaphore. */ if ( key == KEY_CVAR ) { PR_ASSERT(completed_io->io_pending == PR_FALSE); PR_ASSERT(completed_io->io_suspended == PR_FALSE); PR_ASSERT(completed_io->md.thr_bound_cpu == NULL); /* Thread has already been deleted from sleepQ */ /* Switch CPU and add to runQ */ completed_io->cpu = lockedCPU; completed_io->state = _PR_RUNNABLE; _PR_RUNQ_LOCK(lockedCPU); _PR_ADD_RUNQ(completed_io, lockedCPU, pri); _PR_RUNQ_UNLOCK(lockedCPU); } else { PR_ASSERT(key == KEY_IO); PR_ASSERT(completed_io->io_pending == PR_TRUE); _PR_THREAD_LOCK(completed_io); completed_io->io_pending = PR_FALSE; /* If io_suspended is true, then this IO has already resumed. * We don't need to do anything; because the thread is * already running. */ if (completed_io->io_suspended == PR_FALSE) { if (completed_io->flags & (_PR_ON_SLEEPQ|_PR_ON_PAUSEQ)) { _PR_SLEEPQ_LOCK(completed_io->cpu); _PR_DEL_SLEEPQ(completed_io, PR_TRUE); _PR_SLEEPQ_UNLOCK(completed_io->cpu); _PR_THREAD_UNLOCK(completed_io); /* * If an I/O operation is suspended, the thread
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -