📄 rtptran.cpp
字号:
RTPBaseTransport::reflectPacket(BasePacket* pBasePacket, REF(IHXBuffer*)pSendBuf)
{
HX_ASSERT(pBasePacket);
HX_ASSERT(m_bHasRTCPRule);
HX_ASSERT(m_ulPayloadWirePacket==1);
HX_RESULT theErr = HXR_OK;
IHXPacket* pPacket = pBasePacket->GetPacket();
IHXBuffer* pBuffer = NULL;
UINT32 ulLen = 0;
/*
* Sanity check
*/
if (!pPacket)
{
return HXR_UNEXPECTED;
}
else if (pPacket->IsLost())
{
pPacket->Release();
return HXR_IGNORE;
}
else
{
pBuffer = pPacket->GetBuffer();
if (!pBuffer)
{
pPacket->Release();
return HXR_UNEXPECTED;
}
}
ulLen = pBuffer->GetSize();
HX_ASSERT(pPacket->GetStreamNumber() == m_streamNumber);
HX_ASSERT(pPacket->GetASMFlags());
/*
* RTP packet
*/
UINT16 streamNumber = pPacket->GetStreamNumber();
RTSPStreamData* pStreamData =
m_pStreamHandler->getStreamData(streamNumber);
if (isRTCPRule(pPacket->GetASMRuleNumber()))
{
/*
* RTCP packet
*/
if (!pStreamData->m_bFirstPacket)
{
if (m_reflectorInfo.m_unSeqNoOffset && m_reflectorInfo.m_ulRTPTSOffset)
{
theErr = FixRTCPSR(m_pCommonClassFactory,
pBuffer,
pSendBuf,
m_reflectorInfo.m_ulRTPTSOffset);
}
else
{
theErr = HXR_OK;
pSendBuf = pBuffer;
pSendBuf->AddRef();
}
}
else
{
theErr = HXR_IGNORE;
}
BYTE* pReport = pBuffer->GetBuffer();
if ((pReport) && ((*(++pReport)) == 200))
{
pReport += 7;
UINT32 ulSourceSec = GetDwordFromBufAndInc(pReport);
UINT32 ulSourceFract = GetDwordFromBufAndInc(pReport);
HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime();
Timeval tvNow((INT32) rmatv.tv_sec, (INT32)rmatv.tv_usec);
NTPTime ntpNow(tvNow);
m_LSRHistory [m_cLSRWrite].m_ulSourceLSR = ulSourceSec << 16;
m_LSRHistory [m_cLSRWrite].m_ulSourceLSR |= (ulSourceFract >> 16);
m_LSRHistory [m_cLSRWrite].m_ulServerLSR = ntpNow.m_ulSecond << 16;
m_LSRHistory [m_cLSRWrite].m_ulServerLSR |= (ntpNow.m_ulFraction >> 16);
(++m_cLSRWrite) %= LSR_HIST_SZ;
}
if (HXR_OK == theErr)
{
theErr = m_pRTCPTran->reflectRTCP(pSendBuf);
HX_RELEASE(pSendBuf);
}
pPacket->Release();
pBuffer->Release();
if (HXR_OK == theErr)
{
return HXR_IGNORE;
}
else
{
return theErr;
}
}
if (!pStreamData->m_packetSent)
{
pStreamData->m_packetSent = TRUE;
}
if (pStreamData->m_bFirstPacket)
{
pStreamData->m_bFirstPacket = FALSE;
BYTE* pcOrig = pBuffer->GetBuffer();
UINT16 unFirstSeqNo = 0;
UINT32 ulFirstRTPTS = 0;
pcOrig += 2;
unFirstSeqNo = *pcOrig++<<8;
unFirstSeqNo |= *pcOrig++;
ulFirstRTPTS = GetDwordFromBufAndInc(pcOrig);
if (m_pReportHandler)
{
m_pReportHandler->SetSSRC(GetDwordFromBufAndInc(pcOrig));
}
// get an offset for reflector
UINT16 nA = m_reflectorInfo.m_unSeqNoOffset ;
UINT16 nB = unFirstSeqNo;
UINT32 lA = m_reflectorInfo.m_ulRTPTSOffset;
UINT32 lB = ulFirstRTPTS;
m_reflectorInfo.m_unSeqNoOffset = 0 - unFirstSeqNo;
m_reflectorInfo.m_ulRTPTSOffset = 0 - ulFirstRTPTS;
}
if (m_reflectorInfo.m_unSeqNoOffset && m_reflectorInfo.m_ulRTPTSOffset)
{
theErr = FixRTPHeader(m_pCommonClassFactory,
pBuffer,
pSendBuf,
m_reflectorInfo.m_unSeqNoOffset ,
m_reflectorInfo.m_ulRTPTSOffset);
}
else
{
theErr = HXR_OK;
pSendBuf = pBuffer;
pSendBuf->AddRef();
}
// forever increasing
pStreamData->m_seqNo = pBasePacket->m_uSequenceNumber;
pStreamData->m_lastTimestamp = pPacket->GetTime();
HX_ASSERT(pBuffer);
BYTE* pRawPkt = (BYTE*)pBuffer->GetBuffer();
UINT32 ulPayloadLen = ulLen;
UINT32 ulRTPHeaderSize = 0;
UINT8 uiCSRCCount = (UINT32)(pRawPkt[0] & 0x0F);
// We only want to count the payload, not the RTP headers.
ulRTPHeaderSize += (4 * 3); // RTP fixed header size, not including CSRCs.
ulRTPHeaderSize += 4 * uiCSRCCount; // CSRCs.
// Extension header present.
if (pRawPkt[0] & 0x20)
{
HX_ASSERT(ulPayloadLen - ulRTPHeaderSize > 0);
ulRTPHeaderSize += 2; // 16-bit profile-defined field
// Overrun prevention.
if (pBuffer->GetSize() > ulRTPHeaderSize + 1)
{
// Extension length is last 16 bits of first word.
UINT32 ulExtensionLength = (pRawPkt[ulRTPHeaderSize] << 8) + pRawPkt[ulRTPHeaderSize + 1];
ulRTPHeaderSize += 2; // 16-bit length field.
ulRTPHeaderSize += (ulExtensionLength * 4); // Rest of extension header.
}
}
ulPayloadLen -= ulRTPHeaderSize;
updateQoSInfo(ulPayloadLen);
/*
* clean up
*/
pPacket->Release();
pBuffer->Release();
return theErr;
}
void
RTPBaseTransport::updateQoSInfo(UINT32 ulBytesSent)
{
m_ulPacketsSent++;
m_lBytesSent += ulBytesSent;
if (!m_pQoSInfo)
{
return;
}
UINT64 ulSessionBytesSent = m_pQoSInfo->GetBytesSent();
ulSessionBytesSent += ulBytesSent;
m_pQoSInfo->SetBytesSent(ulSessionBytesSent);
UINT32 ulSessionPacketsSent = m_pQoSInfo->GetPacketsSent();
ulSessionPacketsSent++;
m_pQoSInfo->SetPacketsSent(ulSessionPacketsSent);
}
UINT32
RTPBaseTransport::MapLSR(UINT32 ulSourceLSR)
{
if (m_ulPayloadWirePacket == 0)
{
return ulSourceLSR;
}
UINT8 cSearchCursor = m_cLSRRead;
while (cSearchCursor != m_cLSRWrite)
{
if (m_LSRHistory [cSearchCursor].m_ulSourceLSR == ulSourceLSR)
{
m_cLSRRead = cSearchCursor;
return m_LSRHistory [cSearchCursor].m_ulServerLSR;
}
(++cSearchCursor) %= LSR_HIST_SZ;
}
return 0;
}
HX_RESULT
FixRTPHeader(IHXCommonClassFactory* pCCF,
IHXBuffer* pOrigBuf,
REF(IHXBuffer*) pNewBuf,
UINT16 unSeqNoOffset,
UINT32 ulRTPTSOffset)
{
if (pOrigBuf->GetSize() < 8)
{
return HXR_INVALID_PARAMETER;
}
HX_RESULT theErr = pCCF->CreateInstance(IID_IHXBuffer, (void**) &pNewBuf);
if (HXR_OK == theErr)
{
theErr = pNewBuf->Set(pOrigBuf->GetBuffer(), pOrigBuf->GetSize());
}
if (HXR_OK == theErr)
{
BYTE* pcOrig = pOrigBuf->GetBuffer();
UINT16 unSeqNo = 0;
UINT32 ulRTPTS = 0;
pcOrig += 2;
unSeqNo = *pcOrig++<<8;
unSeqNo |= *pcOrig++;
ulRTPTS = GetDwordFromBufAndInc(pcOrig);
UINT16 nA = unSeqNo;
UINT32 lA = ulRTPTS;
// update
unSeqNo += unSeqNoOffset;
ulRTPTS += ulRTPTSOffset;
BYTE* pc = pNewBuf->GetBuffer();
pc += 2;
*pc++ = (UINT8)(unSeqNo>>8);
*pc++ = (UINT8)(unSeqNo);
*pc++ = (UINT8)(ulRTPTS>>24);
*pc++ = (UINT8)(ulRTPTS>>16);
*pc++ = (UINT8)(ulRTPTS>>8);
*pc++ = (UINT8)(ulRTPTS);
}
return theErr;
}
HX_RESULT
FixRTCPSR(IHXCommonClassFactory* pCCF,
IHXBuffer* pOrigBuf,
REF(IHXBuffer*) pNewBuf,
UINT32 ulRTPTSOffset)
{
BYTE* pcOrig = pOrigBuf->GetBuffer();
if (pOrigBuf->GetSize() < 20)
{
return HXR_INVALID_PARAMETER;
}
else
{
// make sure it's SR
if (RTCP_SR != *(pcOrig+1))
{
return HXR_IGNORE;
}
}
HX_RESULT theErr = pCCF->CreateInstance(IID_IHXBuffer, (void**) &pNewBuf);
if (HXR_OK == theErr)
{
theErr = pNewBuf->Set(pOrigBuf->GetBuffer(), pOrigBuf->GetSize());
}
if (HXR_OK == theErr)
{
UINT32 ulRTPTS = 0;
pcOrig += 16;
ulRTPTS = GetDwordFromBufAndInc(pcOrig);
UINT32 lA = ulRTPTS;
// update
ulRTPTS += ulRTPTSOffset;
BYTE* pc = pNewBuf->GetBuffer();
pc += 16;
//RTP Timestamp
*pc++ = (UINT8)(ulRTPTS>>24);
*pc++ = (UINT8)(ulRTPTS>>16);
*pc++ = (UINT8)(ulRTPTS>>8);
*pc++ = (UINT8)(ulRTPTS);
}
return theErr;
}
void
RTPBaseTransport::SyncTimestamp(IHXPacket* pPacket)
{
IHXTimeStampSync* pTSSync = NULL;
if (FAILED(
m_pResp->QueryInterface(IID_IHXTimeStampSync, (void**)&pTSSync)))
{
// this shouldn't happen...
HX_ASSERT(!"IHXTimeStampSync not implemented");
return;
}
UINT32 ulInitialRefTime = 0;
UINT32 ulInitialThisTime = pPacket->GetTime();
if (pTSSync->NeedInitialTS(m_sessionID))
{
pTSSync->SetInitialTS(m_sessionID, pPacket->GetTime());
ulInitialRefTime = ulInitialThisTime;
}
else
{
ulInitialRefTime = pTSSync->GetInitialTS(m_sessionID);
}
HX_RELEASE(pTSSync);
RTSPStreamData* pStreamData =
m_pStreamHandler->getStreamData(pPacket->GetStreamNumber());
HX_ASSERT(pStreamData != NULL);
if (pStreamData)
{
// calc the difference b/n reference stream
if (ulInitialThisTime >= ulInitialRefTime)
{
// we want RTP time
if (pStreamData->m_pTSConverter)
{
m_lTimeOffsetRTP =
pStreamData->m_pTSConverter->hxa2rtp(ulInitialThisTime - ulInitialRefTime);
}
else
{
m_lTimeOffsetRTP = ulInitialThisTime - ulInitialRefTime;
}
}
else
{
// we want RTP time
if (pStreamData->m_pTSConverter)
{
m_lTimeOffsetRTP =
pStreamData->m_pTSConverter->hxa2rtp(ulInitialRefTime - ulInitialThisTime);
}
else
{
m_lTimeOffsetRTP = ulInitialRefTime - ulInitialThisTime;
}
m_lTimeOffsetRTP *= -1;
}
}
}
// The pPacketBuf is returned with an AddRef(), as it must.
HX_RESULT
RTPBaseTransport::makePacket(BasePacket* pBasePacket,
REF(IHXBuffer*) pPacketBuf)
{
if(!m_bIsSource)
{
HX_ASSERT(!"Player shouldn't be making pkt");
return HXR_UNEXPECTED;
}
IHXPacket* pPacket = pBasePacket->GetPacket();
if (!pPacket)
{
return HXR_UNEXPECTED;
}
else if (pPacket->IsLost())
{
pPacket->Release();
return HXR_OK;
}
IHXBuffer* pBuffer = pPacket->GetBuffer();
UINT32 bufLen = pBuffer->GetSize();
UINT16 streamNumber = pPacket->GetStreamNumber();
UINT16 ruleNumber = pPacket->GetASMRuleNumber();
UINT8 ruleFlags = pPacket->GetASMFlags();
// it better be the same
HX_ASSERT(m_streamNumber == streamNumber);
RTSPStreamData* pStreamData =
m_pStreamHandler->getStreamData(streamNumber);
//XXXBAB
if (!pStreamData->m_packetSent)
{
pStreamData->m_packetSent = TRUE;
}
pStreamData->m_seqNo = pBasePacket->m_uSequenceNumber;
/*
* Make RTP Packet
*/
RTPPacket pkt;
HX_RESULT hresult = HXR_OK;
BOOL bCompressed = FALSE; //XXXBAB don't compress anything yet...
UINT32 packetLen = 0;
pkt.version_flag = 2;
pkt.padding_flag = 0;
pkt.csrc_len = 0;
/*
* Basics
*/
pkt.seq_no = pStreamData->m_seqNo;
pkt.data.data = (INT8*)pBuffer->GetBuffer();
pkt.data.len = HX_SAFEINT(pBuffer->GetSize());
pkt.ssrc = m_pReportHandler->GetSSRC();
pkt.extension_flag = 0;
pkt.payload = m_rtpPayloadType;
/*
* IHXRTPPacket support
*/
if (pStreamData->m_bFirstPacket)
{
// figure out pkt type
IHXRTPPacket* pRTPPacket = NULL;
pStreamData->m_bUsesRTPPackets = (pPacket->QueryInterface(
IID_IHXRTPPacket,
(void**) &pRTPPacket)
== HXR_OK);
if (pStreamData->m_bUsesRTPPackets)
{
HX_ASSERT(pRTPPacket == pPacket);
if (pRTPPacket != pPacket)
{
return HXR_INVALID_PARAMETER;
}
}
HX_RELEASE(pRTPPacket);
// figure out marker bit handling routine
if (NULL == m_pMBitHandler)
{
IHXRTPPacketInfo* pRTPPacketInfo = NULL;
if (pPacket->QueryInterface(IID_IHXRTPPacketInfo, (void**) &pRTPPacketInfo) == HXR_OK)
{
m_pMBitHandler = &RTPBaseTransport::MBitRTPPktInfo;
pRTPPacketInfo->Release();
}
else
{
m_pMBitHandler = &RTPBaseTransport::MBitASMRuleNo;
}
}
}
/*
* Marker Bit
*/
(this->*m_pMBitHandler)(pkt.marker_flag, pPacket, ruleNumber);
if (m_bRTPTimeSet)
{
SyncTimestamp(pPacket);
}
/*
* Timestamp
*/
if (pStreamData->m_bUsesRTPPackets)
{
pkt.timestamp = ((IHXRTPPacket*) pPacket)->GetRTPTime();
}
else if (pStreamData->m_pTSConverter)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -