📄 ppffsrc.cpp
字号:
// then, Buffer IHXBuffer* pBuf; if (m_pHeader->GetFirstPropertyBuffer(pCh, pBuf) == HXR_OK) { pHeader->SetPropertyBuffer(pCh, pBuf); HX_RELEASE(pBuf); while (HXR_OK == m_pHeader->GetNextPropertyBuffer(pCh, pBuf)) { pHeader->SetPropertyBuffer(pCh, pBuf); HX_RELEASE(pBuf); } } pHeader->SetPropertyULONG32("StreamNumber", unStreamNumber); pHeader->SetPropertyULONG32("LiveStream", 1);#ifndef _USE_MASTER_STREAM_SYNC_SCHEME if (!m_bIsRMSource) { // Non RM source are RTCP paced an synchronized. // We muits indicate to a renderer (audio) to pay close attention to // time stamps and produce appropriate gaps if needed to maintain // synchronization. pHeader->SetPropertyULONG32("NonContigTime", 1); }#endif // _USE_MASTER_STREAM_SYNC_SCHEME } BOOL CPurePlaySource::AreDependenciesOK(UINT16 uRule) { return m_pRules[uRule].m_bSyncOK; } void CPurePlaySource::RecvRule(BOOL bOn, UINT16 uRule) { HX_ASSERT(m_pRules != NULL); if (bOn) m_pRules[uRule].m_bRecvOn = TRUE; } void CPurePlaySource::SetSyncOK(UINT16 uRule, BOOL bOK) { m_pRules[uRule].m_bSyncOK = bOK; } BOOL CPurePlaySource::IsDependOK(BOOL bOn, UINT16 uRule) { /* * Right now return TRUE if there are no rules and only check the * dependencies if all rules that the current rule depends on have * been received. */ if (m_pRules == NULL) { return TRUE; } UINT16* pDeps = m_pRules[uRule].m_pOnDepends; if (!pDeps) { return m_pRules[uRule].m_bRecvOn; } for (; *pDeps != PP_DEPEND_END; pDeps++) { if (!m_pRules[*pDeps].m_bRecvOn) { return FALSE; } } return TRUE; }HX_RESULT CPurePlaySource::Distribute(ULONG32 ulDistributionMode, ULONG32 ulVal1, ULONG32 ulVal2){ CHXMapLongToObj::Iterator i; CStream* pStream; BOOL bDone; HX_RESULT retVal = HXR_OK; i = m_SsrcToStream.Begin(); do { bDone = TRUE; pStream = NULL; if (i != m_SsrcToStream.End()) { bDone = FALSE; pStream = ((CStream*)(*i)); ++i; } switch (ulDistributionMode) { case DISTRIBUTE_SYNC: m_ulHXMasterTime = ulVal1; m_lHXOffsetToMaster = ((LONG32) ulVal2); m_bSyncDistributed = TRUE; if (pStream) { pStream->m_Syncer.HandleMasterSync(m_ulHXMasterTime, m_lHXOffsetToMaster); } break; case DISTRIBUTE_SYNCANCHOR: m_ulHXAnchorTime = ulVal1; m_ulNTPHXTime = ulVal2; m_bSyncAnchorDistributed = TRUE; if (pStream) { pStream->m_Syncer.AnchorSync(m_ulHXAnchorTime, m_ulNTPHXTime); } break; case DISTRIBUTE_STARTTIME: m_ulStartTime = ulVal1; m_bStartTimeDistributed = TRUE; if (pStream) { pStream->m_Syncer.SetStartTime(ulVal1, TRUE); } break; default: // nothing to do retVal = HXR_UNEXPECTED; break; } } while ((!bDone) && (retVal == HXR_OK)); return retVal;}HX_RESULT CPurePlaySource::HandleRTPMsg(HX_RESULT status, IHXBuffer* pBuf){ if (m_pPlugin->m_state == Closed) {// it is closing... return HXR_OK; } if (status == HXR_OK) { IHXPacket* pPacket = NULL; IHXBuffer* pDataBuf = NULL; RTPPacket pkt; UINT16 uASMRule = 0; UINT8 chASMFlags = HX_ASM_SWITCH_ON; BOOL bSyncOK = TRUE; CStream* pStrm = NULL; ULONG32 ulTimeNow = HX_GET_BETTERTICKCOUNT(); ULONG32 ulHXTime; ULONG32 ulRTPTime; BOOL bWasInProbation; HX_ASSERT(pBuf != NULL); /* * Convert this RTP packet into an IHXPacket */ pkt.unpack(pBuf->GetBuffer(), pBuf->GetSize()); if (!GetStreamBySSRC(pkt.ssrc, pStrm)) { // never seen this ssrc. add it to the map if (!AddNewEntryToMap(pkt.ssrc, pStrm)) { m_pRTPSock->Read(1024); return HXR_OK; }#ifdef XXXGo_DEBUG if (m_pLogFile) { fprintf(m_pLogFile, "HandleRTP: adding a new entry with ssrc = %u\n", pkt.ssrc); fflush(m_pLogFile); }#endif } HX_ASSERT(pStrm); if (pkt.payload != m_chRTPPayload) { // we have to ignore any packet whose payload does not match the // one in SDP file. m_pRTPSock->Read(1024); HX_ASSERT(FALSE && "Received packed with different payload type on the same stream"); return HXR_OK; } // yes, we heard from you! pStrm->m_bHeardSinceLastTime = TRUE; pStrm->m_ulNumRRIntervals = 0; if (!pStrm->m_bMadeCut) { m_pRTPSock->Read(1024); return HXR_OK; } bWasInProbation = pStrm->IsInProbation(); // check version and seq# if ((RTP_VERSION != pkt.version_flag) || !pStrm->UpdateSeqNum((UINT16)pkt.seq_no, pkt.timestamp)) { // invalid! m_pRTPSock->Read(1024); return HXR_OK; } if (bWasInProbation && (!pStrm->IsInProbation())) { m_ulStartedSourceCount++; } /************************ * calculate jitter (rfc 1889 Appendix A.8) */ INT32 lTransit = ulTimeNow / 1000 - pkt.timestamp; INT32 lD = lTransit - pStrm->m_ulTransit; pStrm->m_ulTransit = lTransit; if (lD < 0) { lD = -lD; } pStrm->m_ulJitter += (UINT32)((1./16.) * ((double)lD - pStrm->m_ulJitter)); /* * Store off the initial time value of the first packet */ if (pStrm->m_bInitialPkt) {#ifdef XXXGo_DEBUG if (m_pLogFile) { fprintf(m_pLogFile, "ssrc=%u, num sources = %d\n", pkt.ssrc, GetNumSrc()); fflush(m_pLogFile); } #endif pStrm->m_bInitialPkt = FALSE; pStrm->m_ulInitialRTPTime = pkt.timestamp; pStrm->m_ulStartTimeMS = ulTimeNow; pStrm->m_ulStatStartMS = ulTimeNow; if (!IsRMSource()) { pStrm->m_Syncer.SetStartTime(ulTimeNow); pStrm->m_Syncer.SetStartSync(pkt.timestamp, STARTING_HX_TIMESTAMP, TRUE, ulTimeNow); ulHXTime = pStrm->m_Syncer.RTP2SyncHX(pkt.timestamp); ulRTPTime = pStrm->m_Syncer.RTP2SyncRTP(pkt.timestamp); pStrm->m_ulLastRawRTPTS = pkt.timestamp; pStrm->m_ulLastRTPTS = ulRTPTime; pStrm->m_ulLastHXTS = ulHXTime; } } /* * Update stats with Packet size plus UDP overhead (8) and IP (20) */ pStrm->SetStats(ulTimeNow, pBuf->GetSize() + 28); // XXXGo // we need this // this is the very first pkt of this "stream" if (m_bInitialPacket) {#ifdef XXXGo_DEBUG if (m_pLogFile) { fprintf(m_pLogFile, "Very First Pk ssrc=%u\n", pkt.ssrc); fflush(m_pLogFile); }#endif // if we have failed to obtain payload from the stream header, // m_chRTPPayload is -1 if (m_chRTPPayload != -1 && m_chRTPPayload != pkt.payload) { // payload of packet received is not the same as the one in sdp // we have to ignore this. m_pRTPSock->Read(1024); return HXR_OK; } m_bInitialPacket = FALSE; /* * Schedule the initial RTCP receiver report */ UINT32 ulNextTime = (UINT32) (m_pRTCPInterval->GetRTCPInterval( m_pRTCPInterval->GetRTCPBW(), m_SsrcToMember.GetCount(), GetTrueNumSrc()) * 1000.0); Schedule(ulNextTime, RTCP_SR); } HX_ASSERT(pkt.ssrc == ((INT32) pStrm->m_ulSSRC)); /* * Convert the timestamp back to RMA format from RTP format * if non RealMedia presentation */ if (IsRMSource()) { // RealMedia source ulHXTime = (ULONG32) pkt.timestamp; #ifdef XXXGo_DEBUG if (m_pLogFile) { fprintf(m_pLogFile, "transfering ssrc(%u): TS(%u) -> %u\n", pkt.ssrc, pkt.timestamp, ulHXTime); fflush(m_pLogFile); } #endif if ((pkt.extension_flag == 1) && (pkt.op_code == RTP_OP_ASMRULES)) { chASMFlags = (UINT8) pkt.asm_flags; uASMRule = pkt.asm_rule; } } else { // Non-RealMedia source // Wait for wall clock synchronization if (!pStrm->m_Syncer.IsStartSyncSet()) { // We can't convert to synchronized time stamp yet or // the synchronization point is not close enough to be accurate#ifdef XXXGo_DEBUG if (m_pLogFile) { fprintf(m_pLogFile, "Ignoring packet before RTCP sync: ssrc(%u): TS(%u)\n", pkt.ssrc, pkt.timestamp); fflush(m_pLogFile); } #endif m_pRTPSock->Read(1024); return HXR_OK; } // Convert from RTP to synchronized RMA time if (pStrm->m_ulLastRawRTPTS == pkt.timestamp) { // We need to make sure the RTCP synchronization does not // cause subsequent, same time-stamped packets to take on // different time stamps as this may compromise payload // integrity. ulRTPTime = pStrm->m_ulLastRTPTS; ulHXTime = pStrm->m_ulLastHXTS; } else { ulHXTime = pStrm->m_Syncer.RTP2SyncHX(pkt.timestamp); ulRTPTime = pStrm->m_Syncer.RTP2SyncRTP(pkt.timestamp); pStrm->m_ulLastRawRTPTS = pkt.timestamp; pStrm->m_ulLastRTPTS = ulRTPTime; pStrm->m_ulLastHXTS = ulHXTime; } #ifdef _SYNC_TRACE if (m_sfile) { fprintf(m_sfile, "adjusting ssrc(%u): TS(%u) -> %u\n", pkt.ssrc, pkt.timestamp, ulHXTime); fflush(m_sfile); }#endif // _SYNC_TRACE#ifdef XXXGo_DEBUG if (m_pLogFile) { fprintf(m_pLogFile, "adjusting ssrc(%u): TS(%u) -> %u\n", pkt.ssrc, pkt.timestamp, ulHXTime); fflush(m_pLogFile); } #endif uASMRule = pkt.marker_flag ? 1 : 0; chASMFlags = HX_ASM_SWITCH_ON; } pDataBuf = new CHXBuffer; pDataBuf->AddRef(); pDataBuf->Set((unsigned char*)pkt.data.data, pkt.data.len); if (m_bIsRMSource) { pPacket = new CHXPacket; pPacket->AddRef(); pPacket->Set(pDataBuf, ulHXTime, pStrm->m_unStreamNum, chASMFlags, uASMRule); } else { pPacket = new CHXRTPPacket; pPacket->AddRef(); pStrm->SequentializeRMATS(ulHXTime); ((IHXRTPPacket*) pPacket)->SetRTP(pDataBuf, ulHXTime, ulRTPTime, pStrm->m_unStreamNum, chASMFlags, uASMRule); } RecvRule((chASMFlags & HX_ASM_SWITCH_ON), uASMRule); /* * Add this packet to the list of pending packets and process * the list */#ifdef _DO_BUFFER_OCCUPANCY_CHECKS if (pStrm->m_pTransBuf->GetQueuedPktCount() > MAX_BUFFERED_STREAM_PACKETS) { IHXPacket* pDeadPacket = NULL; while (pStrm->m_pTransBuf->GetQueuedPktCount() > UNLOCK_THRESHOLD_LEVEL) { if (pStrm->GetPacket(pDeadPacket, ulTimeNow) != HXR_OK) { break; } HX_RELEASE(pDeadPacket); } }#endif // _DO_BUFFER_OCCUPANCY_CHECKS if ((!IsRMSource()) || AreDependenciesOK(uASMRule)) {#ifdef XXXGo_DEBUG //fprintf(m_file, "+++ %u Adding Packet\n", iStream);#endif pStrm->m_pTransBuf->AddPacket(pkt.seq_no, pPacket, ulTimeNow); } else { /* * Check to see if the dependencies have been filled and add * the packet if they have */ if (IsDependOK((chASMFlags & HX_ASM_SWITCH_ON), uASMRule)) { SetSyncOK(uASMRule); pStrm->m_pTransBuf->AddPacket(pkt.seq_no, pPacket, ulTimeNow); } } pDataBuf->Release(); pPacket->Release(); #ifdef XXXGo_DEBUG//fprintf(m_pLogFile, "HandleRTP: ssrc = %u, t.s. = %u, #Sources = %d\n", // pkt.ssrc, lHXTime, GetNumSrc());#endif m_pPlugin->ProcessPendingPackets(); } else {#ifdef XXXGo_DEBUG if (m_pFFLog) { fprintf(m_pFFLog, "this is why!!!\n"); } #endif HX_ASSERT(FALSE); m_pPlugin->Close(); return HXR_FAIL; } if (m_pRTPSock) { m_pRTPSock->Read(1024); } return HXR_OK;}#define RTCP_VALID_MASK (0xe0fe)#define RTCP_VALID_VALUE (0x80c8)HX_RESULTCPurePlaySource::HandleRTCPMsg(HX_RESULT status, IHXBuffer* pBuf){ if (status != HXR_OK || !pBuf) { HX_ASSERT(FALSE); return HXR_OK; } pBuf->AddRef(); HX_RESULT theErr = HXR_OK; BYTE* pFirst = pBuf->GetBuffer(); BYTE* pNext = pFirst; // len of compound RTCP pkt in words UINT32 ulCompoundLen = pBuf->GetSize(); CStream* pStrm = NULL; UINT32 ulSsrc = 0; // put ssrc if this ssrc is not found in a map CMember* pMember = NULL; if (m_pRTCPInterval) { m_pRTCPInterval->UpdateAvgRTCPSize(ulCompoundLen); } /* ** validity check */ BYTE* pCompound = pFirst; BYTE* pCompoundEnd; UINT16 nRTCPHeader = getshort(pCompound); if ((nRTCPHeader & RTCP_VALID_MASK) != RTCP_VALID_VALUE) { HX_ASSERT(FALSE && "invalid RTCP header"); /* something is wrong */ goto done;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -