⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 prmwait.c

📁 Netscape NSPR库源码
💻 C
📖 第 1 页 / 共 3 页
字号:
                PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);                PR_Lock(group->ml);                goto failed_alloc;            }            if (NULL != old_polling_list)                PR_DELETE(old_polling_list);            PR_Lock(group->ml);            if (_prmw_running != group->state)            {                PR_SetError(PR_INVALID_STATE_ERROR, 0);                goto aborted;            }            group->polling_list = poll_list;            group->polling_count = new_count;        }        now = PR_IntervalNow();        polling_interval = max_polling_interval;        since_last_poll = now - group->last_poll;        waiter = &group->waiter->recv_wait;        poll_list = group->polling_list;        for (count = 0; count < group->waiter->count; ++waiter)        {            PR_ASSERT(waiter < &group->waiter->recv_wait                + group->waiter->length);            if (NULL != *waiter)  /* a live one! */            {                if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)                && (since_last_poll >= (*waiter)->timeout))                    _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);                else                {                    if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)                    {                        (*waiter)->timeout -= since_last_poll;                        if ((*waiter)->timeout < polling_interval)                            polling_interval = (*waiter)->timeout;                    }                    PR_ASSERT(poll_list < group->polling_list                        + group->polling_count);                    poll_list->fd = (*waiter)->fd;                    poll_list->in_flags = PR_POLL_READ;                    poll_list->out_flags = 0;#if 0                    printf(                        "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",                        poll_list, count, poll_list->fd, (*waiter)->timeout);#endif                    poll_list += 1;                    count += 1;                }            }        }         PR_ASSERT(count == group->waiter->count);        /*        ** If there are no more threads waiting for completion,        ** we need to return.        */        if ((!PR_CLIST_IS_EMPTY(&group->io_ready))        && (1 == group->waiting_threads)) break;        if (0 == count) continue;  /* wait for new business */        group->last_poll = now;        PR_Unlock(group->ml);        count_ready = PR_Poll(group->polling_list, count, polling_interval);        PR_Lock(group->ml);        if (_prmw_running != group->state)        {            PR_SetError(PR_INVALID_STATE_ERROR, 0);            goto aborted;        }        if (-1 == count_ready)        {            goto failed_poll;  /* that's a shame */        }        else if (0 < count_ready)        {            for (poll_list = group->polling_list; count > 0;            poll_list++, count--)            {                PR_ASSERT(                    poll_list < group->polling_list + group->polling_count);                if (poll_list->out_flags != 0)                {                    waiter = _MW_LookupInternal(group, poll_list->fd);                    /*                    ** If 'waiter' is NULL, that means the wait receive                    ** descriptor has been canceled.                    */                    if (NULL != waiter)                        _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);                }            }        }        /*        ** If there are no more threads waiting for completion,        ** we need to return.        ** This thread was "borrowed" to do the polling, but it really        ** belongs to the client.        */        if ((!PR_CLIST_IS_EMPTY(&group->io_ready))        && (1 == group->waiting_threads)) break;    }    rv = PR_SUCCESS;aborted:failed_poll:failed_alloc:    group->poller = NULL;  /* we were that, not we ain't */    if ((_prmw_running == group->state) && (group->waiting_threads > 1))    {        /* Wake up one thread to become the new poller. */        PR_NotifyCondVar(group->io_complete);    }    return rv;  /* we return with the lock held */}  /* _MW_PollInternal */#endif /* !WINNT */static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group){    PRMWGroupState rv = group->state;    /*    ** Looking at the group's fields is safe because    ** once the group's state is no longer running, it    ** cannot revert and there is a safe check on entry    ** to make sure no more threads are made to wait.    */    if ((_prmw_stopping == rv)    && (0 == group->waiting_threads))    {        rv = group->state = _prmw_stopped;        PR_NotifyCondVar(group->mw_manage);    }    return rv;}  /* MW_TestForShutdownInternal */#ifndef WINNTstatic void _MW_InitialRecv(PRCList *io_ready){    PRRecvWait *desc = (PRRecvWait*)io_ready;    if ((NULL == desc->buffer.start)    || (0 == desc->buffer.length))        desc->bytesRecv = 0;    else    {        desc->bytesRecv = desc->fd->methods->recv(            desc->fd, desc->buffer.start,            desc->buffer.length, 0, desc->timeout);        if (desc->bytesRecv < 0)  /* SetError should already be there */            desc->outcome = PR_MW_FAILURE;    }}  /* _MW_InitialRecv */#endif#ifdef WINNTstatic void NT_TimeProc(void *arg){    _MDOverlapped *overlapped = (_MDOverlapped *)arg;    PRRecvWait *desc =  overlapped->data.mw.desc;    PRFileDesc *bottom;        if (InterlockedCompareExchange((PVOID *)&desc->outcome,        (PVOID)PR_MW_TIMEOUT, (PVOID)PR_MW_PENDING) != (PVOID)PR_MW_PENDING)    {        /* This wait recv descriptor has already completed. */        return;    }    /* close the osfd to abort the outstanding async io request */    /* $$$$    ** Little late to be checking if NSPR's on the bottom of stack,    ** but if we don't check, we can't assert that the private data    ** is what we think it is.    ** $$$$    */    bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);    PR_ASSERT(NULL != bottom);    if (NULL != bottom)  /* now what!?!?! */    {        bottom->secret->state = _PR_FILEDESC_CLOSED;        if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)        {            fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());            PR_ASSERT(!"What shall I do?");        }    }    return;}  /* NT_TimeProc */static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd){    PRRecvWait **waiter;    _PR_MD_LOCK(&group->mdlock);    waiter = _MW_LookupInternal(group, fd);    if (NULL != waiter)    {        group->waiter->count -= 1;        *waiter = NULL;    }    _PR_MD_UNLOCK(&group->mdlock);    return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;}PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd){    PRRecvWait **waiter;    waiter = _MW_LookupInternal(group, fd);    if (NULL != waiter)    {        group->waiter->count -= 1;        *waiter = NULL;    }    return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;}#endif /* WINNT *//******************************************************************//******************************************************************//********************** The public API portion ********************//******************************************************************//******************************************************************/PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(    PRWaitGroup *group, PRRecvWait *desc){    _PR_HashStory hrv;    PRStatus rv = PR_FAILURE;#ifdef WINNT    _MDOverlapped *overlapped;    HANDLE hFile;    BOOL bResult;    DWORD dwError;    PRFileDesc *bottom;#endif    if (!_pr_initialized) _PR_ImplicitInitialization();    if ((NULL == group) && (NULL == (group = MW_Init2())))    {        return rv;    }    PR_ASSERT(NULL != desc->fd);    desc->outcome = PR_MW_PENDING;  /* nice, well known value */    desc->bytesRecv = 0;  /* likewise, though this value is ambiguious */    PR_Lock(group->ml);    if (_prmw_running != group->state)    {        /* Not allowed to add after cancelling the group */        desc->outcome = PR_MW_INTERRUPT;        PR_SetError(PR_INVALID_STATE_ERROR, 0);        PR_Unlock(group->ml);        return rv;    }#ifdef WINNT    _PR_MD_LOCK(&group->mdlock);#endif    /*    ** If the waiter count is zero at this point, there's no telling    ** how long we've been idle. Therefore, initialize the beginning    ** of the timing interval. As long as the list doesn't go empty,    ** it will maintain itself.    */    if (0 == group->waiter->count)        group->last_poll = PR_IntervalNow();    do    {        hrv = MW_AddHashInternal(desc, group->waiter);        if (_prmw_rehash != hrv) break;        hrv = MW_ExpandHashInternal(group);  /* gruesome */        if (_prmw_success != hrv) break;    } while (PR_TRUE);#ifdef WINNT    _PR_MD_UNLOCK(&group->mdlock);#endif    PR_NotifyCondVar(group->new_business);  /* tell the world */    rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;    PR_Unlock(group->ml);#ifdef WINNT    overlapped = PR_NEWZAP(_MDOverlapped);    if (NULL == overlapped)    {        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);        NT_HashRemove(group, desc->fd);        return rv;    }    overlapped->ioModel = _MD_MultiWaitIO;    overlapped->data.mw.desc = desc;    overlapped->data.mw.group = group;    if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)    {        overlapped->data.mw.timer = CreateTimer(            desc->timeout,            NT_TimeProc,            overlapped);        if (0 == overlapped->data.mw.timer)        {            NT_HashRemove(group, desc->fd);            PR_DELETE(overlapped);            /*             * XXX It appears that a maximum of 16 timer events can             * be outstanding. GetLastError() returns 0 when I try it.             */            PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());            return PR_FAILURE;        }    }    /* Reach to the bottom layer to get the OS fd */    bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);    PR_ASSERT(NULL != bottom);    if (NULL == bottom)    {        PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);        return PR_FAILURE;    }    hFile = (HANDLE)bottom->secret->md.osfd;     if (!bottom->secret->md.io_model_committed)    {        PRInt32 st;        st = _md_Associate(hFile);        PR_ASSERT(0 != st);        bottom->secret->md.io_model_committed = PR_TRUE;    }    bResult = ReadFile(hFile,        desc->buffer.start,        (DWORD)desc->buffer.length,        NULL,        &overlapped->overlapped);    if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)    {        if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)        {            if (InterlockedCompareExchange((PVOID *)&desc->outcome,                (PVOID)PR_MW_FAILURE, (PVOID)PR_MW_PENDING)                == (PVOID)PR_MW_PENDING)            {                CancelTimer(overlapped->data.mw.timer);            }            NT_HashRemove(group, desc->fd);            PR_DELETE(overlapped);        }        _PR_MD_MAP_READ_ERROR(dwError);        rv = PR_FAILURE;    }#endif    return rv;}  /* PR_AddWaitFileDesc */PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group){    PRCList *io_ready = NULL;#ifdef WINNT    PRThread *me = _PR_MD_CURRENT_THREAD();    _MDOverlapped *overlapped;    #endif    if (!_pr_initialized) _PR_ImplicitInitialization();    if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;    PR_Lock(group->ml);    if (_prmw_running != group->state)    {        PR_SetError(PR_INVALID_STATE_ERROR, 0);        goto invalid_state;    }    group->waiting_threads += 1;  /* the polling thread is counted */#ifdef WINNT    _PR_MD_LOCK(&group->mdlock);    while (PR_CLIST_IS_EMPTY(&group->io_ready))    {        _PR_THREAD_LOCK(me);        me->state = _PR_IO_WAIT;        PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);        if (!_PR_IS_NATIVE_THREAD(me))        {            _PR_SLEEPQ_LOCK(me->cpu);            _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);            _PR_SLEEPQ_UNLOCK(me->cpu);        }        _PR_THREAD_UNLOCK(me);        _PR_MD_UNLOCK(&group->mdlock);        PR_Unlock(group->ml);        _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);        me->state = _PR_RUNNING;        PR_Lock(group->ml);        _PR_MD_LOCK(&group->mdlock);        if (_PR_PENDING_INTERRUPT(me)) {            PR_REMOVE_LINK(&me->waitQLinks);            _PR_MD_UNLOCK(&group->mdlock);            me->flags &= ~_PR_INTERRUPT;            me->io_suspended = PR_FALSE;            PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);            goto aborted;        }    }    io_ready = PR_LIST_HEAD(&group->io_ready);    PR_ASSERT(io_ready != NULL);    PR_REMOVE_LINK(io_ready);    _PR_MD_UNLOCK(&group->mdlock);    overlapped = (_MDOverlapped *)        ((char *)io_ready - offsetof(_MDOverlapped, data));    io_ready = &overlapped->data.mw.desc->internal;#else    do    {        /*        ** If the I/O ready list isn't empty, have this thread        ** return with the first receive wait object that's available.        */        if (PR_CLIST_IS_EMPTY(&group->io_ready))        {            /*            ** Is there a polling thread yet? If not, grab this thread            ** and use it.            */            if (NULL == group->poller)            {                /*                ** This thread will stay do polling until it becomes the only one                ** left to service a completion. Then it will return and there will                ** be none left to actually poll or to run completions.                **                ** The polling function should only return w/ failure or                ** with some I/O ready.                */                if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll;            }            else            {                /*                ** There are four reasons a thread can be awakened from                ** a wait on the io_complete condition variable.                ** 1. Some I/O has completed, i.e., the io_ready list                **    is nonempty.                ** 2. The wait group is canceled.                ** 3. The thread is interrupted.                ** 4. The current polling thread has to leave and needs                **    a replacement.                ** The logic to find a new polling thread is made more                ** complicated by all the other possible events.                ** I tried my best to write the logic clearly, but                ** it is still full of if's with continue and goto.                */                PRStatus st;                do                 {                    st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);                    if (_prmw_running != group->state)                    {                        PR_SetError(PR_INVALID_STATE_ERROR, 0);                        goto aborted;                    }                    if (_MW_ABORTED(st) || (NULL == group->poller)) break;                } while (PR_CLIST_IS_EMPTY(&group->io_ready));                /*                ** The thread is interrupted and has to leave.  It might                ** have also been awakened to process ready i/o or be the                ** new poller.  To be safe, if either condition is true,                ** we awaken another thread to take its place.                */                if (_MW_ABORTED(st))                {                    if ((NULL == group->poller                    || !PR_CLIST_IS_EMPTY(&group->io_ready))                    && group->waiting_threads > 1)                        PR_NotifyCondVar(group->io_complete);                    goto aborted;                }                /*                ** A new poller is needed, but can I be the new poller?                ** If there is no i/o ready, sure.  But if there is any                ** i/o ready, it has a higher priority.  I want to

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -