📄 rtptran.cpp
字号:
{
pkt.timestamp =
pStreamData->m_pTSConverter->hxa2rtp(pPacket->GetTime());
}
else
{
pkt.timestamp = pPacket->GetTime();
}
/*
* Extension and asm rule
*/
if (RTP_OP_ASMRULES == m_ulExtensionSupport)
{
// this is the only one right now.
pkt.extension_flag = 1;
pkt.op_code = RTP_OP_ASMRULES;
pkt.op_code_data_length = 1;
pkt.asm_flags = ruleFlags;
pkt.asm_rule = ruleNumber;
}
else
{
pkt.extension_flag = 0;
}
if (pStreamData->m_bFirstPacket)
{
m_pRTCPTran->startScheduler();
m_pRTCPTran->m_bSendBye = TRUE;
pStreamData->m_bFirstPacket = FALSE;
// init report handler with starting NTP time and
// 0 RTP time as the reference point.
m_pReportHandler->Init(*m_pFirstPlayTime,
0,
pStreamData->m_pTSConverter);
// at this point, it should have the same stream number
HX_ASSERT(m_streamNumber == m_pRTCPTran->m_streamNumber);
HX_ASSERT(m_bRTPTimeSet);
}
// externally, we need to offset the timestamp...
//
// XXXGo
// In RTSP PLAY Response msg, there is a RTP-Info header in which there
// is a rtp timestap that corresponds to NTP time spesified in PLAY Request.
// Since PLAY Response goes out before we ever receive the first pkt, it
// always returns RTP time equivalent of NTP time in a Range header as a timestamp.
// So, we need to offset the timestamp here.
// In future, we might want to change the calling sequence so we don't have to do this...
if (m_bRTPTimeSet)
{
INT64 nNewRTPOffset = 0;
UINT32 ulRawRTPTime = 0;
HXTimeval hxNow = m_pScheduler->GetCurrentSchedulerTime();
Timeval tvNow;
NTPTime ntpNow;
// Convert scheduler time to something we can use.
tvNow.tv_sec = hxNow.tv_sec;
tvNow.tv_usec = hxNow.tv_usec;
ntpNow = NTPTime(tvNow);
if (pStreamData->m_pTSConverter)
{
ulRawRTPTime = pStreamData->m_pTSConverter->hxa2rtp((UINT32)(ntpNow - m_pReportHandler->GetNTPBase()));
}
else
{
ulRawRTPTime = (UINT32)(ntpNow - m_pReportHandler->GetNTPBase());
}
nNewRTPOffset = CAST_TO_INT64 pkt.timestamp - CAST_TO_INT64 ulRawRTPTime;
m_pReportHandler->SetRTPBase(nNewRTPOffset);
// if this is true, there was a Range header in a PLAY request
m_bRTPTimeSet = FALSE;
if (m_bIsLive)
{
m_ulBaseTS = pkt.timestamp - m_lTimeOffsetRTP;
}
}
if (m_bIsLive)
{
pkt.timestamp -= m_ulBaseTS;
}
pStreamData->m_lastTimestamp = pkt.timestamp;
/*
* Create enough space to account for the op code and
* op code data if the extension bit is set
*/
packetLen = pkt.static_size() + pBuffer->GetSize() +
(pkt.extension_flag
? sizeof(UINT16) + (pkt.op_code_data_length * sizeof(UINT32))
: 0);
IHXBuffer* pPacketOut = NULL;
m_pCommonClassFactory->CreateInstance(IID_IHXBuffer,
(void**)&pPacketOut);
if(pPacketOut)
{
pPacketOut->SetSize(packetLen);
pkt.pack(pPacketOut->GetBuffer(), packetLen);
pPacketOut->SetSize(packetLen); //update with actual packed length
#ifdef DEBUG
if (m_drop_packets && ++m_packets_since_last_drop % 10 == 0)
{
goto RTPsendContinue;
}
#endif /* DEBUG */
updateQoSInfo(bufLen);
// out params...
pPacketBuf = pPacketOut;
/* update */
m_pReportHandler->OnRTPSend(pkt.seq_no, 1, pBasePacket->GetSize(), pkt.timestamp);
}
else
{
hresult = HXR_OUTOFMEMORY;
}
#ifdef DEBUG
RTPsendContinue:
#endif
pBuffer->Release();
pPacket->Release();
return hresult;
}
HX_RESULT
RTPBaseTransport::handlePacket(IHXBuffer* pBuffer)
{
if (!m_ulPacketsSent && m_bEmulatePVSession)
{
/* XXXMC
* Special-case handling for PV clients
*/
UINT8* pUDPPktPayload = pBuffer->GetBuffer();
UINT8 ucRTPVersion = (*pUDPPktPayload & 0xc0)>>6;
if(ucRTPVersion != 2)
{
DPRINTF(D_INFO, ("RTP: PV CLIENT PKT RECVD\n"));
this->sendPVHandshakeResponse(pUDPPktPayload);
return HXR_OK;
}
}
return _handlePacket(pBuffer, TRUE);
}
HX_RESULT
RTPBaseTransport::_handlePacket(IHXBuffer* pBuffer, BOOL bIsRealTime)
{
RTPPacket pkt;
UINT32 timeStampHX = 0;
UINT32 timeStampRTP = 0;
HX_RESULT hresult = HXR_OK;
BOOL bHasASMRules = FALSE;
if (m_bIsSource)
{
return HXR_OK;
}
if(pkt.unpack(pBuffer->GetBuffer(), pBuffer->GetSize()) == 0)
{
return HXR_UNEXPECTED;
}
if(pkt.version_flag != 2)
{
return HXR_INVALID_VERSION;
}
// ignore the packets not matching the payload type
if (pkt.payload != m_rtpPayloadType)
{
return HXR_OK;
}
// stick with the 1st ssrc with the same payload type
if (!m_bSSRCDetermined)
{
m_bSSRCDetermined = TRUE;
m_ulSSRCDetermined = pkt.ssrc;
m_pRTCPTran->setSSRC(m_ulSSRCDetermined);
}
// ignore the packets with different ssrc but with the same payload time
else if (m_ulSSRCDetermined != pkt.ssrc)
{
return HXR_OK;
}
RTSPStreamData* pStreamData = m_pStreamHandler->getStreamData(m_streamNumber);
HX_ASSERT(pStreamData != NULL);
// If this function is called in Real-Time, handle RTCP response as needed
if (bIsRealTime)
{
HXTimeval rmatv = m_pScheduler->GetCurrentSchedulerTime();
ULONG32 ulPktRcvTime = rmatv.tv_sec*1000 + rmatv.tv_usec/1000;
// Convert reception time to the packet time stamp units
if (m_pRTCPTran->GetTSConverter())
{
ulPktRcvTime = m_pRTCPTran->GetTSConverter()->hxa2rtp(ulPktRcvTime);
}
// Gather data for RTCP RR
if (pStreamData->m_bFirstPacket)
{
if (!m_pRTCPTran->isShedulerStarted())
{
m_pRTCPTran->startScheduler();
}
}
/* update */
m_pReportHandler->OnRTPReceive(pkt.ssrc,
pkt.seq_no,
pkt.timestamp,
ulPktRcvTime);
/* send RR if necessary */
if (m_pRTCPTran->m_bSendReport && m_pRTCPTran->m_bSendRTCP)
{
m_pRTCPTran->sendReceiverReport();
m_pRTCPTran->m_bSendReport = FALSE;
m_pRTCPTran->scheduleNextReport();
}
}
// If we are waiting for the start info, we cannot place the
// packets into the transport buffer yet since we need the
// start info to proper scale and offset the packet time
// stamps
if (m_bWaitForStartInfo)
{
if (m_StartInfoWaitQueue.GetCount() == 0)
{
// First packet received
m_ulStartInfoWaitStartTime = HX_GET_TICKCOUNT();
m_uFirstSeqNum = pkt.seq_no;
m_ulFirstRTPTS = pkt.timestamp;
// For Live stream, postpone identification of first packet until we get
// a contiguous sequence (some servers have discontinuity on start
// of live streams)
if (!m_bIsLive)
{
m_bFirstSet = TRUE;
}
}
else
{
LONG32 lSeqNumDelta = ((LONG32) (((UINT16) pkt.seq_no) - m_uFirstSeqNum));
// First SeqNum and First TS need not belong to the same packet
// We are really looking for the lowest seq. num and lowest time
// stamp so that we do not throw away any packets and so that the
// time is not wrapped before 0
if (lSeqNumDelta < 0)
{
m_uFirstSeqNum = pkt.seq_no;
}
else if (!m_bFirstSet)
{
// If we have not encountered continuty yet, look for it
if (lSeqNumDelta > MAX_NUM_PACKETS_GAPPED_FOR_LIVE_START)
{
resetStartInfoWaitQueue();
m_uFirstSeqNum = pkt.seq_no;
m_ulFirstRTPTS = pkt.timestamp;
}
else
{
// Continuity found - we have the start
m_bFirstSet = TRUE;
}
}
if (((LONG32) (m_ulFirstRTPTS - pkt.timestamp)) > 0)
{
m_ulFirstRTPTS = pkt.timestamp;
}
}
pBuffer->AddRef();
m_StartInfoWaitQueue.AddTail(pBuffer);
/* If start Info has been at least partially set or the wait has been
aborted for some reason (usually when we know it will not be set
through out-of band methods <-> RTP Info did not contain start Info
we need) or we time-out, stop waiting and hand off acumulated
packets to the transport buffer.
Also if starting seq. number is not explicitly communicated,
scan through few starting packets until we have a good starting
sequence number (contiguous) since some servers send lossy streams
in the beginning. */
if (m_bSeqNoSet ||
((m_bRTPTimeSet || m_bAbortWaitForStartInfo) &&
((!m_bIsLive) || (m_StartInfoWaitQueue.GetCount() >= MIN_NUM_PACKETS_SCANNED_FOR_LIVE_START))) ||
((HX_GET_TICKCOUNT() - m_ulStartInfoWaitStartTime) >
MAX_STARTINFO_WAIT_TIME))
{
IHXBuffer* pStoredBuffer;
m_bWaitForStartInfo = FALSE;
m_bAbortWaitForStartInfo = FALSE;
m_bFirstSet = TRUE;
while (!m_StartInfoWaitQueue.IsEmpty())
{
pStoredBuffer = (IHXBuffer*) m_StartInfoWaitQueue.RemoveHead();
if (pStoredBuffer)
{
_handlePacket(pStoredBuffer, FALSE);
pStoredBuffer->Release();
}
}
}
return HXR_OK;
}
/*
* Extension and asm rule
*/
if (pkt.extension_flag == 1)
{
HX_ASSERT(RTP_OP_PACKETFLAGS != pkt.op_code);
if (RTP_OP_ASMRULES == pkt.op_code)
{
bHasASMRules = TRUE;
}
}
/*
* RTP-Info: if either one of them were not in RTP-Info, we need to set
* it right here.
*/
if (!m_bSeqNoSet)
{
if (!m_bFirstSet)
{
m_uFirstSeqNum = pkt.seq_no;
}
#ifdef RTP_MESSAGE_DEBUG
messageFormatDebugFileOut("INIT: StartSeqNum not in RTP-Info");
#endif // RTP_MESSAGE_DEBUG
setFirstSeqNum(m_streamNumber, m_uFirstSeqNum);
}
if (!m_bRTPTimeSet)
{
if (!m_bFirstSet)
{
m_ulFirstRTPTS = pkt.timestamp;
}
#ifdef RTP_MESSAGE_DEBUG
messageFormatDebugFileOut("INIT: RTPOffset not in RTP-Info");
#endif // RTP_MESSAGE_DEBUG
setFirstTimeStamp(m_streamNumber, m_ulFirstRTPTS);
m_bWeakStartSync = TRUE;
}
/*
* TimeStamp
*/
// for RealMedia in scalable multicast, we don't want to adjust
// the timestamp since the packets' time is in ms already and
// A/V is always in sync
if (m_bSkipTimeAdjustment)
{
timeStampRTP = timeStampHX = pkt.timestamp;
}
else if (m_bLastTSSet && (m_ulLastRawRTPTS == (ULONG32)pkt.timestamp))
{
// We want to preserve same time stamped packet sequences
// since some payloads may depend on it for proper coded frame
// assembly
timeStampRTP = m_ulLastRTPTS;
timeStampHX = m_ulLastHXTS;
}
else
{
if (pStreamData->m_pTSConverter)
{
timeStampHX = pStreamData->m_pTSConverter->rtp2hxa(pkt.timestamp);
}
else
{
timeStampHX = pkt.timestamp;
}
timeStampHX += (m_lSyncOffsetHX +
m_lOffsetToMasterHX -
m_lTimeOffsetHX);
timeStampRTP = (pkt.timestamp +
m_lSyncOffsetRTP +
m_lOffsetToMasterRTP -
m_lTimeOffsetRTP);
m_ulLastHXTS = timeStampHX;
m_ulLastRTPTS = timeStampRTP;
m_ulLastRawRTPTS = pkt.timestamp;
m_bLastTSSet = TRUE;
}
#ifdef RTP_MESSAGE_DEBUG
if (m_bMessageDebug)
{
messageFormatDebugFileOut("PKT: (Seq=%6u,RTPTime=%10u) -> (HXTimeval=%10u,RTPTimeval=%10u)",
((UINT16) pkt.seq_no), pkt.timestamp,
timeStampHX, timeStampRTP);
}
#endif // RTP_MESSAGE_DEBUG
pStreamData->m_bFirstPacket = FALSE;
CHXPacket* pPacket = new CHXRTPPacket;
if(pPacket)
{
pPacket->AddRef();
}
else
{
hresult = HXR_OUTOFMEMORY;
}
UINT32 dataOffset=
(UINT32)((PTR_INT)pkt.data.data - (PTR_INT)pBuffer->GetBuffer());
IHXBuffer* pPktBuffer =
new CHXStaticBuffer(pBuffer, dataOffset, pkt.data.len);
if(pPktBuffer)
{
pPktBuffer->AddRef();
}
else
{
hresult = HXR_OUTOFMEMORY;
}
if( hresult == HXR_OUTOFMEMORY )
{
HX_RELEASE(pPacket);
return hresult;
}
if (bHasASMRules)
{
pPacket->SetRTP(pPktBuffer, timeStampHX, timeStampRTP, m_streamNumber,
(UINT8) pkt.asm_flags, pkt.asm_rule);
}
else if(pkt.marker_flag == 1 && m_bHasMarkerRule)
{
pPacket->SetRTP(pPktBuffer, timeStampHX, timeStampRTP, m_streamNumber,
HX_ASM_SWITCH_ON | HX_ASM_SWITCH_OFF, m_markerRuleNumber);
}
else
{
pPacket->SetRTP(pPktBuffer, timeStampHX, timeStampRTP, m_streamNumber,
HX_ASM_SWITCH_ON | HX_ASM_SWITCH_OFF, pkt.marker_flag ? 1 : 0);
}
if (m_bIsSource)
{
hresult = m_pResp->PacketReady(HXR_OK,
m_sessionID,
pPacket);
}
else
{
hresult = storePacket(pPacket,
m_streamNumber,
pkt.seq_no,
0,
0);
}
pPktBuffer->Release();
pPacket->Release();
return hresult;
}
HX_RESULT
RTPBaseTransport::handleRTCPSync(NTPTime ntpTime, ULONG32 ulRTPTime)
{
HX_RESULT retVal = HXR_IGNORE;
// We use RTCP synchronization on live streams only.
// Static streams have no reason not to be synchronzied in RTP time.
// Making use of RTCP for static streams may result in unwanted sync.
// noise/error for servers who do not generate proper RTCP ntp-rtp
// mapping. RealServers prior to RealServer9 had error in RTCP reported
// ntp-rtp mapping (max. error 1s, avg 500ms).
if ((m_bIsLive || m_bWeakStartSync) && !m_bSkipTimeAdjustment)
{
ULONG32 ulNtpHX = ntpTime.toMSec();
RTSPStreamData* pStreamData =
m_pStreamHandler->getStreamData(m_streamNumber);
#ifdef RTP_MESSAGE_DEBUG
messageFormatDebugFileOut("RTCP-SYNC: Received NTPTime=%u RTPTime=%u",
ulNtpHX, ulRTPTime);
#endif // RTP_MESSAGE_DEBUG
// We ignore the RTCP sync until we can compute npt (m_bRTPTimeSet) or
// if the RTCP packet contains no synchronization information
// (ulNtpHX == 0)
if (pStreamData && (ulNtpHX != 0) && m_bRTPTimeSet)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -