📄 prmwait.c
字号:
** process the ready i/o first and wake up another ** thread to be the new poller. */ if (NULL == group->poller) { if (PR_CLIST_IS_EMPTY(&group->io_ready)) continue; if (group->waiting_threads > 1) PR_NotifyCondVar(group->io_complete); } } PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready)); } io_ready = PR_LIST_HEAD(&group->io_ready); PR_NotifyCondVar(group->io_taken); PR_ASSERT(io_ready != NULL); PR_REMOVE_LINK(io_ready); } while (NULL == io_ready);failed_poll:#endifaborted: group->waiting_threads -= 1;invalid_state: (void)MW_TestForShutdownInternal(group); PR_Unlock(group->ml);failed_init: if (NULL != io_ready) { /* If the operation failed, record the reason why */ switch (((PRRecvWait*)io_ready)->outcome) { case PR_MW_PENDING: PR_ASSERT(0); break; case PR_MW_SUCCESS:#ifndef WINNT _MW_InitialRecv(io_ready);#endif break;#ifdef WINNT case PR_MW_FAILURE: _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error); break;#endif case PR_MW_TIMEOUT: PR_SetError(PR_IO_TIMEOUT_ERROR, 0); break; case PR_MW_INTERRUPT: PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0); break; default: break; }#ifdef WINNT if (NULL != overlapped->data.mw.timer) { PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout); CancelTimer(overlapped->data.mw.timer); } else { PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout); } PR_DELETE(overlapped);#endif } return (PRRecvWait*)io_ready;} /* PR_WaitRecvReady */PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc){#if !defined(WINNT) PRRecvWait **recv_wait;#endif PRStatus rv = PR_SUCCESS; if (NULL == group) group = mw_state->group; PR_ASSERT(NULL != group); if (NULL == group) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return PR_FAILURE; } PR_Lock(group->ml); if (_prmw_running != group->state) { PR_SetError(PR_INVALID_STATE_ERROR, 0); rv = PR_FAILURE; goto unlock; }#ifdef WINNT if (InterlockedCompareExchange((PVOID *)&desc->outcome, (PVOID)PR_MW_INTERRUPT, (PVOID)PR_MW_PENDING) == (PVOID)PR_MW_PENDING) { PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); PR_ASSERT(NULL != bottom); if (NULL == bottom) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); goto unlock; } bottom->secret->state = _PR_FILEDESC_CLOSED;#if 0 fprintf(stderr, "cancel wait recv: closing socket\n");#endif if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) { fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); exit(1); } }#else if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd))) { /* it was in the wait table */ _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT); goto unlock; } if (!PR_CLIST_IS_EMPTY(&group->io_ready)) { /* is it already complete? */ PRCList *head = PR_LIST_HEAD(&group->io_ready); do { PRRecvWait *done = (PRRecvWait*)head; if (done == desc) goto unlock; head = PR_NEXT_LINK(head); } while (head != &group->io_ready); } PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); rv = PR_FAILURE;#endifunlock: PR_Unlock(group->ml); return rv;} /* PR_CancelWaitFileDesc */PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group){ PRRecvWait **desc; PRRecvWait *recv_wait = NULL;#ifdef WINNT _MDOverlapped *overlapped; PRRecvWait **end; PRThread *me = _PR_MD_CURRENT_THREAD();#endif if (NULL == group) group = mw_state->group; PR_ASSERT(NULL != group); if (NULL == group) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return NULL; } PR_Lock(group->ml); if (_prmw_stopped != group->state) { if (_prmw_running == group->state) group->state = _prmw_stopping; /* so nothing new comes in */ if (0 == group->waiting_threads) /* is there anybody else? */ group->state = _prmw_stopped; /* we can stop right now */ else { PR_NotifyAllCondVar(group->new_business); PR_NotifyAllCondVar(group->io_complete); } while (_prmw_stopped != group->state) (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT); }#ifdef WINNT _PR_MD_LOCK(&group->mdlock);#endif /* make all the existing descriptors look done/interrupted */#ifdef WINNT end = &group->waiter->recv_wait + group->waiter->length; for (desc = &group->waiter->recv_wait; desc < end; ++desc) { if (NULL != *desc) { if (InterlockedCompareExchange((PVOID *)&(*desc)->outcome, (PVOID)PR_MW_INTERRUPT, (PVOID)PR_MW_PENDING) == (PVOID)PR_MW_PENDING) { PRFileDesc *bottom = PR_GetIdentitiesLayer( (*desc)->fd, PR_NSPR_IO_LAYER); PR_ASSERT(NULL != bottom); if (NULL == bottom) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); goto invalid_arg; } bottom->secret->state = _PR_FILEDESC_CLOSED;#if 0 fprintf(stderr, "cancel wait group: closing socket\n");#endif if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) { fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); exit(1); } } } } while (group->waiter->count > 0) { _PR_THREAD_LOCK(me); me->state = _PR_IO_WAIT; PR_APPEND_LINK(&me->waitQLinks, &group->wait_list); if (!_PR_IS_NATIVE_THREAD(me)) { _PR_SLEEPQ_LOCK(me->cpu); _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT); _PR_SLEEPQ_UNLOCK(me->cpu); } _PR_THREAD_UNLOCK(me); _PR_MD_UNLOCK(&group->mdlock); PR_Unlock(group->ml); _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT); me->state = _PR_RUNNING; PR_Lock(group->ml); _PR_MD_LOCK(&group->mdlock); }#else for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc) { PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length); if (NULL != *desc) _MW_DoneInternal(group, desc, PR_MW_INTERRUPT); }#endif /* take first element of finished list and return it or NULL */ if (PR_CLIST_IS_EMPTY(&group->io_ready)) PR_SetError(PR_GROUP_EMPTY_ERROR, 0); else { PRCList *head = PR_LIST_HEAD(&group->io_ready); PR_REMOVE_AND_INIT_LINK(head);#ifdef WINNT overlapped = (_MDOverlapped *) ((char *)head - offsetof(_MDOverlapped, data)); head = &overlapped->data.mw.desc->internal; if (NULL != overlapped->data.mw.timer) { PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout); CancelTimer(overlapped->data.mw.timer); } else { PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout); } PR_DELETE(overlapped);#endif recv_wait = (PRRecvWait*)head; }#ifdef WINNTinvalid_arg: _PR_MD_UNLOCK(&group->mdlock);#endif PR_Unlock(group->ml); return recv_wait;} /* PR_CancelWaitGroup */PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */){#ifdef XP_MAC#pragma unused (size)#endif PRWaitGroup *wg; if (NULL == (wg = PR_NEWZAP(PRWaitGroup))) { PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); goto failed; } /* the wait group itself */ wg->ml = PR_NewLock(); if (NULL == wg->ml) goto failed_lock; wg->io_taken = PR_NewCondVar(wg->ml); if (NULL == wg->io_taken) goto failed_cvar0; wg->io_complete = PR_NewCondVar(wg->ml); if (NULL == wg->io_complete) goto failed_cvar1; wg->new_business = PR_NewCondVar(wg->ml); if (NULL == wg->new_business) goto failed_cvar2; wg->mw_manage = PR_NewCondVar(wg->ml); if (NULL == wg->mw_manage) goto failed_cvar3; PR_INIT_CLIST(&wg->group_link); PR_INIT_CLIST(&wg->io_ready); /* the waiters sequence */ wg->waiter = (_PRWaiterHash*)PR_CALLOC( sizeof(_PRWaiterHash) + (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*))); if (NULL == wg->waiter) { PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); goto failed_waiter; } wg->waiter->count = 0; wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;#ifdef WINNT _PR_MD_NEW_LOCK(&wg->mdlock); PR_INIT_CLIST(&wg->wait_list);#endif /* WINNT */ PR_Lock(mw_lock); PR_APPEND_LINK(&wg->group_link, &mw_state->group_list); PR_Unlock(mw_lock); return wg;failed_waiter: PR_DestroyCondVar(wg->mw_manage);failed_cvar3: PR_DestroyCondVar(wg->new_business);failed_cvar2: PR_DestroyCondVar(wg->io_complete);failed_cvar1: PR_DestroyCondVar(wg->io_taken);failed_cvar0: PR_DestroyLock(wg->ml);failed_lock: PR_DELETE(wg); wg = NULL;failed: return wg;} /* MW_CreateWaitGroup */PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group){ PRStatus rv = PR_SUCCESS; if (NULL == group) group = mw_state->group; PR_ASSERT(NULL != group); if (NULL != group) { PR_Lock(group->ml); if ((group->waiting_threads == 0) && (group->waiter->count == 0) && PR_CLIST_IS_EMPTY(&group->io_ready)) { group->state = _prmw_stopped; } else { PR_SetError(PR_INVALID_STATE_ERROR, 0); rv = PR_FAILURE; } PR_Unlock(group->ml); if (PR_FAILURE == rv) return rv; PR_Lock(mw_lock); PR_REMOVE_LINK(&group->group_link); PR_Unlock(mw_lock);#ifdef WINNT /* * XXX make sure wait_list is empty and waiter is empty. * These must be checked while holding mdlock. */ _PR_MD_FREE_LOCK(&group->mdlock);#endif PR_DELETE(group->waiter); PR_DELETE(group->polling_list); PR_DestroyCondVar(group->mw_manage); PR_DestroyCondVar(group->new_business); PR_DestroyCondVar(group->io_complete); PR_DestroyCondVar(group->io_taken); PR_DestroyLock(group->ml); if (group == mw_state->group) mw_state->group = NULL; PR_DELETE(group); } else { /* The default wait group is not created yet. */ PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); rv = PR_FAILURE; } return rv;} /* PR_DestroyWaitGroup *//***************************************************************************************************************************************************************** Wait group enumerations ***********************************************************************************************************************************************************************/PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group){ PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator); if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); else { enumerator->group = group; enumerator->seal = _PR_ENUM_SEALED; } return enumerator;} /* PR_CreateMWaitEnumerator */PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator){ PR_ASSERT(NULL != enumerator); PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal); if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return PR_FAILURE; } enumerator->seal = _PR_ENUM_UNSEALED; PR_Free(enumerator); return PR_SUCCESS;} /* PR_DestroyMWaitEnumerator */PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup( PRMWaitEnumerator *enumerator, const PRRecvWait *previous){ PRRecvWait *result = NULL; /* entry point sanity checking */ PR_ASSERT(NULL != enumerator); PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal); if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument; /* beginning of enumeration */ if (NULL == previous) { if (NULL == enumerator->group) { enumerator->group = mw_state->group; if (NULL == enumerator->group) { PR_SetError(PR_GROUP_EMPTY_ERROR, 0); return NULL; } } enumerator->waiter = &enumerator->group->waiter->recv_wait; enumerator->p_timestamp = enumerator->group->p_timestamp; enumerator->thread = PR_GetCurrentThread(); enumerator->index = 0; } /* continuing an enumeration */ else { PRThread *me = PR_GetCurrentThread(); PR_ASSERT(me == enumerator->thread); if (me != enumerator->thread) goto bad_argument; /* need to restart the enumeration */ if (enumerator->p_timestamp != enumerator->group->p_timestamp) return PR_EnumerateWaitGroup(enumerator, NULL); } /* actually progress the enumeration */#if defined(WINNT) _PR_MD_LOCK(&enumerator->group->mdlock);#else PR_Lock(enumerator->group->ml);#endif while (enumerator->index++ < enumerator->group->waiter->length) { if (NULL != (result = *(enumerator->waiter)++)) break; }#if defined(WINNT) _PR_MD_UNLOCK(&enumerator->group->mdlock);#else PR_Unlock(enumerator->group->ml);#endif return result; /* what we live for */bad_argument: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return NULL; /* probably ambiguous */} /* PR_EnumerateWaitGroup *//* prmwait.c */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -