📄 prmwait.c
字号:
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); PR_Lock(group->ml); goto failed_alloc; } if (NULL != old_polling_list) PR_DELETE(old_polling_list); PR_Lock(group->ml); if (_prmw_running != group->state) { PR_SetError(PR_INVALID_STATE_ERROR, 0); goto aborted; } group->polling_list = poll_list; group->polling_count = new_count; } now = PR_IntervalNow(); polling_interval = max_polling_interval; since_last_poll = now - group->last_poll; waiter = &group->waiter->recv_wait; poll_list = group->polling_list; for (count = 0; count < group->waiter->count; ++waiter) { PR_ASSERT(waiter < &group->waiter->recv_wait + group->waiter->length); if (NULL != *waiter) /* a live one! */ { if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) && (since_last_poll >= (*waiter)->timeout)) _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT); else { if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) { (*waiter)->timeout -= since_last_poll; if ((*waiter)->timeout < polling_interval) polling_interval = (*waiter)->timeout; } PR_ASSERT(poll_list < group->polling_list + group->polling_count); poll_list->fd = (*waiter)->fd; poll_list->in_flags = PR_POLL_READ; poll_list->out_flags = 0;#if 0 printf( "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n", poll_list, count, poll_list->fd, (*waiter)->timeout);#endif poll_list += 1; count += 1; } } } PR_ASSERT(count == group->waiter->count); /* ** If there are no more threads waiting for completion, ** we need to return. */ if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) && (1 == group->waiting_threads)) break; if (0 == count) continue; /* wait for new business */ group->last_poll = now; PR_Unlock(group->ml); count_ready = PR_Poll(group->polling_list, count, polling_interval); PR_Lock(group->ml); if (_prmw_running != group->state) { PR_SetError(PR_INVALID_STATE_ERROR, 0); goto aborted; } if (-1 == count_ready) { goto failed_poll; /* that's a shame */ } else if (0 < count_ready) { for (poll_list = group->polling_list; count > 0; poll_list++, count--) { PR_ASSERT( poll_list < group->polling_list + group->polling_count); if (poll_list->out_flags != 0) { waiter = _MW_LookupInternal(group, poll_list->fd); /* ** If 'waiter' is NULL, that means the wait receive ** descriptor has been canceled. */ if (NULL != waiter) _MW_DoneInternal(group, waiter, PR_MW_SUCCESS); } } } /* ** If there are no more threads waiting for completion, ** we need to return. ** This thread was "borrowed" to do the polling, but it really ** belongs to the client. */ if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) && (1 == group->waiting_threads)) break; } rv = PR_SUCCESS;aborted:failed_poll:failed_alloc: group->poller = NULL; /* we were that, not we ain't */ if ((_prmw_running == group->state) && (group->waiting_threads > 1)) { /* Wake up one thread to become the new poller. */ PR_NotifyCondVar(group->io_complete); } return rv; /* we return with the lock held */} /* _MW_PollInternal */#endif /* !WINNT */static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group){ PRMWGroupState rv = group->state; /* ** Looking at the group's fields is safe because ** once the group's state is no longer running, it ** cannot revert and there is a safe check on entry ** to make sure no more threads are made to wait. */ if ((_prmw_stopping == rv) && (0 == group->waiting_threads)) { rv = group->state = _prmw_stopped; PR_NotifyCondVar(group->mw_manage); } return rv;} /* MW_TestForShutdownInternal */#ifndef WINNTstatic void _MW_InitialRecv(PRCList *io_ready){ PRRecvWait *desc = (PRRecvWait*)io_ready; if ((NULL == desc->buffer.start) || (0 == desc->buffer.length)) desc->bytesRecv = 0; else { desc->bytesRecv = desc->fd->methods->recv( desc->fd, desc->buffer.start, desc->buffer.length, 0, desc->timeout); if (desc->bytesRecv < 0) /* SetError should already be there */ desc->outcome = PR_MW_FAILURE; }} /* _MW_InitialRecv */#endif#ifdef WINNTstatic void NT_TimeProc(void *arg){ _MDOverlapped *overlapped = (_MDOverlapped *)arg; PRRecvWait *desc = overlapped->data.mw.desc; PRFileDesc *bottom; if (InterlockedCompareExchange((PVOID *)&desc->outcome, (PVOID)PR_MW_TIMEOUT, (PVOID)PR_MW_PENDING) != (PVOID)PR_MW_PENDING) { /* This wait recv descriptor has already completed. */ return; } /* close the osfd to abort the outstanding async io request */ /* $$$$ ** Little late to be checking if NSPR's on the bottom of stack, ** but if we don't check, we can't assert that the private data ** is what we think it is. ** $$$$ */ bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); PR_ASSERT(NULL != bottom); if (NULL != bottom) /* now what!?!?! */ { bottom->secret->state = _PR_FILEDESC_CLOSED; if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) { fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); PR_ASSERT(!"What shall I do?"); } } return;} /* NT_TimeProc */static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd){ PRRecvWait **waiter; _PR_MD_LOCK(&group->mdlock); waiter = _MW_LookupInternal(group, fd); if (NULL != waiter) { group->waiter->count -= 1; *waiter = NULL; } _PR_MD_UNLOCK(&group->mdlock); return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;}PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd){ PRRecvWait **waiter; waiter = _MW_LookupInternal(group, fd); if (NULL != waiter) { group->waiter->count -= 1; *waiter = NULL; } return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;}#endif /* WINNT *//******************************************************************//******************************************************************//********************** The public API portion ********************//******************************************************************//******************************************************************/PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc( PRWaitGroup *group, PRRecvWait *desc){ _PR_HashStory hrv; PRStatus rv = PR_FAILURE;#ifdef WINNT _MDOverlapped *overlapped; HANDLE hFile; BOOL bResult; DWORD dwError; PRFileDesc *bottom;#endif if (!_pr_initialized) _PR_ImplicitInitialization(); if ((NULL == group) && (NULL == (group = MW_Init2()))) { return rv; } PR_ASSERT(NULL != desc->fd); desc->outcome = PR_MW_PENDING; /* nice, well known value */ desc->bytesRecv = 0; /* likewise, though this value is ambiguious */ PR_Lock(group->ml); if (_prmw_running != group->state) { /* Not allowed to add after cancelling the group */ desc->outcome = PR_MW_INTERRUPT; PR_SetError(PR_INVALID_STATE_ERROR, 0); PR_Unlock(group->ml); return rv; }#ifdef WINNT _PR_MD_LOCK(&group->mdlock);#endif /* ** If the waiter count is zero at this point, there's no telling ** how long we've been idle. Therefore, initialize the beginning ** of the timing interval. As long as the list doesn't go empty, ** it will maintain itself. */ if (0 == group->waiter->count) group->last_poll = PR_IntervalNow(); do { hrv = MW_AddHashInternal(desc, group->waiter); if (_prmw_rehash != hrv) break; hrv = MW_ExpandHashInternal(group); /* gruesome */ if (_prmw_success != hrv) break; } while (PR_TRUE);#ifdef WINNT _PR_MD_UNLOCK(&group->mdlock);#endif PR_NotifyCondVar(group->new_business); /* tell the world */ rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE; PR_Unlock(group->ml);#ifdef WINNT overlapped = PR_NEWZAP(_MDOverlapped); if (NULL == overlapped) { PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); NT_HashRemove(group, desc->fd); return rv; } overlapped->ioModel = _MD_MultiWaitIO; overlapped->data.mw.desc = desc; overlapped->data.mw.group = group; if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) { overlapped->data.mw.timer = CreateTimer( desc->timeout, NT_TimeProc, overlapped); if (0 == overlapped->data.mw.timer) { NT_HashRemove(group, desc->fd); PR_DELETE(overlapped); /* * XXX It appears that a maximum of 16 timer events can * be outstanding. GetLastError() returns 0 when I try it. */ PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError()); return PR_FAILURE; } } /* Reach to the bottom layer to get the OS fd */ bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); PR_ASSERT(NULL != bottom); if (NULL == bottom) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return PR_FAILURE; } hFile = (HANDLE)bottom->secret->md.osfd; if (!bottom->secret->md.io_model_committed) { PRInt32 st; st = _md_Associate(hFile); PR_ASSERT(0 != st); bottom->secret->md.io_model_committed = PR_TRUE; } bResult = ReadFile(hFile, desc->buffer.start, (DWORD)desc->buffer.length, NULL, &overlapped->overlapped); if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING) { if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) { if (InterlockedCompareExchange((PVOID *)&desc->outcome, (PVOID)PR_MW_FAILURE, (PVOID)PR_MW_PENDING) == (PVOID)PR_MW_PENDING) { CancelTimer(overlapped->data.mw.timer); } NT_HashRemove(group, desc->fd); PR_DELETE(overlapped); } _PR_MD_MAP_READ_ERROR(dwError); rv = PR_FAILURE; }#endif return rv;} /* PR_AddWaitFileDesc */PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group){ PRCList *io_ready = NULL;#ifdef WINNT PRThread *me = _PR_MD_CURRENT_THREAD(); _MDOverlapped *overlapped; #endif if (!_pr_initialized) _PR_ImplicitInitialization(); if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init; PR_Lock(group->ml); if (_prmw_running != group->state) { PR_SetError(PR_INVALID_STATE_ERROR, 0); goto invalid_state; } group->waiting_threads += 1; /* the polling thread is counted */#ifdef WINNT _PR_MD_LOCK(&group->mdlock); while (PR_CLIST_IS_EMPTY(&group->io_ready)) { _PR_THREAD_LOCK(me); me->state = _PR_IO_WAIT; PR_APPEND_LINK(&me->waitQLinks, &group->wait_list); if (!_PR_IS_NATIVE_THREAD(me)) { _PR_SLEEPQ_LOCK(me->cpu); _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT); _PR_SLEEPQ_UNLOCK(me->cpu); } _PR_THREAD_UNLOCK(me); _PR_MD_UNLOCK(&group->mdlock); PR_Unlock(group->ml); _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT); me->state = _PR_RUNNING; PR_Lock(group->ml); _PR_MD_LOCK(&group->mdlock); if (_PR_PENDING_INTERRUPT(me)) { PR_REMOVE_LINK(&me->waitQLinks); _PR_MD_UNLOCK(&group->mdlock); me->flags &= ~_PR_INTERRUPT; me->io_suspended = PR_FALSE; PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0); goto aborted; } } io_ready = PR_LIST_HEAD(&group->io_ready); PR_ASSERT(io_ready != NULL); PR_REMOVE_LINK(io_ready); _PR_MD_UNLOCK(&group->mdlock); overlapped = (_MDOverlapped *) ((char *)io_ready - offsetof(_MDOverlapped, data)); io_ready = &overlapped->data.mw.desc->internal;#else do { /* ** If the I/O ready list isn't empty, have this thread ** return with the first receive wait object that's available. */ if (PR_CLIST_IS_EMPTY(&group->io_ready)) { /* ** Is there a polling thread yet? If not, grab this thread ** and use it. */ if (NULL == group->poller) { /* ** This thread will stay do polling until it becomes the only one ** left to service a completion. Then it will return and there will ** be none left to actually poll or to run completions. ** ** The polling function should only return w/ failure or ** with some I/O ready. */ if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll; } else { /* ** There are four reasons a thread can be awakened from ** a wait on the io_complete condition variable. ** 1. Some I/O has completed, i.e., the io_ready list ** is nonempty. ** 2. The wait group is canceled. ** 3. The thread is interrupted. ** 4. The current polling thread has to leave and needs ** a replacement. ** The logic to find a new polling thread is made more ** complicated by all the other possible events. ** I tried my best to write the logic clearly, but ** it is still full of if's with continue and goto. */ PRStatus st; do { st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT); if (_prmw_running != group->state) { PR_SetError(PR_INVALID_STATE_ERROR, 0); goto aborted; } if (_MW_ABORTED(st) || (NULL == group->poller)) break; } while (PR_CLIST_IS_EMPTY(&group->io_ready)); /* ** The thread is interrupted and has to leave. It might ** have also been awakened to process ready i/o or be the ** new poller. To be safe, if either condition is true, ** we awaken another thread to take its place. */ if (_MW_ABORTED(st)) { if ((NULL == group->poller || !PR_CLIST_IS_EMPTY(&group->io_ready)) && group->waiting_threads > 1) PR_NotifyCondVar(group->io_complete); goto aborted; } /* ** A new poller is needed, but can I be the new poller? ** If there is no i/o ready, sure. But if there is any ** i/o ready, it has a higher priority. I want to
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -