thrdconn.cpp
来自「symbian 下的helix player源代码」· C++ 代码 · 共 1,964 行 · 第 1/4 页
CPP
1,964 行
#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
m_bReadPostPendingWouldBlock = (m_bReadNowPending && theErr == HXR_WOULD_BLOCK);
#endif
m_bReadNowPending = (theErr == HXR_WOULD_BLOCK ? TRUE : FALSE);
}
if (!theErr && uCount > 0)
{
conn::bytes_to_actualtcpread(this, uCount);
m_pReceiveTCP->EnQueue(m_pTempBuffer, uCount);
}
}
}
#ifndef _WINCE
if (!m_bOutstandingReadNotification && m_pReceiveTCP->GetQueuedItemCount() > 0)
{
#ifdef HELIX_FEATURE_NETWORK_USE_SELECT
m_bOutstandingReadNotification = TRUE;
HXThreadMessage msg(HXMSG_ASYNC_READ, this, NULL);
theErr = m_pMainAppThread->PostMessage(&msg, m_pInternalWindowHandle);
if ( theErr == HXR_NOT_INITIALIZED )
{
theErr = HXR_OK;
}
#endif
}
#endif /* _WINCE */
}
else /*if (m_uSocketType == HX_UDP_SOCKET)*/
{
UINT32 ulAddress = 0;
UINT16 ulPort = 0;
/* Read as much UDP data as possible */
while (!theErr)
{
IHXBuffer* pBuffer = NULL;
if ( m_bReadPostPendingWouldBlock )
{
/* fake a call return */
theErr = HXR_WOULD_BLOCK;
}
else
{
/* call read and do heuristinc bookkeeping */
theErr = m_pActualConn->readfrom(pBuffer, ulAddress, ulPort);
if( theErr == HXR_OUTOFMEMORY )
{
mLastError = HXR_OUTOFMEMORY;
}
/* If this is a single WOULDBLOCK, ReadNowPending gets set. If this is the second
consecutive blocking call, ReadPostPendingWouldBlock gets set.
Feel free to suggest better variable names. */
#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
m_bReadPostPendingWouldBlock = (m_bReadNowPending && theErr == HXR_WOULD_BLOCK);
#endif
m_bReadNowPending = (theErr == HXR_WOULD_BLOCK ? TRUE : FALSE);
}
if (!theErr && pBuffer)
{
UDP_PACKET* pPacket = new UDP_PACKET;
if(pPacket)
{
pPacket->pBuffer = pBuffer;
pPacket->ulAddress = ulAddress;
pPacket->ulPort = ulPort;
m_ReadUDPBuffers.AddTail((void*)pPacket);
}
else
{
theErr = HXR_OUTOFMEMORY;
}
}
}
#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
if ( !theErr && !m_bOutstandingReadNotification && m_ReadUDPBuffers.GetCount() > 0)
{
m_bOutstandingReadNotification = TRUE;
HXThreadMessage msg(HXMSG_ASYNC_READ, this, NULL);
m_pMainAppThread->PostMessage(&msg, m_pInternalWindowHandle);
}
#endif
}
if (!mLastError && theErr)
{
mLastError = ConvertNetworkError(theErr);
}
/* If there is an error, issue a Read Available message
* so that error can be reported back on next Read
*/
if (!m_bOutstandingReadNotification && mLastError && theErr != HXR_OUTOFMEMORY)
{
m_bOutstandingReadNotification = TRUE;
HXThreadMessage msg(HXMSG_ASYNC_READ, this, NULL);
#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
m_pMainAppThread->PostMessage(&msg, m_pInternalWindowHandle);
#else
m_pNetworkThread->PostMessage(&msg, m_pInternalWindowHandle);
#endif
}
exit:
m_pMutex->Unlock();
#ifdef _CARBON
Release();
#endif
return theErr;
}
void
ThreadedConn::DoWrite()
{
// If we are out of memory, let's just get out of here. Ideally, we should
// not ever get to this point, but lots of functions here have void return
// types, so it is possible to lose an OOM error.
if( mLastError == HXR_OUTOFMEMORY )
{
return;
}
#ifdef _CARBON
AddRef();
#endif
HX_RESULT theErr = HXR_OK;
m_pMutex->Lock();
if (m_uSocketType == HX_TCP_SOCKET)
{
UINT16 uCount = m_pSendTCP->GetQueuedItemCount();
if (uCount > 0)
{
m_pSendTCP->DeQueue(m_pTempBuffer, uCount);
UINT16 uActualCount = uCount;
theErr = m_pActualConn->write(m_pTempBuffer, &uActualCount);
switch(theErr)
{
case HXR_AT_INTERRUPT:
case HXR_WOULD_BLOCK:
case HXR_OK:
// enqueue the data that was not sent
if(uActualCount != uCount)
{
m_pSendTCP->EnQueue(m_pTempBuffer + uActualCount,
uCount - uActualCount);
}
// mask out these errors
theErr = HXR_OK;
break;
default:
break;
}
}
#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
if (!m_bIsDone && !m_bOutstandingWriteNotification && m_pSendTCP->GetMaxAvailableElements() > 0)
{
m_bOutstandingWriteNotification = TRUE;
HXThreadMessage msg(HXMSG_ASYNC_WRITE, this, NULL);
m_pMainAppThread->PostMessage(&msg, m_pInternalWindowHandle);
}
#endif
}
else /*if (m_uSocketType == HX_UDP_SOCKET)*/
{
while (!theErr && m_WriteUDPBuffers.GetCount() > 0)
{
UDPPacketInfo* pPacket = (UDPPacketInfo*) m_WriteUDPBuffers.GetHead();
UINT16 uLength = (UINT16) pPacket->m_pBuffer->GetSize();
theErr = m_pActualConn->writeto(pPacket->m_pBuffer->GetBuffer(), // sendto
&uLength,
pPacket->m_ulAddr,
pPacket->m_uPort);
if (!theErr)
{
pPacket->m_pBuffer->Release();
delete pPacket;
m_WriteUDPBuffers.RemoveHead();
}
}
#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
if (!m_bIsDone && !m_bOutstandingWriteNotification && m_WriteUDPBuffers.GetCount() == 0)
{
m_bOutstandingWriteNotification = TRUE;
HXThreadMessage msg(HXMSG_ASYNC_WRITE, this, NULL);
m_pMainAppThread->PostMessage(&msg, m_pInternalWindowHandle);
}
#endif
}
if (!mLastError && theErr)
{
mLastError = ConvertNetworkError(theErr);
}
#ifndef HELIX_FEATURE_NETWORK_USE_SELECT
if (!mLastError && !m_bNetworkIOPending &&
((m_uSocketType == HX_TCP_SOCKET && m_pSendTCP->GetQueuedItemCount() > 0) ||
(m_uSocketType == HX_UDP_SOCKET && m_WriteUDPBuffers.GetCount() > 0)))
{
theErr = PostIOMessage();
}
#endif
m_pMutex->Unlock();
if (m_bWriteFlushPending &&
((m_uSocketType == HX_TCP_SOCKET && m_pSendTCP->GetQueuedItemCount() == 0) ||
(m_uSocketType == HX_UDP_SOCKET && m_WriteUDPBuffers.GetCount() == 0)))
{
m_bWriteFlushPending = FALSE;
Release();
}
/* We are done and there is no more data pending to bw written out */
/* This is the time socket actually gets destroyed */
if (m_bIsDone && !m_bWriteFlushPending)
{
m_bConnected = FALSE;
PostDoneAndDetach();
}
#ifdef _CARBON
Release();
#endif
}
//--------------------------------------------------
void
ThreadedConn::DoNetworkIO(void)
{
m_bNetworkIOPending = FALSE;
if (m_bConnected)
{
// DoRead now has a return type, but since DoNetworkIO does not we
// are going to ignore any errors returned. This may not be a good idea.
#if !defined( WIN32_PLATFORM_PSPC ) /*&& !defined( _UNIX )*/
#if defined(HELIX_FEATURE_NETWORK_USE_SELECT)
if (!m_pNetworkThread->m_bUseReaderWriter)
#endif //defined(HELIX_FEATURE_NETWORK_USE_SELECT)
DoRead();
DoWrite();
#else
#if defined(HELIX_FEATURE_NETWORK_USE_SELECT)
if (!m_pNetworkThread->m_bUseReaderWriter)
#endif //defined(HELIX_FEATURE_NETWORK_USE_SELECT)
DoRead(TRUE);
DoWrite();
#endif
}
}
HX_RESULT
ThreadedConn::ConvertNetworkError(HX_RESULT theErr)
{
HX_RESULT lResult = theErr;
if (!IS_SERVER_ALERT(theErr))
{
switch(theErr)
{
case HXR_AT_INTERRUPT: // mask out these errors
case HXR_WOULD_BLOCK:
case HXR_NO_DATA:
case HXR_OK:
lResult = HXR_OK;
break;
case HXR_DNR:
case HXR_NET_CONNECT:
case HXR_DOC_MISSING:
case HXR_OUTOFMEMORY:
case HXR_ADVANCED_SERVER:
case HXR_BAD_SERVER:
case HXR_OLD_SERVER:
case HXR_INVALID_FILE:
case HXR_REDIRECTION:
case HXR_PROXY:
case HXR_PROXY_RESPONSE:
case HXR_ADVANCED_PROXY:
case HXR_OLD_PROXY:
case HXR_PERFECTPLAY_NOT_SUPPORTED:
case HXR_NO_LIVE_PERFECTPLAY:
case HXR_PERFECTPLAY_NOT_ALLOWED:
break;
default:
lResult = HXR_FAIL;
break;
}
}
return lResult;
}
HX_RESULT
ThreadedConn::PostIOMessage(void)
{
m_bNetworkIOPending = TRUE;
HXThreadMessage msg(HXMSG_ASYNC_NETWORKIO, this, NULL);
return m_pNetworkThread->PostMessage(&msg);
}
HX_RESULT ThreadedConn::ThrConnSocketCallback::Func(NotificationType Type,
BOOL bSuccess, conn* pConn)
{
ThreadedConn* pContext = m_pContext;
// It would be nice to set theErr for all of the calls below, but the
// effects of that are unknown to this developer. XXXJHHB
HX_RESULT theErr = HXR_OK;
if(pContext)
{
switch (Type)
{
case READ_NOTIFICATION:
theErr = pContext->DoRead(TRUE);
break;
case WRITE_NOTIFICATION:
pContext->DoWrite();
break;
case CONNECT_NOTIFICATION:
pContext->HandleConnectNotification(bSuccess);
break;
case DNS_NOTIFICATION:
pContext->HandleDNSNotification(bSuccess);
break;
case ACCEPT_NOTIFICATION:
pContext->HandleAcceptNotification(pConn);
#ifdef HELIX_FEATURE_NETWORK_USE_SELECT
break;
#endif
case CLOSE_NOTIFICATION:
pContext->HandleCloseNotification();
break;
#ifdef HELIX_FEATURE_NETWORK_USE_SELECT
case SEND_BUFFER_NOTIFICATION:
UINT16 len;
len = sizeof(HXThreadMessage);
pContext->m_bNetworkIOPending = TRUE;
pContext->write((char *)bSuccess,&len);
pContext->DoWrite();
pContext->m_bNetworkIOPending = FALSE;
break;
#endif
default:
break;
}
}
return theErr;
}
ThreadedConn::ThrdConnGenericCallback::ThrdConnGenericCallback(ThreadedConn* pConn, UINT16 uCallbackType)
: m_lRefCount (0)
, m_uCallbackType (uCallbackType)
, m_pConn (pConn)
, m_bBlocking (FALSE)
, m_ulLocalAddr (0)
, m_uPort (0)
, m_ulBufferSize (0)
, m_uBacklog(0)
// , m_pNewConn(NULL)
{
}
ThreadedConn::ThrdConnGenericCallback::~ThrdConnGenericCallback()
{
// HX_RELEASE(m_pNewConn);
}
/*
* IUnknown methods
*/
/////////////////////////////////////////////////////////////////////////
// Method:
// IUnknown::QueryInterface
// Purpose:
// Implement this to export the interfaces supported by your
// object.
//
STDMETHODIMP ThreadedConn::ThrdConnGenericCallback::QueryInterface(REFIID riid, void** ppvObj)
{
if (IsEqualIID(riid, IID_IUnknown))
{
AddRef();
*ppvObj = this;
return HXR_OK;
}
else if (IsEqualIID(riid, IID_IHXCallback))
{
AddRef();
*ppvObj = (IHXCallback*)this;
return HXR_OK;
}
*ppvObj = NULL;
return HXR_NOINTERFACE;
}
/////////////////////////////////////////////////////////////////////////
// Method:
// IUnknown::AddRef
// Purpose:
// Everyone usually implements this the same... feel free to use
// this implementation.
//
STDMETHODIMP_(ULONG32) ThreadedConn::ThrdConnGenericCallback::AddRef()
{
return InterlockedIncrement(&m_lRefCount);
}
/////////////////////////////////////////////////////////////////////////
// Method:
// IUnknown::Release
// Purpose:
// Everyone usually implements this the same... feel free to use
// this implementation.
//
STDMETHODIMP_(ULONG32) ThreadedConn::ThrdConnGenericCallback::Release()
{
if (InterlockedDecrement(&m_lRefCount) > 0)
{
return m_lRefCount;
}
delete this;
return 0;
}
/*
* IHXCallback methods
*/
STDMETHODIMP ThreadedConn::ThrdConnGenericCallback::Func(void)
{
if (m_pConn)
{
switch (m_uCallbackType)
{
case DNS_CALLBACK_TYPE:
m_pConn->ActualDnsFindIpAddr(m_HostName, m_bBlocking);
break;
case INIT_CALLBACK_TYPE:
m_pConn->ActualInit(m_ulLocalAddr, m_uPort, m_bBlocking);
break;
case SETWINDOWHANDLE_CALLBACK_TYPE:
#if defined (_WIN32) || defined (_WINDOWS)
m_pConn->ActuallSetWindowHandle(m_ulHandle);
#endif /*defined (_WIN32) || defined (_WINDOWS)*/
break;
case CONNECT_CALLBACK_TYPE:
m_pConn->ActualConnect(m_HostName, m_uPort, m_bBlocking,
m_ulHandle);
break;
#if defined(HELIX_FEATURE_NETWORK_USE_SELECT)
case ACCEPT_CALLBACK_TYPE:
m_pConn->ActualAccept(m_ulLocalAddr,
m_ulHandle);
break;
#endif //defined(HELIX_FEATURE_NETWORK_USE_SELECT)
case BLOCKING_CALLBACK_TYPE:
m_pConn->ActualBlocking();
break;
case NONBLOCKING_CALLBACK_TYPE:
m_pConn->ActualNonBlocking();
break;
case DONE_CALLBACK_TYPE:
m_pConn->ActualDone();
break;
case SET_BUFFER_SIZE_CALLBACK_TYPE:
m_pConn->ActualSetReceiveBufSize(m_ulBufferSize);
break;
case LISTEN_CALLBACK_TYPE:
m_pConn->ActualListen(m_ulLocalAddr, m_uPort, m_uBacklog,
m_bBlocking, m_ulHandle);
break;
default:
break;
}
}
return HXR_OK;
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?