thrdconn.cpp

来自「symbian 下的helix player源代码」· C++ 代码 · 共 1,964 行 · 第 1/4 页

CPP
1,964
字号
#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
                    m_bReadPostPendingWouldBlock = (m_bReadNowPending && theErr == HXR_WOULD_BLOCK);
#endif
                    m_bReadNowPending = (theErr == HXR_WOULD_BLOCK ? TRUE : FALSE);
                }
                if (!theErr && uCount > 0)
                {
                    conn::bytes_to_actualtcpread(this, uCount);
                    m_pReceiveTCP->EnQueue(m_pTempBuffer, uCount);
                }
            }
        }

#ifndef _WINCE
        if (!m_bOutstandingReadNotification && m_pReceiveTCP->GetQueuedItemCount() > 0)
        {
#ifdef HELIX_FEATURE_NETWORK_USE_SELECT
            m_bOutstandingReadNotification  = TRUE;
            HXThreadMessage msg(HXMSG_ASYNC_READ, this, NULL);
            theErr = m_pMainAppThread->PostMessage(&msg, m_pInternalWindowHandle);
	    if ( theErr == HXR_NOT_INITIALIZED )
	    {
		theErr = HXR_OK;
	    }
#endif
        }
#endif /* _WINCE */
    }
    else /*if (m_uSocketType == HX_UDP_SOCKET)*/
    {
        UINT32 ulAddress = 0;
        UINT16 ulPort = 0;

        /* Read as much UDP data as possible */
        while (!theErr)
        {
            IHXBuffer* pBuffer = NULL;

            if ( m_bReadPostPendingWouldBlock )
            {
                /* fake a call return */
                theErr = HXR_WOULD_BLOCK;
            }
            else
            {
                /* call read and do heuristinc bookkeeping */
                theErr = m_pActualConn->readfrom(pBuffer, ulAddress, ulPort);
                if( theErr == HXR_OUTOFMEMORY )
                {
                    mLastError = HXR_OUTOFMEMORY;
                }
                /* If this is a single WOULDBLOCK, ReadNowPending gets set.  If this is the second
                   consecutive blocking call, ReadPostPendingWouldBlock gets set.
                   Feel free to suggest better variable names. */
#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
                m_bReadPostPendingWouldBlock = (m_bReadNowPending && theErr == HXR_WOULD_BLOCK);
#endif
                m_bReadNowPending = (theErr == HXR_WOULD_BLOCK ? TRUE : FALSE);
            }

            if (!theErr && pBuffer)
            {
                UDP_PACKET* pPacket = new UDP_PACKET;

                if(pPacket)
                {
                    pPacket->pBuffer = pBuffer;
                    pPacket->ulAddress = ulAddress;
                    pPacket->ulPort = ulPort;
                    m_ReadUDPBuffers.AddTail((void*)pPacket);
                }
                else
                {
                    theErr = HXR_OUTOFMEMORY;
                }
            }
        }


#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
        if ( !theErr && !m_bOutstandingReadNotification && m_ReadUDPBuffers.GetCount() > 0)
        {
            m_bOutstandingReadNotification  = TRUE;
            HXThreadMessage msg(HXMSG_ASYNC_READ, this, NULL);
            m_pMainAppThread->PostMessage(&msg, m_pInternalWindowHandle);
        }
#endif
    }


    if (!mLastError && theErr)
    {
        mLastError = ConvertNetworkError(theErr);
    }

    /* If there is an error, issue a Read Available message
     * so that error can be reported back on next Read
     */
    if (!m_bOutstandingReadNotification && mLastError && theErr != HXR_OUTOFMEMORY)
    {
        m_bOutstandingReadNotification  = TRUE;
        HXThreadMessage msg(HXMSG_ASYNC_READ, this, NULL);
#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
        m_pMainAppThread->PostMessage(&msg, m_pInternalWindowHandle);
#else
	m_pNetworkThread->PostMessage(&msg, m_pInternalWindowHandle);
#endif
    }

exit:
    m_pMutex->Unlock();
#ifdef _CARBON
    Release();
#endif
    return theErr;
}

void
ThreadedConn::DoWrite()
{
    // If we are out of memory, let's just get out of here. Ideally, we should
    // not ever get to this point, but lots of functions here have void return
    // types, so it is possible to lose an OOM error.
    if( mLastError == HXR_OUTOFMEMORY )
    {
        return;
    }
#ifdef _CARBON
    AddRef();
#endif
    HX_RESULT theErr = HXR_OK;
    m_pMutex->Lock();
    if (m_uSocketType == HX_TCP_SOCKET)
    {
        UINT16 uCount = m_pSendTCP->GetQueuedItemCount();
        if (uCount > 0)
        {
            m_pSendTCP->DeQueue(m_pTempBuffer, uCount);
            UINT16 uActualCount = uCount;
            theErr = m_pActualConn->write(m_pTempBuffer, &uActualCount);
            switch(theErr)
            {
                case HXR_AT_INTERRUPT:
                case HXR_WOULD_BLOCK:
                case HXR_OK:
                    // enqueue the data that was not sent
                    if(uActualCount != uCount)
                    {
                        m_pSendTCP->EnQueue(m_pTempBuffer + uActualCount,
                                            uCount - uActualCount);
                    }

                    // mask out these errors
                    theErr = HXR_OK;
                    break;

                default:
                    break;
            }
        }

#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
        if (!m_bIsDone && !m_bOutstandingWriteNotification && m_pSendTCP->GetMaxAvailableElements() > 0)
        {
            m_bOutstandingWriteNotification = TRUE;
            HXThreadMessage msg(HXMSG_ASYNC_WRITE, this, NULL);
            m_pMainAppThread->PostMessage(&msg, m_pInternalWindowHandle);
        }
#endif
    }
    else /*if (m_uSocketType == HX_UDP_SOCKET)*/
    {
        while (!theErr && m_WriteUDPBuffers.GetCount() > 0)
        {
            UDPPacketInfo* pPacket = (UDPPacketInfo*) m_WriteUDPBuffers.GetHead();
            UINT16 uLength = (UINT16) pPacket->m_pBuffer->GetSize();
            theErr = m_pActualConn->writeto(pPacket->m_pBuffer->GetBuffer(),   // sendto
                                        &uLength,
                                        pPacket->m_ulAddr,
                                        pPacket->m_uPort);
            if (!theErr)
            {
                pPacket->m_pBuffer->Release();
                delete pPacket;
                m_WriteUDPBuffers.RemoveHead();
            }
        }

#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
        if (!m_bIsDone && !m_bOutstandingWriteNotification && m_WriteUDPBuffers.GetCount() == 0)
        {
            m_bOutstandingWriteNotification = TRUE;
            HXThreadMessage msg(HXMSG_ASYNC_WRITE, this, NULL);
            m_pMainAppThread->PostMessage(&msg, m_pInternalWindowHandle);
        }
#endif
    }

    if (!mLastError && theErr)
    {
        mLastError = ConvertNetworkError(theErr);
    }

#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
    if (!mLastError && !m_bNetworkIOPending &&
        ((m_uSocketType == HX_TCP_SOCKET && m_pSendTCP->GetQueuedItemCount() > 0) ||
         (m_uSocketType == HX_UDP_SOCKET && m_WriteUDPBuffers.GetCount() > 0)))
    {
        theErr = PostIOMessage();
    }
#endif

    m_pMutex->Unlock();

    if (m_bWriteFlushPending &&
        ((m_uSocketType == HX_TCP_SOCKET && m_pSendTCP->GetQueuedItemCount() == 0) ||
         (m_uSocketType == HX_UDP_SOCKET && m_WriteUDPBuffers.GetCount() == 0)))
    {
        m_bWriteFlushPending    = FALSE;
        Release();
    }

    /* We are done and there is no more data pending to bw written out */
    /* This is the time socket actually gets destroyed */
    if (m_bIsDone && !m_bWriteFlushPending)
    {
        m_bConnected = FALSE;
        PostDoneAndDetach();
    }
#ifdef _CARBON
    Release();
#endif
}

//--------------------------------------------------
void
ThreadedConn::DoNetworkIO(void)
{
    m_bNetworkIOPending = FALSE;
    if (m_bConnected)
    {
        // DoRead now has a return type, but since DoNetworkIO does not we
        // are going to ignore any errors returned. This may not be a good idea.
#if !defined( WIN32_PLATFORM_PSPC ) /*&& !defined( _UNIX )*/
#if defined(HELIX_FEATURE_NETWORK_USE_SELECT)
	if (!m_pNetworkThread->m_bUseReaderWriter)
#endif //defined(HELIX_FEATURE_NETWORK_USE_SELECT)
        DoRead();
        DoWrite();
#else
#if defined(HELIX_FEATURE_NETWORK_USE_SELECT)
	if (!m_pNetworkThread->m_bUseReaderWriter)
#endif //defined(HELIX_FEATURE_NETWORK_USE_SELECT)
        DoRead(TRUE);
        DoWrite();
#endif
    }
}
HX_RESULT
ThreadedConn::ConvertNetworkError(HX_RESULT theErr)
{
    HX_RESULT lResult = theErr;
    if (!IS_SERVER_ALERT(theErr))
    {
        switch(theErr)
        {
            case HXR_AT_INTERRUPT:                      // mask out these errors
            case HXR_WOULD_BLOCK:
            case HXR_NO_DATA:
            case HXR_OK:
                lResult = HXR_OK;
                break;

            case HXR_DNR:
            case HXR_NET_CONNECT:
            case HXR_DOC_MISSING:
            case HXR_OUTOFMEMORY:
            case HXR_ADVANCED_SERVER:
            case HXR_BAD_SERVER:
            case HXR_OLD_SERVER:
            case HXR_INVALID_FILE:
            case HXR_REDIRECTION:
            case HXR_PROXY:
            case HXR_PROXY_RESPONSE:
            case HXR_ADVANCED_PROXY:
            case HXR_OLD_PROXY:
            case HXR_PERFECTPLAY_NOT_SUPPORTED:
            case HXR_NO_LIVE_PERFECTPLAY:
            case HXR_PERFECTPLAY_NOT_ALLOWED:
                break;

            default:
                lResult = HXR_FAIL;
                break;
        }
    }

    return lResult;
}

HX_RESULT
ThreadedConn::PostIOMessage(void)
{
    m_bNetworkIOPending = TRUE;
    HXThreadMessage msg(HXMSG_ASYNC_NETWORKIO, this, NULL);
    return m_pNetworkThread->PostMessage(&msg);
}


HX_RESULT ThreadedConn::ThrConnSocketCallback::Func(NotificationType Type,
                                               BOOL bSuccess, conn* pConn)
{
    ThreadedConn* pContext = m_pContext;
    // It would be nice to set theErr for all of the calls below, but the
    // effects of that are unknown to this developer. XXXJHHB
    HX_RESULT theErr = HXR_OK;
    if(pContext)
    {
        switch (Type)
        {
        case READ_NOTIFICATION:
            theErr = pContext->DoRead(TRUE);
            break;
        case WRITE_NOTIFICATION:
            pContext->DoWrite();
            break;
        case CONNECT_NOTIFICATION:
            pContext->HandleConnectNotification(bSuccess);
            break;
        case DNS_NOTIFICATION:
            pContext->HandleDNSNotification(bSuccess);
            break;
        case ACCEPT_NOTIFICATION:
            pContext->HandleAcceptNotification(pConn);
#ifdef HELIX_FEATURE_NETWORK_USE_SELECT
            break;
#endif
        case CLOSE_NOTIFICATION:
            pContext->HandleCloseNotification();
            break;
#ifdef HELIX_FEATURE_NETWORK_USE_SELECT
        case SEND_BUFFER_NOTIFICATION:
	    UINT16 len;
	    len = sizeof(HXThreadMessage);
	    pContext->m_bNetworkIOPending = TRUE;
            pContext->write((char *)bSuccess,&len);
            pContext->DoWrite();
	    pContext->m_bNetworkIOPending = FALSE;
            break;
#endif
        default:
            break;
        }
    }

    return theErr;
}



ThreadedConn::ThrdConnGenericCallback::ThrdConnGenericCallback(ThreadedConn* pConn, UINT16 uCallbackType)
    : m_lRefCount (0)
    , m_uCallbackType (uCallbackType)
    , m_pConn (pConn)
    , m_bBlocking (FALSE)
    , m_ulLocalAddr (0)
    , m_uPort (0)
    , m_ulBufferSize (0)
    , m_uBacklog(0)
//    , m_pNewConn(NULL)
{
}

ThreadedConn::ThrdConnGenericCallback::~ThrdConnGenericCallback()
{
//    HX_RELEASE(m_pNewConn);
}

/*
 * IUnknown methods
 */

/////////////////////////////////////////////////////////////////////////
//      Method:
//              IUnknown::QueryInterface
//      Purpose:
//              Implement this to export the interfaces supported by your
//              object.
//
STDMETHODIMP ThreadedConn::ThrdConnGenericCallback::QueryInterface(REFIID riid, void** ppvObj)
{
    if (IsEqualIID(riid, IID_IUnknown))
    {
        AddRef();
        *ppvObj = this;
        return HXR_OK;
    }
    else if (IsEqualIID(riid, IID_IHXCallback))
    {
        AddRef();
        *ppvObj = (IHXCallback*)this;
        return HXR_OK;
    }

    *ppvObj = NULL;
    return HXR_NOINTERFACE;
}

/////////////////////////////////////////////////////////////////////////
//      Method:
//              IUnknown::AddRef
//      Purpose:
//              Everyone usually implements this the same... feel free to use
//              this implementation.
//
STDMETHODIMP_(ULONG32) ThreadedConn::ThrdConnGenericCallback::AddRef()
{
    return InterlockedIncrement(&m_lRefCount);
}

/////////////////////////////////////////////////////////////////////////
//      Method:
//              IUnknown::Release
//      Purpose:
//              Everyone usually implements this the same... feel free to use
//              this implementation.
//
STDMETHODIMP_(ULONG32) ThreadedConn::ThrdConnGenericCallback::Release()
{
    if (InterlockedDecrement(&m_lRefCount) > 0)
    {
        return m_lRefCount;
    }

    delete this;
    return 0;
}

/*
 *      IHXCallback methods
 */
STDMETHODIMP ThreadedConn::ThrdConnGenericCallback::Func(void)
{
    if (m_pConn)
    {
        switch (m_uCallbackType)
        {
            case DNS_CALLBACK_TYPE:
                m_pConn->ActualDnsFindIpAddr(m_HostName, m_bBlocking);
                break;
            case INIT_CALLBACK_TYPE:
                m_pConn->ActualInit(m_ulLocalAddr, m_uPort, m_bBlocking);
                break;
            case SETWINDOWHANDLE_CALLBACK_TYPE:
#if defined (_WIN32) || defined (_WINDOWS)
                m_pConn->ActuallSetWindowHandle(m_ulHandle);
#endif /*defined (_WIN32) || defined (_WINDOWS)*/
                break;
            case CONNECT_CALLBACK_TYPE:
                m_pConn->ActualConnect(m_HostName, m_uPort, m_bBlocking,
                                        m_ulHandle);
                break;
#if defined(HELIX_FEATURE_NETWORK_USE_SELECT)
            case ACCEPT_CALLBACK_TYPE:
                m_pConn->ActualAccept(m_ulLocalAddr,
                                        m_ulHandle);
                break;
#endif //defined(HELIX_FEATURE_NETWORK_USE_SELECT)
            case BLOCKING_CALLBACK_TYPE:
                m_pConn->ActualBlocking();
                break;
            case NONBLOCKING_CALLBACK_TYPE:
                m_pConn->ActualNonBlocking();
                break;
            case DONE_CALLBACK_TYPE:
                m_pConn->ActualDone();
                break;
            case SET_BUFFER_SIZE_CALLBACK_TYPE:
                m_pConn->ActualSetReceiveBufSize(m_ulBufferSize);
                break;
            case LISTEN_CALLBACK_TYPE:
                m_pConn->ActualListen(m_ulLocalAddr, m_uPort, m_uBacklog,
                                m_bBlocking, m_ulHandle);
                break;
            default:
                break;
        }
    }
    return HXR_OK;
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?