📄 rtptran.cpp
字号:
// Npt time can be computed (ulHXTime)
ULONG32 ulHXTime = pStreamData->m_pTSConverter->rtp2hxa(ulRTPTime);
retVal = HXR_OK;
if ((!m_pSyncServer) && m_pResp)
{
m_pResp->QueryInterface(IID_IHXTransportSyncServer,
(void**) &m_pSyncServer);
}
if (m_bNTPtoHXOffsetSet)
{
// We can sync - NTP to NPT offset is known
ULONG32 ulExpectedHXTime = ulNtpHX + m_lNTPtoHXOffset;
LONG32 lSyncOffsetHX = ulExpectedHXTime - ulHXTime;
LONG32 lSyncOffsetChange = lSyncOffsetHX - m_lSyncOffsetHX;
if ((lSyncOffsetChange > ACCEPTABLE_SYNC_NOISE) ||
(lSyncOffsetChange < (-ACCEPTABLE_SYNC_NOISE)))
{
if (m_bIsSyncMaster && m_pSyncServer)
{
#ifdef RTP_MESSAGE_DEBUG
messageFormatDebugFileOut("RTCP-SYNC: Distribute Master Sync NPTTime=%u SyncOffset=%d",
ulHXTime, -lSyncOffsetHX);
#endif // RTP_MESSAGE_DEBUG
m_pSyncServer->DistributeSync(ulHXTime, -lSyncOffsetHX);
}
else
{
m_lSyncOffsetHX = lSyncOffsetHX;
if (lSyncOffsetHX >= 0)
{
m_lSyncOffsetRTP = (LONG32)
(pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) lSyncOffsetHX));
}
else
{
m_lSyncOffsetRTP = (LONG32)
(-pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) (-lSyncOffsetHX)));
}
#ifdef RTP_MESSAGE_DEBUG
messageFormatDebugFileOut("RTCP-SYNC: Self-Sync SyncOffset=%d SyncOffsetRTP=%d",
m_lSyncOffsetHX, m_lSyncOffsetRTP);
#endif // RTP_MESSAGE_DEBUG
}
}
}
else
{
// This the first RTCP sync accross all streams, anchor sync
if (m_pSyncServer)
{
#ifdef RTP_MESSAGE_DEBUG
messageFormatDebugFileOut("RTCP-SYNC: Distribute NTP-NPT Mapping NTPTime=%u NPTTime=%u",
ulNtpHX, ulHXTime);
#endif // RTP_MESSAGE_DEBUG
m_pSyncServer->DistributeSyncAnchor(ulHXTime, ulNtpHX);
}
}
}
}
return retVal;
}
HX_RESULT
RTPBaseTransport::anchorSync(ULONG32 ulHXTime, ULONG32 ulNTPTime)
{
HX_RESULT retVal = HXR_OK;
m_lNTPtoHXOffset = ulHXTime - ulNTPTime;
m_bNTPtoHXOffsetSet = TRUE;
#ifdef RTP_MESSAGE_DEBUG
messageFormatDebugFileOut("RTCP-SYNC: Received NTP-NPT Mapping NTPTime=%u NPTTime=%u NTPtoNPTOffset=%d",
ulNTPTime, ulHXTime, m_lNTPtoHXOffset);
#endif // RTP_MESSAGE_DEBUG
return retVal;
}
HX_RESULT
RTPBaseTransport::handleMasterSync(ULONG32 ulHXTime, LONG32 lHXOffsetToMaster)
{
HX_RESULT retVal = HXR_IGNORE;
RTSPStreamData* pStreamData =
m_pStreamHandler->getStreamData(m_streamNumber);
if (pStreamData && (!m_bIsSyncMaster))
{
retVal = HXR_OK;
m_lOffsetToMasterHX = lHXOffsetToMaster;
if (lHXOffsetToMaster >= 0)
{
m_lOffsetToMasterRTP = (LONG32)
(pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) lHXOffsetToMaster));
}
else
{
m_lOffsetToMasterRTP = (LONG32)
(-pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) (-lHXOffsetToMaster)));
}
#ifdef RTP_MESSAGE_DEBUG
messageFormatDebugFileOut("RTCP-SYNC: Master-Sync NPTTime=%u MasterSyncOffset=%d MasterSyncOffsetRTP=%d",
ulHXTime, m_lOffsetToMasterHX, m_lOffsetToMasterRTP);
#endif // RTP_MESSAGE_DEBUG
}
return retVal;
}
void
RTPBaseTransport::resetStartInfoWaitQueue(void)
{
IHXBuffer* pStoredBuffer;
while (!m_StartInfoWaitQueue.IsEmpty())
{
pStoredBuffer = (IHXBuffer*) m_StartInfoWaitQueue.RemoveHead();
HX_RELEASE(pStoredBuffer);
}
}
HX_RESULT
RTPBaseTransport::streamDone(UINT16 streamNumber)
{
HX_ASSERT(m_streamNumber == streamNumber);
HX_ASSERT(m_streamNumber == m_pRTCPTran->m_streamNumber);
HX_RESULT hresult = HXR_OK;
HX_ASSERT(m_pRTCPTran);
if (!m_bActive)
{
// this stream is not active, don't do anything.
}
else if (m_bIsSource)
{
hresult= m_pRTCPTran->streamDone(streamNumber);
}
else
{
// send BYE pkt
m_pRTCPTran->streamDone(streamNumber);
hresult = m_pResp->OnStreamDone(HXR_OK, streamNumber);
}
return hresult;
}
STDMETHODIMP
RTPBaseTransport::InitBw(IHXBandwidthManagerInput* pBwMgr)
{
HX_RELEASE(m_pBwMgrInput);
m_pBwMgrInput = pBwMgr;
pBwMgr->AddRef();
return HXR_OK;
}
STDMETHODIMP
RTPBaseTransport::SetTransmitRate(UINT32 ulBitRate)
{
return HXR_OK;
}
/*
* XXXMC
* Special-case handling for PV clients
*/
void
RTPBaseTransport::setPVEmulationMode(BOOL bPVSessionFlag)
{
m_bEmulatePVSession = bPVSessionFlag;
}
void
RTPBaseTransport::setRTCPTransport(RTCPBaseTransport* pRTCPTran)
{
HX_ASSERT(pRTCPTran);
HX_ASSERT(!m_pRTCPTran);
m_pRTCPTran = pRTCPTran;
m_pRTCPTran->AddRef();
// pointing to the same instatnce
HX_ASSERT(m_pReportHandler);
m_pRTCPTran->m_pReportHandler = m_pReportHandler;
}
void
RTPBaseTransport::MBitRTPPktInfo(REF(UINT8)bMBit, IHXPacket* pPkt, UINT16 unRuleNo)
{
BOOL b = FALSE;
IHXRTPPacketInfo* pRTPPacketInfo = NULL;
if (pPkt->QueryInterface(IID_IHXRTPPacketInfo, (void**) &pRTPPacketInfo) == HXR_OK)
{
if (pRTPPacketInfo->GetMarkerBit(b) == HXR_OK && b)
{
bMBit = TRUE;
}
else
{
bMBit = FALSE;
}
pRTPPacketInfo->Release();
}
else
{
bMBit = FALSE;
}
}
void
RTPBaseTransport::MBitASMRuleNo(REF(UINT8)bMBit, IHXPacket* pPkt, UINT16 unRuleNo)
{
bMBit = m_bHasMarkerRule && ((unRuleNo & 0x1) == m_markerRuleNumber);
}
#ifdef RTP_MESSAGE_DEBUG
void
RTPBaseTransport::messageFormatDebugFileOut(const char* fmt, ...)
{
if(m_bMessageDebug)
{
va_list args;
char buf[4096]; /* Flawfinder: ignore */
SafeSprintf(buf, 4096, "%s.%d", (const char*) m_messageDebugFileName,
m_streamNumber);
va_start(args, fmt);
FILE* fp = fopen(buf, "a");
if (fp)
{
vsprintf(buf, fmt, args);
fprintf(fp, "%s\n", buf);
fclose(fp);
}
va_end(args);
}
}
#endif // RTP_MESSAGE_DEBUG
/*
* RTP UDP
*/
RTPUDPTransport::RTPUDPTransport(BOOL bIsSource)
: RTPBaseTransport(bIsSource)
, m_pUDPSocket(NULL)
, m_foreignAddr(0)
, m_foreignPort(0)
, m_keepAliveSeq((UINT16)(random32(0) & 0xffff))
, m_ulCurrentMulticastAddress(0)
, m_ulCurrentMulticastPort(0)
, m_pMCastUDPSocket(NULL)
{
}
RTPUDPTransport::~RTPUDPTransport()
{
Done();
}
RTSPTransportTypeEnum
RTPUDPTransport::tag()
{
return RTSP_TR_RTP_UDP;
}
void
RTPUDPTransport::Done()
{
m_keepAlive.reset();
if (m_pMCastUDPSocket)
{
m_pMCastUDPSocket->LeaveMulticastGroup(m_ulCurrentMulticastAddress, HXR_INADDR_ANY);
}
HX_RELEASE(m_pMCastUDPSocket);
HX_RELEASE(m_pUDPSocket);
RTPBaseTransport::Done();
}
HX_RESULT
RTPUDPTransport::init(IUnknown* pContext,
IHXUDPSocket* pSocket,
IHXRTSPTransportResponse* pResp)
{
m_pResp = pResp;
m_pResp->AddRef();
m_pUDPSocket = pSocket;
m_pUDPSocket->AddRef();
/* Set DiffServ Code Point */
IHXSetSocketOption* pOpt = NULL;
if (SUCCEEDED(m_pUDPSocket->QueryInterface(IID_IHXSetSocketOption, (void**)&pOpt)))
{
IHXQoSDiffServConfigurator* pCfg = NULL;
if (SUCCEEDED(pContext->QueryInterface(IID_IHXQoSDiffServConfigurator, (void**)&pCfg)))
{
pCfg->ConfigureSocket(pOpt, HX_QOS_DIFFSERV_CLASS_MEDIA);
pCfg->Release();
pCfg = NULL;
}
pOpt->Release();
pOpt = NULL;
}
HX_RESULT hresult = Init(pContext);
if(HXR_OK != hresult)
{
return hresult;
}
#ifdef DEBUG
if (debug_func_level() & DF_DROP_PACKETS)
{
m_drop_packets = TRUE;
}
#endif /* DEBUG */
RTPBaseTransport::init();
return HXR_OK;
}
void
RTPUDPTransport::setForeignAddress(UINT32 foreignAddr, UINT16 foreignPort)
{
m_foreignAddr = foreignAddr;
m_foreignPort = foreignPort;
UINT32 natTimeout = GetNATTimeout(m_pContext);
if (!m_bIsSource && natTimeout)
{
// Initialize keepalive object
m_keepAlive.Init(m_pScheduler, natTimeout, new KeepAliveCB(this));
// Do initial "poke" through the NAT
onNATKeepAlive();
}
}
HX_RESULT RTPUDPTransport::handlePacket(IHXBuffer* pBuffer)
{
m_keepAlive.OnActivity();
return RTPBaseTransport::handlePacket(pBuffer);
}
void
RTPUDPTransport::JoinMulticast(UINT32 ulAddress, UINT32 ulPort, IHXUDPSocket* pUDP)
{
if (m_ulCurrentMulticastAddress)
{
m_pMCastUDPSocket->LeaveMulticastGroup(m_ulCurrentMulticastAddress, HXR_INADDR_ANY);
}
else
{
m_pMCastUDPSocket = pUDP;
m_pMCastUDPSocket->AddRef();
}
m_pMCastUDPSocket->JoinMulticastGroup(ulAddress, HXR_INADDR_ANY);
m_bMulticast = TRUE;
m_ulCurrentMulticastAddress = ulAddress;
m_ulCurrentMulticastPort = ulPort;
if (m_pStreamHandler)
{
RTSPStreamData* pStreamData = m_pStreamHandler->firstStreamData();
ASSERT(pStreamData);
while(pStreamData)
{
pStreamData->m_pTransportBuffer->SetMulticast();
pStreamData = m_pStreamHandler->nextStreamData();
}
}
return;
}
HX_RESULT RTPUDPTransport::onNATKeepAlive()
{
DPRINTF(D_INFO, ("RTP : onNATKeepAlive()\n"));
// Send an RTP packet with PT=0 and no payload
IHXBuffer* pPktBuf = NULL;
if (m_pCommonClassFactory &&
(HXR_OK == m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, (void**)&pPktBuf)))
{
RTPPacket pkt;
pkt.version_flag = 2;
pkt.padding_flag = 0;
pkt.csrc_len = 0;
pkt.marker_flag = 0;
pkt.extension_flag = 0;
pkt.data.data = 0;
pkt.data.len = 0;
pkt.ssrc = m_pReportHandler->GetSSRC();
pkt.payload = 0;
pkt.seq_no = m_keepAliveSeq++;
pkt.timestamp = HX_GET_TICKCOUNT() * 8; // Timestamp in 1/8000 sec
UINT32 packetLen = pkt.static_size() + pkt.data.len;
if (HXR_OK == pPktBuf->SetSize(packetLen))
{
// Pack the data into the buffer
pkt.pack(pPktBuf->GetBuffer(), packetLen);
pPktBuf->SetSize(packetLen); // Update the packet size
writePacket(pPktBuf);
}
}
HX_RELEASE(pPktBuf);
return HXR_OK;
}
HX_RESULT
RTPUDPTransport::writePacket(IHXBuffer* pSendBuffer)
{
if (!m_pUDPSocket)
return HXR_FAIL;
m_keepAlive.OnActivity();
return m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuffer);
}
/*
* XXXMC
* Special-case handling for PV clients
*/
HX_RESULT
RTPUDPTransport::sendPVHandshakeResponse(UINT8* pPktPayload)
{
IHXBuffer* pPktPayloadBuff = NULL;
m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, (void**) &pPktPayloadBuff);
if (pPktPayloadBuff)
{
DPRINTF(D_INFO, ("RTP: Sending POKE PKT RESPONSE\n"));
pPktPayloadBuff->Set((UCHAR*)pPktPayload, 8);
writePacket(pPktPayloadBuff);
pPktPayloadBuff->Release();
}
return HXR_OK;
}
HX_RESULT
RTPUDPTransport::sendPacket(BasePacket* pPacket)
{
HX_ASSERT(m_bActive);
HX_RESULT theErr;
if (m_ulPayloadWirePacket!=0)
{
IHXBuffer* pSendBuf = NULL;
theErr = reflectPacket(pPacket, pSendBuf);
if (HXR_OK == theErr)
{
theErr = writePacket(pSendBuf);
pSendBuf->Release();
}
else if (HXR_IGNORE == theErr)
{
return HXR_OK;
}
return theErr;
}
IHXBuffer* pPacketBuf = NULL;
theErr = makePacket(pPacket, pPacketBuf);
if (HXR_OK == theErr)
{
theErr = writePacket(pPacketBuf);
/* send SR if necessary */
if (HXR_OK == theErr && m_pRTCPTran->m_bSendReport &&
m_pRTCPTran->m_bSendRTCP)
{
m_pRTCPTran->sendSenderReport();
m_pRTCPTran->m_bSendReport = FALSE;
m_pRTCPTran->scheduleNextReport();
}
}
HX_RELEASE(pPacketBuf);
return theErr;
}
RTPUDPTransport::KeepAliveCB::KeepAliveCB(RTPUDPTransport* pTransport):
m_pTransport(pTransport),
m_lRefCount(0)
{
if(m_pTransport)
{
m_pTransport->AddRef();
}
}
RTPUDPTransport::KeepAliveCB::~KeepAliveCB()
{
HX_RELEASE(m_pTransport);
}
STDMETHODIMP
RTPUDPTransport::KeepAliveCB::QueryInterface(REFIID riid, void** ppvObj)
{
if (IsEqualIID(riid, IID_IUnknown))
{
AddRef();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -