📄 hxsmstr.cpp
字号:
/*
* Add up the bitrate of all of the rules we are subscribed to
* and check if any of the rules put us in time stamped delivery mode.
*/
ulRecvBitRate = 0;
m_bTimeStampDeliveryMode = FALSE;
for (UINT16 i = 0; i < m_nNumRules; i++)
{
if (m_pSubInfo[i])
{
ulRecvBitRate += m_ulRuleBw[i];
if(m_bRuleTimeStampDelivery[i])
{
m_bTimeStampDeliveryMode = TRUE;
}
}
}
bTimeStampDelivery = m_bTimeStampDeliveryMode;
return HXR_OK;
}
STDMETHODIMP
HXASMStream::HandleSlowSource(UINT32 ulRecvBitRate)
{
#if XXXSMP
/* Whoa! This code is totally buggy. We need to fix this!
*
* This code was NEVER executed, since day 1. Since I fixed the TS
* delivery flag above, we started executing it (and it doesn't work).
*
* Besides, not executing it is a good thing- This mode really sucks.
*/
if (m_bTimeStampDeliveryMode)
#else
if (0)
#endif
{
/*
* Time stamp delivery mode. No amount of ASM can help us now.
*/
/*
* If we are above what we previously asked the server to limit to
* then we have not yet started receiving the slower data yet or
* the server is ignoring us. Either way it will not help to send
* the message again.
*/
if(ulRecvBitRate >= m_ulLastLimitBandwidth)
{
return HXR_OK;
}
IHXThinnableSource* pThin;
if(HXR_OK == m_pASMSource->QueryInterface(IID_IHXThinnableSource,
(void **)&pThin))
{
/*
* Tell the source to thin the bitrate down to 80% of what we
* were getting.
*/
m_ulLastLimitBandwidth = (UINT32)(0.8 * ulRecvBitRate);
pThin->LimitBandwidthByDropping(m_uStreamNumber, m_ulLastLimitBandwidth);
pThin->Release();
}
}
return HXR_OK;
}
STDMETHODIMP
HXASMStream::GetThresholdInfo(float* pThreshold, REF(UINT32) ulNumThreshold)
{
// XXXSMP Fix this. Unify all construction of buffers & values
// to one place. Don't reconstruct unless you need to and store wasting
// time, space, memory!
if (!m_pRuleBook)
{
// ASM always expect num of threshold >= 1
ulNumThreshold = 1;
pThreshold[0] = 0.0f;
return HXR_OK;
}
HXSMUpdateSubscriptionVars(m_pSubscriptionVariables, 0,
TRUE, ComputeLost());
m_pRuleBook->GetPreEvaluate(pThreshold, ulNumThreshold,
m_pSubscriptionVariables, "Bandwidth");
return HXR_OK;
}
STDMETHODIMP
HXASMStream::UnRegister(void)
{
m_bStartRecalc = FALSE;
/* Remove any scheduled callback */
if (m_ulLossCBHandle && m_pLossCB && m_pScheduler)
{
m_pScheduler->Remove(m_ulLossCBHandle);
m_ulLossCBHandle = 0;
}
return HXR_OK;
}
STDMETHODIMP_(ULONG32)
HXASMStream::GetNumThresholds(void)
{
// return the maximum number of thresholds for this rulebook
return m_pRuleBook ? m_pRuleBook->GetNumThresholds(): 0;
}
STDMETHODIMP
HXASMStream::GetFixedBandwidth(REF(UINT32) ulBitRate)
{
if (m_ulFixedBandwidth)
{
ulBitRate = m_ulFixedBandwidth;
return HXR_OK;
}
return HXR_FAIL;
}
STDMETHODIMP
HXASMStream::GetBiasFactor(REF(INT32) lBias)
{
lBias = m_lBias;
return HXR_OK;
}
STDMETHODIMP
HXASMStream::SetBiasFactor(INT32 lBias)
{
m_lBias = lBias;
return HXR_OK;
}
HXASMStream::LossCheckCallback::LossCheckCallback(HXASMStream* pASMStream)
{
m_pASMStream = pASMStream;
m_lRefCount = 0;
}
STDMETHODIMP_(UINT32)
HXASMStream::LossCheckCallback::AddRef(void)
{
return InterlockedIncrement(&m_lRefCount);
}
STDMETHODIMP_(UINT32)
HXASMStream::LossCheckCallback::Release(void)
{
if (InterlockedDecrement(&m_lRefCount) > 0)
{
return m_lRefCount;
}
delete this;
return 0;
}
STDMETHODIMP
HXASMStream::LossCheckCallback::QueryInterface
(
REFIID interfaceID,
void** ppInterfaceObj
)
{
QInterfaceList qiList[] =
{
{ GET_IIDHANDLE(IID_IHXCallback), (IHXCallback*)this },
{ GET_IIDHANDLE(IID_IUnknown), (IUnknown*)(IHXCallback*)this },
};
return ::QIFind(qiList, QILISTSIZE(qiList), interfaceID, ppInterfaceObj);
}
STDMETHODIMP
HXASMStream::GetPreData(REF(UINT32) ulPreData)
{
if (m_ulCurrentPreData)
{
ulPreData = m_ulCurrentPreData;
return HXR_OK;
}
else
{
ulPreData = 0;
return HXR_FAIL;
}
}
STDMETHODIMP
HXASMStream::GetBandwidth(REF(UINT32) ulBandwidth)
{
if (m_ulCurrentBandwidth)
{
ulBandwidth = m_ulCurrentBandwidth;
return HXR_OK;
}
else if (m_ulLastBandwidth)
{
ulBandwidth = m_ulLastBandwidth;
return HXR_OK;
}
else
{
ulBandwidth = 0;
return HXR_FAILED;
}
}
void
HXASMStream::RecalcCurrentProps()
{
m_ulCurrentBandwidth = 0;
m_ulCurrentPreData = 0;
for (UINT16 i = 0; i < m_nNumRules; i++)
{
if (m_pSubInfo[i])
{
m_ulCurrentPreData += m_ulRulePreData[i];
m_ulCurrentBandwidth += m_ulRuleBw[i];
}
}
if (m_pSource->IsActive())
{
INT32 lLastBandwidth = 0;
if( m_pRegistry )
{
m_pRegistry->GetIntById(m_ulIDClipBandwidth, lLastBandwidth);
}
/* Do not override bandwidth if the current bandwidth has either
* not changed or is zero.
*/
if (m_ulCurrentBandwidth != 0 &&
(UINT32) lLastBandwidth != m_ulCurrentBandwidth &&
m_pRegistry)
{
m_pRegistry->SetIntById(m_ulIDClipBandwidth, m_ulCurrentBandwidth);
/* A hack to tell the top level client that a stream switch occured */
m_pRegistry->SetIntByName("Statistics.StreamSwitchOccured", 1);
}
}
if (m_ulCurrentBandwidth > 0)
{
m_ulLastBandwidth = m_ulCurrentBandwidth;
}
//DEBUG_OUT(this, (s, "New PreData %p %d New Bandwidth %d\n", this, m_ulCurrentPreData, m_ulCurrentBandwidth));
}
void
HXASMStream::Recalc()
{
if (!m_bStartRecalc)
return;
HXSMUpdateSubscriptionVars(m_pSubscriptionVariables,
m_ulBandwidthAllocation,
TRUE, ComputeLost());
BOOL* pCurrentSubInfo = new BOOL[m_nNumRules];
if (m_pRuleBook)
{
HX_RESULT lResult = m_pRuleBook->GetSubscription(pCurrentSubInfo, m_pSubscriptionVariables);
HX_ASSERT(lResult == HXR_OK);
}
CHXSimpleList SubChange;
CHXSimpleList* pSubList = &SubChange;
if (m_pSubList)
{
pSubList = m_pSubList;
}
for (UINT16 i = 0; i < m_nNumRules; i++)
{
if (pCurrentSubInfo[i] != m_pSubInfo[i])
{
RTSPSubscription* pEntry = NULL;
if (m_pAtomicRuleChange)
{
pEntry = new RTSPSubscription;
pEntry->m_streamNumber = m_uStreamNumber;
pEntry->m_ruleNumber = i;
pEntry->m_bIsSubscribe = (pCurrentSubInfo[i]) ? TRUE : FALSE;
pSubList->AddTail(pEntry);
}
if (pCurrentSubInfo[i])
{
Subscribe(i);
}
else
{
Unsubscribe(i);
}
m_pSubInfo[i] = pCurrentSubInfo[i];
}
}
if (m_pAtomicRuleChange && (!m_pSubList) && (!SubChange.IsEmpty()))
{
m_pAtomicRuleChange->RuleChange(SubChange);
}
RecalcCurrentProps();
delete [] pCurrentSubInfo;
/* Reschedule callback if this was invoked by the scheduler callback */
if (!m_ulLossCBHandle && m_pLossCB)
{
m_ulLossCBHandle = m_pScheduler->RelativeEnter(m_pLossCB, 1000);
}
}
float HXASMStream::ComputeLost()
{
INT32 ulRecv = 0, ulLost = 0;
float lost = 0.0;
if( m_pRegistry )
{
m_pRegistry->GetIntById(m_ulIDRecv, ulRecv);
m_pRegistry->GetIntById(m_ulIDLost, ulLost);
}
if (ulRecv != 0)
{
lost = ((float)ulLost) / ((float)ulRecv) * 100;
}
return lost;
}
STDMETHODIMP
HXASMStream::LossCheckCallback::Func()
{
m_pASMStream->m_ulLossCBHandle = 0;
m_pASMStream->Recalc();
return HXR_OK;
}
HX_RESULT
HXASMStream::ResetASMSource(IHXASMSource* pASMSource)
{
HX_RESULT hr = HXR_OK;
HX_RELEASE(m_pASMSource);
if (pASMSource)
{
pASMSource->QueryInterface(IID_IHXASMSource, (void **)&m_pASMSource);
}
if (m_pAtomicRuleChange)
{
HX_RELEASE(m_pAtomicRuleChange);
pASMSource->QueryInterface(IID_IHXAtomicRuleChange,
(void **)&m_pAtomicRuleChange);
}
// reset substream info.
if (m_pRuleBook && m_pSubInfo)
{
for (UINT16 i = 0; i < m_nNumRules; i++)
{
m_pSubInfo[i] = 0;
}
}
// reset Registry ID
if (strlen(m_szRecv) && m_pRegistry)
{
m_ulIDRecv = m_pRegistry->GetId(m_szRecv);
}
if (strlen(m_szLost) && m_pRegistry)
{
m_ulIDLost = m_pRegistry->GetId(m_szLost);
}
if (strlen(m_szClipBandwidth) && m_pRegistry)
{
m_ulIDClipBandwidth = m_pRegistry->GetId(m_szClipBandwidth);
}
return hr;
}
STDMETHODIMP
HXASMStream::RuleGather(CHXSimpleList* pList)
{
/* Truely disgusting... */
m_pSubList = pList;
return HXR_OK;
}
STDMETHODIMP
HXASMStream::RuleFlush(CHXSimpleList* pList)
{
m_pAtomicRuleChange->RuleChange(*pList);
return HXR_OK;
}
void
HXASMStream::PostEndTimePacket(IHXPacket* pPacket, BOOL& bSentMe, BOOL& bEndMe)
{
int i = 0;
UINT8 nRuleFlags = 0;
UINT16 nRuleNumber = 0;
IHXBuffer* pRuleBook = NULL;
bSentMe = TRUE;
bEndMe = FALSE;
// assume the packet is NOT lost
HX_ASSERT(pPacket->IsLost() == FALSE);
// This is the first packet whose timestamp > endtime
// Initialize ASMRuleState to determine whether pass it to
// the renderer or not
if (!m_pASMRuleState)
{
// Extract RuleBook from the stream header
m_pHeader->GetPropertyCString("ASMRuleBook", pRuleBook);
if (pRuleBook != NULL)
{
m_pASMRuleState = new CASMRuleState(m_nNumRules, (char*)pRuleBook->GetBuffer(), (UINT16)pRuleBook->GetSize());
}
if (m_pASMRuleState)
{
// initialize the ASMRuleState based on the
// current subscription status of all the rules
for (i = 0; i < m_nNumRules; i++)
{
if (m_pRuleSubscribeStatus[i])
{
m_pASMRuleState->CompleteSubscribe(i);
m_pASMRuleState->StartUnsubscribePending(i);
}
}
}
else
{
// end the stream now is fine.
bSentMe = FALSE;
bEndMe = TRUE;
goto cleanup;
}
}
nRuleNumber = pPacket->GetASMRuleNumber();
nRuleFlags = pPacket->GetASMFlags();
// if the rule is unsubscribed, then
// we just pass it to the renderer since
// the server will stop sending them anyway
// i.e. during the stream swithing
if (!m_pRuleSubscribeStatus[nRuleNumber] &&
m_pASMRuleState->AnyPendingUnsubscribes())
{
goto cleanup;
}
else if (nRuleFlags & HX_ASM_SWITCH_OFF)
{
if (m_pASMRuleState->IsUnsubscribePending(nRuleNumber) &&
m_pASMRuleState->CanUnsubscribeNow(nRuleNumber))
{
m_pASMRuleState->CompleteUnsubscribe(nRuleNumber);
}
}
bSentMe = m_pASMRuleState->IsRuleSubscribed(nRuleNumber);
// if all the rules are unsubscribed, then
// we are DONE!
if (!m_pASMRuleState->AnyPendingUnsubscribes())
{
bSentMe = FALSE;
bEndMe = TRUE;
}
else if (bSentMe == FALSE && m_bEndOneRuleEndAll)
{
bEndMe = TRUE;
}
cleanup:
HX_RELEASE(pRuleBook);
return;
}
void
HXASMStream::ResetASMRuleState(void)
{
HX_DELETE(m_pASMRuleState);
return;
}
STDMETHODIMP_(HX_RESULT)
HXASMStream::Enable( UINT16 nRule )
{
if( m_pRuleBook )
{
m_pRuleBook->Enable( nRule );
}
return HXR_OK;
}
STDMETHODIMP_(HX_RESULT)
HXASMStream::Disable( UINT16 nRule )
{
if( m_pRuleBook )
{
m_pRuleBook->Disable( nRule );
}
return HXR_OK;
}
STDMETHODIMP_(HX_RESULT)
HXASMStream::ReCompute()
{
HX_RESULT retVal = HXR_OK;
if( m_pRuleBook )
{
m_pRuleBook->ReCompute();
// Now we must check that there is at least 1 playable substream.
BOOL* pCurrentSubInfo = new BOOL[m_nNumRules];
if( pCurrentSubInfo == NULL )
{
return HXR_FAIL;
}
HXSMUpdateSubscriptionVars(m_pSubscriptionVariables, 0, FALSE, 0);
// GetSubscription does not actually do any subscribing. Rather it
// populates the boolean array pCurrentSubInfo such that subscribable
// streams are set to TRUE;
m_pRuleBook->GetSubscription( pCurrentSubInfo, m_pSubscriptionVariables );
BOOL bAtLeastOneRuleSubscribable = FALSE;
for( int ii=0; ii<m_nNumRules; ii++ )
{
if( pCurrentSubInfo[ii] == TRUE )
{
bAtLeastOneRuleSubscribable = TRUE;
}
}
HX_VECTOR_DELETE(pCurrentSubInfo);
if( bAtLeastOneRuleSubscribable == FALSE )
{
// We can't subscribe to any stream so reset and fail.
for( int ii=0; ii<m_nNumRules; ii++ )
{
m_pRuleBook->Enable( ii );
}
m_pRuleBook->ReCompute();
retVal = HXR_FAIL;
}
}
return retVal;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -