📄 rtptran.cpp
字号:
pkt.op_code_data_length = 1; pkt.asm_flags = ruleFlags; pkt.asm_rule = ruleNumber; } else { pkt.extension_flag = 0; } if (pStreamData->m_bFirstPacket) { m_pRTCPTran->startScheduler(); m_pRTCPTran->m_bSendBye = TRUE; pStreamData->m_bFirstPacket = FALSE; // init report handler with starting NTP time and // 0 RTP time as the reference point. m_pReportHandler->Init(*m_pFirstPlayTime, 0, pStreamData->m_pTSConverter); // at this point, it should have the same stream number HX_ASSERT(m_streamNumber == m_pRTCPTran->m_streamNumber); HX_ASSERT(m_bRTPTimeSet); } // externally, we need to offset the timestamp... // // XXXGo // In RTSP PLAY Response msg, there is a RTP-Info header in which there // is a rtp timestap that corresponds to NTP time spesified in PLAY Request. // Since PLAY Response goes out before we ever receive the first pkt, it // always returns RTP time equivalent of NTP time in a Range header as a timestamp. // So, we need to offset the timestamp here. // In future, we might want to change the calling sequence so we don't have to do this... if (m_bRTPTimeSet) { INT64 nNewRTPOffset = 0; UINT32 ulRawRTPTime = 0; HXTimeval hxNow = m_pScheduler->GetCurrentSchedulerTime(); Timeval tvNow; NTPTime ntpNow; // Convert scheduler time to something we can use. tvNow.tv_sec = hxNow.tv_sec; tvNow.tv_usec = hxNow.tv_usec; ntpNow = NTPTime(tvNow); if (pStreamData->m_pTSConverter) { ulRawRTPTime = pStreamData->m_pTSConverter->hxa2rtp((UINT32)(ntpNow - m_pReportHandler->GetNTPBase())); } else { ulRawRTPTime = (UINT32)(ntpNow - m_pReportHandler->GetNTPBase()); } nNewRTPOffset = CAST_TO_INT64 pkt.timestamp - CAST_TO_INT64 ulRawRTPTime; m_pReportHandler->SetRTPBase(nNewRTPOffset); // if this is true, there was a Range header in a PLAY request m_bRTPTimeSet = FALSE; if (m_bIsLive) { m_ulBaseTS = pkt.timestamp - m_lTimeOffsetRTP; } } if (m_bIsLive) { pkt.timestamp -= m_ulBaseTS; } pStreamData->m_lastTimestamp = pkt.timestamp; /* * Create enough space to account for the op code and * op code data if the extension bit is set */ packetLen = pkt.static_size() + pBuffer->GetSize() + (pkt.extension_flag ? sizeof(UINT16) + (pkt.op_code_data_length * sizeof(UINT32)) : 0); IHXBuffer* pPacketOut = NULL; m_pCommonClassFactory->CreateInstance(IID_IHXBuffer, (void**)&pPacketOut); if(pPacketOut) { pPacketOut->SetSize(packetLen); pkt.pack(pPacketOut->GetBuffer(), packetLen); pPacketOut->SetSize(packetLen); //update with actual packed length#ifdef DEBUG if (m_drop_packets && ++m_packets_since_last_drop % 10 == 0) { goto RTPsendContinue; }#endif /* DEBUG */ updateQoSInfo(bufLen); // out params... pPacketBuf = pPacketOut; /* update */ m_pReportHandler->OnRTPSend(pkt.seq_no, 1, pBasePacket->GetSize(), pkt.timestamp); } else { hresult = HXR_OUTOFMEMORY; }#ifdef DEBUGRTPsendContinue:#endif pBuffer->Release(); pPacket->Release(); return hresult;}HX_RESULTRTPBaseTransport::handlePacket(IHXBuffer* pBuffer){ if (!m_ulPacketsSent && m_bEmulatePVSession) { /* XXXMC * Special-case handling for PV clients */ UINT8* pUDPPktPayload = pBuffer->GetBuffer(); UINT8 ucRTPVersion = (*pUDPPktPayload & 0xc0)>>6; if(ucRTPVersion != 2) { DPRINTF(D_INFO, ("RTP: PV CLIENT PKT RECVD\n")); this->sendPVHandshakeResponse(pUDPPktPayload); return HXR_OK; } } return _handlePacket(pBuffer, TRUE);}HX_RESULTRTPBaseTransport::_handlePacket(IHXBuffer* pBuffer, BOOL bIsRealTime){ RTPPacket pkt; UINT32 timeStampHX = 0; UINT32 timeStampRTP = 0; HX_RESULT hresult = HXR_OK; BOOL bHasASMRules = FALSE; if (m_bIsSource) { return HXR_OK; } if(pkt.unpack(pBuffer->GetBuffer(), pBuffer->GetSize()) == 0) { return HXR_UNEXPECTED; } if(pkt.version_flag != 2) { return HXR_INVALID_VERSION; } // ignore the packets not matching the payload type if (pkt.payload != m_rtpPayloadType) { return HXR_OK; } // stick with the 1st ssrc with the same payload type if (!m_bSSRCDetermined) { m_bSSRCDetermined = TRUE; m_ulSSRCDetermined = pkt.ssrc; m_pRTCPTran->setSSRC(m_ulSSRCDetermined); } // ignore the packets with different ssrc but with the same payload time else if (m_ulSSRCDetermined != pkt.ssrc) { return HXR_OK; } RTSPStreamData* pStreamData = m_pStreamHandler->getStreamData(m_streamNumber); HX_ASSERT(pStreamData != NULL); // If this function is called in Real-Time, handle RTCP response as needed if (bIsRealTime) { HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime(); ULONG32 ulPktRcvTime = rmatv.tv_sec*1000 + rmatv.tv_usec/1000; // Convert reception time to the packet time stamp units if (m_pRTCPTran->GetTSConverter()) { ulPktRcvTime = m_pRTCPTran->GetTSConverter()->hxa2rtp(ulPktRcvTime); } // Gather data for RTCP RR if (pStreamData->m_bFirstPacket) { if (!m_pRTCPTran->isShedulerStarted()) { m_pRTCPTran->startScheduler(); } } /* update */ m_pReportHandler->OnRTPReceive(pkt.ssrc, pkt.seq_no, pkt.timestamp, ulPktRcvTime); /* send RR if necessary */ if (m_pRTCPTran->m_bSendReport && m_pRTCPTran->m_bSendRTCP) { m_pRTCPTran->sendReceiverReport(); m_pRTCPTran->m_bSendReport = FALSE; m_pRTCPTran->scheduleNextReport(); } } // If we are waiting for the start info, we cannot place the // packets into the transport buffer yet since we need the // start info to proper scale and offset the packet time // stamps if (m_bWaitForStartInfo) { if (m_StartInfoWaitQueue.GetCount() == 0) { // First packet received m_ulStartInfoWaitStartTime = HX_GET_TICKCOUNT(); m_uFirstSeqNum = pkt.seq_no; m_ulFirstRTPTS = pkt.timestamp; // For Live stream, postpone identification of first packet until we get // a contiguous sequence (some servers have discontinuity on start // of live streams) if (!m_bIsLive) { m_bFirstSet = TRUE; } } else { LONG32 lSeqNumDelta = ((LONG32) (((UINT16) pkt.seq_no) - m_uFirstSeqNum)); // First SeqNum and First TS need not belong to the same packet // We are really looking for the lowest seq. num and lowest time // stamp so that we do not throw away any packets and so that the // time is not wrapped before 0 if (lSeqNumDelta < 0) { m_uFirstSeqNum = pkt.seq_no; } else if (!m_bFirstSet) { // If we have not encountered continuty yet, look for it if (lSeqNumDelta > MAX_NUM_PACKETS_GAPPED_FOR_LIVE_START) { resetStartInfoWaitQueue(); m_uFirstSeqNum = pkt.seq_no; m_ulFirstRTPTS = pkt.timestamp; } else { // Continuity found - we have the start m_bFirstSet = TRUE; } } if (((LONG32) (m_ulFirstRTPTS - pkt.timestamp)) > 0) { m_ulFirstRTPTS = pkt.timestamp; } } pBuffer->AddRef(); m_StartInfoWaitQueue.AddTail(pBuffer); /* If start Info has been at least partially set or the wait has been aborted for some reason (usually when we know it will not be set through out-of band methods <-> RTP Info did not contain start Info we need) or we time-out, stop waiting and hand off acumulated packets to the transport buffer. Also if starting seq. number is not explicitly communicated, scan through few starting packets until we have a good starting sequence number (contiguous) since some servers send lossy streams in the beginning. */ if (m_bSeqNoSet || ((m_bRTPTimeSet || m_bAbortWaitForStartInfo) && ((!m_bIsLive) || (m_StartInfoWaitQueue.GetCount() >= MIN_NUM_PACKETS_SCANNED_FOR_LIVE_START))) || ((HX_GET_TICKCOUNT() - m_ulStartInfoWaitStartTime) > MAX_STARTINFO_WAIT_TIME)) { IHXBuffer* pStoredBuffer; m_bWaitForStartInfo = FALSE; m_bAbortWaitForStartInfo = FALSE; m_bFirstSet = TRUE; while (!m_StartInfoWaitQueue.IsEmpty()) { pStoredBuffer = (IHXBuffer*) m_StartInfoWaitQueue.RemoveHead(); if (pStoredBuffer) { _handlePacket(pStoredBuffer, FALSE); pStoredBuffer->Release(); } } } return HXR_OK; } /* * Extension and asm rule */ if (pkt.extension_flag == 1) { HX_ASSERT(RTP_OP_PACKETFLAGS != pkt.op_code); if (RTP_OP_ASMRULES == pkt.op_code) { bHasASMRules = TRUE; } } /* * RTP-Info: if either one of them were not in RTP-Info, we need to set * it right here. */ if (!m_bSeqNoSet) { if (!m_bFirstSet) { m_uFirstSeqNum = pkt.seq_no; }#ifdef RTP_MESSAGE_DEBUG messageFormatDebugFileOut("INIT: StartSeqNum not in RTP-Info");#endif // RTP_MESSAGE_DEBUG setFirstSeqNum(m_streamNumber, m_uFirstSeqNum); } if (!m_bRTPTimeSet) { if (!m_bFirstSet) { m_ulFirstRTPTS = pkt.timestamp; }#ifdef RTP_MESSAGE_DEBUG messageFormatDebugFileOut("INIT: RTPOffset not in RTP-Info");#endif // RTP_MESSAGE_DEBUG setFirstTimeStamp(m_streamNumber, m_ulFirstRTPTS); m_bWeakStartSync = TRUE; } /* * TimeStamp */ // for RealMedia in scalable multicast, we don't want to adjust // the timestamp since the packets' time is in ms already and // A/V is always in sync if (m_bSkipTimeAdjustment) { timeStampRTP = timeStampHX = pkt.timestamp; } else if (m_bLastTSSet && (m_ulLastRawRTPTS == (ULONG32)pkt.timestamp)) { // We want to preserve same time stamped packet sequences // since some payloads may depend on it for proper coded frame // assembly timeStampRTP = m_ulLastRTPTS; timeStampHX = m_ulLastHXTS; } else { if (pStreamData->m_pTSConverter) { timeStampHX = pStreamData->m_pTSConverter->rtp2hxa(pkt.timestamp); } else { timeStampHX = pkt.timestamp; } timeStampHX += (m_lSyncOffsetHX + m_lOffsetToMasterHX - m_lTimeOffsetHX); timeStampRTP = (pkt.timestamp + m_lSyncOffsetRTP + m_lOffsetToMasterRTP - m_lTimeOffsetRTP); m_ulLastHXTS = timeStampHX; m_ulLastRTPTS = timeStampRTP; m_ulLastRawRTPTS = pkt.timestamp; m_bLastTSSet = TRUE; }#ifdef RTP_MESSAGE_DEBUG if (m_bMessageDebug) { messageFormatDebugFileOut("PKT: (Seq=%6u,RTPTime=%10u) -> (HXTimeval=%10u,RTPTimeval=%10u)", ((UINT16) pkt.seq_no), pkt.timestamp, timeStampHX, timeStampRTP); }#endif // RTP_MESSAGE_DEBUG pStreamData->m_bFirstPacket = FALSE; CHXPacket* pPacket = new CHXRTPPacket; if(pPacket) { pPacket->AddRef(); } else { hresult = HXR_OUTOFMEMORY; } UINT32 dataOffset= (UINT32)((PTR_INT)pkt.data.data - (PTR_INT)pBuffer->GetBuffer()); IHXBuffer* pPktBuffer = new CHXStaticBuffer(pBuffer, dataOffset, pkt.data.len); if(pPktBuffer) { pPktBuffer->AddRef(); } else { hresult = HXR_OUTOFMEMORY; } if( hresult == HXR_OUTOFMEMORY ) { HX_RELEASE(pPacket); return hresult; } if (bHasASMRules) { pPacket->SetRTP(pPktBuffer, timeStampHX, timeStampRTP, m_streamNumber, (UINT8) pkt.asm_flags, pkt.asm_rule); } else if(pkt.marker_flag == 1 && m_bHasMarkerRule) { pPacket->SetRTP(pPktBuffer, timeStampHX, timeStampRTP, m_streamNumber, HX_ASM_SWITCH_ON | HX_ASM_SWITCH_OFF, m_markerRuleNumber); } else { pPacket->SetRTP(pPktBuffer, timeStampHX, timeStampRTP, m_streamNumber, HX_ASM_SWITCH_ON | HX_ASM_SWITCH_OFF, pkt.marker_flag ? 1 : 0); } if (m_bIsSource) { hresult = m_pResp->PacketReady(HXR_OK, m_sessionID, pPacket); } else { hresult = storePacket(pPacket, m_streamNumber, pkt.seq_no, 0, 0); } pPktBuffer->Release(); pPacket->Release(); return hresult;}HX_RESULT RTPBaseTransport::handleRTCPSync(NTPTime ntpTime, ULONG32 ulRTPTime){ HX_RESULT retVal = HXR_IGNORE; // We use RTCP synchronization on live streams only. // Static streams have no reason not to be synchronzied in RTP time. // Making use of RTCP for static streams may result in unwanted sync. // noise/error for servers who do not generate proper RTCP ntp-rtp // mapping. RealServers prior to RealServer9 had error in RTCP reported // ntp-rtp mapping (max. error 1s, avg 500ms). if ((m_bIsLive || m_bWeakStartSync) && !m_bSkipTimeAdjustment) { ULONG32 ulNtpHX = ntpTime.toMSec(); RTSPStreamData* pStreamData = m_pStreamHandler->getStreamData(m_streamNumber);#ifdef RTP_MESSAGE_DEBUG messageFormatDebugFileOut("RTCP-SYNC: Received NTPTime=%u RTPTime=%u", ulNtpHX, ulRTPTime);#endif // RTP_MESSAGE_DEBUG // We ignore the RTCP sync until we can compute npt (m_bRTPTimeSet) or // if the RTCP packet contains no synchronization information // (ulNtpHX == 0) if (pStreamData && (ulNtpHX != 0) && m_bRTPTimeSet) { // Npt time can be computed (ulHXTime) ULONG32 ulHXTime = pStreamData->m_pTSConverter->rtp2hxa(ulRTPTime); retVal = HXR_OK; if ((!m_pSyncServer) && m_pResp) { m_pResp->QueryInterface(IID_IHXTransportSyncServer, (void**) &m_pSyncServer); } if (m_bNTPtoHXOffsetSet) { // We can sync - NTP to NPT offset is known ULONG32 ulExpectedHXTime = ulNtpHX + m_lNTPtoHXOffset; LONG32 lSyncOffsetHX = ulExpectedHXTime - ulHXTime; LONG32 lSyncOffsetChange = lSyncOffsetHX - m_lSyncOffsetHX; if ((lSyncOffsetChange > ACCEPTABLE_SYNC_NOISE) || (lSyncOffsetChange < (-ACCEPTABLE_SYNC_NOISE))) { if (m_bIsSyncMaster && m_pSyncServer) {#ifdef RTP_MESSAGE_DEBUG messageFormatDebugFileOut("RTCP-SYNC: Distribute Master Sync NPTTime=%u SyncOffset=%d", ulHXTime, -lSyncOffsetHX);#endif // RTP_MESSAGE_DEBUG m_pSyncServer->DistributeSync(ulHXTime, -lSyncOffsetHX); } else { m_lSyncOffsetHX = lSyncOffsetHX; if (lSyncOffsetHX >= 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -