threngin.cpp

来自「著名的 helix realplayer 基于手机 symbian 系统的 播放」· C++ 代码 · 共 692 行 · 第 1/2 页

CPP
692
字号
	}
	goto exit;
    }

    ndxConn = m_pSockMap->Begin();
    for (; ndxConn != m_pSockMap->End(); ++ndxConn)
    {
	ThreadedConn* pConn = (ThreadedConn*) (*ndxConn);
	pConn->DoNetworkIO();
    }

exit:
    m_pMutex->Unlock();
}
//-----------------------------------------------------------
#if defined(HELIX_FEATURE_NETWORK_USE_SELECT)
HX_RESULT 
ThreadEngine::WaitforSelect()
{
    HX_RESULT theErr = HXR_FAIL;
    ThreadedConn *pConn = NULL;

    if (m_bInDestructor)
	pConn = m_pLocalReader;

    if (m_pLocalReader && m_pLocalReader->GetActualConn())
	theErr = m_pLocalReader->GetActualConn()->WaitforSelect(this,pConn);
    return theErr;
}

#endif // HELIX_FEATURE_NETWORK_USE_SELECT

//-----------------------------------------------------------
void*
NetworkThreadMainLoop(void* pArg)
{
    ThreadEngine*   pEngine = (ThreadEngine*) pArg;
    HXMutex*	    pMutex  = pEngine->m_pMutex; 
    HXThread*	    pThread = pEngine->m_pNetworkThread;
    HXThreadMessage msg;
    BOOL	    bDone   = FALSE;
    UINT32	    ulSleepTime = 0;
    ULONG32	    ulLastTimerCallback = HX_GET_TICKCOUNT();
    
#if defined( _WIN32 ) || defined( _UNIX_THREADED_NETWORK_IO ) || defined(THREADS_SUPPORTED)
    UINT32  ulTimerId = 0;
#endif /*_WIN32 || _UNIX_THREADED_NETWORK_IO */
    
#if defined(HELIX_FEATURE_NETWORK_USE_SELECT)
    unsigned short iSize;
    HX_RESULT theErr = HXR_FAIL;
    UINT32	ulPlatformData	= 0;
    BOOL	bRWStartup = FALSE;
    BOOL	bMoreToRead = FALSE;
    BOOL	bDebug = FALSE;

    pEngine->m_pLocalListener  = new ThreadedConn(HX_TCP_SOCKET);
    pEngine->m_pLocalWriter  = new ThreadedConn(HX_TCP_SOCKET);
    pEngine->m_pLocalReader  = NULL;

    pThread->SetPriority(THREAD_PRIORITY_ABOVE_NORMAL);
//    pThread->SetPriority(THREAD_PRIORITY_HIGHEST);

#if defined (_WIN32)
    ulPlatformData = (UINT32)GetModuleHandle(NULL);
#elif defined (_WIN16)
    ulPlatformData = (UINT32)(int)g_hInstance;
#endif

    if (pEngine->m_pLocalListener && pEngine->m_pLocalWriter)
	theErr = pEngine->m_pLocalListener->listen(
		    LOCAL_LOOPBACK_ADDR
		    , LOCAL_LOOPBACK_PORT
		    , 4 // backlog
		    , 0 // blocking
		    , ulPlatformData // ulPlatform
		    );
    if (SUCCEEDED(theErr))
    // writer connects and reader accepts
	theErr = pEngine->m_pLocalWriter->connect(
		    LOCAL_LOOPBACK_NAME
		    , LOCAL_LOOPBACK_PORT
		    , 0 // blocking
		    , ulPlatformData // ulPlatform
		    );

    if (SUCCEEDED(theErr) && !bDebug)
    // writer connects and reader accepts
	theErr = pEngine->m_pLocalListener->accept(LOCAL_LOOPBACK_ADDR);

    if (FAILED(theErr))
    {
	if (pEngine->m_pLocalListener)
	{
	    pEngine->DetachSocket(pEngine->m_pLocalListener);
	    pEngine->m_pLocalListener = NULL;
	}
	if (pEngine->m_pLocalWriter)
	{
	    pEngine->DetachSocket(pEngine->m_pLocalWriter);
	    pEngine->m_pLocalWriter = NULL;
	}
	if (pEngine->m_pLocalReader)
	{
	    pEngine->DetachSocket(pEngine->m_pLocalReader);
	    pEngine->m_pLocalReader = NULL;
	}
    }

    while (!bDone)
    {
	if (bDebug || FAILED(theErr) 
	    || !pEngine->m_pLocalWriter 
	    || !pEngine->m_pLocalReader 
	    || !pEngine->m_pLocalWriter->connection_really_open()
	    || !pEngine->m_pLocalReader->connection_really_open()
	    )
	{ // process connect and accept messages, then use select
	    pEngine->m_pNetworkThread->m_bUseReaderWriter = FALSE;
           if (ulTimerId <= 0)
           {
               ulSleepTime = 20;
               ulTimerId = HXAsyncTimer::SetTimer( ulSleepTime, pThread );
           }
	    if (pThread->GetMessage(&msg, 0, 0) != HXR_OK)
		break;
	}
	else
	{
	    if (!pEngine->m_pNetworkThread->m_bUseReaderWriter || bRWStartup)
	    { // process messages in queue
		if (!bRWStartup)
		{
		    if (ulTimerId > 0)
		    {
		       HXAsyncTimer::KillTimer(ulTimerId);
		       ulTimerId = 0;
		    }
		    msg.m_ulMessage = HXMSG_ASYNC_START_READERWRITER;
		    theErr = pEngine->m_pNetworkThread->PostMessage(&msg);
		}
		pEngine->m_pNetworkThread->m_bUseReaderWriter = TRUE;
		pEngine->m_pLocalWriter->set_callback(pEngine->m_pLocalWriter->get_callback()); // for post network msg
		pThread->SetNetworkMessageConnection(pEngine->m_pLocalWriter);
		bRWStartup = TRUE;
		if (pThread->GetMessage(&msg, 0, 0) != HXR_OK)
		    break;
	    }
	    else
	    {
		if (!bMoreToRead)
		{
		    if (HXR_FAIL == pEngine->WaitforSelect())
			break;
		}
		msg.m_ulMessage = 0;
		iSize = sizeof(HXThreadMessage);
		pEngine->m_pLocalReader->m_bNetworkIOPending = TRUE;
		theErr = pEngine->m_pLocalReader->read(&msg,&iSize);
		if (HXR_OK != theErr)
		{
		    if (HXR_WOULD_BLOCK == theErr)
		    {
			bMoreToRead = FALSE;
			theErr = HXR_OK;
			continue;
		    }
		    // reader failed, must be disconnected, return to msg loop
		    break;
		}
		if (iSize != sizeof(HXThreadMessage)) // fixme
		{
		    bMoreToRead = FALSE;
		    continue;
		}
		else
		    bMoreToRead = TRUE;
	    }
	}
#else
    while (!bDone && pThread->GetMessage(&msg, 0, 0) == HXR_OK)
    {
#endif // HELIX_FEATURE_NETWORK_USE_SELECT
	switch (msg.m_ulMessage)
	{
#if defined( _WIN32 ) || defined( _UNIX_THREADED_NETWORK_IO ) || defined(THREADS_SUPPORTED)
           case HXMSG_ASYNC_TIMER: //Look in hxmsgs.h (WM_TIMER under win32)
           {
#if defined(_UNIX_THREADED_NETWORK_IO) || (defined(THREADS_SUPPORTED) && defined(_UNIX))
               unix_TCP::process_idle();
#endif                    
               ULONG32 ulCurrentTime = HX_GET_TICKCOUNT();
               
               if (CALCULATE_ELAPSED_TICKS(ulLastTimerCallback, 
                                           ulCurrentTime) >= ulSleepTime)
               {
                   ulLastTimerCallback = ulCurrentTime;
                   pEngine->DoNetworkIO(); 
               }
           }
           break;
#endif /* _WIN32 || _UNIX_THREADED_NETWORK_IO */
           case HXMSG_ASYNC_NETWORKIO:
           {
               ThreadedConn* pConn = (ThreadedConn*) msg.m_pParam1; 
               pEngine->DoNetworkIO(pConn);
           }
           break;
           case HXMSG_ASYNC_CALLBACK:
           {
               ThreadedConn* pConn = (ThreadedConn*) msg.m_pParam1; 
               IHXCallback* pCallback = (IHXCallback*) msg.m_pParam2;
               pEngine->DoAsyncCallback(pConn, pCallback);
           }
           break;
#if defined(HELIX_FEATURE_NETWORK_USE_SELECT)
           case HXMSG_ASYNC_SETREADER_CONNECTION:
           {
               pEngine->m_pLocalReader = (ThreadedConn*) msg.m_pParam2; 
           }
           break;
           case HXMSG_ASYNC_START_READERWRITER:
           {
		pEngine->m_pNetworkThread->m_bUseReaderWriter = TRUE;
		bRWStartup = FALSE;
           }
           break;
#endif //HELIX_FEATURE_NETWORK_USE_SELECT
           case HXMSG_ASYNC_DETACH:
           {
               ThreadedConn* pConn = (ThreadedConn*) msg.m_pParam1; 
               pEngine->DetachSocket(pConn); 
           }
           break;
           case HXMSG_QUIT:
           {
               bDone	= TRUE;
           }
           
           break;
           case HXMSG_ASYNC_RESUME:
#if defined( _WIN32 ) || defined( _UNIX_THREADED_NETWORK_IO ) || defined(THREADS_SUPPORTED)
           {
               if (ulTimerId > 0)
               {
                   HXAsyncTimer::KillTimer(ulTimerId);
                   ulTimerId = 0;
               }
               
               ulSleepTime = (UINT32)(PTR_INT)msg.m_pParam1;
               ulTimerId = HXAsyncTimer::SetTimer( ulSleepTime, pThread );
           }
#endif /* _WIN32 || _UNIX_THREADED_NETWORK_IO */
           break;
           
           case HXMSG_ASYNC_STOP:
#if defined( _WIN32 ) || defined( _UNIX_THREADED_NETWORK_IO ) || defined(THREADS_SUPPORTED)
           {
               if (ulTimerId > 0)
               {
                   HXAsyncTimer::KillTimer( ulTimerId );
                   ulTimerId   = 0;
                   ulSleepTime = 0;
               }
           }
#endif /* _WIN32 || _UNIX_THREADED_NETWORK_IO */
           break;
           default:
               pThread->DispatchMessage(&msg);
               break;
	}
    }
    
#if defined( _WIN32 ) || defined( _UNIX_THREADED_NETWORK_IO ) || defined(THREADS_SUPPORTED)
    if (ulTimerId > 0)
    {
        HXAsyncTimer::KillTimer( ulTimerId );
    }
#endif /* _WIN32 || _UNIX_THREADED_NETWORK_IO */
    
    pEngine->m_pQuitEvent->SignalEvent();
    
    return (void*) 0;
}


#if defined(_MACINTOSH) && defined(_CARBON) && defined(THREADS_SUPPORTED)

void ThreadEngine::FauxMainAppCarbonTimer(EventLoopTimerRef, void* /* unused */)
{
    // xxxbobclark this is essentially just ripped off from Greg Wright's
    // similar message handling loop for Unix.
    //
    // It's been moved to pnio because more places than just rmacore
    // (i.e. rmacleng) use the threaded implementation of networking,
    // viz. rnqueue for Auto-Update. Using a Carbon Timer to get time
    // periodically can luckily be implemented in pnio; it's sure to
    // get actual System Time.
    
    ThreadEngine* pEngine = ThreadEngine::GetThreadEngine();
    HX_ASSERT(pEngine);
    HXCarbonThread* pMainAppThread = (HXCarbonThread*)pEngine->GetMainAppThread();
    HX_ASSERT(pMainAppThread);
    HXThreadMessage msg;
    
    while (pMainAppThread->PeekMessage(&msg, 0, 0, TRUE) == HXR_OK)
    {
	if (msg.m_ulMessage != 0)
	{
	    ThreadedConn* pThreadedConn = (ThreadedConn*)msg.m_pParam1;
	    if (pThreadedConn)
	    {
		switch (msg.m_ulMessage)
		{
		    case HXMSG_ASYNC_DNS:
			pThreadedConn->OnAsyncDNS((BOOL)msg.m_pParam2);
			break;

		    case HXMSG_ASYNC_CONNECT:
			pThreadedConn->OnConnect((BOOL)msg.m_pParam2);
			break;

		    case HXMSG_ASYNC_READ:
			pThreadedConn->OnReadNotification();
			break;

		    case HXMSG_ASYNC_WRITE:
			pThreadedConn->OnWriteNotification();
			break;

		    case HXMSG_ASYNC_ACCEPT:
			pThreadedConn->OnAcceptNotification();
			break;

		    default:
			HX_ASSERT(!"Unknown message in threaded networking Carbon Timer");
			break;

		}
	    }
	}
    }
}

#endif

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?