📄 rtptran.cpp
字号:
m_lSyncOffsetRTP = (LONG32) (pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) lSyncOffsetHX)); } else { m_lSyncOffsetRTP = (LONG32) (-pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) (-lSyncOffsetHX))); }#ifdef RTP_MESSAGE_DEBUG messageFormatDebugFileOut("RTCP-SYNC: Self-Sync SyncOffset=%d SyncOffsetRTP=%d", m_lSyncOffsetHX, m_lSyncOffsetRTP);#endif // RTP_MESSAGE_DEBUG } } } else { // This the first RTCP sync accross all streams, anchor sync if (m_pSyncServer) {#ifdef RTP_MESSAGE_DEBUG messageFormatDebugFileOut("RTCP-SYNC: Distribute NTP-NPT Mapping NTPTime=%u NPTTime=%u", ulNtpHX, ulHXTime);#endif // RTP_MESSAGE_DEBUG m_pSyncServer->DistributeSyncAnchor(ulHXTime, ulNtpHX); } } } } return retVal;}HX_RESULTRTPBaseTransport::anchorSync(ULONG32 ulHXTime, ULONG32 ulNTPTime){ HX_RESULT retVal = HXR_OK; m_lNTPtoHXOffset = ulHXTime - ulNTPTime; m_bNTPtoHXOffsetSet = TRUE;#ifdef RTP_MESSAGE_DEBUG messageFormatDebugFileOut("RTCP-SYNC: Received NTP-NPT Mapping NTPTime=%u NPTTime=%u NTPtoNPTOffset=%d", ulNTPTime, ulHXTime, m_lNTPtoHXOffset);#endif // RTP_MESSAGE_DEBUG return retVal;}HX_RESULT RTPBaseTransport::handleMasterSync(ULONG32 ulHXTime, LONG32 lHXOffsetToMaster){ HX_RESULT retVal = HXR_IGNORE; RTSPStreamData* pStreamData = m_pStreamHandler->getStreamData(m_streamNumber); if (pStreamData && (!m_bIsSyncMaster)) { retVal = HXR_OK; m_lOffsetToMasterHX = lHXOffsetToMaster; if (lHXOffsetToMaster >= 0) { m_lOffsetToMasterRTP = (LONG32) (pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) lHXOffsetToMaster)); } else { m_lOffsetToMasterRTP = (LONG32) (-pStreamData->m_pTSConverter->hxa2rtp_raw((ULONG32) (-lHXOffsetToMaster))); }#ifdef RTP_MESSAGE_DEBUG messageFormatDebugFileOut("RTCP-SYNC: Master-Sync NPTTime=%u MasterSyncOffset=%d MasterSyncOffsetRTP=%d", ulHXTime, m_lOffsetToMasterHX, m_lOffsetToMasterRTP);#endif // RTP_MESSAGE_DEBUG } return retVal;}voidRTPBaseTransport::resetStartInfoWaitQueue(void){ IHXBuffer* pStoredBuffer; while (!m_StartInfoWaitQueue.IsEmpty()) { pStoredBuffer = (IHXBuffer*) m_StartInfoWaitQueue.RemoveHead(); HX_RELEASE(pStoredBuffer); }}HX_RESULTRTPBaseTransport::streamDone(UINT16 streamNumber){ HX_ASSERT(m_streamNumber == streamNumber); HX_ASSERT(m_streamNumber == m_pRTCPTran->m_streamNumber); HX_RESULT hresult = HXR_OK; HX_ASSERT(m_pRTCPTran); if (!m_bActive) { // this stream is not active, don't do anything. } else if (m_bIsSource) { hresult= m_pRTCPTran->streamDone(streamNumber); } else { // send BYE pkt m_pRTCPTran->streamDone(streamNumber); hresult = m_pResp->OnStreamDone(HXR_OK, streamNumber); } return hresult;}STDMETHODIMPRTPBaseTransport::InitBw(IHXBandwidthManagerInput* pBwMgr){ HX_RELEASE(m_pBwMgrInput); m_pBwMgrInput = pBwMgr; pBwMgr->AddRef(); return HXR_OK;}STDMETHODIMPRTPBaseTransport::SetTransmitRate(UINT32 ulBitRate){ return HXR_OK;}/* * XXXMC * Special-case handling for PV clients */voidRTPBaseTransport::setPVEmulationMode(BOOL bPVSessionFlag){ m_bEmulatePVSession = bPVSessionFlag;}voidRTPBaseTransport::setRTCPTransport(RTCPBaseTransport* pRTCPTran){ HX_ASSERT(pRTCPTran); HX_ASSERT(!m_pRTCPTran); m_pRTCPTran = pRTCPTran; m_pRTCPTran->AddRef(); // pointing to the same instatnce HX_ASSERT(m_pReportHandler); m_pRTCPTran->m_pReportHandler = m_pReportHandler;}void RTPBaseTransport::MBitRTPPktInfo(REF(UINT8)bMBit, IHXPacket* pPkt, UINT16 unRuleNo){ BOOL b = FALSE; IHXRTPPacketInfo* pRTPPacketInfo = NULL; if (pPkt->QueryInterface(IID_IHXRTPPacketInfo, (void**) &pRTPPacketInfo) == HXR_OK) { if (pRTPPacketInfo->GetMarkerBit(b) == HXR_OK && b) { bMBit = TRUE; } else { bMBit = FALSE; } pRTPPacketInfo->Release(); } else { bMBit = FALSE; } }void RTPBaseTransport::MBitASMRuleNo(REF(UINT8)bMBit, IHXPacket* pPkt, UINT16 unRuleNo){ bMBit = m_bHasMarkerRule && ((unRuleNo & 0x1) == m_markerRuleNumber);}#ifdef RTP_MESSAGE_DEBUGvoidRTPBaseTransport::messageFormatDebugFileOut(const char* fmt, ...){ if(m_bMessageDebug) { va_list args; char buf[4096]; /* Flawfinder: ignore */ SafeSprintf(buf, 4096, "%s.%d", (const char*) m_messageDebugFileName, m_streamNumber); va_start(args, fmt); FILE* fp = fopen(buf, "a"); if (fp) { vsprintf(buf, fmt, args); fprintf(fp, "%s\n", buf); fclose(fp); } va_end(args); }}#endif // RTP_MESSAGE_DEBUG/* * RTP UDP */RTPUDPTransport::RTPUDPTransport(BOOL bIsSource) : RTPBaseTransport(bIsSource) , m_pUDPSocket(NULL) , m_foreignAddr(0) , m_foreignPort(0) , m_keepAliveSeq((UINT16)(random32(0) & 0xffff)) , m_ulCurrentMulticastAddress(0) , m_ulCurrentMulticastPort(0) , m_pMCastUDPSocket(NULL){}RTPUDPTransport::~RTPUDPTransport(){ Done();}RTSPTransportTypeEnumRTPUDPTransport::tag(){ return RTSP_TR_RTP_UDP;}voidRTPUDPTransport::Done(){ m_keepAlive.reset(); if (m_pMCastUDPSocket) { m_pMCastUDPSocket->LeaveMulticastGroup(m_ulCurrentMulticastAddress, HXR_INADDR_ANY); } HX_RELEASE(m_pMCastUDPSocket); HX_RELEASE(m_pUDPSocket); RTPBaseTransport::Done();}HX_RESULTRTPUDPTransport::init(IUnknown* pContext, IHXUDPSocket* pSocket, IHXRTSPTransportResponse* pResp){ m_pResp = pResp; m_pResp->AddRef(); m_pUDPSocket = pSocket; m_pUDPSocket->AddRef(); /* Set DiffServ Code Point */ IHXSetSocketOption* pOpt = NULL; if (SUCCEEDED(m_pUDPSocket->QueryInterface(IID_IHXSetSocketOption, (void**)&pOpt))) { IHXQoSDiffServConfigurator* pCfg = NULL; if (SUCCEEDED(pContext->QueryInterface(IID_IHXQoSDiffServConfigurator, (void**)&pCfg))) { pCfg->ConfigureSocket(pOpt, HX_QOS_DIFFSERV_CLASS_MEDIA); pCfg->Release(); pCfg = NULL; } pOpt->Release(); pOpt = NULL; } HX_RESULT hresult = Init(pContext); if(HXR_OK != hresult) { return hresult; }#ifdef DEBUG if (debug_func_level() & DF_DROP_PACKETS) { m_drop_packets = TRUE; }#endif /* DEBUG */ RTPBaseTransport::init(); return HXR_OK;}voidRTPUDPTransport::setForeignAddress(UINT32 foreignAddr, UINT16 foreignPort){ m_foreignAddr = foreignAddr; m_foreignPort = foreignPort; UINT32 natTimeout = GetNATTimeout(m_pContext); if (!m_bIsSource && natTimeout) { // Initialize keepalive object m_keepAlive.Init(m_pScheduler, natTimeout, new KeepAliveCB(this)); // Do initial "poke" through the NAT onNATKeepAlive(); }}HX_RESULT RTPUDPTransport::handlePacket(IHXBuffer* pBuffer){ m_keepAlive.OnActivity(); return RTPBaseTransport::handlePacket(pBuffer);}void RTPUDPTransport::JoinMulticast(UINT32 ulAddress, UINT32 ulPort, IHXUDPSocket* pUDP){ if (m_ulCurrentMulticastAddress) { m_pMCastUDPSocket->LeaveMulticastGroup(m_ulCurrentMulticastAddress, HXR_INADDR_ANY); } else { m_pMCastUDPSocket = pUDP; m_pMCastUDPSocket->AddRef(); } m_pMCastUDPSocket->JoinMulticastGroup(ulAddress, HXR_INADDR_ANY); m_bMulticast = TRUE; m_ulCurrentMulticastAddress = ulAddress; m_ulCurrentMulticastPort = ulPort; if (m_pStreamHandler) { RTSPStreamData* pStreamData = m_pStreamHandler->firstStreamData(); ASSERT(pStreamData); while(pStreamData) { pStreamData->m_pTransportBuffer->SetMulticast(); pStreamData = m_pStreamHandler->nextStreamData(); } } return;}HX_RESULT RTPUDPTransport::onNATKeepAlive(){ DPRINTF(D_INFO, ("RTP : onNATKeepAlive()\n")); // Send an RTP packet with PT=0 and no payload IHXBuffer* pPktBuf = NULL; if (m_pCommonClassFactory && (HXR_OK == m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, (void**)&pPktBuf))) { RTPPacket pkt; pkt.version_flag = 2; pkt.padding_flag = 0; pkt.csrc_len = 0; pkt.marker_flag = 0; pkt.extension_flag = 0; pkt.data.data = 0; pkt.data.len = 0; pkt.ssrc = m_pReportHandler->GetSSRC(); pkt.payload = 0; pkt.seq_no = m_keepAliveSeq++; pkt.timestamp = HX_GET_TICKCOUNT() * 8; // Timestamp in 1/8000 sec UINT32 packetLen = pkt.static_size() + pkt.data.len; if (HXR_OK == pPktBuf->SetSize(packetLen)) { // Pack the data into the buffer pkt.pack(pPktBuf->GetBuffer(), packetLen); pPktBuf->SetSize(packetLen); // Update the packet size writePacket(pPktBuf); } } HX_RELEASE(pPktBuf); return HXR_OK;}HX_RESULTRTPUDPTransport::writePacket(IHXBuffer* pSendBuffer){ if (!m_pUDPSocket) return HXR_FAIL; m_keepAlive.OnActivity(); return m_pUDPSocket->WriteTo(m_foreignAddr, m_foreignPort, pSendBuffer);}/* * XXXMC * Special-case handling for PV clients */HX_RESULTRTPUDPTransport::sendPVHandshakeResponse(UINT8* pPktPayload){ IHXBuffer* pPktPayloadBuff = NULL; m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, (void**) &pPktPayloadBuff); if (pPktPayloadBuff) { DPRINTF(D_INFO, ("RTP: Sending POKE PKT RESPONSE\n")); pPktPayloadBuff->Set((UCHAR*)pPktPayload, 8); writePacket(pPktPayloadBuff); pPktPayloadBuff->Release(); } return HXR_OK;}HX_RESULTRTPUDPTransport::sendPacket(BasePacket* pPacket){ HX_ASSERT(m_bActive); HX_RESULT theErr; if (m_ulPayloadWirePacket!=0) { IHXBuffer* pSendBuf = NULL; theErr = reflectPacket(pPacket, pSendBuf); if (HXR_OK == theErr) { theErr = writePacket(pSendBuf); pSendBuf->Release(); } else if (HXR_IGNORE == theErr) { return HXR_OK; } return theErr; } IHXBuffer* pPacketBuf = NULL; theErr = makePacket(pPacket, pPacketBuf); if (HXR_OK == theErr) { theErr = writePacket(pPacketBuf); /* send SR if necessary */ if (HXR_OK == theErr && m_pRTCPTran->m_bSendReport && m_pRTCPTran->m_bSendRTCP) { m_pRTCPTran->sendSenderReport(); m_pRTCPTran->m_bSendReport = FALSE; m_pRTCPTran->scheduleNextReport(); } } HX_RELEASE(pPacketBuf); return theErr;}RTPUDPTransport::KeepAliveCB::KeepAliveCB(RTPUDPTransport* pTransport): m_pTransport(pTransport), m_lRefCount(0){ if(m_pTransport) { m_pTransport->AddRef(); }}RTPUDPTransport::KeepAliveCB::~KeepAliveCB(){ HX_RELEASE(m_pTransport);}STDMETHODIMPRTPUDPTransport::KeepAliveCB::QueryInterface(REFIID riid, void** ppvObj){ if (IsEqualIID(riid, IID_IUnknown)) { AddRef(); *ppvObj = this; return HXR_OK; } else if (IsEqualIID(riid, IID_IHXCallback)) { AddRef(); *ppvObj = (IHXCallback*)this; return HXR_OK; } *ppvObj = NULL; return HXR_NOINTERFACE;}STDMETHODIMP_(UINT32)RTPUDPTransport::KeepAliveCB::AddRef(){ return InterlockedIncrement(&m_lRefCount);}STDMETHODIMP_(UINT32)RTPUDPTransport::KeepAliveCB::Release(){ if(InterlockedDecrement(&m_lRefCount) > 0) { return m_lRefCount; } delete this; return 0;}STDMETHODIMPRTPUDPTransport::KeepAliveCB::Func(){ if (m_pTransport) { m_pTransport->onNATKeepAlive(); } return HXR_OK;}/* * RTP TCP */RTPTCPTransport::RTPTCPTransport(BOOL bIsSource) : RTPBaseTransport(bIsSource) , m_pTCPSocket(0) , m_tcpInterleave(0){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -